1. Ingestion Specification
Ingestion is a simple API that allows you to ingest you your data as events. There are two steps for this.
...
Code Block | ||
---|---|---|
| ||
{ "instrument_details": { "type": "COUNTER", "key": "count" }, "name": "attendance_by_school_grade_for_a_day", "is_active": true, "event_schema": { "$schema": "https://json-schema.org/draft/2019-09/schema", "$id": "http://example.com/example.json", "type": "object", "default": {}, "title": "attendance_by_school_grade_for_a_day", "required": ["date", "grade", "school_id", "count"], "properties": { "dategrade": { "type": "stringinteger", "default": ""0, "title": "The dategrade Schema", "examples": ["2019-01-01"1] }, "gradeschool_id": { "type": "integer", "default": 0, "title": "The gradeschool_id Schema", "examples": [1901] }, }, "school_idexamples": [ { "typedate": "integer2019-01-01", "defaultgrade": 01, "title": "The school_id": Schema"901, "examplescount": [901]23 } },] "examples": [ { } } |
Instruments: Currently only a COUNTER
instrument is supported.
2. Publish an event that satisfies the grammar
Code Block | ||
---|---|---|
| ||
{ "date": "2019-01-01", "grade": 1, "school_id": 901, "count": 23 } ] } } |
Instruments: Currently only a Counter
instrument is supported.
2. Publish an event that satisfies the grammarIf the data is not aggregated at a daily level, an event could also be shared at a week
, month
, year
level as well by just defining a start_date
and end_date
. Not the start_date
and end_date
should cleanly fall in a week
, month
, year
, last 7 days
, last 15 days,
or last 30 days
.
Code Block | ||
---|---|---|
| ||
{ "start_date": "2019-01-01", "end_date": "2019-02-01", "grade": 1, "school_id": 901, "count": 23 } |
If the data is not aggregated at a daily level, an event could also be shared at a week
, month
, year
level as well by just defining a startDate
and endDate
.
...
if you cannot guarantee this, it would be better to use an alternative API to push an event as below.
Code Block | ||
---|---|---|
| ||
{ "startDateweek": "2019-01-01W21", // or "month": "2019-M11" or "endDateyear": "2019-02-012022" "grade": 1, "school_id": 901, "count": 23 } |
2. Transformer Specification
...
Keywords and preferences
The following keywords in an event will be treated as special keywords and don’t need to be part of the schema - date
, start_date
, end_date
, week
, month
, and year
and are preferred in the same order. They cannot be mappings to a dimension or dataset.
2. Transformer Specification
The transformer is defined as an actor and can be sync as well as async. This allows you to write any transformer in any language as long as it gets the job done. For the current implementation in cQube we create custom processors for every single transformer in Nifi. Events are transformed in bulk every few mins (A config needs to be exposed to for defining how long do we wait for bulk transformations).
Code Block | ||
---|---|---|
| ||
const transformer = { name: "transformer_name", eventGrammarevent_grammar: "es11", //EventGrammar on which the transformer can act. datasetGrammardataset_grammar: "ds23", //Dataset that this transformer which modify. config: { actor((callback, receivetransform, transformBulk) => { // manage side effects here // Examples - logging, pushing to an externa event bus, etc. callback('SOME_EVENT'); // receive from parent transform(async (event: Event) => { // transform event to dataset; Has access to event and dataset grammars for validation }); // for speedups transformtransformBulk(async (event: Event[]) => { // transform events to dataset; Has access to event and dataset grammars for validation }); // disposal return () => { /* do cleanup here */ /* push this to dataset */ }; }), } } |
How this event gets processed later is defined by a pipe.
A pipe connects then event to a source. Example An example of a pipe is as shown below
Code Block | ||
---|---|---|
| ||
{ "event": "es11", "transformer": "tr33", "dataset": "ds23" } |
You can also use keep the transformer as null
to just bypass everything and not store datatransform data and directly insert it in a dataset.
Defining Dimensions
cQube supports arbitrary dimension dimensions to be stored. There are only two categories of dimensions that are supported:
Time-based dimensions
Dynamic dimensions
For the Attendance Example, the following dimensions are viable dimensions
School (Dynamic)
District (Dynamic)
Time (Time Based) -
week
,month
,year
,last 7 days
,last 15 days
,last 30 days
.
School as a dimension
Below is the specifications on how they should be defined in cQube.
Define the grammer grammar for how the dimension data needs to be stored. Given that School Schools and Districts are are very similar, they could be combined as well. For the school dimension it a , sample data looks like the following -
...
The
indexes
field is used to create indexes on the database. This creats creates a simple BTree index for the list of columns shared.The
primary_id
field is used to create a primary key on the database.
...
Once the dimension is defined, it can be added to cQube by using the dimension schema (definition grammar) API.
The next step is to insert data in the dimensions which can be done through a POST request to the dimension API. In this case it would be inserting the schools in the school dimension table.
Time as a dimension
Example of a time dimension
...
If you have a requirement to increase this limit, please reach out to us. The lowest bucket_size
that cQube would support would be a day.
Impact of Dimensions on Datasets
Impact on Datasets
cQube Datasets are optimized for query based on dimensions. Indexes are defined as part of the schema.
Impact on Input Data
The input data is validated against the dimension data before being persisted.
The event schema should always reference a dimension for this to work.
Notes:
An update to the schema is not supported yet. To update please create a new schema and tie it up to the Dataset Model. Please note that this is a slower process and would lead to a downtime of the cQube instance until the migration is completed. Currently, it can only happen through private APIs exposed to a developer.
If this needs to absolutely happen, create a new dimension, insert the dimension data and run the entire event source through the transformers to update the datasets.
A relationnal relational database is used to store the data in a flat manner. All fields are flattened and used as column names when creating a dimension table.
The schema provided to insert data is used to validate the data before inserting it into the database. This is to ensure that the data is in the correct format and that the data types are correct. AJV is used to validate the data.
4.
...
cQube implements Opentelemetry Instruments which can be of the following types. A single instrument allows for a lot of functionalities downstream so please ensure you have all the use cases covered before adding a new instrument
...
counter
(def) - allows for aggregations as sum
, count
, percentage
, min
, max
, avg
, percentile
...
Datasets
Dataset insert
requires an input that looks like the following.
Code Block |
---|
{
"name": "attendance_school",
"data": {
"date": "2019-01-01", // This is a dimension
"school_id": 901, // This is a dimension
"grade": 1,
},
"counter": 190
}
|
Note: id
, created_at
, updated_at
, count
, sum
, and percentage
are all automatically added to the dataset. Mapping a dataset to a dimension happens the DatasetGrammar
which is defined ahead of time.
Dataset Grammar
Defines
How the dataset is defined
How it is linked to dimensions
Which fields need to be cached/indexed to be queried faster
Code Block |
---|
{
"indexes": [["date"], ["school_id"]] // For faster queries
"title": "attendance_school_grade", //name of the dataset
"schema": {
// Will be a JSONSchema. Right now just sharing an example here
"date": "2019-01-01", // Dimension
"grade": 1, // Addtional filterable data => indexed
"school_id": 901, // Dimension
},
"dimensionMappings": [
{
"key": "school_id",
"dimension": {
"name": "Dimension1",
"mapped_to": "Dimension1.school_id" //Optional in case of time.
}
},
{
"key": "date",
"dimension": {
"name": "Last 7 Days"
}
}
]
} |
5. Instruments
cQube implements Opentelemetry Instruments which can be of the following types. A single instrument allows for a lot of functionalities downstream so please ensure you have all the use cases covered before adding a new instrument
counter
(def) - allows for aggregations assum
,count
,percentage
,min
,max
,avg
,percentile
Future instruments to be implemented -
meter
,gauge
,histogram
.
6. Event Management
All events are stored as CSV files in S3. They can be rerun to repopulate
Datasets
andDimensions
.Time dimensions-based Datasets are rolled up and bucketed using an OLAP extension in PSQL. This provides the best of both worlds - faster inserts (for data with an older timestamp compared to OLAPs) and almost comparable OLAP rolling aggregations on time dimensions.
Assumptions
Since it’s not a frequent use case, Dimension updates and deletion are not supported. In case this is needed they can just create a new dimension, update the mapping to datasets and refresh the datasets.
Event Sourcing is used throughout the design.
Next Steps
SDKs can be added to push events to Opentelemetry Receivers
The current collection service can be deprecated to use Opentelemetry Receivers.
Transformers can be moved to an Opentelemetry Transformer.
The collector can send data to any destination using official plugins.