1. Ingestion Specification
Ingestion is a simple API that allows you to ingest you data as events. There are two steps for this.
Define and Event Grammar
Code Block |
---|
|
{
"instrument_details": {
"type": "COUNTER",
"key": "count"
},
"name": "attendance_by_school_grade_for_a_day",
"is_active": true,
"event_schema": {
"$schema": "https://json-schema.org/draft/2019-09/schema",
"$id": "http://example.com/example.json",
"type": "object",
"default": {},
"title": "attendance_by_school_grade_for_a_day",
"required": ["date", "grade", "school_id", "count"],
"properties": {
"date": {
"type": "string",
"default": "",
"title": "The date Schema",
"examples": ["2019-01-01"]
},
"grade": {
"type": "integer",
"default": 0,
"title": "The grade Schema",
"examples": [1]
},
"school_id": {
"type": "integer",
"default": 0,
"title": "The school_id Schema",
"examples": [901]
}
},
"examples": [
{
"date": "2019-01-01",
"grade": 1,
"school_id": 901,
"count": 23
}
]
}
} |
Instruments: Currently only a Counter
instrument is supported.
2. Publish an event that satisfies the grammar
Code Block |
---|
|
{
"date": "2019-01-01",
"grade": 1,
"school_id": 901,
"count": 23
} |
If the data is not aggregated at a daily level, an event could also be shared at a week
, month
, year
level as well by just defining a startDate
and endDate
.
Code Block |
---|
{
"startDate": "2019-01-01",
"endDate": "2019-02-01"
"grade": 1,
"school_id": 901,
"count": 23
} |
The transformer is defined as an actor and cand can be sync as well as async. This allows you to write any transformer in any language as long as it gets the job done. For the current implementation in cQube we create custom processors for every single transformer in Nifi. Events are transformed in bulk every few mins (A config needs to be exposed to for defining how long do we wait for bulk transformations).
Code Block |
---|
|
const transformer = {
name: "transformer_name",
eventGrammar: "es11", //EventGrammar on which the transformer can act.
datasetGrammar: "ds23", //Dataset that this transformer which modify.
config: {
spawnactor((callback, receive) => {
// manage side effects here
// Examples - logging, pushing to an externa event bus, etc.
callback('SOME_EVENT');
// receive from parent
transform((event: Event) => {
// transform event to dataset; Has access to event and dataset grammars for validation
});
// for speedups
transform((event: Event[]) => {
// transform events to dataset; Has access to event and dataset grammars for validation
});
// disposal
return () => {
/* do cleanup here */
/* push this to dataset */
};
}),
}
}
|
How this event is gets processed later is defined by a pipe.
A pipe connects then event to a source. Example of a pipe is as shown below
Code Block |
---|
|
{
"event": "es11",
"transformer": "tr33",
"dataset": "ds23"
} |
You can also use keep the transformer as null
to just bypass everything and not store data.
3. Dimensions
4. Instruments