cQube | LLD - v12Dec2022

cQube | LLD - v12Dec2022

Purpose

cQube solution architecture document:cQube | Design Document (Nov 2022)

In this document focus is on evolving Design Specs, which can be used by the development team.

Audience

Solution development team

How to use this document

This document is organized by topics. Various illustrations/models used in this document are present in this document: cQube | Design Specs | Models - v24Nov2022 

TopicsData processing pipeline alternatives

2 possible solutions were considered:

  1. Using Postgres 

  2. Using Nifi

 

Alt: Postgres model:

  • v4.0 - as-is

    • ingestion: python api, storage: s3, processing: nifi, aggregate data storage: postgres exhaust: s3 (json), viz engine: nodejs, viz:angular, spec: python based generator (fragments of SQL)

  • v5: ingestion: nodejs api, store (till processed):postgres db

  • POST /spec/* → when a new spec is added (spec=event + dataset + pipe)

    • /event - create table <input> ⇒ e

      • Acts as a Q

      • Helps type validation

    • /dataset - create table <input> ⇒ c

    • /pipe - 

      • create a new pipe in table <pipeline>

      • pipe connects event to cube via transformer ⇒ event → transformer → dataset

      • transformer ⇒ create trigger <transformer> on event

  • POST /event → when a new event arrives

    • j ⇒ select e.c1, c.c2+1, c.c3+e.c3 from cqb1, evt1 inner join cqb1.c1=evt.c1 and cqb1

    • update cqb1 set c1=j.c1, c2=j.c2, c3=j.c3 from j

    • Transformer is a PG function connected via trigger on event table

    • Transformer updates dataset

Alt: Nifi model:

  1. Events are passed to API (ajv based validation)

  2. API writes to event_batch_file

  3. Every 5 min, trigger intermediate_cube_processor

  4. API will write to flowfile & ideally that should nifi process group

  5. Process group - transformer - event_data, event_specs, cube_schema, cube_data - Py/Js

    1. Receives Event data

    2. Pull all required data (from store)

    3. Perform transformation

    4. Store in an intermediate cube.csv

    5. Upsert the cube, once in every 5 min

 

Based on 

  • Traceability (where in the flow is my data)

  • Horizontal scalability

  • Flexibility - ability to add additional steps or pipelines or create side-effects

 

Nifi based data processing model is chosen for cQube.

 

Physical Architecture

   

  1. Kong

    1. Used as a API gateway

    2. Used for AuthN & AuthZ <<TBD>>

  2. Auth: handles AuthN and AuthZ <<TBD - Alternatives>>

    1. AuthN (identity and role assignment): 

      1. Implementation choices

        1. Location

          1. Internal

          2. External

        2. Solution

          1. Keycloak

          2. Using a 3rd party - Auth0

      2. Design Factors/Requirements

        1. State may have their own - duplication

        2. Management Overhead

        3. Users

          1. Adapter

          2. Dashboard Users

            1. Admin

            2. Public?

          3. Internal Service Communication?

        4. Keeping services stateless assuming access is authenticated already - JWT being the easiest to manage.

        5. Should support any OIDC based authentication (allow for external SSO integration)

    2. AuthZ: access to resources based on role

      1. Implementation Choices

        1. OPA - I am leaning towards this as it fulfills all requirements

      2. Requirements

        1. Need it to be internal to cQube

        2. Role based authorization

        3. Works on

          1. UI

          2. SQL - SQL based filtering layer which dynamically filters rows and columns, based on role and configuration

          3. APIs

          4. Infrastructure

  3. Flow Engine: manages various data ingestion pipelines

  4. User MS: To manage users and roles

  5. Spec MS: Exposes APIs to process specifications - events, dimensions, datasets, transformers, schedules and pipelines

  6. Ingest MS: Exposes APIs to ingest event, dimension and dataset data

  7. Spec db: stores various specification details

  8. Config db: stores cQube specific data such as usage, roles, access control etc

  9. Ingest db: stores ingested data created by collecting events, dimensions and datasets before updating dataset (KPI) db

  10. Dataset db: stores KPI data. It is updated from ingested data <<TBD - should we consider Timescale>>

  11. SQL Access: <<TBD - potentially Hasura>>

Deployment Architecture

AWS - Network Architecture

The following steps define the cQube setup and workflow completion processes in AWS. cQube mainly comprises the areas mentioned below:

  1. EC2 Server

  2. IAM user and Role creation for S3 connectivity.

The cQube network setup process is described in the block diagram below:

 

Microservices Details

   Following are the details of the microservices which get installed in the cqube server.

  • Ingestion-ms: The ingestion-ms is used to upload the data of the events, datasets, dimensions, transformers and pipeline. All these apis will be to ingesting the data into the cQube.

  • Spec-ms: The spec-ms is used to import schema of the events, datasets, dimensions, transformers and pipeline. All these specs will be defined by the cQube platform prior to ingesting the data into the cQube. These specifications are derived by considering the KPIs as the Indicator.

  • Generator-ms: The generator-ms is used to create the specs & transformers for the derived datasets. Performed aggregation logics, updating data to datasets based on transformation. Status update of file processing

  • Nifi-ms: Apache NiFi is used as a real-time integrated data logistics and simple event processing platform

  • Postgres-ms: Postgres microservice contains the schema and tables

  • Nginx-ms: It is commonly used as a reverse proxy and load balancer to manage incoming traffic and distribute it to slower upstream servers

  • Kong-ms: It is a lightweight API Gateway that secures, manages, and extends APIs and microservices.

 

Spec pipeline

Defines how specs are processed into cQube.

 

  1. Specs can be added to cQube via Spec microservice

  2. Spec MS interacts with data flow engine (nifi) and spec db (postgrsql)

  3. Spec microservice namespace: /spec/*

  4. Key APIs & processing steps

    1. POST /spec/dimension - 

      1. Compile and verify validity

      2. Store in spec.dimension table

      3. Create the actual dimension table

  • The data type given in the spec will be mapped to the equivalent data type in the postgres, like if the datatype in spec is “string” then the column datatype in the database will be “VARCHAR”

  • Similarly will get all the columns and their respective data types and generate a dynamic query and create a dimension table.

  1. POST /spec/event -

    1. Compile and verify validity

    2. Store in spec.event table

  2. POST /spec/dataset -

    1. Compile and verify validity

    2. Store spec.dataset table

    3. Create the actual dataset table

  • The data type given in the spec will be mapped to the equivalent data type in the postgres, like if the datatype in spec is “number” then the column datatype in the database will be “NUMERIC”

  • Similarly will get all the columns and their respective data types and generate a dynamic query and create a dataset table.

  1. POST /spec/transformer - 

    1. This is accessible to only superadmins

    2. Compile and verify validity - aware of required ingestion type, key_file, program name and operation.

    3. Where applicable, internally, a code generator will be used to create a transformer

    4. It is assumed that each transformer is tested and coming from a trusted source

    5. Store in spec.transformer table

  2. POST /spec/pipeline - 

    1. Compile and verify validity

    2. Store in spec.pipeline table

    3. Nifi compilation - abstracted by a wrapper layer which hides nifi calls

      1. Create a new nifi processor_group

      2. Add processors to the newly created processor group

      3. Connect the transformer into relevant processor

  3. POST /spec/schedule - 

  1. Compile and verify validity

  2. Create/Update schedule for a pipeline

                     

  

Spec Flow

Yaml Link -> spec-ms/spec.yaml at dev · Sunbird-cQube/spec-ms ;

The specifications flow diagram represents the schema of the events, datasets, dimensions, transformers and pipeline. All these specs will be defined by the cQube platform prior to ingesting the data into the cQube. These specifications are derived by considering the KPIs as the Indicator. 

Dimension Specification

  1. The dimension spec will be validated with the predefined specification using AJV framework.

  2. When the validation is successful then the spec will be checked for duplication.

  3. If the spec already exists with the same data then an error response will be sent.

  4. Else stored in the spec.dimension table.

  5. Add a record into spec.pipeline table

  6. Once the spec is stored, a dimension table will be created.

Event Specification

  1. The event spec will be validated with the predefined specification using AJV framework.

  2. When the validation is successful then the spec will be checked for duplicacy.

  3. If the spec already exists with the same data then an error response will be sent.

  4. Else stored in the event spec table.

  5. Add a record into spec.pipeline table

Dataset Specification

  1. The Dataset spec will be validated with the predefined specification using AJV framework.

  2. When the validation is successful then the spec will be checked for duplicacy.

  3. If the spec already exists with the same data then an error response will be sent.

  4. Else stored in the dataset spec table.

  5. Add a record into spec.pipeline table

  6. Once the spec is stored, dataset table will be created

Transformer Specification

  1. The Transformer spec will be validated with the predefined specification using AJV framework.

  2. When the validation is successful then internally python api is called to generate transformer files and store them for later implementation for the given program by passing ingestion_name, key_file, program and operation. 

  3. If the transformer file is successfully created then a record is added into spec.transformer table with the file name

Pipeline Specification

  1. Event processing

    1. Event collection pipeline - /ingestion/event

    2. Event collection to aggregation pipeline - /ingestion/pipeline <event name> 

    3. Aggregation to dataset upsert pipeline - /ingestion/pipeline <dataset name> 

  2. Dimension processing

    1. Dimension collection pipeline - /ingestion/dimension

    2. Dimension collection to dimension upsert pipeline - /ingestion/pipeline <dimension name> 

  3. Dataset processing 

    1. Dataset collection pipeline - /ingestion/dataset

    2. Dataset collection to Dataset upsert pipeline - /ingestion/pipeline <dataset name> 

 

  1. The Pipeline spec will be validated with the predefined specification using AJV framework.

  2. When the validation is successful then the spec will be checked for duplicacy.

  3. If the spec already exists with the same data then an error response will be sent.

  4. Else stored in the pipeline spec table.

  5. Pipeline types are listed below

  • Dimension to Collection

  • Dimension collection to db

  •  Event to Collection  

  • Dataset to collection

  • Event collection to Aggregate

  • Aggregate to DB

 

If the Pipeline type is Aggregate to DB then processor group will be created and processors will be added inside the processor group by calling Nifi APIs

  1. Processor group will be mapped to the transformer.

  2. And connection will be established between the processors

Spec DB Schema

Below are the tables included in spec schema

  1. spec.dimension

  2. spec.event

  3. spec.dataset

  4. spec.transformer

  5. spec.pipeline

  6. spec.schedule

Ingestion pipeline

  1. Data is ingested into cQube via Ingest MS

  1. Ingest MS stores the data into Ingest Store

  2. Based on schedule Ingest Flow engine interacts with ingest Store and Updates Dataset DB

  3. Ingestion microservice namespace: /ingestion/*

  4. Key APIs & processing steps

    1. POST /ingestion/event

      1. Validate event data

      2. Append to corresponding event file (csv)

    2. POST /ingestion/dimension

      1. Validate dimension data

      2. Append to corresponding dimension file (csv)

    3. POST /ingestion/dataset

      1. Validate dataset data

      2. Append to corresponding aggregate file (csv)

    4. POST /ingestion/pipeline

      1. Trigger pipeline by - event, dimension, aggregate or dataset from ingestion store and transform and upsert dataset store

    5. POST /ingestion/csv

     i. Upload the CSV file with event, dimension and dataset data

    ii. Validates the data

   iii. Internally make /ingestion/event, ingestion/dimension, ingestion/dataset API 

  1. GET /ingestion/file-status

   i. Gives the Status of the file which was uploaded using /ingestion/csv API  

  1. PUT /ingestion/file-status

   ii. This is an internal API called from transformer files when the processing starts and when the processing completes

  1. Flow engine is configured to trigger pipelines based on schedule

Ingestion Flow

     Dimension Data
  1. The dimension data will be added using POST /ingestion/dimension API.

  2. Data will be validated with the dimension spec using AJV.

  3. When the validation is successful then the dimension data will be stored in the CSV file.

  4. Based on the schedule time processor group will be triggered and data will be added into DB.

Event Data 
  1. The Event data will be added using POST /ingestion/event API.

  2. Data will be validated with the event spec using AJV.

  3. When the validation is successful then the event data will be stored in the CSV file.

  4. Based on the schedule time processor group will be triggered and data will be ingested into DB.

Dataset
  1. The Dataset data will be added using POST /ingestion/dataset API.

  2. Data will be validated with the dataset spec using AJV.

  3. When the validation is successful then the dataset data will be stored in the CSV file.

  4. Based on the schedule time processor group will be triggered and data will be ingested into DB.

Upload CSV Flow
  1. Data can be ingested in two ways 

  1. Through Ingestion APIs like /ingestion/dimension, /ingestion/event and ingestion/dataset

  2. Using /ingestion/csv API

  1. CSV file will be imported using POST /ingestion/csv api 

  1. When the validation is successful then a record will be added to ingestion.file_tracker table with file details like file_name, file_size and uploaded time

  1. Asynchronous call will be made and response will be sent

  1. In the asynchronous call, if the ingestion_type is event then records will be inserted into ingestion.file_pipeline_tracker table for all the datasets which will be generated using that event data. 

  1. spec schema will fetched from DB for the given ingestion type and validates the CSV data for the datatype 

  1. If there is any error then ingestion.file_tracker will be updated with the error status

  1. If there is no error, then data will be read in batch with the batch limit of 1 Lakh and passed as input the ingestion/event, ingestion/dimension and ingestion/dataset API based on the ingestion type.

  1. Once all the data is processed then the Uploaded status will be updated in ingestion.file_tracker table.

Pipeline Flow
  1. POST /ingestion/scheduler API will add/update the schedule for  Nifi Scheduler.

  2. Nifi Scheduler will call the POST /ingestion/pipeline API

  3. The Pipeline API will have following functionalities 

  1. Read the data from dimension collection and write to Dimension DB(S3)

          II. Get the Ingest DB and store the data into the Dataset DB

 

File Status Flow

  1. File status will be checked using GET /ingestion/file-status API

  2. To update the file status PUT /ingestion/file-status API

  3. If the file is of ingestion type event then update the file status in ingestion.file_tracker table and update the status in ingestion.file_pipeline_tracker table for the dataset for which the event is processed.

  4. If all the dataset has completed processing then change the status in ingestion.file_tracker table as ‘Ready_to_archive”.

  5. If the ingestion_type is dimension and dataset then update the file status in ingestion.file_tracker table as “Ready_to_archive”.

Ingestion schema

                  In the illustration below two dimensions as examples have been used.

  1. ingestion.dimension_state

  2. ingestion.dimension_district

  3. ingestion.sac_stds_atd_cmp_by_school ->one example has been used to illustrate.Dataset tables will have dynamic columns and static columns. Dynamic columns will be  added when dataset_spec is added. Static columns are sum,count,min,max,avg.

 

KPIs Flow

KPI (Key Performance Indicator)

  • KPIs are the one which defines what has to be derived from the input data or events

  • Each item in the Viz can be called as KPI.

    Example:   School_attendance

              KPI Category                 :    Student attendance Compliance

   KPI VisualisationType  :    Table

  KPI Indicator                 :    School-wise average attendance compliance %

  KPI Levels                     :    School

 

 If there are other Indicators need to show on dashboard, for example :

Cluster-wise Average attendance compliance %

Block-wise Average attendance compliance %

District-wise Average attendance compliance %

State-wise Average attendance compliance %

Events
  • A data structure that records an occurrence at a particular time for an entity (eg: school, etc). It is a combination of simple data types (eg: integer, varchar, etc.).  An event should always contain a column/set of columns that helps you calculate the Indicator. A table with a timestamp doesn’t necessarily mean that it is an event; it should contribute to either aggregation or filtering of the dataset.

  • An Event should be in such a way that it should be able to update the dataset in some form

  • Transformation of Events to data that updates datasets - Transformation happens through a transformer: f(eventDetails, eventSchema, datasetSchema, dimesionConfig) = [array of columns]

  • Updating datasets

 

Example: