...
This document provides the details of how the product workflow has been designed by considering the use cases of end users. This will explain the flow process of the application starting from Injecting the events, dimensions & datasetshow the data is uploaded to the cloud storage.
Purpose: The purpose of the document is to provide how to upload the data cloud storage using ingestion api’s so that it can used for processing to ingest the data and access into the end datasets.
Step-wise Ingestion process
...
API Endpoint: <domain_name>/ingestion/csvnew_programs
HTTP Method: POST
...
This API will import the dimension csv and store upload it in to the inputs combined_input folder in cloud if there are no errors. Later the same file will be used by the Nifi processor to process and ingest the data into the database.
Step 2: Build the request body with reference to YAML file. The request body for the above api is attached in Link for yaml: https://github.com/Sunbird-cQube/spec-ms/blob/dev/spec.yaml
...
API Endpoint: <domain_name>/ingestion/csvnew_programs
HTTP Method: POST
...
This API will import the event csv and store upload it in to the inputs combined_input folder in cloud if there are no errors. Later Then adapter will use the same file will be used by the Nifi processor to process and ingest the data into the database.files to breakdown the combined input into multiple input files.
...
Step 2: Build the request body with reference to YAML file. The request body for the above api is attached in Link for yaml: https://github.com/Sunbird-cQube/spec-ms/blob/dev/spec.yaml
...
After successful execution of the csv import api we get the response and we can see the file status indicating if the file is uploaded or not using GET file status API. If the file is successfully uploaded we will get the response as uploaded and if there are any errors it will send us the response indicating there was an error in the file.
Ingestion of Dataset data using CSV
Select the csv import folder from the postman collection.
Step 1: Open the specified request & add the details
API Endpoint: <domain_name>/ingestion/csv
HTTP Method: POST
...
This API will import the event csv and store it in the inputs folder if there are no errors. Later the same file will be used by the Nifi processor to process and ingest the data into the database.
Step 2: Build the request body with reference to YAML file. The request body for the above api is attached in Link for yaml: https://github.com/Sunbird-cQube/spec-ms/blob/dev/spec.yaml
Provide the valid input details for the Parameters shown below.
file : Attach the csv file for the importing
ingestion_type : Specify the type of ingestion
ingestion_name : Name of the dataset(dataset name should be present in the database)
Step 3: Click on the send button for the request and if the request is successful the user should see a response message. Please refer to the below screenshot.
After successful execution of the csv import api we get the response and we can see the file status indicating if the file is uploaded or not using GET file status API. If the file is successfully uploaded we will get the response as uploaded and if there are any errors it will send us the response indicating there was an error in the file.
Ingestion of data using API
The data can also be ingested using API’s developed for event,dimension and dataset.
Ingestion of Events using API
Step 1: Open the specified request & add the details
API Endpoint: <domain_name>/ingestion/event
HTTP Method: POST
...
This API will be used to write events into the csv file which will be stored in the input folder. The csv will then be used by the Nifi processor to ingest the data into the database. The API can be used to add individual events into csv.
Step 2: Build the request body with reference to YAML file. The request body for the above api is attached in Link for yaml: https://github.com/Sunbird-cQube/spec-ms/blob/dev/spec.yaml
Provide the valid input details for the Parameters shown below. The request body should conform to the schema stored in the database for the particular event name.
...
Step 3: Click on the send button for the request and if the request is successful the user should see a response message. Please refer to the below screenshot.
After successful execution of the event api we get the response and the data sent in the request body will be written to the csv file. If there are any errors it will be written to the separate csv file and valid data will be written to the other csv file.
...
Ingestion of Dimensions using API
Step 1: Open the specified request & add the details
API Endpoint: <domain_name>/ingestion/dimension
HTTP Method: POST
...
This API will be used to write dimensions into the csv file which will be stored in the input folder. The csv will then be used by the Nifi processor to ingest the data into the database. This API can be used to add individual dimensions into csv.
Step 2: Build the request body with reference to YAML file. The request body for the above api is attached in Link for yaml: https://github.com/Sunbird-cQube/spec-ms/blob/dev/spec.yaml . Provide the valid input details for the Parameters shown below. The request body should conform to the schema stored in the database for the particular dimension name.
...
Step 3: Click on the send button for the request and if the request is successful the user should see a response message. Please refer to the below screenshot.
After successful execution of the dimension api we get the response and the data sent in the request body will be written to the csv file. If there are any errors it will be written to the separate csv file and valid data will be written to the other csv file.
...
Ingestion of Datasets using API
Step 1: Open the specified request & add the details
API Endpoint: <domain_name>/ingestion/dataset
HTTP Method: POST
...
This API will be used to write datasets into the csv file which will be stored in the input folder. The csv will then be used by the Nifi processor to ingest the data into the database. This API can be used to add individual datasets into csv.
Step 2: Build the request body with reference to YAML file. The request body for the above api is attached in Link for yaml: https://github.com/Sunbird-cQube/spec-ms/blob/dev/spec.yaml
Provide the valid input details for the Parameters shown below. The request body should conform to the schema stored in the database for the particular dataset name.
...
Step 3: Click on the send button for the request and if the request is successful the user should see a response message. Please refer to the below screenshot.
...
After successful execution of the dataset api we get the response and the data sent in the request body will be written to the csv file. If there are any errors it will be written to the separate csv file and valid data will be written to the other csv file.
File Status API
There are two file status API’s:
...
GET file status API
...
Things to take care of while ingesting Data / Debugging:
The date format should be correct. The accepted date format is DD/MM/YY
The file name and schema name stored in table should be same.
Error Monitoring
The error file will store all the error records during the ingestion process and will be uploaded to the appropriate cloud storage. The user can login to the respective cloud storage and download the file to take a look at the error records present in the csv. The below mentioned steps will specify on how to access the error file.
The cloud storage bucket/container will be named as cQube-edu.
Inside the bucket there will be multiple folders for different processing stages and the ingestion error files will be stored in ingestion_error folder. This folder will in turn contain folders for each program, the name of the folder will be same as the <program_name>.
The <program_name> folder will internally have a folder which will have current date as the name.
The user can access the current date folder if its present and can see the error files present in it and download the error files to view the errors present in the csv.
Please refer to the below screenshots of different storage type( Minio, Azure and AWS) on how to access the error files
Accessing error files in Minio Storage
...
Accessing error files in Azure
...
Accessing error files in AWS
...
To get the count of processed records and error records GET /ingestion/file-status API can be used.
File Status API
GET file status API
GET file status API
...
Provide the valid input details for the parameters shown below.
...
Step 3: Click on the send button for the request and if the request is successful the user should see a response message which contains the status of the file.
...
If the csv files are too large in size, the upload process of those files will take more time which will make users wait for longer duration to receive the response from the API. As a result, the upload process will be running asynchronously and we have developed this api to know the status of the file at any particular time.
...
Schedule API
This API helps to schedule the processor group at any particular time.
Step 1: Open the specified request & add the details
API Endpoint: <domain_name>/ingestionspec/file-statusschedule
HTTP Method: PUTPOST
...
Step 2: Build the request body with reference to YAML file. The request body for the above api is attached in in Link for yaml: https://github.com/Sunbird-cQube/spec-ms/blob/dev/spec.yaml . Provide the valid input details for the parameters Parameters shown below.
...
Step 3: Click on the send button for the request and if the request is successful the user should see a response message. The response message schedule time will be indicating the file status has been updatedupdated in the processor group.
...
This
The schedule api is required helps to update the file status when the file is processed by a processor group and should be moved to ready to archive state and further to be uploaded to s3. This api helps to maintain the track of processed files and differentiate which files have to be uploaded to s3 archive bucket once it is processed.
Schedule API
This API helps to schedule the processor group at any particular time and ingest the data into the databaserun the processor group in Nifi at a scheduled time. The schedule time can be updated by changing the cron expression for scheduled_at property in the request body.
V4-Data Emission API
This API are used for data migration. It will help to migrate the data in cQube 4.0 version.
Step 1: Open the specified request & and add the details.
API Endpoint: <domain_name>/spec/scheduleingestion/v4-data-emission
HTTP Method: POSTGET
...
Step 2: Build the request body with reference to YAML file. The request body for the above api is attached in Link for yaml: https://github.com/Sunbird-cQube/spec-ms/blob/dev/spec.yaml . Provide the valid input details for the Parameters shown below. The pipeline name passed in the request body should be present in the database.
Step 3: Click on the send button for the request and if the request is successful the user should see a response message as “FIles uploaded successfully”. The schedule time files will be updated in the processor group.
...
The schedule api helps to run the processor group in Nifi at a scheduled time. The schedule time can be updated by changing the cron expression for scheduled_at property in the request body. The processor group processes the csv file at a scheduled time and ingests the data into the database.
Upload to S3 API
The api helps to upload the processed files to the archive bucket and also upload the file to the error bucket in s3uploaded to the emission folder created in the respective cloud storage.
...
National Programs API
This API accepts the event data in the zip file format and adds it to the emission folder in the respective cloud storage.
Step 1: Open the specified request & and add the details.
API Endpoint: <domain_name>/specingestion/s3national_programs
HTTP Method: POST
...
Step 2: Build the request body with reference to YAML file. The request body for the above api is attached in Link for yaml: https://github.com/Sunbird-cQube/spec-ms/blob/dev/spec.yaml . Provide the valid input details for the parameters Parameters shown below.
The scheduled_type key is allowed to take two types of values: archive and error.
If the value is archive then a processor group is created to upload all the archived files present in the archived folder in server to be uploaded to s3 archived bucket.
If the value is an error then a processor group is created to upload the error files present in the error folder in server to be uploaded to s3 error bucket.
...
Step 3: Build the request body with reference to YAML file. The request body for the above api is attached in Link for yaml: https://github.com/Sunbird-cQube/spec-ms/blob/dev/spec.yaml . Provide the valid input details for the Parameters shown below.
...
This api also schedules to run the processor group at any particular time. The processor group automatically picks all the files in the archived folder or error folder and uploads to s3 archive bucket and s3 error bucket respectively.
...
Step 3: Click on the send button for the request and if the request is successful the user should see a response message as “File uploaded successfully”. The files will be uploaded to the emission folder created in the respective cloud storage.
The zip file will be extracted and will be read by the adaptors and then moved into the input folders.
...