Ingestion Specification
Define and Event Grammar
{ "instrument_details": { "type": "COUNTER", "key": "count" }, "name": "attendance_by_school_grade_for_a_day", "is_active": true, "event_schema": { "$schema": "https://json-schema.org/draft/2019-09/schema", "$id": "http://example.com/example.json", "type": "object", "default": {}, "title": "attendance_by_school_grade_for_a_day", "required": ["date", "grade", "school_id", "count"], "properties": { "date": { "type": "string", "default": "", "title": "The date Schema", "examples": ["2019-01-01"] }, "grade": { "type": "integer", "default": 0, "title": "The grade Schema", "examples": [1] }, "school_id": { "type": "integer", "default": 0, "title": "The school_id Schema", "examples": [901] } }, "examples": [ { "date": "2019-01-01", "grade": 1, "school_id": 901, "count": 23 } ] } }
2. Publish an event that satisfies the grammar
{ "date": "2019-01-01", "grade": 1, "school_id": 901, "count": 23 }
Transformer Specification
The transformer is defined as an actor and cand be sync as well as async.
const transformer = { name: "transformer_name", eventGrammar: "es11", //EventGrammar on which the transformer can act. datasetGrammar: "ds23", //Dataset that this transformer which modify. config: { spawn((callback, receive) => { // manage side effects here // Examples - logging, pushing to an externa event bus, etc. callback('SOME_EVENT'); // receive from parent transform((event: Event) => { // transform event to dataset; Has access to event and dataset grammars for validation }); // for speedups transform((event: Event[]) => { // transform events to dataset; Has access to event and dataset grammars for validation }); // disposal return () => { /* do cleanup here */ /* push this to dataset */ }; }), } }
How this event is processed is defined by a pipe.
A pipe connects then event to a source. Example of a pipe is as shown below
{ "event": "es11", "transformer": "tr33", "dataset": "ds23" }
0 Comments