Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

1. Ingestion Specification

Ingestion is a simple API that allows you to ingest your data as events. There are two steps for this.

...

Code Block
languagejson
{
  "instrument_details": {
    "type": "COUNTER",
    "key": "count"
  },
  "name": "attendance_by_school_grade_for_a_day",
  "is_active": true,
  "event_schema": {
    "$schema": "https://json-schema.org/draft/2019-09/schema",
    "$id": "http://example.com/example.json",
    "type": "object",
    "default": {},
    "title": "attendance_by_school_grade_for_a_day",
    "required": ["date", "grade", "school_id", "count"],
    "properties": {
      "dategrade": {
        "type": "stringinteger",
        "default": ""0,
        "title": "The dategrade Schema",
        "examples": ["2019-01-01"1]
      },
      "gradeschool_id": {
        "type": "integer",
        "default": 0,
        "title": "The gradeschool_id Schema",
        "examples": [1901]
      },
    },
    "school_idexamples": [
      {
        "typedate": "integer2019-01-01",
        "defaultgrade": 01,
        "title": "The school_id": Schema"901,
        "examplescount": [901]23
      }
    },]
    "examples": [
      {
   }
}

Instruments: Currently only a COUNTER instrument is supported.

2. Publish an event that satisfies the grammar

Code Block
languagejson
{
    "date": "2019-01-01",
   
    "grade": 1,
        "school_id": 901,
   
    "count": 23
     
}
    ]
  }
}

Instruments: Currently only a COUNTER instrument is supported.

...

If the data is not aggregated at a daily level, an event could also be shared at a week, month, year level as well by just defining a start_date and end_date. Not the start_date and end_date should cleanly fall in a week, month, year, last 7 days, last 15 days, or last 30 days.

Code Block
languagejson
{
    "start_date": "2019-01-01",
    "end_date": "2019-02-01",
    "grade": 1,
    "school_id": 901,
    "count": 23
}

If the data is not aggregated at a daily level, an event could also be shared at a week, month, year level as well by just defining a startDate and endDateif you cannot guarantee this, it would be better to use an alternative API to push an event as below.

Code Block
languagejson
{
    "startDateweek": "2019-01-01W21", // or "month": "2019-M11" or "endDateyear": "2019-02-012022",
    "grade": 1,
    "school_id": 901,
    "count": 23
}

2. Transformer Specification

...

Keywords and preferences

The following keywords in an event will be treated as special keywords and don’t need to be part of the schema - date, start_date, end_date, week, month, and year and are preferred in the same order. They cannot be mappings to a dimension or dataset.

2. Transformer Specification

The transformer is defined as an actor and can be sync as well as async. This allows you to write any transformer in any language as long as it gets the job done. For the current implementation in cQube we create custom processors for every single transformer in Nifi. Events are transformed in bulk every few mins (A config needs to be exposed to for defining how long do we wait for bulk transformations).

Code Block
languagetypescript
const transformer = {
    name: "transformer_name", 
    eventGrammarevent_grammar: "es11", //EventGrammar on which the transformer can act.
    datasetGrammardataset_grammar: "ds23", //Dataset that this transformer which modify.
    config: {
        actor((callback, receivetransform, transformBulk) => {

            // manage side effects here 
            // Examples - logging, pushing to an externa event bus, etc.
            callback('SOME_EVENT');

            // receive from parent
            transform(async (event: Event) => {
              // transform event to dataset; Has access to event and dataset grammars for validation
            });
            
            // for speedups
            transformtransformBulk(async (event: Event[]) => {
              // transform events to dataset; Has access to event and dataset grammars for validation
            });

            // disposal
            return () => {
              /* do cleanup here */
              /* push this to dataset */
            };
        }),
    }
}

...

You can also use keep the transformer as null to just bypass everything and not store data.

...

transform data and directly insert it in a dataset.

Defining Dimensions

