cQube | Flow of Data

1. Ingestion Specification

Ingestion is a simple API that allows you to ingest your data as events. There are two steps for this.

  1. Define and Event Grammar

{ "instrument_details": { "type": "COUNTER", "key": "count" }, "name": "attendance_by_school_grade_for_a_day", "is_active": true, "event_schema": { "$schema": "https://json-schema.org/draft/2019-09/schema", "$id": "http://example.com/example.json", "type": "object", "default": {}, "title": "attendance_by_school_grade_for_a_day", "required": ["date", "grade", "school_id", "count"], "properties": { "grade": { "type": "integer", "default": 0, "title": "The grade Schema", "examples": [1] }, "school_id": { "type": "integer", "default": 0, "title": "The school_id Schema", "examples": [901] } }, "examples": [ { "date": "2019-01-01", "grade": 1, "school_id": 901, "count": 23 } ] } }

Instruments: Currently only a COUNTER instrument is supported.

2. Publish an event that satisfies the grammar

{ "date": "2019-01-01", "grade": 1, "school_id": 901, "count": 23 }

If the data is not aggregated at a daily level, an event could also be shared at a week, month, year level as well by just defining a start_date and end_date. Not the start_date and end_date should cleanly fall in a week, month, year, last 7 days, last 15 days, or last 30 days.

{ "start_date": "2019-01-01", "end_date": "2019-02-01", "grade": 1, "school_id": 901, "count": 23 }

if you cannot guarantee this, it would be better to use an alternative API to push an event as below.

