cQube | LLD - v12Dec2022
Purpose
cQube solution architecture document:cQube | Design Document (Nov 2022)
In this document focus is on evolving Design Specs, which can be used by the development team.
Audience
Solution development team
How to use this document
This document is organized by topics. Various illustrations/models used in this document are present in this document: cQube | Design Specs | Models - v24Nov2022
TopicsData processing pipeline alternatives
2 possible solutions were considered:
Using Postgres
Using Nifi
Alt: Postgres model:
v4.0 - as-is
ingestion: python api, storage: s3, processing: nifi, aggregate data storage: postgres exhaust: s3 (json), viz engine: nodejs, viz:angular, spec: python based generator (fragments of SQL)
v5: ingestion: nodejs api, store (till processed):postgres db
POST /spec/* → when a new spec is added (spec=event + dataset + pipe)
/event - create table <input> ⇒ e
Acts as a Q
Helps type validation
/dataset - create table <input> ⇒ c
/pipe -
create a new pipe in table <pipeline>
pipe connects event to cube via transformer ⇒ event → transformer → dataset
transformer ⇒ create trigger <transformer> on event
POST /event → when a new event arrives
j ⇒ select e.c1, c.c2+1, c.c3+e.c3 from cqb1, evt1 inner join cqb1.c1=evt.c1 and cqb1
update cqb1 set c1=j.c1, c2=j.c2, c3=j.c3 from j
Transformer is a PG function connected via trigger on event table
Transformer updates dataset
Alt: Nifi model:
Events are passed to API (ajv based validation)
API writes to event_batch_file
Every 5 min, trigger intermediate_cube_processor
API will write to flowfile & ideally that should nifi process group
Process group - transformer - event_data, event_specs, cube_schema, cube_data - Py/Js
Receives Event data
Pull all required data (from store)
Perform transformation
Store in an intermediate cube.csv
Upsert the cube, once in every 5 min
Based on
Traceability (where in the flow is my data)
Horizontal scalability
Flexibility - ability to add additional steps or pipelines or create side-effects
Nifi based data processing model is chosen for cQube.
Physical Architecture
Kong
Used as a API gateway
Used for AuthN & AuthZ <<TBD>>
Auth: handles AuthN and AuthZ <<TBD - Alternatives>>
AuthN (identity and role assignment):
Implementation choices
Location
Internal
External
Solution
Keycloak
Using a 3rd party - Auth0
Design Factors/Requirements
State may have their own - duplication
Management Overhead
Users
Adapter
Dashboard Users
Admin
Public?
Internal Service Communication?
Keeping services stateless assuming access is authenticated already - JWT being the easiest to manage.
Should support any OIDC based authentication (allow for external SSO integration)
AuthZ: access to resources based on role
Implementation Choices
OPA - I am leaning towards this as it fulfills all requirements
…
Requirements
Need it to be internal to cQube
Role based authorization
Works on
UI
SQL - SQL based filtering layer which dynamically filters rows and columns, based on role and configuration
APIs
Infrastructure
Flow Engine: manages various data ingestion pipelines
User MS: To manage users and roles
Spec MS: Exposes APIs to process specifications - events, dimensions, datasets, transformers, schedules and pipelines
Ingest MS: Exposes APIs to ingest event, dimension and dataset data
Spec db: stores various specification details
Config db: stores cQube specific data such as usage, roles, access control etc
Ingest db: stores ingested data created by collecting events, dimensions and datasets before updating dataset (KPI) db
Dataset db: stores KPI data. It is updated from ingested data <<TBD - should we consider Timescale>>
SQL Access: <<TBD - potentially Hasura>>
Deployment Architecture
AWS - Network Architecture
The following steps define the cQube setup and workflow completion processes in AWS. cQube mainly comprises the areas mentioned below:
EC2 Server
IAM user and Role creation for S3 connectivity.
The cQube network setup process is described in the block diagram below:
Microservices Details
Following are the details of the microservices which get installed in the cqube server.
Ingestion-ms: The ingestion-ms is used to upload the data of the events, datasets, dimensions, transformers and pipeline. All these apis will be to ingesting the data into the cQube.
Spec-ms: The spec-ms is used to import schema of the events, datasets, dimensions, transformers and pipeline. All these specs will be defined by the cQube platform prior to ingesting the data into the cQube. These specifications are derived by considering the KPIs as the Indicator.
Generator-ms: The generator-ms is used to create the specs & transformers for the derived datasets. Performed aggregation logics, updating data to datasets based on transformation. Status update of file processing
Nifi-ms: Apache NiFi is used as a real-time integrated data logistics and simple event processing platform
Postgres-ms: Postgres microservice contains the schema and tables
Nginx-ms: It is commonly used as a reverse proxy and load balancer to manage incoming traffic and distribute it to slower upstream servers
Kong-ms: It is a lightweight API Gateway that secures, manages, and extends APIs and microservices.
Spec pipeline
Defines how specs are processed into cQube.
Specs can be added to cQube via Spec microservice
Spec MS interacts with data flow engine (nifi) and spec db (postgrsql)
Spec microservice namespace: /spec/*
Key APIs & processing steps
POST /spec/dimension -
Compile and verify validity
Store in spec.dimension table
Create the actual dimension table
The data type given in the spec will be mapped to the equivalent data type in the postgres, like if the datatype in spec is “string” then the column datatype in the database will be “VARCHAR”
Similarly will get all the columns and their respective data types and generate a dynamic query and create a dimension table.
POST /spec/event -
Compile and verify validity
Store in spec.event table
POST /spec/dataset -
Compile and verify validity
Store spec.dataset table
Create the actual dataset table
The data type given in the spec will be mapped to the equivalent data type in the postgres, like if the datatype in spec is “number” then the column datatype in the database will be “NUMERIC”
Similarly will get all the columns and their respective data types and generate a dynamic query and create a dataset table.
POST /spec/transformer -
This is accessible to only superadmins
Compile and verify validity - aware of required ingestion type, key_file, program name and operation.
Where applicable, internally, a code generator will be used to create a transformer
It is assumed that each transformer is tested and coming from a trusted source
Store in spec.transformer table
POST /spec/pipeline -
Compile and verify validity
Store in spec.pipeline table
Nifi compilation - abstracted by a wrapper layer which hides nifi calls
Create a new nifi processor_group
Add processors to the newly created processor group
Connect the transformer into relevant processor
POST /spec/schedule -
Compile and verify validity
Create/Update schedule for a pipeline
Spec Flow
Yaml Link -> https://github.com/Sunbird-cQube/spec-ms/blob/dev/spec.yaml ;
The specifications flow diagram represents the schema of the events, datasets, dimensions, transformers and pipeline. All these specs will be defined by the cQube platform prior to ingesting the data into the cQube. These specifications are derived by considering the KPIs as the Indicator.
Dimension Specification
The dimension spec will be validated with the predefined specification using AJV framework.
When the validation is successful then the spec will be checked for duplication.
If the spec already exists with the same data then an error response will be sent.
Else stored in the spec.dimension table.
Add a record into spec.pipeline table
Once the spec is stored, a dimension table will be created.
Event Specification
The event spec will be validated with the predefined specification using AJV framework.
When the validation is successful then the spec will be checked for duplicacy.
If the spec already exists with the same data then an error response will be sent.
Else stored in the event spec table.
Add a record into spec.pipeline table
Dataset Specification
The Dataset spec will be validated with the predefined specification using AJV framework.
When the validation is successful then the spec will be checked for duplicacy.
If the spec already exists with the same data then an error response will be sent.
Else stored in the dataset spec table.
Add a record into spec.pipeline table
Once the spec is stored, dataset table will be created
Transformer Specification
The Transformer spec will be validated with the predefined specification using AJV framework.
When the validation is successful then internally python api is called to generate transformer files and store them for later implementation for the given program by passing ingestion_name, key_file, program and operation.
If the transformer file is successfully created then a record is added into spec.transformer table with the file name
Pipeline Specification
Event processing
Event collection pipeline - /ingestion/event
Event collection to aggregation pipeline - /ingestion/pipeline <event name>
Aggregation to dataset upsert pipeline - /ingestion/pipeline <dataset name>
Dimension processing
Dimension collection pipeline - /ingestion/dimension
Dimension collection to dimension upsert pipeline - /ingestion/pipeline <dimension name>
Dataset processing
Dataset collection pipeline - /ingestion/dataset
Dataset collection to Dataset upsert pipeline - /ingestion/pipeline <dataset name>
The Pipeline spec will be validated with the predefined specification using AJV framework.
When the validation is successful then the spec will be checked for duplicacy.
If the spec already exists with the same data then an error response will be sent.
Else stored in the pipeline spec table.
Pipeline types are listed below
Dimension to Collection
Dimension collection to db
Event to Collection
Dataset to collection
Event collection to Aggregate
Aggregate to DB
If the Pipeline type is Aggregate to DB then processor group will be created and processors will be added inside the processor group by calling Nifi APIs
Processor group will be mapped to the transformer.
And connection will be established between the processors
Spec DB Schema
Below are the tables included in spec schema
spec.dimension
spec.event
spec.dataset
spec.transformer
spec.pipeline
spec.schedule
Ingestion pipeline
Data is ingested into cQube via Ingest MS
Ingest MS stores the data into Ingest Store
Based on schedule Ingest Flow engine interacts with ingest Store and Updates Dataset DB
Ingestion microservice namespace: /ingestion/*
Key APIs & processing steps
POST /ingestion/event
Validate event data
Append to corresponding event file (csv)
POST /ingestion/dimension
Validate dimension data
Append to corresponding dimension file (csv)
POST /ingestion/dataset
Validate dataset data
Append to corresponding aggregate file (csv)
POST /ingestion/pipeline
Trigger pipeline by - event, dimension, aggregate or dataset from ingestion store and transform and upsert dataset store
POST /ingestion/csv
i. Upload the CSV file with event, dimension and dataset data
ii. Validates the data
iii. Internally make /ingestion/event, ingestion/dimension, ingestion/dataset API
GET /ingestion/file-status
i. Gives the Status of the file which was uploaded using /ingestion/csv API
PUT /ingestion/file-status
ii. This is an internal API called from transformer files when the processing starts and when the processing completes
Flow engine is configured to trigger pipelines based on schedule
Ingestion Flow
Dimension Data
The dimension data will be added using POST /ingestion/dimension API.
Data will be validated with the dimension spec using AJV.
When the validation is successful then the dimension data will be stored in the CSV file.
Based on the schedule time processor group will be triggered and data will be added into DB.
Event Data
The Event data will be added using POST /ingestion/event API.
Data will be validated with the event spec using AJV.
When the validation is successful then the event data will be stored in the CSV file.
Based on the schedule time processor group will be triggered and data will be ingested into DB.
Dataset
The Dataset data will be added using POST /ingestion/dataset API.
Data will be validated with the dataset spec using AJV.
When the validation is successful then the dataset data will be stored in the CSV file.
Based on the schedule time processor group will be triggered and data will be ingested into DB.
Upload CSV Flow
Data can be ingested in two ways
Through Ingestion APIs like /ingestion/dimension, /ingestion/event and ingestion/dataset
Using /ingestion/csv API
CSV file will be imported using POST /ingestion/csv api
When the validation is successful then a record will be added to ingestion.file_tracker table with file details like file_name, file_size and uploaded time
Asynchronous call will be made and response will be sent
In the asynchronous call, if the ingestion_type is event then records will be inserted into ingestion.file_pipeline_tracker table for all the datasets which will be generated using that event data.
spec schema will fetched from DB for the given ingestion type and validates the CSV data for the datatype
If there is any error then ingestion.file_tracker will be updated with the error status
If there is no error, then data will be read in batch with the batch limit of 1 Lakh and passed as input the ingestion/event, ingestion/dimension and ingestion/dataset API based on the ingestion type.
Once all the data is processed then the Uploaded status will be updated in ingestion.file_tracker table.
Pipeline Flow
POST /ingestion/scheduler API will add/update the schedule for Nifi Scheduler.
Nifi Scheduler will call the POST /ingestion/pipeline API
The Pipeline API will have following functionalities
Read the data from dimension collection and write to Dimension DB(S3)
II. Get the Ingest DB and store the data into the Dataset DB
File Status Flow
File status will be checked using GET /ingestion/file-status API
To update the file status PUT /ingestion/file-status API
If the file is of ingestion type event then update the file status in ingestion.file_tracker table and update the status in ingestion.file_pipeline_tracker table for the dataset for which the event is processed.
If all the dataset has completed processing then change the status in ingestion.file_tracker table as ‘Ready_to_archive”.
If the ingestion_type is dimension and dataset then update the file status in ingestion.file_tracker table as “Ready_to_archive”.
Ingestion schema
In the illustration below two dimensions as examples have been used.
ingestion.dimension_state
ingestion.dimension_district
ingestion.sac_stds_atd_cmp_by_school ->one example has been used to illustrate.Dataset tables will have dynamic columns and static columns. Dynamic columns will be added when dataset_spec is added. Static columns are sum,count,min,max,avg.
KPIs Flow
KPI (Key Performance Indicator)
KPIs are the one which defines what has to be derived from the input data or events
Each item in the Viz can be called as KPI.
Example: School_attendance
KPI Category : Student attendance Compliance
KPI VisualisationType : Table
KPI Indicator : School-wise average attendance compliance %
KPI Levels : School
If there are other Indicators need to show on dashboard, for example :
Cluster-wise Average attendance compliance %
Block-wise Average attendance compliance %
District-wise Average attendance compliance %
State-wise Average attendance compliance %
Events
A data structure that records an occurrence at a particular time for an entity (eg: school, etc). It is a combination of simple data types (eg: integer, varchar, etc.). An event should always contain a column/set of columns that helps you calculate the Indicator. A table with a timestamp doesn’t necessarily mean that it is an event; it should contribute to either aggregation or filtering of the dataset.
An Event should be in such a way that it should be able to update the dataset in some form
Transformation of Events to data that updates datasets - Transformation happens through a transformer: f(eventDetails, eventSchema, datasetSchema, dimesionConfig) = [array of columns]
Updating datasets
Example:
school_id | class | date | total_students | attendance_marked |
<Unique id> | <Integer Value> | <dd/mm/yy> | <integer Value> | <integer Value> |
Dimensions
Dimensions Describe events.
Dimensions are Static records
In the final aggregation we will be mapping Dimensions with Datasets.
Example:
Dimensions for above mentioned KPI are :
School : School_id,School_name,Cluster_Id,Block_id,District_id,State_Id
Cluster : Cluster_id,Cluster_name
Block : Block_id,Block_name
District : District_id,District_name
State_Id : State_Id,State_name
Datasets
High-level data which is computed by aggregating events. It is a data representation of the indicator. Datasets are persistent within cQube Ed. A dataset is created for at least one indicator.
A dataset can be derived from one or more specified events, independently.
It may additionally contain dimensions and derived values from other datasets during the mapping process.
Example : School_attendance
Dataset for the above mentioned KPI is defined below:
date | school_id | Sum(total_students) | sum(attendance_marked) | Average Attendance % |
<yy-mm-dd> | <unique id> | <sum of parameters> | <sum of parameters> | <Sum of parameter/ sum of parameter> |
Datasets for the above mentioned Indicators :
Cluster-attendance
Block_attendance
District_attendance
State_attendance
Note:
There is a list of transformers. In this dataset we are using Sum
There is a list of dimensions. In the above mentioned dataset School is Dimension.
These are some sample KPIs derived as a set of events, dimensions, and datasets from the PAT & SAT datasource which are used in earlier cQube version v4.0 . Link
Developer & QC WorkFlow
The Developer & QC workflow activity is described in the above diagram from how the task is identified & assigned till it merges to the respective branch in the Developer workflow. Once the code is merged, QC will then test their scripts and execute the test cases and produce the test result.
Devops
Installation Process
Following technologies will be used to implement cQube as a one step installation.
Shell Scripting - We are using shell script to install the basic softwares required to run ansible and docker. To generate a configuration file by prompting the questions to the user. To run the ansible playbooks.
Docker - For Containerising the micro services
Docker Swarm - Swarm consists of multiple docker hosts which can act as a manager which can handle multiple nodes. By implementing this we can ensure that the system continue to run even when one node fails
Ansible - To easily build the code and to reuse the code.
Github Actions - For Continuous Integration and Continuous Deployment
Steps: Users will clone the code from the github, checkout to the latest repository and run a shell script install.sh by giving executable permission to the script.
Install.sh File:
It checks if the user is running the script using sudo and throws an error accordingly.
Installs all the basic softwares like ansible, docker, python dependencies
Triggers a configuration.sh file which is used to generate a config.yml file by popping up respective questions to the user.
The value entered by the user will be validated and it will loop the same variable until the correct value is entered.
Once the config file is created, It will be previewed to the user to re-check the entered values.
Once the user confirms the configuration values, then the ansible scripts will get triggered.
Once all ansible scripts are triggered, Shell script shows the message “cqube installed successfully”
Ansible Script:
Ansible script will be triggered and it will help to build the docker images with the respective docker files provided.
Ansible triggers a docker-compose file to start all the docker containers within the server.
Ansible script to trigger nginx setup and configuration in the remote machine. ( nginx server )
Docker Containers:
Spec-ms
Ingestion-ms
Generator-ms
Nifi-ms
Postgres-ms
Kong-ms
Nginx-ms
Flow Diagram of One step Installation
Github Actions for CI/CD
GitHub Actions gives developers the ability to automate their workflows across issues, pull requests, and more—plus native CI/CD Functionality.
All GitHub Actions automations are handled via workflows, which are
YAML files placed under the .github/workflows directory in a repository
that define automated processes.
Every workflow consists of several different core concepts. These include:
● Events: Events are defined triggers that kick off a workflow.
● Jobs: Jobs are a set of steps that execute on the same runner.
● Steps: Steps are individual tasks that run commands in a job.
● Actions: An action is a command that’s executed on a runner—and
the core element of GitHub Actions,
● Runners: A runner is a GitHub Actions server. It listens for available
jobs, runs each in parallel, and reports back progress, logs and
Results.
The below diagram shows the flow of github actions workflow.
Build and Push Docker Image to AWS EC2 Using GitHub Actions:
The below diagram shows how github actions are being implemented for CI/CD. Whenever a user commits a code to the repository, Based on the event defined in the workflow file the deployments gets triggered.
Github actions build the code and perform unit testing for code coverage in its own virtual runners.
If the code is successfully built in its runners without any errors then it will build the docker image using the docker file which is provided.
Github actions then login to the docker hub account through the credentials provided as environments in its secrets.
Once the image is built, then it will push the image to the docker hub with the tags specified in the workflow.
Once the image is pushed to dockerhub, then the github actions trigger to deploy the code to ec2 server. It used a ssh predefined action which helps github actions to login to the ec2 server and pull the latest code from the github repository and deploys it into the server.
Deployment strategy in Dev, QA, Demo Environments
The below process is followed in each environment using its respective github repository branch. Two workflow files will be created in the github repository.
On the event of pull request: workflow action will get triggered where it checks the build of image and code coverage in the github action runners.
On the event of merge request:
A workflow action will get triggered where it builds the docker image out of the dockerfile >> login to the docker hub >> pushed the latest image to the docker hub repository >> connect to the ec2 server >> pulls the latest images from the dockerhub >> deploys into the server.
Developers raise a pull request
On the event of pull request github actions workflow triggers Continuous Integration where it will build and test the code.
Once the build is successful, Code Reviewer checks the code and merge it to the repository
In the event of a merge, github actions trigger continuous Deployment where it will deploy the code to the server.
Once the code is merged to dev the code will be merged to QA which again follows the same above process followed by demo Environment.
Visualization Design
A common generic function reads the metadata from the configuration file and generates a visualization.
This configuration file consists of various queries which will be applied on cubes on load of the report and also on various levels mentioned in the filters section of the configuration file.
A complete configuration file can be found here : student attendance module configuration file
There are generic functions developed to construct filters, tooltips and also few functions to build queries provided with the dynamic values from filters.
A Time Series Component was developed which takes min and max dates of the data available and lets the user choose the range of dates he/she needs to filter within the available dates.
For all the above functionalities to work there will be different queries which need to be configured in the file in a certain way.
Example of the queries needed for table and bignumber visualization of student attendance compliance report on the selection of a district level filter.
Here on selection of the district level, there will be 2 collections of queries that need to be executed. One is to get the filter options for the next level of filter.
And another is a set of queries which will be selected depending on whether the time series filter is already selected or not.
timeSeriesQueries whenever the time series is selected, else the queries in the actions sections will be executed
Here we can provide multiple queries depending on the data needed for the current visualization. Since this visualization needs 3 queries, 1 for Table and 2 for big number representation.
Also for a specific visualization we need to provide the metadata needed to convert the data from the queries to a standard format that specific report type component expects.
For Table:
Ex:
Here we need to add metadata for every column in the table and some options for sorting data on load of the report.
For Big Number:
In big numbers a value suffix as an option for the indicator.
For Bar Chart:
In the Bar Chart we can configure the label, title and values for both axes, and specifically for xAxis we can have more than one label-values as metrics as shown below.