cQube | LLD - v12Dec2022

Purpose

cQube solution architecture document:cQube | Design Document (Nov 2022)

In this document focus is on evolving Design Specs, which can be used by the development team.

Audience

Solution development team

How to use this document

This document is organized by topics. Various illustrations/models used in this document are present in this document: cQube | Design Specs | Models - v24Nov2022 

TopicsData processing pipeline alternatives

2 possible solutions were considered:

  1. Using Postgres 

  2. Using Nifi

 

Alt: Postgres model:

  • v4.0 - as-is

    • ingestion: python api, storage: s3, processing: nifi, aggregate data storage: postgres exhaust: s3 (json), viz engine: nodejs, viz:angular, spec: python based generator (fragments of SQL)

  • v5: ingestion: nodejs api, store (till processed):postgres db

  • POST /spec/* → when a new spec is added (spec=event + dataset + pipe)

    • /event - create table <input> ⇒ e

      • Acts as a Q

      • Helps type validation

    • /dataset - create table <input> ⇒ c

    • /pipe - 

      • create a new pipe in table <pipeline>

      • pipe connects event to cube via transformer ⇒ event → transformer → dataset

      • transformer ⇒ create trigger <transformer> on event

  • POST /event → when a new event arrives

    • j ⇒ select e.c1, c.c2+1, c.c3+e.c3 from cqb1, evt1 inner join cqb1.c1=evt.c1 and cqb1

    • update cqb1 set c1=j.c1, c2=j.c2, c3=j.c3 from j

    • Transformer is a PG function connected via trigger on event table

    • Transformer updates dataset

Alt: Nifi model:

  1. Events are passed to API (ajv based validation)

  2. API writes to event_batch_file

  3. Every 5 min, trigger intermediate_cube_processor

  4. API will write to flowfile & ideally that should nifi process group

  5. Process group - transformer - event_data, event_specs, cube_schema, cube_data - Py/Js

    1. Receives Event data

    2. Pull all required data (from store)

    3. Perform transformation

    4. Store in an intermediate cube.csv

    5. Upsert the cube, once in every 5 min

 

Based on 

  • Traceability (where in the flow is my data)

  • Horizontal scalability

  • Flexibility - ability to add additional steps or pipelines or create side-effects

 

Nifi based data processing model is chosen for cQube.

 

Physical Architecture

   

  1. Kong

    1. Used as a API gateway

    2. Used for AuthN & AuthZ <<TBD>>

  2. Auth: handles AuthN and AuthZ <<TBD - Alternatives>>

    1. AuthN (identity and role assignment): 

      1. Implementation choices

        1. Location

          1. Internal

          2. External

        2. Solution

          1. Keycloak

          2. Using a 3rd party - Auth0

      2. Design Factors/Requirements

        1. State may have their own - duplication

        2. Management Overhead

        3. Users

          1. Adapter

          2. Dashboard Users

            1. Admin

            2. Public?

          3. Internal Service Communication?

        4. Keeping services stateless assuming access is authenticated already - JWT being the easiest to manage.

        5. Should support any OIDC based authentication (allow for external SSO integration)

    2. AuthZ: access to resources based on role

      1. Implementation Choices

        1. OPA - I am leaning towards this as it fulfills all requirements

      2. Requirements

        1. Need it to be internal to cQube

        2. Role based authorization

        3. Works on

          1. UI

          2. SQL - SQL based filtering layer which dynamically filters rows and columns, based on role and configuration

          3. APIs

          4. Infrastructure

  3. Flow Engine: manages various data ingestion pipelines

  4. User MS: To manage users and roles

  5. Spec MS: Exposes APIs to process specifications - events, dimensions, datasets, transformers, schedules and pipelines

  6. Ingest MS: Exposes APIs to ingest event, dimension and dataset data

  7. Spec db: stores various specification details

  8. Config db: stores cQube specific data such as usage, roles, access control etc

  9. Ingest db: stores ingested data created by collecting events, dimensions and datasets before updating dataset (KPI) db

  10. Dataset db: stores KPI data. It is updated from ingested data <<TBD - should we consider Timescale>>

  11. SQL Access: <<TBD - potentially Hasura>>

Deployment Architecture

AWS - Network Architecture

The following steps define the cQube setup and workflow completion processes in AWS. cQube mainly comprises the areas mentioned below:

  1. EC2 Server

  2. IAM user and Role creation for S3 connectivity.

The cQube network setup process is described in the block diagram below:

 

Microservices Details

   Following are the details of the microservices which get installed in the cqube server.

  • Ingestion-ms: The ingestion-ms is used to upload the data of the events, datasets, dimensions, transformers and pipeline. All these apis will be to ingesting the data into the cQube.

  • Spec-ms: The spec-ms is used to import schema of the events, datasets, dimensions, transformers and pipeline. All these specs will be defined by the cQube platform prior to ingesting the data into the cQube. These specifications are derived by considering the KPIs as the Indicator.

  • Generator-ms: The generator-ms is used to create the specs & transformers for the derived datasets. Performed aggregation logics, updating data to datasets based on transformation. Status update of file processing

  • Nifi-ms: Apache NiFi is used as a real-time integrated data logistics and simple event processing platform

  • Postgres-ms: Postgres microservice contains the schema and tables

  • Nginx-ms: It is commonly used as a reverse proxy and load balancer to manage incoming traffic and distribute it to slower upstream servers

  • Kong-ms: It is a lightweight API Gateway that secures, manages, and extends APIs and microservices.

 

Spec pipeline

Defines how specs are processed into cQube.

 

  1. Specs can be added to cQube via Spec microservice

  2. Spec MS interacts with data flow engine (nifi) and spec db (postgrsql)

  3. Spec microservice namespace: /spec/*

  4. Key APIs & processing steps

    1. POST /spec/dimension - 

      1. Compile and verify validity

      2. Store in spec.dimension table

      3. Create the actual dimension table

  • The data type given in the spec will be mapped to the equivalent data type in the postgres, like if the datatype in spec is “string” then the column datatype in the database will be “VARCHAR”

  • Similarly will get all the columns and their respective data types and generate a dynamic query and create a dimension table.

  1. POST /spec/event -

    1. Compile and verify validity

    2. Store in spec.event table

  2. POST /spec/dataset -

    1. Compile and verify validity

    2. Store spec.dataset table

    3. Create the actual dataset table

  • The data type given in the spec will be mapped to the equivalent data type in the postgres, like if the datatype in spec is “number” then the column datatype in the database will be “NUMERIC”

  • Similarly will get all the columns and their respective data types and generate a dynamic query and create a dataset table.

  1. POST /spec/transformer - 

    1. This is accessible to only superadmins

    2. Compile and verify validity - aware of required ingestion type, key_file, program name and operation.

    3. Where applicable, internally, a code generator will be used to create a transformer

    4. It is assumed that each transformer is tested and coming from a trusted source

    5. Store in spec.transformer table

  2. POST /spec/pipeline - 

    1. Compile and verify validity

    2. Store in spec.pipeline table

    3. Nifi compilation - abstracted by a wrapper layer which hides nifi calls

      1. Create a new nifi processor_group

      2. Add processors to the newly created processor group

      3. Connect the transformer into relevant processor

  3. POST /spec/schedule - 

  1. Compile and verify validity

  2. Create/Update schedule for a pipeline

                     

  

Spec Flow

Yaml Link -> https://github.com/Sunbird-cQube/spec-ms/blob/dev/spec.yaml ;

The specifications flow diagram represents the schema of the events, datasets, dimensions, transformers and pipeline. All these specs will be defined by the cQube platform prior to ingesting the data into the cQube. These specifications are derived by considering the KPIs as the Indicator. 

Dimension Specification

  1. The dimension spec will be validated with the predefined specification using AJV framework.

  2. When the validation is successful then the spec will be checked for duplication.

  3. If the spec already exists with the same data then an error response will be sent.

  4. Else stored in the spec.dimension table.

  5. Add a record into spec.pipeline table

  6. Once the spec is stored, a dimension table will be created.

Event Specification

  1. The event spec will be validated with the predefined specification using AJV framework.

  2. When the validation is successful then the spec will be checked for duplicacy.

  3. If the spec already exists with the same data then an error response will be sent.

  4. Else stored in the event spec table.

  5. Add a record into spec.pipeline table

Dataset Specification

  1. The Dataset spec will be validated with the predefined specification using AJV framework.

  2. When the validation is successful then the spec will be checked for duplicacy.

  3. If the spec already exists with the same data then an error response will be sent.

  4. Else stored in the dataset spec table.

  5. Add a record into spec.pipeline table

  6. Once the spec is stored, dataset table will be created

Transformer Specification

  1. The Transformer spec will be validated with the predefined specification using AJV framework.

  2. When the validation is successful then internally python api is called to generate transformer files and store them for later implementation for the given program by passing ingestion_name, key_file, program and operation. 

  3. If the transformer file is successfully created then a record is added into spec.transformer table with the file name

Pipeline Specification

  1. Event processing

    1. Event collection pipeline - /ingestion/event

    2. Event collection to aggregation pipeline - /ingestion/pipeline <event name> 

    3. Aggregation to dataset upsert pipeline - /ingestion/pipeline <dataset name> 

  2. Dimension processing

    1. Dimension collection pipeline - /ingestion/dimension

    2. Dimension collection to dimension upsert pipeline - /ingestion/pipeline <dimension name> 

  3. Dataset processing 

    1. Dataset collection pipeline - /ingestion/dataset

    2. Dataset collection to Dataset upsert pipeline - /ingestion/pipeline <dataset name> 

 

  1. The Pipeline spec will be validated with the predefined specification using AJV framework.

  2. When the validation is successful then the spec will be checked for duplicacy.

  3. If the spec already exists with the same data then an error response will be sent.

  4. Else stored in the pipeline spec table.

  5. Pipeline types are listed below

  • Dimension to Collection

  • Dimension collection to db

  •  Event to Collection  

  • Dataset to collection

  • Event collection to Aggregate

  • Aggregate to DB

 

If the Pipeline type is Aggregate to DB then processor group will be created and processors will be added inside the processor group by calling Nifi APIs

  1. Processor group will be mapped to the transformer.

  2. And connection will be established between the processors

Spec DB Schema

Below are the tables included in spec schema

  1. spec.dimension

  2. spec.event

  3. spec.dataset

  4. spec.transformer

  5. spec.pipeline

  6. spec.schedule

Ingestion pipeline

  1. Data is ingested into cQube via Ingest MS

  1. Ingest MS stores the data into Ingest Store

  2. Based on schedule Ingest Flow engine interacts with ingest Store and Updates Dataset DB

  3. Ingestion microservice namespace: /ingestion/*

  4. Key APIs & processing steps

    1. POST /ingestion/event

      1. Validate event data

      2. Append to corresponding event file (csv)

    2. POST /ingestion/dimension

      1. Validate dimension data

      2. Append to corresponding dimension file (csv)

    3. POST /ingestion/dataset

      1. Validate dataset data

      2. Append to corresponding aggregate file (csv)

    4. POST /ingestion/pipeline

      1. Trigger pipeline by - event, dimension, aggregate or dataset from ingestion store and transform and upsert dataset store

    5. POST /ingestion/csv

     i. Upload the CSV file with event, dimension and dataset data

    ii. Validates the data

   iii. Internally make /ingestion/event, ingestion/dimension, ingestion/dataset API 

  1. GET /ingestion/file-status

   i. Gives the Status of the file which was uploaded using /ingestion/csv API  

  1. PUT /ingestion/file-status

   ii. This is an internal API called from transformer files when the processing starts and when the processing completes

  1. Flow engine is configured to trigger pipelines based on schedule

Ingestion Flow

     Dimension Data
  1. The dimension data will be added using POST /ingestion/dimension API.

  2. Data will be validated with the dimension spec using AJV.

  3. When the validation is successful then the dimension data will be stored in the CSV file.

  4. Based on the schedule time processor group will be triggered and data will be added into DB.

Event Data 
  1. The Event data will be added using POST /ingestion/event API.

  2. Data will be validated with the event spec using AJV.

  3. When the validation is successful then the event data will be stored in the CSV file.

  4. Based on the schedule time processor group will be triggered and data will be ingested into DB.

Dataset
  1. The Dataset data will be added using POST /ingestion/dataset API.

  2. Data will be validated with the dataset spec using AJV.

  3. When the validation is successful then the dataset data will be stored in the CSV file.

  4. Based on the schedule time processor group will be triggered and data will be ingested into DB.

Upload CSV Flow
  1. Data can be ingested in two ways 

  1. Through Ingestion APIs like /ingestion/dimension, /ingestion/event and ingestion/dataset

  2. Using /ingestion/csv API

  1. CSV file will be imported using POST /ingestion/csv api 

  1. When the validation is successful then a record will be added to ingestion.file_tracker table with file details like file_name, file_size and uploaded time

  1. Asynchronous call will be made and response will be sent

  1. In the asynchronous call, if the ingestion_type is event then records will be inserted into ingestion.file_pipeline_tracker table for all the datasets which will be generated using that event data. 

  1. spec schema will fetched from DB for the given ingestion type and validates the CSV data for the datatype 

  1. If there is any error then ingestion.file_tracker will be updated with the error status

  1. If there is no error, then data will be read in batch with the batch limit of 1 Lakh and passed as input the ingestion/event, ingestion/dimension and ingestion/dataset API based on the ingestion type.

  1. Once all the data is processed then the Uploaded status will be updated in ingestion.file_tracker table.

Pipeline Flow
  1. POST /ingestion/scheduler API will add/update the schedule for  Nifi Scheduler.

  2. Nifi Scheduler will call the POST /ingestion/pipeline API

  3. The Pipeline API will have following functionalities 

  1. Read the data from dimension collection and write to Dimension DB(S3)

          II. Get the Ingest DB and store the data into the Dataset DB

 

File Status Flow

  1. File status will be checked using GET /ingestion/file-status API

  2. To update the file status PUT /ingestion/file-status API

  3. If the file is of ingestion type event then update the file status in ingestion.file_tracker table and update the status in ingestion.file_pipeline_tracker table for the dataset for which the event is processed.

  4. If all the dataset has completed processing then change the status in ingestion.file_tracker table as ‘Ready_to_archive”.

  5. If the ingestion_type is dimension and dataset then update the file status in ingestion.file_tracker table as “Ready_to_archive”.

Ingestion schema

                  In the illustration below two dimensions as examples have been used.

  1. ingestion.dimension_state

  2. ingestion.dimension_district

  3. ingestion.sac_stds_atd_cmp_by_school ->one example has been used to illustrate.Dataset tables will have dynamic columns and static columns. Dynamic columns will be  added when dataset_spec is added. Static columns are sum,count,min,max,avg.

 

KPIs Flow

KPI (Key Performance Indicator)

  • KPIs are the one which defines what has to be derived from the input data or events

  • Each item in the Viz can be called as KPI.

    Example:   School_attendance

              KPI Category                 :    Student attendance Compliance

   KPI VisualisationType  :    Table

  KPI Indicator                 :    School-wise average attendance compliance %

  KPI Levels                     :    School

 

 If there are other Indicators need to show on dashboard, for example :

Cluster-wise Average attendance compliance %

Block-wise Average attendance compliance %

District-wise Average attendance compliance %

State-wise Average attendance compliance %

Events
  • A data structure that records an occurrence at a particular time for an entity (eg: school, etc). It is a combination of simple data types (eg: integer, varchar, etc.).  An event should always contain a column/set of columns that helps you calculate the Indicator. A table with a timestamp doesn’t necessarily mean that it is an event; it should contribute to either aggregation or filtering of the dataset.

  • An Event should be in such a way that it should be able to update the dataset in some form

  • Transformation of Events to data that updates datasets - Transformation happens through a transformer: f(eventDetails, eventSchema, datasetSchema, dimesionConfig) = [array of columns]

  • Updating datasets

 

Example:

school_id

class

date

total_students

attendance_marked

<Unique id>

<Integer Value>

<dd/mm/yy>

<integer Value>

<integer Value>

Dimensions
  • Dimensions Describe events.

  • Dimensions are Static records

  • In the final aggregation we will be mapping Dimensions with Datasets.

Example:

Dimensions for above mentioned KPI are :

School  :  School_id,School_name,Cluster_Id,Block_id,District_id,State_Id

Cluster  :  Cluster_id,Cluster_name

Block     :  Block_id,Block_name

District   :  District_id,District_name

State_Id :  State_Id,State_name

Datasets 
  • High-level data which is computed by aggregating events. It is a data representation of the indicator. Datasets are persistent within cQube Ed. A dataset is created for at least one indicator. 

  • A dataset can be derived from one or more specified events, independently.

  • It may additionally contain dimensions and derived values from other datasets during the mapping process.

 

            Example : School_attendance

 Dataset for the above mentioned KPI is defined below:

              

date

school_id

Sum(total_students)

sum(attendance_marked)

Average Attendance %

<yy-mm-dd>

<unique id>

<sum of parameters>

<sum of parameters>

<Sum of parameter/ sum of parameter>

            Datasets for the above mentioned Indicators :

Cluster-attendance

Block_attendance

District_attendance

State_attendance

Note: 

  • There is a list of transformers. In this dataset we are using Sum

  • There is a list of dimensions. In the above mentioned dataset School is Dimension. 

 

These are some sample KPIs derived as a set of events, dimensions, and datasets from the PAT & SAT datasource which are used in earlier cQube version v4.0 . Link

Developer & QC WorkFlow

 

The Developer & QC workflow activity is described in the above diagram from how the task is identified & assigned till it merges to the respective branch in the Developer workflow.  Once the code is merged, QC will then test their scripts and execute the test cases and produce the test result.

 

 

 

 

 

Devops 

Installation Process     

Following technologies will be used to implement cQube as a one step installation.

  1. Shell Scripting - We are using shell script to install the basic softwares required to run ansible and docker. To generate a configuration file by prompting the questions to the user. To run the ansible playbooks.

  2. Docker - For Containerising the micro services

  3. Docker Swarm - Swarm consists of multiple docker hosts which can act as a manager which can handle multiple nodes. By implementing this we can ensure that the system continue to run even when one node fails

  4. Ansible - To easily build the code and to reuse the code.

  5. Github Actions - For Continuous Integration and Continuous Deployment 

Steps: Users will clone the code from the github, checkout to the latest repository and run a shell script install.sh by giving executable permission to the script.

Install.sh File:

  • It checks if the user is running the script using sudo and throws an error accordingly.

  • Installs all the basic softwares like ansible, docker, python dependencies

  • Triggers a configuration.sh file which is used to generate a config.yml file by popping up respective questions to the user.

  • The value entered by the user will be validated and it will loop the same variable until the correct value is entered.

  • Once the config file is created, It will be previewed to the user to re-check the entered values.

  • Once the user confirms the configuration values, then the ansible scripts will get triggered.

  • Once all ansible scripts are triggered, Shell script shows the message “cqube installed successfully”

 

Ansible Script:

  • Ansible script will be triggered and it will help to build the docker images with the respective docker files provided.

  • Ansible triggers a docker-compose file to start all the docker containers within the server.

  • Ansible script to trigger nginx setup and configuration in the remote machine. ( nginx server )

 

 

Docker Containers:

  1. Spec-ms

  2. Ingestion-ms

  3. Generator-ms

  4. Nifi-ms

  5. Postgres-ms

  6. Kong-ms

  7. Nginx-ms

Flow Diagram of One step Installation

 

 

Github Actions for CI/CD

GitHub Actions gives developers the ability to automate their workflows across issues, pull requests, and more—plus native CI/CD Functionality.

All GitHub Actions automations are handled via workflows, which are

YAML files placed under the .github/workflows directory in a repository

that define automated processes.

Every workflow consists of several different core concepts. These include:

● Events: Events are defined triggers that kick off a workflow.

● Jobs: Jobs are a set of steps that execute on the same runner.

● Steps: Steps are individual tasks that run commands in a job.

● Actions: An action is a command that’s executed on a runner—and

the core element of GitHub Actions, 

● Runners: A runner is a GitHub Actions server. It listens for available

jobs, runs each in parallel, and reports back progress, logs and

Results.

The below diagram shows the flow of github actions workflow.

 

 

Build and Push Docker Image to AWS EC2 Using GitHub Actions:

The below diagram shows how github actions are being implemented for CI/CD. Whenever a user commits a code to the repository, Based on the event defined in the workflow file the deployments gets triggered.

Github actions build the code and perform unit testing for code coverage in its own virtual runners.

If the code is successfully built in its runners without any errors then it will build the docker image using the docker file which is provided.

Github actions then login to the docker hub account through the credentials provided as environments in its secrets.

Once the image is built, then it will push the image to the docker hub with the tags specified in the workflow.

Once the image is pushed to dockerhub, then the github actions trigger to deploy the code to ec2 server. It used a ssh predefined action which helps github actions to login to the ec2 server and pull the latest code from the github repository and deploys it into the server.

 

Deployment strategy in Dev, QA, Demo Environments

The below process is followed in each environment using its respective github repository branch. Two workflow files will be created in the github repository.

  • On the event of pull request:          workflow action will get triggered where it checks the build of image and code coverage in the github action runners. 

  • On the event of merge request:

            A workflow action will get triggered where it builds the docker image out of the dockerfile >> login to the docker hub >> pushed the latest image to the docker hub repository >> connect to the ec2 server >> pulls the latest images from the dockerhub >> deploys into the server.

 

 

  • Developers raise a pull request

  • On the event of pull request github actions workflow triggers Continuous Integration where it will build and test the code.

  • Once the build is successful, Code Reviewer checks the code and merge it to the repository

  • In the event of a merge, github actions trigger continuous Deployment where it will deploy the code to the server.

  • Once the code is merged to dev the code will be merged to QA which again follows the same above process followed by demo Environment.

Visualization Design

A common generic function reads the metadata from the configuration file and generates a visualization.

 

This configuration file consists of various queries which will be applied on cubes on load of the report and also on various levels mentioned in the filters section of the configuration file.

 

A complete configuration file can be found here : student attendance module configuration file

 

There are generic functions developed to construct filters, tooltips and also few functions to build queries provided with the dynamic values from filters.

 

A Time Series Component was developed which takes min and max dates of the data available and lets the user choose the range of dates he/she needs to filter within the available dates.

 

For all the above functionalities to work there will be different queries which need to be configured in the file in a certain way.

 

Example of the queries needed for table and bignumber visualization of student attendance compliance report on the selection of a district level filter.

 

 

Here on selection of the district level, there will be 2 collections of queries that need to be executed. One is to get the filter options for the next level of filter.

 

And another is a set of queries which will be selected depending on whether the time series filter is already selected or not.

 

timeSeriesQueries whenever the time series is selected, else the queries in the actions sections will be executed

 

Here we can provide multiple queries depending on the data needed for the current visualization. Since this visualization needs 3 queries, 1 for Table and 2 for big number representation.

 

Also for a specific visualization we need to provide the metadata needed to convert the data from the queries to a standard format that specific report type component expects.

 

 

For Table: 

 

Ex: 

 

Here we need to add metadata for every column in the table and some options for sorting data on load of the report.

 

For Big Number:

 

In big numbers a value suffix as an option for the indicator.

 

For Bar Chart: 

 

In the Bar Chart we can configure the label, title and values for both axes, and specifically for xAxis we can have more than one label-values as metrics as shown below.