1. Ingestion Specification
Ingestion is a simple API that allows you to ingest you data as events. There are two steps for this.
Define and Event Grammar
{ "instrument_details": { "type": "COUNTER", "key": "count" }, "name": "attendance_by_school_grade_for_a_day", "is_active": true, "event_schema": { "$schema": "https://json-schema.org/draft/2019-09/schema", "$id": "http://example.com/example.json", "type": "object", "default": {}, "title": "attendance_by_school_grade_for_a_day", "required": ["date", "grade", "school_id", "count"], "properties": { "date": { "type": "string", "default": "", "title": "The date Schema", "examples": ["2019-01-01"] }, "grade": { "type": "integer", "default": 0, "title": "The grade Schema", "examples": [1] }, "school_id": { "type": "integer", "default": 0, "title": "The school_id Schema", "examples": [901] } }, "examples": [ { "date": "2019-01-01", "grade": 1, "school_id": 901, "count": 23 } ] } }
Instruments: Currently only a Counter
instrument is supported.
2. Publish an event that satisfies the grammar
{ "date": "2019-01-01", "grade": 1, "school_id": 901, "count": 23 }
If the data is not aggregated at a daily level, an event could also be shared at a week
, month
, year
level as well by just defining a startDate
and endDate
.
{ "startDate": "2019-01-01", "endDate": "2019-02-01" "grade": 1, "school_id": 901, "count": 23 }
2. Transformer Specification
The transformer is defined as an actor and can be sync as well as async. This allows you to write any transformer in any language as long as it gets the job done. For the current implementation in cQube we create custom processors for every single transformer in Nifi. Events are transformed in bulk every few mins (A config needs to be exposed to for defining how long do we wait for bulk transformations).
const transformer = { name: "transformer_name", eventGrammar: "es11", //EventGrammar on which the transformer can act. datasetGrammar: "ds23", //Dataset that this transformer which modify. config: { actor((callback, receive) => { // manage side effects here // Examples - logging, pushing to an externa event bus, etc. callback('SOME_EVENT'); // receive from parent transform((event: Event) => { // transform event to dataset; Has access to event and dataset grammars for validation }); // for speedups transform((event: Event[]) => { // transform events to dataset; Has access to event and dataset grammars for validation }); // disposal return () => { /* do cleanup here */ /* push this to dataset */ }; }), } }
How this event gets processed later is defined by a pipe.
A pipe connects then event to a source. Example of a pipe is as shown below
{ "event": "es11", "transformer": "tr33", "dataset": "ds23" }
You can also use keep the transformer as null
to just bypass everything and not store data.
0 Comments