{ "week": "2019-W21", // or "month": "2019-M11" or "year": "2022" "grade": 1, "school_id": 901, "count": 23 }

Keywords and preferences

The following keywords in an event will be treated as special keywords and don’t need to be part of the schema - date, start_date, end_date, week, month, and year and are preferred in the same order. They cannot be mappings to a dimension or dataset.

2. Transformer Specification

The transformer is defined as an actor and can be sync as well as async. This allows you to write any transformer in any language as long as it gets the job done. For the current implementation in cQube we create custom processors for every single transformer in Nifi. Events are transformed in bulk every few mins (A config needs to be exposed to for defining how long do we wait for bulk transformations).

const transformer = { name: "transformer_name", event_grammar: "es11", //EventGrammar on which the transformer can act. dataset_grammar: "ds23", //Dataset that this transformer which modify. config: { actor((callback, transform, transformBulk) => { // manage side effects here // Examples - logging, pushing to an externa event bus, etc. callback('SOME_EVENT'); // receive from parent transform(async (event: Event) => { // transform event to dataset; Has access to event and dataset grammars for validation }); // for speedups transformBulk(async (event: Event[]) => { // transform events to dataset; Has access to event and dataset grammars for validation }); // disposal return () => { /* do cleanup here */ /* push this to dataset */ }; }), } }

How this event gets processed later is defined by a pipe. A pipe connects then event to a source. An example of a pipe is shown below

{ "event": "es11", "transformer": "tr33", "dataset": "ds23" }

You can also use keep the transformer as null to just bypass everything and not transform data and directly insert it in a dataset.

Defining Dimensions

cQube supports arbitrary dimensions to be stored. There are only two categories of dimensions that are supported:

  1. Time-based dimensions

  2. Dynamic dimensions

For the Attendance Example, the following dimensions are viable dimensions

  1. School (Dynamic)

  2. District (Dynamic)

  3. Time (Time Based) - week, month, year, last 7 days, last 15 days, last 30 days.

School as a dimension

Below is the specifications on how they should be defined in cQube.

  1. Define the grammar for how the dimension data needs to be stored. Given that Schools and Districts are very similar, they could be combined as well. For the school dimension, sample data looks like the following -

{ "data": { "school_id": 901, "name": "A green door", "type": "GSSS", "enrollement_count": 345, "district": "District", "block": "Block", "cluster": "Cluster", "village": "Village" } }

The actual schema also includes the metadata that is required by cQube to store this dimension in the database.

  • The indexes field is used to create indexes on the database. This creates a simple BTree index for the list of columns shared.

  • The primary_id field is used to create a primary key on the database.

The corresponding JSON Schema would need to be entered to accept the dimension as a valid input

{ "$schema": "<https://json-schema.org/draft/2019-09/schema",> "$id": "<http://example.com/example.json",> "type": "object", "default": {}, "title": "Root Schema", "required": [ "name", "type", "storage", "data" ], "properties": { "name": { "type": "string", "default": "", "title": "The name Schema", "examples": [ "Schools" ] }, "type": { "type": "string", "default": "", "title": "The type Schema", "examples": [ "dynamic" ] }, "storage": { "type": "object", "default": {}, "title": "The storage Schema", "required": [ "indexes", "primaryId", "retention", "bucket_size" ], "properties": { "indexes": { "type": "array", "default": [], "title": "The indexes Schema", "items": { "type": "string", "title": "A Schema", "examples": [ "name", "type" ] }, "examples": [ ["name", "type" ] ] }, "primaryId": { "type": "string", "default": "", "title": "The primaryId Schema", "examples": [ "school_id" ] }, "retention": { "type": "null", "default": null, "title": "The retention Schema", "examples": [ null ] }, "bucket_size": { "type": "null", "default": null, "title": "The bucket_size Schema", "examples": [ null ] } }, "examples": [{ "indexes": [ "name", "type" ], "primaryId": "school_id", "retention": null, "bucket_size": null }] }, "data": { "type": "object", "default": {}, "title": "The data Schema", "required": [ "school_id", "name", "type", "enrollement_count", "district", "block", "cluster", "village" ], "properties": { "school_id": { "type": "integer", "default": 0, "title": "The school_id Schema", "examples": [ 901 ] }, "name": { "type": "string", "default": "", "title": "The name Schema", "examples": [ "A green door" ] }, "type": { "type": "string", "default": "", "title": "The type Schema", "examples": [ "GSSS" ] }, "enrollement_count": { "type": "integer", "default": 0, "title": "The enrollement_count Schema", "examples": [ 345 ] }, "district": { "type": "string", "default": "", "title": "The district Schema", "examples": [ "District" ] }, "block": { "type": "string", "default": "", "title": "The block Schema", "examples": [ "Block" ] }, "cluster": { "type": "string", "default": "", "title": "The cluster Schema", "examples": [ "Cluster" ] }, "village": { "type": "string", "default": "", "title": "The village Schema", "examples": [ "Village" ] } }, "examples": [{ "school_id": 901, "name": "A green door", "type": "GSSS", "enrollement_count": 345, "district": "District", "block": "Block", "cluster": "Cluster", "village": "Village" }] } }, "examples": [{ "name": "Schools", "type": "dynamic", "storage": { "indexes": [ "name", "type" ], "primaryId": "school_id", "retention": null, "bucket_size": null }, "data": { "school_id": 901, "name": "A green door", "type": "GSSS", "enrollement_count": 345, "district": "District", "block": "Block", "cluster": "Cluster", "village": "Village" } }] }
  1. Once the dimension is defined, it can be added to cQube by using the dimension schema (definition grammar) API.

  2. The next step is to insert data in the dimensions which can be done through a POST request to the dimension API. In this case it would be inserting the schools in the school dimension table.

Time as a dimension

Example of a time dimension

{ "name": "Last 7 Days", "type": "time", "storage": { "retention": "30 days", "bucket_size": "7 days", } }

Here the retention is the time for which the data will be stored in the database and the bucket_size is the time interval for which the data will be aggregated.

cQube has an upper limit to the amount of data that can stored based on the retention policy - 30M records. This is not enforced at the storage layer but the system is not optimized for datasets larger than this. This is kept to keep the architecture simple and not adding an additonal layer of archival storage complexity. This is a soft limit for now and can be increased in the future.

If you have a requirement to increase this limit, please reach out to us. The lowest bucket_size that cQube would support would be a day.

Impact of Dimensions on Datasets

  1. Impact on Datasets

    • cQube Datasets are optimized for query based on dimensions. Indexes are defined as part of the schema.

  2. Impact on Input Data

    • The input data is validated against the dimension data before being persisted.

    • The event schema should always reference a dimension for this to work.

Notes:

  1. An update to the schema is not supported yet. To update please create a new schema and tie it up to the Dataset Model. Please note that this is a slower process and would lead to a downtime of the cQube instance until the migration is completed. Currently, it can only happen through private APIs exposed to a developer.

    1. If this needs to absolutely happen, create a new dimension, insert the dimension data and run the entire event source through the transformers to update the datasets.

  2. A relational database is used to store the data in a flat manner. All fields are flattened and used as column names when creating a dimension table.

  3. The schema provided to insert data is used to validate the data before inserting it into the database. This is to ensure that the data is in the correct format and that the data types are correct. AJV is used to validate the data.

4. Datasets

Dataset insert requires an input that looks like the following.

{ "name": "attendance_school", "data": { "date": "2019-01-01", // This is a dimension "school_id": 901, // This is a dimension "grade": 1, }, "counter": 190 }

Note: id, created_at, updated_at, count, sum, and percentage are all automatically added to the dataset. Mapping a dataset to a dimension happens the DatasetGrammar which is defined ahead of time.

Dataset Grammar

Defines

  1. How the dataset is defined

  2. How it is linked to dimensions

  3. Which fields need to be cached/indexed to be queried faster

{ "indexes": [["date"], ["school_id"]] // For faster queries "title": "attendance_school_grade", //name of the dataset "schema": { // Will be a JSONSchema. Right now just sharing an example here "date": "2019-01-01", // Dimension "grade": 1, // Addtional filterable data => indexed "school_id": 901, // Dimension }, "dimensionMappings": [ { "key": "school_id", "dimension": { "name": "Dimension1", "mapped_to": "Dimension1.school_id" //Optional in case of time. } }, { "key": "date", "dimension": { "name": "Last 7 Days" } } ] }

5. Instruments

cQube implements Opentelemetry Instruments which can be of the following types. A single instrument allows for a lot of functionalities downstream so please ensure you have all the use cases covered before adding a new instrument

  • counter (def) - allows for aggregations as sum, count, percentage, min, max, avg, percentile

  • Future instruments to be implemented - meter, gauge, histogram.

6. Event Management

  1. All events are stored as CSV files in S3. They can be rerun to repopulate Datasets and Dimensions.

  2. Time dimensions-based Datasets are rolled up and bucketed using an OLAP extension in PSQL. This provides the best of both worlds - faster inserts (for data with an older timestamp compared to OLAPs) and almost comparable OLAP rolling aggregations on time dimensions.

Assumptions

  1. Since it’s not a frequent use case, Dimension updates and deletion are not supported. In case this is needed they can just create a new dimension, update the mapping to datasets and refresh the datasets.

  2. Event Sourcing is used throughout the design.

Next Steps

  1. SDKs can be added to push events to Opentelemetry Receivers

  2. The current collection service can be deprecated to use Opentelemetry Receivers.

  3. Transformers can be moved to an Opentelemetry Transformer.

  4. The collector can send data to any destination using official plugins.