cQube supports arbitrary dimensions to be stored. There are only two categories of dimensions that are supported:

...

  1. School (Dynamic)

  2. District (Dynamic)

  3. Time (Time Based) - week, month, year, last 7 days, last 15 days, last 30 days.

School as a dimension

Below is the specifications on how they should be defined in cQube.

...

  1. Once the dimension is defined, it can be added to cQube by using the dimension schema (definition grammar) API.

  2. The next step is to insert data in the dimensions which can be done through a POST request to the dimension API. In this case it would be inserting the schools in the school dimension table.

Time as a dimension

Example of a time dimension

...

If you have a requirement to increase this limit, please reach out to us. The lowest bucket_size that cQube would support would be a day.

Impact of Dimensions on Datasets

  1. Impact on Datasets

    • cQube Datasets are optimized for query based on dimensions. Indexes are defined as part of the schema.

  2. Impact on Input Data

    • The input data is validated against the dimension data before being persisted.

    • The event schema should always reference a dimension for this to work.

Notes:

  1. An update to the schema is not supported yet. To update please create a new schema and tie it up to the Dataset Model. Please note that this is a slower process and would lead to a downtime of the cQube instance until the migration is completed. Currently, it can only happen through private APIs exposed to a developer.

    1. If this needs to absolutely happen, create a new dimension, insert the dimension data and run the entire event source through the transformers to update the datasets.

  2. A relational database is used to store the data in a flat manner. All fields are flattened and used as column names when creating a dimension table.

  3. The schema provided to insert data is used to validate the data before inserting it into the database. This is to ensure that the data is in the correct format and that the data types are correct. AJV is used to validate the data.

4. Datasets

Dataset insert requires an input that looks like the following.

...

Note: id, created_at, updated_at, count, sum, and percentage are all automatically added to the dataset. Mapping a dataset to a dimension happens the DatasetGrammar which is defined ahead of time.

Dataset Grammar

Defines

  1. How the dataset is defined

  2. How it is linked to dimensions

  3. Which fields need to be cached/indexed to be queried faster

Code Block
{
  "indexes": [["date"], ["school_id"]] // For faster queries
  "title": "attendance_school_grade", //name of the dataset
  "schema": {
  
    // Will be a JSONSchema. Right now just sharing an example here
    "date": "2019-01-01", // Dimension
    "grade": 1, // Addtional filterable data => indexed
    "school_id": 901, // Dimension
  },
  "dimensionMappings": [
    {
      "key": "school_id",
      "dimension": {
        "name": "Dimension1",
        "mapped_to": "Dimension1.school_id" //Optional in case of time.
      }
    },
    {
      "key": "date",
      "dimension": {
        "name": "Last 7 Days"
      }
    }
  ]
}

5. Instruments

cQube implements Opentelemetry Instruments which can be of the following types. A single instrument allows for a lot of functionalities downstream so please ensure you have all the use cases covered before adding a new instrument

  • counter (def) - allows for aggregations as sum, count, percentage, min, max, avg, percentile

  • Future instruments to be implemented - meter, gauge, histogram.

6. Event Management

  1. All events are stored as CSV files in S3. They can be rerun to repopulate Datasets and Dimensions.

  2. Time dimensions-based Datasets are rolled up and bucketed using an OLAP extension in PSQL. This provides the best of both worlds - faster inserts (for data with an older timestamp compared to OLAPs) and almost comparable OLAP rolling aggregations on time dimensions.

Assumptions

  1. Since it’s not a frequent use case, Dimension updates and deletion are not supported. In case this is needed they can just create a new dimension, update the mapping to datasets and refresh the datasets.

  2. Event Sourcing is used throughout the design.

Next Steps

  1. SDKs can be added to push events to Opentelemetry Receivers

  2. The current collection service can be deprecated to use Opentelemetry Receivers.

  3. Transformers can be moved to an Opentelemetry Transformer.

  4. The collector can send data to any destination using official plugins.