Skip to content

Commit

Permalink
OZ-402: Jobs refactored to support configuring source databases + Swi…
Browse files Browse the repository at this point in the history
…tch to using MiniCluster. (#13)

* OZ-402: Refactor Streaming,Batch and Export jobs to support using arbitrary databses and Switch to using MiniCluster

* Rename BatchParquetExport class

* Fix Connect Mysql boolean handling

* README enhanced with ChatGPT.

* Add sl4j for logging

* Update README

* Update README

* Update README

* Remove log pushed in error

---------

Co-authored-by: Dimitri R <dimitri@mekomsolutions.com>
  • Loading branch information
enyachoke and mks-d authored Nov 20, 2023
1 parent 52c3737 commit edb7052
Show file tree
Hide file tree
Showing 20 changed files with 782 additions and 240 deletions.
65 changes: 34 additions & 31 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,20 +1,16 @@
# Ozone Analytics ETL Pipelines

# Ozone ETL pipelines
## Overview

## Flink



This repository contains ETL [Flink](hhttps://ci.apache.org/projects/flink/flink-docs-master/) [jobs](https://ci.apache.org/projects/flink/flink-docs-master/docs/internals/job_scheduling/#:~:text=A%20Flink%20job%20is%20first,it%20cancels%20all%20running%20tasks) for flattening [Ozone HIS](https://github.com/ozone-his) data.
This repository contains the ETL pipelines that are used to transform data from all Ozone components into a format that is easy to query and analyze. The pipelines are written in [Apache Flink](https://ci.apache.org/projects/flink/flink-docs-master/), a powerful framework that supports both batch and real-time data processing.

## Features
The project provides the following features:



- Provides both [batch]() and [streaming]() modes

- Currently flattens OpenMRS to output reporting friendly tables for:
- Support for [**Batch Analytics**](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/batch/batch_shuffle/) and [**Streaming Analytics**](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/overview/) ETL

- Flattening of data from Ozone HIS Components into a format that is easy to query and analyze.:
The data that is flattened depends on project needs. For example, our Reference Distro provides flattening queries that produce the following tables:
- patients

- observations
Expand All @@ -37,33 +33,31 @@ This repository contains ETL [Flink](hhttps://ci.apache.org/projects/flink/flink



## Tech

- [Flink](hhttps://ci.apache.org/projects/flink/flink-docs-master/) - For ETL
- [Kafka connect](https://docs.confluent.io/platform/current/connect/index.html) - For CDC
- [Kafka](https://kafka.apache.org/) - For streaming data
## Technologies
We utilize the following technologies to power our ETL pipelines:
- [Apache Flink](hhttps://ci.apache.org/projects/flink/flink-docs-master/) - For orchestrating the ETL jobs.
- [Kafka Connect](https://docs.confluent.io/platform/current/connect/index.html) - for Change Data Capture (CDC).
- [Apache Kafka](https://kafka.apache.org/) - For managing data streams.

### Development

#### DSL
#### Domain Specific Languages (DSLs)

The project provides for defining ETL jobs for reporting. The underlying DSLs usable for the jobs are categorised as:
- [Flattening DSLs](https://github.com/ozone-his/ozonepro-distro/analytics_config/dsl/flattening/README.md) - For flattening data from OpenMRS. Note that these are related to the liquibase migration scripts that are used to create destination tables found [here](https://github.com/ozone-his/ozonepro-distro/analytics_config/liquibase/analytics/).
. The project generates Flink jobs based on SQL DSLs. The DSLs are categorized into two:
- [Flattening DSLs](https://github.com/ozone-his/ozonepro-distro/analytics_config/dsl/flattening/README.md) - For flattening data from OpenMRS. Note that these are related to the Liquibase migration scripts that are used to create destination tables found [here](https://github.com/ozone-his/ozonepro-distro/analytics_config/liquibase/analytics/).
- [Parquet Export DSLs](https://github.com/ozone-his/ozonepro-distro/analytics_config/dsl/export/README.md) - For exporting data to parquet files


#### Step1: startup backing services
#### Step1: Start Required Services
The project assumes you already have an Ozone HIS instance running. If not please follow the instructions [here](https://github.com/ozone-his/ozone-docker) or [here](https://github.com/ozone-his/ozonepro-docker) to get one up and running.

The project also assumes you have the required migration scripts and destination table creation sripts with their queries scripts located somewhere you know. They can be downloaded as part of the project [here](https://github.com/ozone-his/ozonepro-distro) in the `analytics_config` directory, for example, the following `env` variable would be exported as below;
The project also assumes you have the required migration scripts and destination table creation scripts with their query scripts located somewhere you know. They can be downloaded as part of the project [here](https://github.com/ozone-his/ozonepro-distro) in the `analytics_config` directory, for example, the following `env` variable would be exported as below;

```bash
export ANALYTICS_SOURCE_TABLES_PATH=~/ozonepro-distro/analytics_config/dsl/flattening/tables/;
export ANALYTICS_QUERIES_PATH=~/ozonepro-distro/analytics_config/dsl/flattening/queries/;
export ANALYTICS_DESTINATION_TABLES_MIGRATIONS_PATH=~/ozonepro-distro/analytics_config/liquibase/analytics/;
export EXPORT_DESTINATION_TABLES_PATH=~/ozonepro-distro/analytics_config/dsl/export/tables/;
export EXPORT_SOURCE_QUERIES_PATH=~/ozonepro-distro/analytics_config/dsl/export/queries;

```

```cd development```
Expand All @@ -76,7 +70,7 @@ export ANALYTICS_DESTINATION_TABLES_MIGRATIONS_PATH= path_to_folder_containing_l
export ANALYTICS_DB_HOST=gateway.docker.internal; \
export ANALYTICS_DB_PORT=5432; \
export CONNECT_MYSQL_HOSTNAME=gateway.docker.internal; \
export CONNECT_MYSQL_PORT=3306; \
export CONNECT_MYSQL_PORT=3307; \
export CONNECT_MYSQL_USER=root; \
export CONNECT_MYSQL_PASSWORD=3cY8Kve4lGey; \
export CONNECT_ODOO_DB_HOSTNAME=gateway.docker.internal; \
Expand All @@ -94,7 +88,10 @@ export CONNECT_ODOO_DB_PASSWORD=password
```mvn clean install compile```

#### Step 3:
##### Run Streaming job
***Note***: The `ANALYTICS_CONFIG_FILE_PATH` env var provides the location of the configuration file required by all jobs. An example file is provided at `development/data/config.yaml`


##### Running in Streaming mode

```bash
export ANALYTICS_SOURCE_TABLES_PATH=path_to_folder_containing_source_tables_to_query_from;\
Expand All @@ -111,17 +108,21 @@ export OPENMRS_DB_NAME=openmrs;\
export OPENMRS_DB_USER=root;\
export OPENMRS_DB_PASSWORD=3cY8Kve4lGey;\
export OPENMRS_DB_HOST=localhost;\
export OPENMRS_DB_PORT=3306;\
export OPENMRS_DB_PORT=3307;\
export ODOO_DB_NAME=odoo;\
export ODOO_DB_USER=postgres;\
export ODOO_DB_PASSWORD=password;\
export ODOO_DB_HOST=localhost;\
export ODOO_DB_PORT=5432;
export ODOO_DB_PORT=5432;\
export ZOOKEEPER_URL=localhost:2181;\
export ANALYTICS_CONFIG_FILE_PATH=$(pwd)/development/data/config.yaml;\
export ANALYTICS_KAFKA_URL=localhost:29092
```

```mvn compile exec:java -Dexec.mainClass="com.ozonehis.data.pipelines.streaming.StreamingETLJob" -Dexec.classpathScope="compile"```

##### Run Batch job
##### Runin Batch mode

```bash
export ANALYTICS_DB_USER=analytics;\
export ANALYTICS_DB_PASSWORD=password;\
Expand All @@ -138,10 +139,11 @@ export ODOO_DB_USER=postgres;\
export ODOO_DB_PASSWORD=password;\
export ODOO_DB_HOST=localhost;\
export ODOO_DB_PORT=5432;
export ANALYTICS_CONFIG_FILE_PATH=$(pwd)/development/data/config.yaml;\
```
```mvn compile exec:java -Dexec.mainClass="com.ozonehis.data.pipelines.batch.BatchETLJob" -Dexec.classpathScope="compile"```

##### Run Parquet Export job
##### Run Export job
```mkdir -p development/data/parquet/```

```bash
Expand All @@ -157,10 +159,11 @@ export ANALYTICS_DB_PORT=5432;\
export ANALYTICS_DB_NAME=analytics;\
export EXPORT_OUTPUT_PATH=$(pwd)/development/data/parquet/;\
export EXPORT_OUTPUT_TAG=h1;
export ANALYTICS_CONFIG_FILE_PATH=$(pwd)/development/data/config.yaml;\
```
```mvn compile exec:java -Dexec.mainClass="com.ozonehis.data.pipelines.export.BatchParquetExport" -Dexec.classpathScope="compile"```
```mvn compile exec:java -Dexec.mainClass="com.ozonehis.data.pipelines.export.BatchExport" -Dexec.classpathScope="compile"```


## Gotchas
When streaming data from Postgres See
When streaming data from PostgreSQL See
[consuming-data-produced-by-debezium-postgres-connector](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/debezium/#consuming-data-produced-by-debezium-postgres-connector)
2 changes: 1 addition & 1 deletion development/.env
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ CHANGELOG_FILE=db.changelog-master.xml
ODOO_ANALYTICS_TABLES='databasechangelog,account_account,product_category,sale_order'

#Kafka
CREATE_TOPICS=appointment_service:1:1,appointment_service_type:1:1,care_setting:1:1,concept:1:1,concept_name:1:1,concept_reference_map:1:1,concept_reference_source:1:1,concept_reference_term:1:1,conditions:1:1,encounter:1:1,encounter_diagnosis:1:1,encounter_type:1:1,location:1:1,form:1:1,obs:1:1,order_type:1:1,orders:1:1,patient:1:1,patient_appointment:1:1,patient_appointment_provider:1:1,patient_identifier:1:1,patient_identifier_type:1:1,patient_program:1:1,program:1:1,person:1:1,person_name:1:1,person_address:1:1,visit_type:1:1,visit:1:1
CREATE_TOPICS=openmrs.openmrs.appointment_service,openmrs.openmrs.appointment_service_type,openmrs.openmrs.care_setting,openmrs.openmrs.concept,openmrs.openmrs.concept_name,openmrs.openmrs.concept_reference_map,openmrs.openmrs.concept_reference_source,openmrs.openmrs.concept_reference_term,openmrs.openmrs.conditions,openmrs.openmrs.encounter,openmrs.openmrs.encounter_diagnosis,openmrs.openmrs.encounter_type,openmrs.openmrs.location,openmrs.openmrs.form,openmrs.openmrs.obs,openmrs.openmrs.order_type,openmrs.openmrs.orders,openmrs.openmrs.patient,openmrs.openmrs.patient_appointment,openmrs.openmrs.patient_appointment_provider,openmrs.openmrs.patient_identifier,openmrs.openmrs.patient_identifier_type,openmrs.openmrs.patient_program,openmrs.openmrs.program,openmrs.openmrs.person,openmrs.openmrs.person_name,openmrs.openmrs.person_address,openmrs.openmrs.visit_type,openmrs.openmrs.visit,odoo.public.sale_order

# Postgresql
POSTGRES_USER=postgres
Expand Down
3 changes: 2 additions & 1 deletion development/data/.gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
parquet
parquet
*.ser
57 changes: 57 additions & 0 deletions development/data/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# Configuration Jdbc catalogs which Map to actual databases allowing direct read and write without temporary tables
jdbcCatalogs:
# Name of the catalog
- name: analytics
# Name of the default database in the catalog
defaultDatabase: '${ANALYTICS_DB_NAME:-analytics}'
# Database username
username: '${ANALYTICS_DB_USER:-analytics}'
# Databse password
password: '${ANALYTICS_DB_PASSWORD:-analytics}'
# Jdbc Database Url
baseUrl: 'jdbc:postgresql://${ANALYTICS_DB_HOST:-localhost}:${ANALYTICS_DB_PORT:-5432}'
driver: postgresql
# Configuration for Kafka data streams used for streaming analytics
kafkaStreams:
# Topic prefix generated by Kafka Connect
- topicPrefix: openmrs.openmrs
# Path to the table definitions for temporary source tables
tableDefinitionsPath: '${ANALYTICS_SOURCE_TABLES_PATH:-/analytics/source-tables}/openmrs'
# Kafka bootstrap servers
bootstrapServers: '${ANALYTICS_KAFKA_URL:-localhost:9092}'
- topicPrefix: odoo.public
tableDefinitionsPath: '${ANALYTICS_SOURCE_TABLES_PATH:-/analytics/source-tables}/odoo'
bootstrapServers: '${ANALYTICS_KAFKA_URL:-localhost:9092}'
# Configuration for Jdbc data sources used for batch analytics
jdbcSources:
# Database url for the source database
- databaseUrl: jdbc:mysql://${OPENMRS_DB_HOST:-localhost}:${OPENMRS_DB_PORT:-3306}/${OPENMRS_DB_NAME:-openmrs}?sslmode=disable
# Username for the source database
username: '${OPENMRS_DB_USER:-openmrs}'
# Password for the source database
password: '${OPENMRS_DB_PASSWORD:-openmrs}'
tableDefinitionsPath: '${ANALYTICS_SOURCE_TABLES_PATH:-/analytics/source-tables}/openmrs'
- databaseUrl: jdbc:postgresql://${ODOO_DB_HOST:-localhost}:${ODOO_DB_PORT:-5432}/${ODOO_DB_NAME:-odoo}?sslmode=disable
username: '${ODOO_DB_USER:-odoo}'
password: '${ODOO_DB_PASSWORD:-odoo}'
tableDefinitionsPath: '${ANALYTICS_SOURCE_TABLES_PATH:-/analytics/source-tables}/odoo'
# Configuration for Jdbc data sinks
jdbcSinks:
# Name of the jdbc catalog to write into. The catalog must be defined in the jdbcCatalogs section
- jdbcCatalog: analytics
# Name of the databse. The database must be defined in the jdbcCatalog above
databaseName: analytics
# The path to the queries to use for flattening data
queryPath: '${ANALYTICS_QUERIES_PATH:-/analytics/queries}'
# Configuration for File based data sinks
fileSinks:
# Path to definations for temporary destination tables
- destinationTableDefinitionsPath: '${EXPORT_DESTINATION_TABLES_PATH:-/export/destination-tables}'
# The path for the queries to use for export
queryPath: '${EXPORT_SOURCE_QUERIES_PATH:-/export/queries}'
# The path to use for the exported output
exportOutputPath: '${EXPORT_OUTPUT_PATH:-/tmp/parquet}'
# The tag to use for the exported output. This is used to create a subdirectory in the exportOutputPath
exportOutPutTag: '${EXPORT_OUTPUT_TAG:-location1}'
# The format for export files. Currently only parquet and csv are supported
format: parquet
37 changes: 37 additions & 0 deletions development/docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,22 @@
version: '3.8'
services:
zookeeper:
restart: on-failure
image: debezium/zookeeper:${DEBEZIUM_VERSION}
ports:
- 2181:2181
- 2888:2888
- 3888:3888
volumes:
- zookeeper-data:/zookeeper/data
- zookeeper-txns:/zookeeper/txns
zoonavigator:
image: elkozmon/zoonavigator
ports:
- "8000:8000"
environment:
HTTP_PORT: 8000
AUTO_CONNECT_CONNECTION_STRING: zookeeper:2181
kafka:
restart: on-failure
image: debezium/kafka:${DEBEZIUM_VERSION}
Expand All @@ -24,6 +41,24 @@ services:
"-c",
"./bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --list"
]
kafka-setup:
restart: on-failure
image: debezium/kafka:${DEBEZIUM_VERSION}
command:
- /bin/bash
- -c
- |
IFS=',' read -ra topics <<< "$$TOPICS"
for topic in $${topics[@]}
do
echo "Creating topic $$topic..."
./bin/kafka-topics.sh --create --topic $$topic --partitions 1 --replication-factor 1 --if-not-exists --bootstrap-server kafka:9092
./bin/kafka-configs.sh --bootstrap-server=kafka:9092 --entity-type topics --entity-name $$topic --alter --add-config retention.ms=31556736000
done
environment:
- TOPICS=${CREATE_TOPICS}
depends_on:
- kafka
connect:
restart: on-failure
image: debezium/connect:${DEBEZIUM_VERSION}
Expand Down Expand Up @@ -111,3 +146,5 @@ services:

volumes:
kafka-data: ~
zookeeper-data: ~
zookeeper-txns: ~
3 changes: 2 additions & 1 deletion development/setup-connect/setup-connect.sh
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ curl --fail -i -X PUT -H "Accept:application/json" -H "Content-Type:application/
"table.exclude.list": "${file:/kafka/config/connect-distributed.properties:table.exclude.list}",
"database.history.kafka.bootstrap.servers": "${file:/kafka/config/connect-distributed.properties:mysql.kafka.bootstrap.servers}",
"database.history.kafka.topic": "${file:/kafka/config/connect-distributed.properties:mysql.histroy.topic}",
"converters": "timestampConverter",
"converters": "timestampConverter,boolean",
"boolean.type": "io.debezium.connector.mysql.converters.TinyIntOneToBooleanConverter",
"timestampConverter.type": "oryanmoshe.kafka.connect.util.TimestampConverter",
"timestampConverter.format.time": "HH:mm:ss",
"timestampConverter.format.date": "YYYY-MM-dd",
Expand Down
Loading

0 comments on commit edb7052

Please sign in to comment.