cQube | LLD - v12Dec2022
Purpose
cQube solution architecture document:cQube | Design Document (Nov 2022)
In this document focus is on evolving Design Specs, which can be used by the development team.
Audience
Solution development team
How to use this document
This document is organized by topics. Various illustrations/models used in this document are present in this document: cQube | Design Specs | Models - v24Nov2022
TopicsData processing pipeline alternatives
2 possible solutions were considered:
Using Postgres
Using Nifi
Alt: Postgres model:
v4.0 - as-is
ingestion: python api, storage: s3, processing: nifi, aggregate data storage: postgres exhaust: s3 (json), viz engine: nodejs, viz:angular, spec: python based generator (fragments of SQL)
v5: ingestion: nodejs api, store (till processed):postgres db
POST /spec/* → when a new spec is added (spec=event + dataset + pipe)
/event - create table <input> ⇒ e
Acts as a Q
Helps type validation
/dataset - create table <input> ⇒ c
/pipe -
create a new pipe in table <pipeline>
pipe connects event to cube via transformer ⇒ event → transformer → dataset
transformer ⇒ create trigger <transformer> on event
POST /event → when a new event arrives
j ⇒ select e.c1, c.c2+1, c.c3+e.c3 from cqb1, evt1 inner join cqb1.c1=evt.c1 and cqb1
update cqb1 set c1=j.c1, c2=j.c2, c3=j.c3 from j
Transformer is a PG function connected via trigger on event table
Transformer updates dataset
Alt: Nifi model:
Events are passed to API (ajv based validation)
API writes to event_batch_file
Every 5 min, trigger intermediate_cube_processor
API will write to flowfile & ideally that should nifi process group
Process group - transformer - event_data, event_specs, cube_schema, cube_data - Py/Js
Receives Event data
Pull all required data (from store)
Perform transformation
Store in an intermediate cube.csv
Upsert the cube, once in every 5 min
Based on
Traceability (where in the flow is my data)
Horizontal scalability
Flexibility - ability to add additional steps or pipelines or create side-effects
Nifi based data processing model is chosen for cQube.
Physical Architecture
Kong
Used as a API gateway
Used for AuthN & AuthZ <<TBD>>
Auth: handles AuthN and AuthZ <<TBD - Alternatives>>
AuthN (identity and role assignment):
Implementation choices
Location
Internal
External
Solution
Keycloak
Using a 3rd party - Auth0
Design Factors/Requirements
State may have their own - duplication
Management Overhead
Users
Adapter
Dashboard Users
Admin
Public?
Internal Service Communication?
Keeping services stateless assuming access is authenticated already - JWT being the easiest to manage.
Should support any OIDC based authentication (allow for external SSO integration)
AuthZ: access to resources based on role
Implementation Choices
OPA - I am leaning towards this as it fulfills all requirements
…
Requirements
Need it to be internal to cQube
Role based authorization
Works on
UI
SQL - SQL based filtering layer which dynamically filters rows and columns, based on role and configuration
APIs
Infrastructure
Flow Engine: manages various data ingestion pipelines
User MS: To manage users and roles
Spec MS: Exposes APIs to process specifications - events, dimensions, datasets, transformers, schedules and pipelines
Ingest MS: Exposes APIs to ingest event, dimension and dataset data
Spec db: stores various specification details
Config db: stores cQube specific data such as usage, roles, access control etc
Ingest db: stores ingested data created by collecting events, dimensions and datasets before updating dataset (KPI) db
Dataset db: stores KPI data. It is updated from ingested data <<TBD - should we consider Timescale>>
SQL Access: <<TBD - potentially Hasura>>
Deployment Architecture
AWS - Network Architecture
The following steps define the cQube setup and workflow completion processes in AWS. cQube mainly comprises the areas mentioned below:
EC2 Server
IAM user and Role creation for S3 connectivity.
The cQube network setup process is described in the block diagram below:
Microservices Details
Following are the details of the microservices which get installed in the cqube server.
Ingestion-ms: The ingestion-ms is used to upload the data of the events, datasets, dimensions, transformers and pipeline. All these apis will be to ingesting the data into the cQube.
Spec-ms: The spec-ms is used to import schema of the events, datasets, dimensions, transformers and pipeline. All these specs will be defined by the cQube platform prior to ingesting the data into the cQube. These specifications are derived by considering the KPIs as the Indicator.
Generator-ms: The generator-ms is used to create the specs & transformers for the derived datasets. Performed aggregation logics, updating data to datasets based on transformation. Status update of file processing
Nifi-ms: Apache NiFi is used as a real-time integrated data logistics and simple event processing platform
Postgres-ms: Postgres microservice contains the schema and tables
Nginx-ms: It is commonly used as a reverse proxy and load balancer to manage incoming traffic and distribute it to slower upstream servers
Kong-ms: It is a lightweight API Gateway that secures, manages, and extends APIs and microservices.
Spec pipeline
Defines how specs are processed into cQube.
Specs can be added to cQube via Spec microservice
Spec MS interacts with data flow engine (nifi) and spec db (postgrsql)
Spec microservice namespace: /spec/*
Key APIs & processing steps
POST /spec/dimension -
Compile and verify validity
Store in spec.dimension table
Create the actual dimension table
The data type given in the spec will be mapped to the equivalent data type in the postgres, like if the datatype in spec is “string” then the column datatype in the database will be “VARCHAR”
Similarly will get all the columns and their respective data types and generate a dynamic query and create a dimension table.
POST /spec/event -
Compile and verify validity
Store in spec.event table
POST /spec/dataset -
Compile and verify validity
Store spec.dataset table
Create the actual dataset table
The data type given in the spec will be mapped to the equivalent data type in the postgres, like if the datatype in spec is “number” then the column datatype in the database will be “NUMERIC”
Similarly will get all the columns and their respective data types and generate a dynamic query and create a dataset table.
POST /spec/transformer -
This is accessible to only superadmins
Compile and verify validity - aware of required ingestion type, key_file, program name and operation.
Where applicable, internally, a code generator will be used to create a transformer
It is assumed that each transformer is tested and coming from a trusted source
Store in spec.transformer table
POST /spec/pipeline -
Compile and verify validity
Store in spec.pipeline table
Nifi compilation - abstracted by a wrapper layer which hides nifi calls
Create a new nifi processor_group
Add processors to the newly created processor group
Connect the transformer into relevant processor
POST /spec/schedule -
Compile and verify validity
Create/Update schedule for a pipeline
Spec Flow
Yaml Link ->
spec-ms/spec.yaml at dev · Sunbird-cQube/spec-ms ;
The specifications flow diagram represents the schema of the events, datasets, dimensions, transformers and pipeline. All these specs will be defined by the cQube platform prior to ingesting the data into the cQube. These specifications are derived by considering the KPIs as the Indicator.
Dimension Specification
The dimension spec will be validated with the predefined specification using AJV framework.
When the validation is successful then the spec will be checked for duplication.
If the spec already exists with the same data then an error response will be sent.
Else stored in the spec.dimension table.
Add a record into spec.pipeline table
Once the spec is stored, a dimension table will be created.
Event Specification
The event spec will be validated with the predefined specification using AJV framework.
When the validation is successful then the spec will be checked for duplicacy.
If the spec already exists with the same data then an error response will be sent.
Else stored in the event spec table.
Add a record into spec.pipeline table
Dataset Specification
The Dataset spec will be validated with the predefined specification using AJV framework.
When the validation is successful then the spec will be checked for duplicacy.
If the spec already exists with the same data then an error response will be sent.
Else stored in the dataset spec table.
Add a record into spec.pipeline table
Once the spec is stored, dataset table will be created
Transformer Specification
The Transformer spec will be validated with the predefined specification using AJV framework.
When the validation is successful then internally python api is called to generate transformer files and store them for later implementation for the given program by passing ingestion_name, key_file, program and operation.
If the transformer file is successfully created then a record is added into spec.transformer table with the file name
Pipeline Specification
Event processing
Event collection pipeline - /ingestion/event
Event collection to aggregation pipeline - /ingestion/pipeline <event name>
Aggregation to dataset upsert pipeline - /ingestion/pipeline <dataset name>
Dimension processing
Dimension collection pipeline - /ingestion/dimension
Dimension collection to dimension upsert pipeline - /ingestion/pipeline <dimension name>
Dataset processing
Dataset collection pipeline - /ingestion/dataset
Dataset collection to Dataset upsert pipeline - /ingestion/pipeline <dataset name>
The Pipeline spec will be validated with the predefined specification using AJV framework.
When the validation is successful then the spec will be checked for duplicacy.
If the spec already exists with the same data then an error response will be sent.
Else stored in the pipeline spec table.
Pipeline types are listed below
Dimension to Collection
Dimension collection to db
Event to Collection
Dataset to collection
Event collection to Aggregate
Aggregate to DB
If the Pipeline type is Aggregate to DB then processor group will be created and processors will be added inside the processor group by calling Nifi APIs
Processor group will be mapped to the transformer.
And connection will be established between the processors
Spec DB Schema
Below are the tables included in spec schema
spec.dimension
spec.event
spec.dataset
spec.transformer
spec.pipeline
spec.schedule
Ingestion pipeline
Data is ingested into cQube via Ingest MS
Ingest MS stores the data into Ingest Store
Based on schedule Ingest Flow engine interacts with ingest Store and Updates Dataset DB
Ingestion microservice namespace: /ingestion/*
Key APIs & processing steps
POST /ingestion/event
Validate event data
Append to corresponding event file (csv)
POST /ingestion/dimension
Validate dimension data
Append to corresponding dimension file (csv)
POST /ingestion/dataset
Validate dataset data
Append to corresponding aggregate file (csv)
POST /ingestion/pipeline
Trigger pipeline by - event, dimension, aggregate or dataset from ingestion store and transform and upsert dataset store
POST /ingestion/csv
i. Upload the CSV file with event, dimension and dataset data
ii. Validates the data
iii. Internally make /ingestion/event, ingestion/dimension, ingestion/dataset API
GET /ingestion/file-status
i. Gives the Status of the file which was uploaded using /ingestion/csv API
PUT /ingestion/file-status
ii. This is an internal API called from transformer files when the processing starts and when the processing completes
Flow engine is configured to trigger pipelines based on schedule
Ingestion Flow
Dimension Data
The dimension data will be added using POST /ingestion/dimension API.
Data will be validated with the dimension spec using AJV.
When the validation is successful then the dimension data will be stored in the CSV file.
Based on the schedule time processor group will be triggered and data will be added into DB.
Event Data
The Event data will be added using POST /ingestion/event API.
Data will be validated with the event spec using AJV.
When the validation is successful then the event data will be stored in the CSV file.
Based on the schedule time processor group will be triggered and data will be ingested into DB.
Dataset
The Dataset data will be added using POST /ingestion/dataset API.
Data will be validated with the dataset spec using AJV.
When the validation is successful then the dataset data will be stored in the CSV file.
Based on the schedule time processor group will be triggered and data will be ingested into DB.
Upload CSV Flow
Data can be ingested in two ways
Through Ingestion APIs like /ingestion/dimension, /ingestion/event and ingestion/dataset
Using /ingestion/csv API
CSV file will be imported using POST /ingestion/csv api
When the validation is successful then a record will be added to ingestion.file_tracker table with file details like file_name, file_size and uploaded time
Asynchronous call will be made and response will be sent
In the asynchronous call, if the ingestion_type is event then records will be inserted into ingestion.file_pipeline_tracker table for all the datasets which will be generated using that event data.
spec schema will fetched from DB for the given ingestion type and validates the CSV data for the datatype
If there is any error then ingestion.file_tracker will be updated with the error status
If there is no error, then data will be read in batch with the batch limit of 1 Lakh and passed as input the ingestion/event, ingestion/dimension and ingestion/dataset API based on the ingestion type.
Once all the data is processed then the Uploaded status will be updated in ingestion.file_tracker table.
Pipeline Flow
POST /ingestion/scheduler API will add/update the schedule for Nifi Scheduler.
Nifi Scheduler will call the POST /ingestion/pipeline API
The Pipeline API will have following functionalities
Read the data from dimension collection and write to Dimension DB(S3)
II. Get the Ingest DB and store the data into the Dataset DB
File Status Flow
File status will be checked using GET /ingestion/file-status API
To update the file status PUT /ingestion/file-status API
If the file is of ingestion type event then update the file status in ingestion.file_tracker table and update the status in ingestion.file_pipeline_tracker table for the dataset for which the event is processed.
If all the dataset has completed processing then change the status in ingestion.file_tracker table as ‘Ready_to_archive”.
If the ingestion_type is dimension and dataset then update the file status in ingestion.file_tracker table as “Ready_to_archive”.
Ingestion schema
In the illustration below two dimensions as examples have been used.
ingestion.dimension_state
ingestion.dimension_district
ingestion.sac_stds_atd_cmp_by_school ->one example has been used to illustrate.Dataset tables will have dynamic columns and static columns. Dynamic columns will be added when dataset_spec is added. Static columns are sum,count,min,max,avg.
KPIs Flow
KPI (Key Performance Indicator)
KPIs are the one which defines what has to be derived from the input data or events
Each item in the Viz can be called as KPI.
Example: School_attendance
KPI Category : Student attendance Compliance
KPI VisualisationType : Table
KPI Indicator : School-wise average attendance compliance %
KPI Levels : School
If there are other Indicators need to show on dashboard, for example :
Cluster-wise Average attendance compliance %
Block-wise Average attendance compliance %
District-wise Average attendance compliance %
State-wise Average attendance compliance %
Events
A data structure that records an occurrence at a particular time for an entity (eg: school, etc). It is a combination of simple data types (eg: integer, varchar, etc.). An event should always contain a column/set of columns that helps you calculate the Indicator. A table with a timestamp doesn’t necessarily mean that it is an event; it should contribute to either aggregation or filtering of the dataset.
An Event should be in such a way that it should be able to update the dataset in some form
Transformation of Events to data that updates datasets - Transformation happens through a transformer: f(eventDetails, eventSchema, datasetSchema, dimesionConfig) = [array of columns]
Updating datasets
Example: