Collection and Management of Public Policy Datasets

The Data Management component is in charge of the collection and management of the datasets from different stakeholders in the AI4PublicPolicy. The dataset management component loads and stores the data to be analyzed by the Virtualized Policy Management Environment (VPME). Data can be collected from diverse data source types and in different formats including data at-rest and streaming data.

The Data Management component must consider different data sources providing data in different formats, and with different schemas to be useful for a wide range of applications. It also considers data sources that provide data-at-rest (files with data) and streaming data (e.g., data that is not stored and comes directly from the source in a continuous manner). The data is then stored in a SQL database for further processing. This type of datastore can be integrated with tools for semantic processing like Apache Jena. An initial data model has been developed based on the datasets provided by the pilots (more than 30 different datasets).

Data Management Architecture

The Data Management component is made of a set of sub-components shown in the following figure. It provides a REST API to connect to. The Data Collector component manages the request and starts the data collection process. This process downloads the data (Data Retrieval component) which may have different formats (CSV, JSON and GeoJSON among others), and stores it in the datastore (Data Injector). These data can be later analysed by the VPME.

Data Collector

The Data Management component is accessed through a web service. The following figure shows the Swagger UI with the API of the Data Collector.

The Data Collector request has as a parameter a JSON object with three fields: endpoint, sourceType, and dataSetSchemaJSON. The endpoint indicates the point from where the dataset is reachable to be downloaded. The sourceType describes the format of data in the dataset (JSON and CSV among others), if it is static data or streaming, if it is data streaming. The dataSetSchemaJSON provides an endpoint where the JSON with the schema of the dataset is provided.

When the Data Collector receives a request, it starts the data retrieval and data injector flow. First, the Data Collector identifies the table of the AI4PublicPolicy datastore in which the data is going to be stored. This table is chosen to match the provided schema of the dataset with the ones of the tables in the AI4PublicPolicy datastore. Then, the Data Retrieval component downloads the dataset if the source type is not streaming and invokes the Data Injector component to store the data.

Data Retrieval

The Data Retrieval gets the actual dataset and schema from different data sources available in the form of a file, database, or streaming data and in different formats. If the dataset is a file (not streaming data), the Data Retrieval component downloads the corresponding file and stores it temporally until it is loaded into the datastore. If the format is streaming, the schema includes the endpoint from where the data is going to be read which will be used by the Data Injector component. The Data Retrieval component is implemented in Python.

Data Injector

The Data Injector component reads the file with the dataset and its schema downloaded by the Data Retrieval component and stores the dataset in the AI4PublicPolicy datastore. The Data Injector component is implemented in the JAVA programming language using the Apache Beam framework (shown in the following figure) and Apache Flink as a runner. Apache Beam provides a unified model for the definition of batch and streaming processing pipelines or models. Pipelines can be defined in JAVA, Python, or Go, which are executed by a runner, for instance, Spark, Flink, Samza, and Google Cloud Dataflow among others.

The Beam Models are based on the principle of pipelines. A pipeline encapsulates the data processing task: reading input data, transforming the data, and writing the output data. It is possible to define the input data schema through a schema JSON object whose fields are defined in the BigQuery format. The data can be consumed and written from/to different source types, like File-based, FileSystem (e.g., Hadoop Distributed File System, Google Cloud Storage, Local File System, and Amazon S3), messaging systems (e.g., Kinesis, Apache Kafka, MQTT and RabitMQ among others) and databases (Apache Cassandra, Google Cloud Bigtable, MongoDB, JDBC and Redis among others), moreover, other I/O formats are being developed.

The following figure shows all available I/O formats Apache Beam can deal with.

The pipeline data source is configured with the location of the dataset in the storage and the dataset JSON schema object. Once the data source is configured, the data is read in the pipeline and transformed to be stored in the AI4PublicPolicy datastore.

The AI4PublicPolicy database has been designed and created taking into account the datasets identified by the pilots. The corresponding data model is shown in the following figure.

The progress of injecting data can be monitored using the Flink GUI (as shown in the following figure). Once the Flink job completes, the data is available in the database.