Skip to end of metadata
Go to start of metadata

You are viewing an old version of this page. View the current version.

Compare with Current Restore this Version View Page History

« Previous Version 6 Next »

1. Ingestion Specification

Ingestion is a simple API that allows you to ingest you data as events. There are two steps for this.

  1. Define and Event Grammar

{
  "instrument_details": {
    "type": "COUNTER",
    "key": "count"
  },
  "name": "attendance_by_school_grade_for_a_day",
  "is_active": true,
  "event_schema": {
    "$schema": "https://json-schema.org/draft/2019-09/schema",
    "$id": "http://example.com/example.json",
    "type": "object",
    "default": {},
    "title": "attendance_by_school_grade_for_a_day",
    "required": ["date", "grade", "school_id", "count"],
    "properties": {
      "date": {
        "type": "string",
        "default": "",
        "title": "The date Schema",
        "examples": ["2019-01-01"]
      },
      "grade": {
        "type": "integer",
        "default": 0,
        "title": "The grade Schema",
        "examples": [1]
      },
      "school_id": {
        "type": "integer",
        "default": 0,
        "title": "The school_id Schema",
        "examples": [901]
      }
    },
    "examples": [
      {
        "date": "2019-01-01",
        "grade": 1,
        "school_id": 901,
        "count": 23
      }
    ]
  }
}

Instruments: Currently only a Counter instrument is supported.

2. Publish an event that satisfies the grammar

{
    "date": "2019-01-01",
    "grade": 1,
    "school_id": 901,
    "count": 23
}

If the data is not aggregated at a daily level, an event could also be shared at a week, month, year level as well by just defining a startDate and endDate.

{
    "startDate": "2019-01-01",
    "endDate": "2019-02-01"
    "grade": 1,
    "school_id": 901,
    "count": 23
}

2. Transformer Specification

The transformer is defined as an actor and can be sync as well as async. This allows you to write any transformer in any language as long as it gets the job done. For the current implementation in cQube we create custom processors for every single transformer in Nifi. Events are transformed in bulk every few mins (A config needs to be exposed to for defining how long do we wait for bulk transformations).

const transformer = {
    name: "transformer_name", 
    eventGrammar: "es11", //EventGrammar on which the transformer can act.
    datasetGrammar: "ds23", //Dataset that this transformer which modify.
    config: {
        actor((callback, receive) => {

            // manage side effects here 
            // Examples - logging, pushing to an externa event bus, etc.
            callback('SOME_EVENT');

            // receive from parent
            transform((event: Event) => {
              // transform event to dataset; Has access to event and dataset grammars for validation
            });
            
            // for speedups
            transform((event: Event[]) => {
              // transform events to dataset; Has access to event and dataset grammars for validation
            });

            // disposal
            return () => {
              /* do cleanup here */
              /* push this to dataset */
            };
        }),
    }
}

How this event gets processed later is defined by a pipe. A pipe connects then event to a source. Example of a pipe is as shown below

{
  "event": "es11",
  "transformer": "tr33",
  "dataset": "ds23"
}

You can also use keep the transformer as null to just bypass everything and not store data.

3. Dimensions

4. Instruments