diff --git a/README.md b/README.md index f25b243..be5a1ef 100644 --- a/README.md +++ b/README.md @@ -1,20 +1,16 @@ +# Ozone Analytics ETL Pipelines -# Ozone ETL pipelines +## Overview -## Flink - - - -This repository contains ETL [Flink](hhttps://ci.apache.org/projects/flink/flink-docs-master/) [jobs](https://ci.apache.org/projects/flink/flink-docs-master/docs/internals/job_scheduling/#:~:text=A%20Flink%20job%20is%20first,it%20cancels%20all%20running%20tasks) for flattening [Ozone HIS](https://github.com/ozone-his) data. +This repository contains the ETL pipelines that are used to transform data from all Ozone components into a format that is easy to query and analyze. The pipelines are written in [Apache Flink](https://ci.apache.org/projects/flink/flink-docs-master/), a powerful framework that supports both batch and real-time data processing. ## Features +The project provides the following features: - - -- Provides both [batch]() and [streaming]() modes - -- Currently flattens OpenMRS to output reporting friendly tables for: +- Support for [**Batch Analytics**](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/batch/batch_shuffle/) and [**Streaming Analytics**](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/overview/) ETL +- Flattening of data from Ozone HIS Components into a format that is easy to query and analyze.: +The data that is flattened depends on project needs. For example, our Reference Distro provides flattening queries that produce the following tables: - patients - observations @@ -37,25 +33,24 @@ This repository contains ETL [Flink](hhttps://ci.apache.org/projects/flink/flink -## Tech - -- [Flink](hhttps://ci.apache.org/projects/flink/flink-docs-master/) - For ETL -- [Kafka connect](https://docs.confluent.io/platform/current/connect/index.html) - For CDC -- [Kafka](https://kafka.apache.org/) - For streaming data +## Technologies +We utilize the following technologies to power our ETL pipelines: +- [Apache Flink](hhttps://ci.apache.org/projects/flink/flink-docs-master/) - For orchestrating the ETL jobs. +- [Kafka Connect](https://docs.confluent.io/platform/current/connect/index.html) - for Change Data Capture (CDC). +- [Apache Kafka](https://kafka.apache.org/) - For managing data streams. ### Development -#### DSL +#### Domain Specific Languages (DSLs) -The project provides for defining ETL jobs for reporting. The underlying DSLs usable for the jobs are categorised as: -- [Flattening DSLs](https://github.com/ozone-his/ozonepro-distro/analytics_config/dsl/flattening/README.md) - For flattening data from OpenMRS. Note that these are related to the liquibase migration scripts that are used to create destination tables found [here](https://github.com/ozone-his/ozonepro-distro/analytics_config/liquibase/analytics/). +. The project generates Flink jobs based on SQL DSLs. The DSLs are categorized into two: +- [Flattening DSLs](https://github.com/ozone-his/ozonepro-distro/analytics_config/dsl/flattening/README.md) - For flattening data from OpenMRS. Note that these are related to the Liquibase migration scripts that are used to create destination tables found [here](https://github.com/ozone-his/ozonepro-distro/analytics_config/liquibase/analytics/). - [Parquet Export DSLs](https://github.com/ozone-his/ozonepro-distro/analytics_config/dsl/export/README.md) - For exporting data to parquet files - -#### Step1: startup backing services +#### Step1: Start Required Services The project assumes you already have an Ozone HIS instance running. If not please follow the instructions [here](https://github.com/ozone-his/ozone-docker) or [here](https://github.com/ozone-his/ozonepro-docker) to get one up and running. -The project also assumes you have the required migration scripts and destination table creation sripts with their queries scripts located somewhere you know. They can be downloaded as part of the project [here](https://github.com/ozone-his/ozonepro-distro) in the `analytics_config` directory, for example, the following `env` variable would be exported as below; +The project also assumes you have the required migration scripts and destination table creation scripts with their query scripts located somewhere you know. They can be downloaded as part of the project [here](https://github.com/ozone-his/ozonepro-distro) in the `analytics_config` directory, for example, the following `env` variable would be exported as below; ```bash export ANALYTICS_SOURCE_TABLES_PATH=~/ozonepro-distro/analytics_config/dsl/flattening/tables/; @@ -63,7 +58,6 @@ export ANALYTICS_QUERIES_PATH=~/ozonepro-distro/analytics_config/dsl/flattening/ export ANALYTICS_DESTINATION_TABLES_MIGRATIONS_PATH=~/ozonepro-distro/analytics_config/liquibase/analytics/; export EXPORT_DESTINATION_TABLES_PATH=~/ozonepro-distro/analytics_config/dsl/export/tables/; export EXPORT_SOURCE_QUERIES_PATH=~/ozonepro-distro/analytics_config/dsl/export/queries; - ``` ```cd development``` @@ -76,7 +70,7 @@ export ANALYTICS_DESTINATION_TABLES_MIGRATIONS_PATH= path_to_folder_containing_l export ANALYTICS_DB_HOST=gateway.docker.internal; \ export ANALYTICS_DB_PORT=5432; \ export CONNECT_MYSQL_HOSTNAME=gateway.docker.internal; \ -export CONNECT_MYSQL_PORT=3306; \ +export CONNECT_MYSQL_PORT=3307; \ export CONNECT_MYSQL_USER=root; \ export CONNECT_MYSQL_PASSWORD=3cY8Kve4lGey; \ export CONNECT_ODOO_DB_HOSTNAME=gateway.docker.internal; \ @@ -94,7 +88,10 @@ export CONNECT_ODOO_DB_PASSWORD=password ```mvn clean install compile``` #### Step 3: -##### Run Streaming job +***Note***: The `ANALYTICS_CONFIG_FILE_PATH` env var provides the location of the configuration file required by all jobs. An example file is provided at `development/data/config.yaml` + + +##### Running in Streaming mode ```bash export ANALYTICS_SOURCE_TABLES_PATH=path_to_folder_containing_source_tables_to_query_from;\ @@ -111,17 +108,21 @@ export OPENMRS_DB_NAME=openmrs;\ export OPENMRS_DB_USER=root;\ export OPENMRS_DB_PASSWORD=3cY8Kve4lGey;\ export OPENMRS_DB_HOST=localhost;\ -export OPENMRS_DB_PORT=3306;\ +export OPENMRS_DB_PORT=3307;\ export ODOO_DB_NAME=odoo;\ export ODOO_DB_USER=postgres;\ export ODOO_DB_PASSWORD=password;\ export ODOO_DB_HOST=localhost;\ -export ODOO_DB_PORT=5432; +export ODOO_DB_PORT=5432;\ +export ZOOKEEPER_URL=localhost:2181;\ +export ANALYTICS_CONFIG_FILE_PATH=$(pwd)/development/data/config.yaml;\ +export ANALYTICS_KAFKA_URL=localhost:29092 ``` ```mvn compile exec:java -Dexec.mainClass="com.ozonehis.data.pipelines.streaming.StreamingETLJob" -Dexec.classpathScope="compile"``` -##### Run Batch job +##### Runin Batch mode + ```bash export ANALYTICS_DB_USER=analytics;\ export ANALYTICS_DB_PASSWORD=password;\ @@ -138,10 +139,11 @@ export ODOO_DB_USER=postgres;\ export ODOO_DB_PASSWORD=password;\ export ODOO_DB_HOST=localhost;\ export ODOO_DB_PORT=5432; +export ANALYTICS_CONFIG_FILE_PATH=$(pwd)/development/data/config.yaml;\ ``` ```mvn compile exec:java -Dexec.mainClass="com.ozonehis.data.pipelines.batch.BatchETLJob" -Dexec.classpathScope="compile"``` -##### Run Parquet Export job +##### Run Export job ```mkdir -p development/data/parquet/``` ```bash @@ -157,10 +159,11 @@ export ANALYTICS_DB_PORT=5432;\ export ANALYTICS_DB_NAME=analytics;\ export EXPORT_OUTPUT_PATH=$(pwd)/development/data/parquet/;\ export EXPORT_OUTPUT_TAG=h1; +export ANALYTICS_CONFIG_FILE_PATH=$(pwd)/development/data/config.yaml;\ ``` -```mvn compile exec:java -Dexec.mainClass="com.ozonehis.data.pipelines.export.BatchParquetExport" -Dexec.classpathScope="compile"``` +```mvn compile exec:java -Dexec.mainClass="com.ozonehis.data.pipelines.export.BatchExport" -Dexec.classpathScope="compile"``` ## Gotchas -When streaming data from Postgres See +When streaming data from PostgreSQL See [consuming-data-produced-by-debezium-postgres-connector](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/debezium/#consuming-data-produced-by-debezium-postgres-connector) diff --git a/development/.env b/development/.env index c04c030..196d175 100644 --- a/development/.env +++ b/development/.env @@ -19,7 +19,7 @@ CHANGELOG_FILE=db.changelog-master.xml ODOO_ANALYTICS_TABLES='databasechangelog,account_account,product_category,sale_order' #Kafka -CREATE_TOPICS=appointment_service:1:1,appointment_service_type:1:1,care_setting:1:1,concept:1:1,concept_name:1:1,concept_reference_map:1:1,concept_reference_source:1:1,concept_reference_term:1:1,conditions:1:1,encounter:1:1,encounter_diagnosis:1:1,encounter_type:1:1,location:1:1,form:1:1,obs:1:1,order_type:1:1,orders:1:1,patient:1:1,patient_appointment:1:1,patient_appointment_provider:1:1,patient_identifier:1:1,patient_identifier_type:1:1,patient_program:1:1,program:1:1,person:1:1,person_name:1:1,person_address:1:1,visit_type:1:1,visit:1:1 +CREATE_TOPICS=openmrs.openmrs.appointment_service,openmrs.openmrs.appointment_service_type,openmrs.openmrs.care_setting,openmrs.openmrs.concept,openmrs.openmrs.concept_name,openmrs.openmrs.concept_reference_map,openmrs.openmrs.concept_reference_source,openmrs.openmrs.concept_reference_term,openmrs.openmrs.conditions,openmrs.openmrs.encounter,openmrs.openmrs.encounter_diagnosis,openmrs.openmrs.encounter_type,openmrs.openmrs.location,openmrs.openmrs.form,openmrs.openmrs.obs,openmrs.openmrs.order_type,openmrs.openmrs.orders,openmrs.openmrs.patient,openmrs.openmrs.patient_appointment,openmrs.openmrs.patient_appointment_provider,openmrs.openmrs.patient_identifier,openmrs.openmrs.patient_identifier_type,openmrs.openmrs.patient_program,openmrs.openmrs.program,openmrs.openmrs.person,openmrs.openmrs.person_name,openmrs.openmrs.person_address,openmrs.openmrs.visit_type,openmrs.openmrs.visit,odoo.public.sale_order # Postgresql POSTGRES_USER=postgres diff --git a/development/data/.gitignore b/development/data/.gitignore index 484d707..65f5bea 100644 --- a/development/data/.gitignore +++ b/development/data/.gitignore @@ -1 +1,2 @@ -parquet \ No newline at end of file +parquet +*.ser \ No newline at end of file diff --git a/development/data/config.yaml b/development/data/config.yaml new file mode 100644 index 0000000..868ba40 --- /dev/null +++ b/development/data/config.yaml @@ -0,0 +1,57 @@ +# Configuration Jdbc catalogs which Map to actual databases allowing direct read and write without temporary tables +jdbcCatalogs: + # Name of the catalog + - name: analytics + # Name of the default database in the catalog + defaultDatabase: '${ANALYTICS_DB_NAME:-analytics}' + # Database username + username: '${ANALYTICS_DB_USER:-analytics}' + # Databse password + password: '${ANALYTICS_DB_PASSWORD:-analytics}' + # Jdbc Database Url + baseUrl: 'jdbc:postgresql://${ANALYTICS_DB_HOST:-localhost}:${ANALYTICS_DB_PORT:-5432}' + driver: postgresql +# Configuration for Kafka data streams used for streaming analytics +kafkaStreams: + # Topic prefix generated by Kafka Connect + - topicPrefix: openmrs.openmrs + # Path to the table definitions for temporary source tables + tableDefinitionsPath: '${ANALYTICS_SOURCE_TABLES_PATH:-/analytics/source-tables}/openmrs' + # Kafka bootstrap servers + bootstrapServers: '${ANALYTICS_KAFKA_URL:-localhost:9092}' + - topicPrefix: odoo.public + tableDefinitionsPath: '${ANALYTICS_SOURCE_TABLES_PATH:-/analytics/source-tables}/odoo' + bootstrapServers: '${ANALYTICS_KAFKA_URL:-localhost:9092}' +# Configuration for Jdbc data sources used for batch analytics +jdbcSources: + # Database url for the source database + - databaseUrl: jdbc:mysql://${OPENMRS_DB_HOST:-localhost}:${OPENMRS_DB_PORT:-3306}/${OPENMRS_DB_NAME:-openmrs}?sslmode=disable + # Username for the source database + username: '${OPENMRS_DB_USER:-openmrs}' + # Password for the source database + password: '${OPENMRS_DB_PASSWORD:-openmrs}' + tableDefinitionsPath: '${ANALYTICS_SOURCE_TABLES_PATH:-/analytics/source-tables}/openmrs' + - databaseUrl: jdbc:postgresql://${ODOO_DB_HOST:-localhost}:${ODOO_DB_PORT:-5432}/${ODOO_DB_NAME:-odoo}?sslmode=disable + username: '${ODOO_DB_USER:-odoo}' + password: '${ODOO_DB_PASSWORD:-odoo}' + tableDefinitionsPath: '${ANALYTICS_SOURCE_TABLES_PATH:-/analytics/source-tables}/odoo' +# Configuration for Jdbc data sinks +jdbcSinks: + # Name of the jdbc catalog to write into. The catalog must be defined in the jdbcCatalogs section + - jdbcCatalog: analytics + # Name of the databse. The database must be defined in the jdbcCatalog above + databaseName: analytics + # The path to the queries to use for flattening data + queryPath: '${ANALYTICS_QUERIES_PATH:-/analytics/queries}' +# Configuration for File based data sinks +fileSinks: + # Path to definations for temporary destination tables + - destinationTableDefinitionsPath: '${EXPORT_DESTINATION_TABLES_PATH:-/export/destination-tables}' + # The path for the queries to use for export + queryPath: '${EXPORT_SOURCE_QUERIES_PATH:-/export/queries}' + # The path to use for the exported output + exportOutputPath: '${EXPORT_OUTPUT_PATH:-/tmp/parquet}' + # The tag to use for the exported output. This is used to create a subdirectory in the exportOutputPath + exportOutPutTag: '${EXPORT_OUTPUT_TAG:-location1}' + # The format for export files. Currently only parquet and csv are supported + format: parquet \ No newline at end of file diff --git a/development/docker-compose.yaml b/development/docker-compose.yaml index 2ba0590..09d682a 100644 --- a/development/docker-compose.yaml +++ b/development/docker-compose.yaml @@ -1,5 +1,22 @@ version: '3.8' services: + zookeeper: + restart: on-failure + image: debezium/zookeeper:${DEBEZIUM_VERSION} + ports: + - 2181:2181 + - 2888:2888 + - 3888:3888 + volumes: + - zookeeper-data:/zookeeper/data + - zookeeper-txns:/zookeeper/txns + zoonavigator: + image: elkozmon/zoonavigator + ports: + - "8000:8000" + environment: + HTTP_PORT: 8000 + AUTO_CONNECT_CONNECTION_STRING: zookeeper:2181 kafka: restart: on-failure image: debezium/kafka:${DEBEZIUM_VERSION} @@ -24,6 +41,24 @@ services: "-c", "./bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --list" ] + kafka-setup: + restart: on-failure + image: debezium/kafka:${DEBEZIUM_VERSION} + command: + - /bin/bash + - -c + - | + IFS=',' read -ra topics <<< "$$TOPICS" + for topic in $${topics[@]} + do + echo "Creating topic $$topic..." + ./bin/kafka-topics.sh --create --topic $$topic --partitions 1 --replication-factor 1 --if-not-exists --bootstrap-server kafka:9092 + ./bin/kafka-configs.sh --bootstrap-server=kafka:9092 --entity-type topics --entity-name $$topic --alter --add-config retention.ms=31556736000 + done + environment: + - TOPICS=${CREATE_TOPICS} + depends_on: + - kafka connect: restart: on-failure image: debezium/connect:${DEBEZIUM_VERSION} @@ -111,3 +146,5 @@ services: volumes: kafka-data: ~ + zookeeper-data: ~ + zookeeper-txns: ~ diff --git a/development/setup-connect/setup-connect.sh b/development/setup-connect/setup-connect.sh index 400b40c..bf75bd1 100644 --- a/development/setup-connect/setup-connect.sh +++ b/development/setup-connect/setup-connect.sh @@ -19,7 +19,8 @@ curl --fail -i -X PUT -H "Accept:application/json" -H "Content-Type:application/ "table.exclude.list": "${file:/kafka/config/connect-distributed.properties:table.exclude.list}", "database.history.kafka.bootstrap.servers": "${file:/kafka/config/connect-distributed.properties:mysql.kafka.bootstrap.servers}", "database.history.kafka.topic": "${file:/kafka/config/connect-distributed.properties:mysql.histroy.topic}", - "converters": "timestampConverter", + "converters": "timestampConverter,boolean", + "boolean.type": "io.debezium.connector.mysql.converters.TinyIntOneToBooleanConverter", "timestampConverter.type": "oryanmoshe.kafka.connect.util.TimestampConverter", "timestampConverter.format.time": "HH:mm:ss", "timestampConverter.format.date": "YYYY-MM-dd", diff --git a/pom.xml b/pom.xml index 33deb07..16bb72d 100644 --- a/pom.xml +++ b/pom.xml @@ -1,7 +1,6 @@ - 4.0.0 @@ -17,7 +16,7 @@ - Mekom Solutions + Mekom Solutions https://www.mekomsolutions.com @@ -33,7 +32,7 @@ ${target.java.version} ${target.java.version} 2.15.0-rc1 - provided + compile @@ -104,6 +103,12 @@ flink-parquet ${flink.version} + + org.apache.flink + flink-csv + ${flink.version} + + org.apache.flink @@ -125,6 +130,11 @@ jackson-databind ${jackson.version} + + com.fasterxml.jackson.dataformat + jackson-dataformat-yaml + ${jackson.version} + org.apache.parquet parquet-hadoop-bundle @@ -142,23 +152,12 @@ 2.10.2 ${scope} - - - - org.apache.logging.log4j - log4j-slf4j-impl - ${log4j.version} - - - org.apache.logging.log4j - log4j-api - ${log4j.version} - - org.apache.logging.log4j - log4j-core - ${log4j.version} + org.slf4j + slf4j-simple + 1.7.5 + commons-cli commons-cli @@ -178,7 +177,7 @@ true - provided + compile @@ -197,10 +196,10 @@ - org.apache.flink:force-shading + @@ -220,7 +219,7 @@ implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> com.ozonehis.data.pipelines.streaming.StreamingETLJob - + @@ -280,7 +279,7 @@ - com.ozonehis.data.pipelines.export.BatchParquetExport + com.ozonehis.data.pipelines.export.BatchExport ${project.artifactId}-${project.version}-etl-export diff --git a/src/main/java/com/ozonehis/data/pipelines/batch/BatchETLJob.java b/src/main/java/com/ozonehis/data/pipelines/batch/BatchETLJob.java index 70fb4f8..8130615 100644 --- a/src/main/java/com/ozonehis/data/pipelines/batch/BatchETLJob.java +++ b/src/main/java/com/ozonehis/data/pipelines/batch/BatchETLJob.java @@ -18,24 +18,25 @@ package com.ozonehis.data.pipelines.batch; -import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.connector.jdbc.catalog.JdbcCatalog; +import org.apache.flink.runtime.minicluster.MiniCluster; +import org.apache.flink.streaming.api.environment.RemoteStreamEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; -import org.apache.flink.table.api.StatementSet; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import com.ozonehis.data.pipelines.config.JdbcCatalogConfig; +import com.ozonehis.data.pipelines.config.JdbcSinkConfig; +import com.ozonehis.data.pipelines.config.JdbcSourceConfig; import com.ozonehis.data.pipelines.utils.CommonUtils; import com.ozonehis.data.pipelines.utils.QueryFile; import com.ozonehis.data.pipelines.utils.ConnectorUtils; import com.ozonehis.data.pipelines.utils.Environment; -import java.io.File; -import java.net.URL; -import java.net.URLClassLoader; -import java.util.Arrays; +import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -51,61 +52,68 @@ * method, change the respective entry in the POM.xml file (simply search for 'mainClass'). */ public class BatchETLJob { - // private static final Logger LOG = new Log4jLoggerFactory().getLogger(StreamingETLJob.class.getName()); + + // private static final Logger LOG = new + // Log4jLoggerFactory().getLogger(BatchETLJob.class.getName()); + private static String configFilePath = Environment.getEnv("ANALYTICS_CONFIG_FILE_PATH", "/etc/analytics/config.yaml"); + + private static StreamTableEnvironment tableEnv = null; + + private static MiniCluster cluster = null; public static void main(String[] args) throws Exception { - StreamExecutionEnvironment env = Environment.getExecutionEnvironment(); + cluster = Environment.initMiniClusterWithEnv(false); + cluster.start(); + StreamExecutionEnvironment env = new RemoteStreamEnvironment(cluster.getRestAddress().get().getHost(), + cluster.getRestAddress().get().getPort(), cluster.getConfiguration()); EnvironmentSettings envSettings = EnvironmentSettings.newInstance().inBatchMode().build(); - String name = "analytics"; - String defaultDatabase = Environment.getEnv("ANALYTICS_DB_NAME", "analytics"); - String username = Environment.getEnv("ANALYTICS_DB_USER", "analytics"); - String password = Environment.getEnv("ANALYTICS_DB_PASSWORD", "analytics"); - String baseUrl = String.format("jdbc:postgresql://%s:%s", Environment.getEnv("ANALYTICS_DB_HOST", "localhost"), - Environment.getEnv("ANALYTICS_DB_PORT", "5432")); - - String openmrsDBName = Environment.getEnv("OPENMRS_DB_NAME", "openmrs"); - String openmrsDBuser = Environment.getEnv("OPENMRS_DB_USER", "openmrs"); - String openmrsDBpassword = Environment.getEnv("OPENMRS_DB_PASSWORD", "openmrs"); - String openmrsDBhost = Environment.getEnv("OPENMRS_DB_HOST", "localhost"); - String openmrsDBport = Environment.getEnv("OPENMRS_DB_PORT", "3306"); - String openmrsDBurl = String.format("jdbc:mysql://%s:%s/%s?sslmode=disable", openmrsDBhost, openmrsDBport, - openmrsDBName); - - String odooDBName = Environment.getEnv("ODOO_DB_NAME", "odoo"); - String odooDBuser = Environment.getEnv("ODOO_DB_USER", "odoo"); - String odooDBpassword = Environment.getEnv("ODOO_DB_PASSWORD", "odoo"); - String odooDBhost = Environment.getEnv("ODOO_DB_HOST", "localhost"); - String odooDBport = Environment.getEnv("ODOO_DB_PORT", "5432"); - String odooDBurl = String.format("jdbc:postgresql://%s:%s/%s?sslmode=disable", odooDBhost, odooDBport, odooDBName); - - StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, envSettings); - JdbcCatalog catalog = new JdbcCatalog(ClassLoader.getSystemClassLoader(),name, defaultDatabase, username, password, baseUrl); - tableEnv.registerCatalog("analytics", catalog); - Stream tables = CommonUtils.getSQL(Environment.getEnv("ANALYTICS_SOURCE_TABLES_PATH", "/analytics/source_tables")).stream(); - tables.forEach(s -> { - Map connectorOptions = null; - if (s.parent.equals("openmrs")) { - connectorOptions = Stream - .of(new String[][] { { "connector", "jdbc" }, { "url", openmrsDBurl }, { "username", openmrsDBuser }, - { "password", openmrsDBpassword }, { "table-name", s.fileName }, }) - .collect(Collectors.toMap(data -> data[0], data -> data[1])); - } else if (s.parent.equals("odoo")) { - connectorOptions = Stream - .of(new String[][] { { "connector", "jdbc" }, { "url", odooDBurl }, { "username", odooDBuser }, - { "password", odooDBpassword }, { "table-name", s.fileName }, }) + tableEnv = StreamTableEnvironment.create(env, envSettings); + registerCatalogs(); + registerJdbcTables(); + executeFlattening(); + Environment.exitOnComplete(cluster); + } + + private static void registerCatalogs() { + for (JdbcCatalogConfig catalogConfig : CommonUtils.getConfig(configFilePath).getJdbcCatalogs()) { + JdbcCatalog catalog = new JdbcCatalog(BatchETLJob.class.getClassLoader(), catalogConfig.getName(), + catalogConfig.getDefaultDatabase(), catalogConfig.getUsername(), catalogConfig.getPassword(), + catalogConfig.getBaseUrl()); + tableEnv.registerCatalog(catalogConfig.getName(), catalog); + } + } + + private static void registerJdbcTables() { + for (JdbcSourceConfig jdbcSourceConfig : CommonUtils.getConfig(configFilePath).getJdbcSources()) { + Stream tables = CommonUtils.getSQL(jdbcSourceConfig.getTableDefinitionsPath()).stream(); + + tables.forEach(s -> { + Map connectorOptions = Stream + .of(new String[][] { { "connector", "jdbc" }, { "url", jdbcSourceConfig.getDatabaseUrl() }, + { "username", jdbcSourceConfig.getUsername() }, + { "password", jdbcSourceConfig.getPassword() }, { "table-name", s.fileName }, }) .collect(Collectors.toMap(data -> data[0], data -> data[1])); - } - String queryDSL = s.content + "\n" + " WITH (\n" - + ConnectorUtils.propertyJoiner(",", "=").apply(connectorOptions) + ")"; - tableEnv.executeSql(queryDSL); - }); - List queries = CommonUtils.getSQL(Environment.getEnv("ANALYTICS_QUERIES_PATH", "/analytics/queries")); - StatementSet stmtSet = tableEnv.createStatementSet(); - for (QueryFile query : queries) { - String queryDSL = "INSERT INTO `analytics`.`analytics`.`" + query.fileName + "`\n" + query.content; - stmtSet.addInsertSql(queryDSL); + + String queryDSL = s.content + "\n" + " WITH (\n" + + ConnectorUtils.propertyJoiner(",", "=").apply(connectorOptions) + ")"; + tableEnv.executeSql(queryDSL); + }); } - stmtSet.execute(); } + private static void executeFlattening() + throws IOException, ClassNotFoundException, InterruptedException, ExecutionException { + String[] jobNames = cluster.listJobs().get().stream().map(job -> job.getJobName()).toArray(String[]::new); + for (JdbcSinkConfig jdbcSinkConfig : CommonUtils.getConfig(configFilePath).getJdbcSinks()) { + List queries = CommonUtils.getSQL(jdbcSinkConfig.getQueryPath()); + for (QueryFile query : queries) { + String queryDSL = "INSERT INTO `" + jdbcSinkConfig.getJdbcCatalog() + "`.`" + + jdbcSinkConfig.getDatabaseName() + "`.`" + query.fileName + "`\n" + query.content; + if (Stream.of(jobNames).noneMatch(jobName -> jobName.equals("insert-into_" + jdbcSinkConfig.getJdbcCatalog() + + "." + jdbcSinkConfig.getDatabaseName() + "." + query.fileName))) { + tableEnv.executeSql(queryDSL); + } + } + } + } } diff --git a/src/main/java/com/ozonehis/data/pipelines/config/AppConfiguration.java b/src/main/java/com/ozonehis/data/pipelines/config/AppConfiguration.java new file mode 100644 index 0000000..7c744f8 --- /dev/null +++ b/src/main/java/com/ozonehis/data/pipelines/config/AppConfiguration.java @@ -0,0 +1,56 @@ +package com.ozonehis.data.pipelines.config; + +import java.util.List; + +public class AppConfiguration { + + private List jdbcCatalogs; + + private List kafkaStreams; + + private List jdbcSinks; + + private List fileSinks; + + public List getFileSinks() { + return fileSinks; + } + + public void setFileSinks(List fileSinks) { + this.fileSinks = fileSinks; + } + + private List jdbcSources; + + public List getJdbcSources() { + return jdbcSources; + } + + public void setJdbcSources(List jdbcSources) { + this.jdbcSources = jdbcSources; + } + + public List getJdbcSinks() { + return jdbcSinks; + } + + public void setJdbcSinks(List jdbcSinks) { + this.jdbcSinks = jdbcSinks; + } + + public List getJdbcCatalogs() { + return jdbcCatalogs; + } + + public void setJdbcCatalogs(List jdbcCatalogs) { + this.jdbcCatalogs = jdbcCatalogs; + } + + public List getKafkaStreams() { + return kafkaStreams; + } + + public void setKafkaStreams(List kafkaStreams) { + this.kafkaStreams = kafkaStreams; + } +} diff --git a/src/main/java/com/ozonehis/data/pipelines/config/ConfigurationLoader.java b/src/main/java/com/ozonehis/data/pipelines/config/ConfigurationLoader.java new file mode 100644 index 0000000..09f8d0a --- /dev/null +++ b/src/main/java/com/ozonehis/data/pipelines/config/ConfigurationLoader.java @@ -0,0 +1,35 @@ +package com.ozonehis.data.pipelines.config; + +import java.io.File; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.file.Files; + +import org.apache.commons.text.StringSubstitutor; +import org.apache.commons.text.lookup.StringLookupFactory; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; + +public class ConfigurationLoader { + + private final ObjectMapper objectMapper; + + private final StringSubstitutor stringSubstitutor; + + public ConfigurationLoader() { + this.objectMapper = new ObjectMapper(new YAMLFactory()); + this.stringSubstitutor = new StringSubstitutor(StringLookupFactory.INSTANCE.environmentVariableStringLookup()); + } + + public T loadConfiguration(File config, Class cls) { + try { + String contents = this.stringSubstitutor.replace(new String(Files.readAllBytes(config.toPath()))); + + return this.objectMapper.readValue(contents, cls); + } + catch (IOException e) { + throw new UncheckedIOException(e); + } + } +} diff --git a/src/main/java/com/ozonehis/data/pipelines/config/FileSinkConfig.java b/src/main/java/com/ozonehis/data/pipelines/config/FileSinkConfig.java new file mode 100644 index 0000000..60a0f2e --- /dev/null +++ b/src/main/java/com/ozonehis/data/pipelines/config/FileSinkConfig.java @@ -0,0 +1,54 @@ +package com.ozonehis.data.pipelines.config; + +public class FileSinkConfig { + + private String destinationTableDefinitionsPath; + + private String queryPath; + + private String exportOutputPath; + + private String format = "parquet"; + + private String exportOutPutTag; + + public String getExportOutPutTag() { + return exportOutPutTag; + } + + public void setExportOutPutTag(String exportOutPutTag) { + this.exportOutPutTag = exportOutPutTag; + } + + public String getFormat() { + return format; + } + + public void setFormat(String format) { + this.format = format; + } + + public String getExportOutputPath() { + return exportOutputPath; + } + + public void setExportOutputPath(String exportOutputPath) { + this.exportOutputPath = exportOutputPath; + } + + public String getQueryPath() { + return queryPath; + } + + public void setQueryPath(String queryPath) { + this.queryPath = queryPath; + } + + public String getDestinationTableDefinitionsPath() { + return destinationTableDefinitionsPath; + } + + public void setDestinationTableDefinitionsPath(String destinationTableDefinitionsPath) { + this.destinationTableDefinitionsPath = destinationTableDefinitionsPath; + } +} diff --git a/src/main/java/com/ozonehis/data/pipelines/config/JdbcCatalogConfig.java b/src/main/java/com/ozonehis/data/pipelines/config/JdbcCatalogConfig.java new file mode 100644 index 0000000..164dcdd --- /dev/null +++ b/src/main/java/com/ozonehis/data/pipelines/config/JdbcCatalogConfig.java @@ -0,0 +1,64 @@ +package com.ozonehis.data.pipelines.config; + +public class JdbcCatalogConfig { + + private String name; + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + private String driver; + + public String getDriver() { + return driver; + } + + public void setDriver(String driver) { + this.driver = driver; + } + + private String defaultDatabase; + + public String getDefaultDatabase() { + return defaultDatabase; + } + + public void setDefaultDatabase(String defaultDatabase) { + this.defaultDatabase = defaultDatabase; + } + + private String baseUrl; + + public String getBaseUrl() { + return baseUrl; + } + + public void setBaseUrl(String baseUrl) { + this.baseUrl = baseUrl; + } + + private String username; + + public String getUsername() { + return username; + } + + public void setUsername(String username) { + this.username = username; + } + + private String password; + + public String getPassword() { + return password; + } + + public void setPassword(String password) { + this.password = password; + } +} diff --git a/src/main/java/com/ozonehis/data/pipelines/config/JdbcSinkConfig.java b/src/main/java/com/ozonehis/data/pipelines/config/JdbcSinkConfig.java new file mode 100644 index 0000000..8efd776 --- /dev/null +++ b/src/main/java/com/ozonehis/data/pipelines/config/JdbcSinkConfig.java @@ -0,0 +1,34 @@ +package com.ozonehis.data.pipelines.config; + +public class JdbcSinkConfig { + + private String jdbcCatalog; + + public String getJdbcCatalog() { + return jdbcCatalog; + } + + public void setJdbcCatalog(String jdbcCatalog) { + this.jdbcCatalog = jdbcCatalog; + } + + private String databaseName; + + public String getDatabaseName() { + return databaseName; + } + + public void setDatabaseName(String databaseName) { + this.databaseName = databaseName; + } + + private String queryPath; + + public String getQueryPath() { + return queryPath; + } + + public void setQueryPath(String queryPath) { + this.queryPath = queryPath; + } +} diff --git a/src/main/java/com/ozonehis/data/pipelines/config/JdbcSourceConfig.java b/src/main/java/com/ozonehis/data/pipelines/config/JdbcSourceConfig.java new file mode 100644 index 0000000..df9f241 --- /dev/null +++ b/src/main/java/com/ozonehis/data/pipelines/config/JdbcSourceConfig.java @@ -0,0 +1,44 @@ +package com.ozonehis.data.pipelines.config; + +public class JdbcSourceConfig { + + private String databaseUrl; + + private String username; + + private String password; + + private String tableDefinitionsPath; + + public String getTableDefinitionsPath() { + return tableDefinitionsPath; + } + + public void setTableDefinitionsPath(String tableDefinitionsPath) { + this.tableDefinitionsPath = tableDefinitionsPath; + } + + public String getPassword() { + return password; + } + + public void setPassword(String password) { + this.password = password; + } + + public String getUsername() { + return username; + } + + public void setUsername(String username) { + this.username = username; + } + + public String getDatabaseUrl() { + return databaseUrl; + } + + public void setDatabaseUrl(String databaseUrl) { + this.databaseUrl = databaseUrl; + } +} diff --git a/src/main/java/com/ozonehis/data/pipelines/config/KafkaStreamConfig.java b/src/main/java/com/ozonehis/data/pipelines/config/KafkaStreamConfig.java new file mode 100644 index 0000000..99dd5ca --- /dev/null +++ b/src/main/java/com/ozonehis/data/pipelines/config/KafkaStreamConfig.java @@ -0,0 +1,34 @@ +package com.ozonehis.data.pipelines.config; + +public class KafkaStreamConfig { + + private String topicPrefix; + + public String getTopicPrefix() { + return topicPrefix; + } + + public void setTopicPrefix(String topicPrefix) { + this.topicPrefix = topicPrefix; + } + + private String bootstrapServers; + + public String getBootstrapServers() { + return bootstrapServers; + } + + public void setBootstrapServers(String bootstrapServers) { + this.bootstrapServers = bootstrapServers; + } + + private String tableDefinitionsPath; + + public String getTableDefinitionsPath() { + return tableDefinitionsPath; + } + + public void setTableDefinitionsPath(String tableDefinitionsPath) { + this.tableDefinitionsPath = tableDefinitionsPath; + } +} diff --git a/src/main/java/com/ozonehis/data/pipelines/export/BatchExport.java b/src/main/java/com/ozonehis/data/pipelines/export/BatchExport.java new file mode 100644 index 0000000..7348be8 --- /dev/null +++ b/src/main/java/com/ozonehis/data/pipelines/export/BatchExport.java @@ -0,0 +1,86 @@ +package com.ozonehis.data.pipelines.export; + +import com.ozonehis.data.pipelines.utils.ConnectorUtils; +import com.ozonehis.data.pipelines.config.FileSinkConfig; +import com.ozonehis.data.pipelines.config.JdbcCatalogConfig; +import com.ozonehis.data.pipelines.utils.CommonUtils; +import com.ozonehis.data.pipelines.utils.QueryFile; +import com.ozonehis.data.pipelines.utils.Environment; + +import java.io.IOException; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import org.apache.flink.connector.jdbc.catalog.JdbcCatalog; +import org.apache.flink.runtime.minicluster.MiniCluster; +import org.apache.flink.streaming.api.environment.RemoteStreamEnvironment; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; + +public class BatchExport { + + private static String configFilePath = Environment.getEnv("ANALYTICS_CONFIG_FILE_PATH", "/etc/analytics/config.yaml"); + + private static StreamTableEnvironment tableEnv = null; + + private static MiniCluster cluster = null; + + public static void main(String[] args) throws Exception { + cluster = Environment.initMiniClusterWithEnv(false); + cluster.start(); + StreamExecutionEnvironment env = new RemoteStreamEnvironment(cluster.getRestAddress().get().getHost(), + cluster.getRestAddress().get().getPort(), cluster.getConfiguration()); + EnvironmentSettings envSettings = EnvironmentSettings.newInstance().inBatchMode().build(); + tableEnv = StreamTableEnvironment.create(env, envSettings); + registerCatalogs(); + registerDestinationTables(); + executeExport(); + Environment.exitOnComplete(cluster); + } + + private static void registerCatalogs() { + for (JdbcCatalogConfig catalogConfig : CommonUtils.getConfig(configFilePath).getJdbcCatalogs()) { + JdbcCatalog catalog = new JdbcCatalog(BatchExport.class.getClassLoader(), catalogConfig.getName(), + catalogConfig.getDefaultDatabase(), catalogConfig.getUsername(), catalogConfig.getPassword(), + catalogConfig.getBaseUrl()); + tableEnv.registerCatalog(catalogConfig.getName(), catalog); + } + } + + private static void registerDestinationTables() { + for (FileSinkConfig fileSinkConfig : CommonUtils.getConfig(configFilePath).getFileSinks()) { + Stream tables = CommonUtils.getSQL(fileSinkConfig.getDestinationTableDefinitionsPath()).stream(); + + tables.forEach(s -> { + Map connectorOptions = Stream + .of(new String[][] { { "connector", "filesystem" }, { "format", fileSinkConfig.getFormat() }, + { "sink.rolling-policy.file-size", "10MB" }, + { "path", + fileSinkConfig.getExportOutputPath() + "/" + s.fileName + "/" + + fileSinkConfig.getExportOutPutTag() + "/" + + DateTimeFormatter.ISO_LOCAL_DATE_TIME.format(LocalDateTime.now()) }, }) + .collect(Collectors.toMap(data -> data[0], data -> data[1])); + + String queryDSL = s.content + "\n" + " WITH (\n" + + ConnectorUtils.propertyJoiner(",", "=").apply(connectorOptions) + ")"; + tableEnv.executeSql(queryDSL); + }); + } + } + + private static void executeExport() + throws IOException, ClassNotFoundException, InterruptedException, ExecutionException { + for (FileSinkConfig fileSinkConfig : CommonUtils.getConfig(configFilePath).getFileSinks()) { + List queries = CommonUtils.getSQL(fileSinkConfig.getQueryPath()); + for (QueryFile query : queries) { + tableEnv.executeSql(query.content); + } + } + } +} diff --git a/src/main/java/com/ozonehis/data/pipelines/export/BatchParquetExport.java b/src/main/java/com/ozonehis/data/pipelines/export/BatchParquetExport.java deleted file mode 100644 index 4ff62eb..0000000 --- a/src/main/java/com/ozonehis/data/pipelines/export/BatchParquetExport.java +++ /dev/null @@ -1,63 +0,0 @@ -package com.ozonehis.data.pipelines.export; - -import com.ozonehis.data.pipelines.utils.ConnectorUtils; -import com.ozonehis.data.pipelines.utils.CommonUtils; -import com.ozonehis.data.pipelines.utils.QueryFile; -import com.ozonehis.data.pipelines.utils.Environment; - -import java.io.File; -import java.net.URL; -import java.net.URLClassLoader; -import java.time.LocalDateTime; -import java.time.format.DateTimeFormatter; -import java.util.Arrays; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; -import java.util.stream.Stream; - -import org.apache.flink.connector.jdbc.catalog.JdbcCatalog; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.table.api.EnvironmentSettings; -import org.apache.flink.table.api.StatementSet; -import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; - -public class BatchParquetExport { - - public static void main(String[] args) throws Exception { - StreamExecutionEnvironment env = Environment.getExecutionEnvironment(); - EnvironmentSettings envSettings = EnvironmentSettings.newInstance().inBatchMode().build(); - String name = "analytics"; - String defaultDatabase = Environment.getEnv("ANALYTICS_DB_NAME", "analytics"); - String username = Environment.getEnv("ANALYTICS_DB_USER", "analytics"); - String password = Environment.getEnv("ANALYTICS_DB_PASSWORD", "analytics"); - String baseUrl = String.format("jdbc:postgresql://%s:%s", Environment.getEnv("ANALYTICS_DB_HOST", "localhost"), - Environment.getEnv("ANALYTICS_DB_PORT", "5432")); - StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, envSettings); - JdbcCatalog catalog = new JdbcCatalog(ClassLoader.getSystemClassLoader(),name, defaultDatabase, username, password, baseUrl); - tableEnv.registerCatalog("analytics", catalog); - Stream tables = CommonUtils.getSQL(Environment.getEnv("EXPORT_DESTINATION_TABLES_PATH", "/export/destination-tables")).stream(); - tables.forEach(s -> { - Map connectorOptions = Stream - .of(new String[][] { { "connector", "filesystem" }, { "format", "parquet" }, - { "sink.rolling-policy.file-size", "10MB" }, - { "path", - Environment.getEnv("EXPORT_OUTPUT_PATH", "/parquet") + "/" + s.fileName + "/" - + Environment.getEnv("EXPORT_OUTPUT_TAG", "location1") + "/" - + DateTimeFormatter.ISO_LOCAL_DATE_TIME.format(LocalDateTime.now()) }, }) - .collect(Collectors.toMap(data -> data[0], data -> data[1])); - String queryDSL = s.content + "\n" + " WITH (\n" - + ConnectorUtils.propertyJoiner(",", "=").apply(connectorOptions) + ")"; - tableEnv.executeSql(queryDSL); - }); - List queries = CommonUtils.getSQL(Environment.getEnv("EXPORT_SOURCE_QUERIES_PATH", "/export/queries")); - StatementSet stmtSet = tableEnv.createStatementSet(); - for (QueryFile query : queries) { - // String queryDSL = "INSERT INTO `analytics`.`analytics`.`" + query.fileName + - // "`\n" + query.content; - stmtSet.addInsertSql(query.content); - } - stmtSet.execute(); - } - -} diff --git a/src/main/java/com/ozonehis/data/pipelines/streaming/StreamingETLJob.java b/src/main/java/com/ozonehis/data/pipelines/streaming/StreamingETLJob.java index a5dffbc..1bd2a8a 100644 --- a/src/main/java/com/ozonehis/data/pipelines/streaming/StreamingETLJob.java +++ b/src/main/java/com/ozonehis/data/pipelines/streaming/StreamingETLJob.java @@ -18,17 +18,28 @@ package com.ozonehis.data.pipelines.streaming; +import java.io.IOException; import java.util.List; import java.util.stream.Collectors; import java.util.stream.Stream; import java.util.Map; +import java.util.concurrent.ExecutionException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.flink.connector.jdbc.catalog.JdbcCatalog; + +import org.apache.flink.runtime.minicluster.MiniCluster; +import org.apache.flink.streaming.api.environment.RemoteStreamEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + import org.apache.flink.table.api.EnvironmentSettings; -import org.apache.flink.table.api.StatementSet; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import com.ozonehis.data.pipelines.config.JdbcCatalogConfig; +import com.ozonehis.data.pipelines.config.JdbcSinkConfig; +import com.ozonehis.data.pipelines.config.KafkaStreamConfig; import com.ozonehis.data.pipelines.utils.CommonUtils; import com.ozonehis.data.pipelines.utils.QueryFile; import com.ozonehis.data.pipelines.utils.ConnectorUtils; @@ -46,49 +57,68 @@ * method, change the respective entry in the POM.xml file (simply search for 'mainClass'). */ public class StreamingETLJob { - // private static final Logger LOG = new - // Log4jLoggerFactory().getLogger(StreamingETLJob.class.getName()); - public static void main(String[] args) { - StreamExecutionEnvironment env = Environment.getExecutionEnvironment(); + private static String configFilePath = Environment.getEnv("ANALYTICS_CONFIG_FILE_PATH", "/etc/analytics/config.yaml"); + + private static StreamTableEnvironment tableEnv = null; + + private static MiniCluster cluster = null; + + public static void main(String[] args) throws Exception { + cluster = Environment.initMiniClusterWithEnv(true); + cluster.start(); + StreamExecutionEnvironment env = new RemoteStreamEnvironment(cluster.getRestAddress().get().getHost(), + cluster.getRestAddress().get().getPort(), cluster.getConfiguration()); + EnvironmentSettings envSettings = EnvironmentSettings.newInstance().inStreamingMode().build(); - String name = "analytics"; - String defaultDatabase = Environment.getEnv("ANALYTICS_DB_NAME", "analytics"); - String username = Environment.getEnv("ANALYTICS_DB_USER", "analytics"); - String password = Environment.getEnv("ANALYTICS_DB_PASSWORD", "analytics"); - String baseUrl = String.format("jdbc:postgresql://%s:%s", Environment.getEnv("ANALYTICS_DB_HOST", "localhost"), - Environment.getEnv("ANALYTICS_DB_PORT", "5432")); - StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, envSettings); - JdbcCatalog catalog = new JdbcCatalog(ClassLoader.getSystemClassLoader(),name, defaultDatabase, username, password, baseUrl); - tableEnv.registerCatalog("analytics", catalog); - Stream tables = CommonUtils.getSQL(Environment.getEnv("ANALYTICS_SOURCE_TABLES_PATH", "/analytics/source-tables")).stream(); - tables.forEach(s -> { - Map connectorOptions = null; - if (s.parent.equals("openmrs")) { - connectorOptions = Stream.of( - new String[][] { { "connector", "kafka" }, { "properties.bootstrap.servers", Environment.getEnv("ANALYTICS_KAFKA_URL", "localhost:29092") }, - { "properties.group.id", "flink" }, { "topic", String.format("openmrs.openmrs.%s", s.fileName) }, - { "scan.startup.mode", "earliest-offset" }, - { "value.debezium-json.ignore-parse-errors", "true" }, { "value.format", "debezium-json" }, }) - .collect(Collectors.toMap(data -> data[0], data -> data[1])); - } else if (s.parent.equals("odoo")) { - connectorOptions = Stream.of(new String[][] { { "connector", "kafka" }, - { "properties.bootstrap.servers", Environment.getEnv("ANALYTICS_KAFKA_URL", "localhost:29092") }, { "properties.group.id", "flink" }, - { "topic", String.format("odoo.public.%s", s.fileName) }, { "scan.startup.mode", "earliest-offset" }, - { "value.debezium-json.ignore-parse-errors", "true" }, { "value.format", "debezium-json" }, }) - .collect(Collectors.toMap(data -> data[0], data -> data[1])); + tableEnv = StreamTableEnvironment.create(env, envSettings); + registerCatalogs(); + registerDataStreams(); + executeFlattening(); + } + + private static void registerCatalogs() { + for (JdbcCatalogConfig catalogConfig : CommonUtils.getConfig(configFilePath).getJdbcCatalogs()) { + JdbcCatalog catalog = new JdbcCatalog(StreamingETLJob.class.getClassLoader(), catalogConfig.getName(), + catalogConfig.getDefaultDatabase(), catalogConfig.getUsername(), catalogConfig.getPassword(), + catalogConfig.getBaseUrl()); + tableEnv.registerCatalog(catalogConfig.getName(), catalog); + } + } + + private static void registerDataStreams() { + for (KafkaStreamConfig kafkaStreamConfig : CommonUtils.getConfig(configFilePath).getKafkaStreams()) { + Stream tables = CommonUtils.getSQL(kafkaStreamConfig.getTableDefinitionsPath()).stream(); + + tables.forEach(s -> { + Map connectorOptions = Stream.of(new String[][] { { "connector", "kafka" }, + { "properties.bootstrap.servers", kafkaStreamConfig.getBootstrapServers() }, + { "properties.group.id", "flink" }, + { "topic", kafkaStreamConfig.getTopicPrefix() + String.format(".%s", s.fileName) }, + { "scan.startup.mode", "earliest-offset" }, { "value.debezium-json.ignore-parse-errors", "true" }, + { "value.format", "debezium-json" }, }).collect(Collectors.toMap(data -> data[0], data -> data[1])); + + String queryDSL = s.content + "\n" + " WITH (\n" + + ConnectorUtils.propertyJoiner(",", "=").apply(connectorOptions) + ")"; + tableEnv.executeSql(queryDSL); + }); + } + } + + private static void executeFlattening() + throws IOException, ClassNotFoundException, InterruptedException, ExecutionException { + String[] jobNames = cluster.listJobs().get().stream().map(job -> job.getJobName()).toArray(String[]::new); + for (JdbcSinkConfig jdbcSinkConfig : CommonUtils.getConfig(configFilePath).getJdbcSinks()) { + List queries = CommonUtils.getSQL(jdbcSinkConfig.getQueryPath()); + for (QueryFile query : queries) { + String queryDSL = "INSERT INTO `" + jdbcSinkConfig.getJdbcCatalog() + "`.`" + + jdbcSinkConfig.getDatabaseName() + "`.`" + query.fileName + "`\n" + query.content; + if (Stream.of(jobNames).noneMatch(jobName -> jobName.equals("insert-into_" + jdbcSinkConfig.getJdbcCatalog() + + "." + jdbcSinkConfig.getDatabaseName() + "." + query.fileName))) { + tableEnv.executeSql(queryDSL); + } } - String queryDSL = s.content + "\n" + " WITH (\n" - + ConnectorUtils.propertyJoiner(",", "=").apply(connectorOptions) + ")"; - tableEnv.executeSql(queryDSL); - }); - List queries = CommonUtils.getSQL(Environment.getEnv("ANALYTICS_QUERIES_PATH", "/analytics/queries")); - StatementSet stmtSet = tableEnv.createStatementSet(); - for (QueryFile query : queries) { - String queryDSL = "INSERT INTO `analytics`.`analytics`.`" + query.fileName + "`\n" + query.content; - stmtSet.addInsertSql(queryDSL); } - stmtSet.execute(); } } diff --git a/src/main/java/com/ozonehis/data/pipelines/utils/CommonUtils.java b/src/main/java/com/ozonehis/data/pipelines/utils/CommonUtils.java index 3b355d0..89699ec 100644 --- a/src/main/java/com/ozonehis/data/pipelines/utils/CommonUtils.java +++ b/src/main/java/com/ozonehis/data/pipelines/utils/CommonUtils.java @@ -1,5 +1,6 @@ package com.ozonehis.data.pipelines.utils; +import java.io.File; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.nio.file.Files; @@ -8,6 +9,9 @@ import java.util.List; import java.util.stream.Collectors; +import com.ozonehis.data.pipelines.config.AppConfiguration; +import com.ozonehis.data.pipelines.config.ConfigurationLoader; + public class CommonUtils { public static String getBaseName(String fileName) { @@ -43,4 +47,10 @@ public static List getSQL(String directory) { } } + + public static AppConfiguration getConfig(String configPath) { + ConfigurationLoader configLoader = new ConfigurationLoader(); + AppConfiguration config = configLoader.loadConfiguration(new File(configPath), AppConfiguration.class); + return config; + } } diff --git a/src/main/java/com/ozonehis/data/pipelines/utils/Environment.java b/src/main/java/com/ozonehis/data/pipelines/utils/Environment.java index 2c98304..a30495e 100644 --- a/src/main/java/com/ozonehis/data/pipelines/utils/Environment.java +++ b/src/main/java/com/ozonehis/data/pipelines/utils/Environment.java @@ -1,10 +1,27 @@ package com.ozonehis.data.pipelines.utils; +import java.util.Collection; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; + import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.configuration.MemorySize; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.runtime.client.JobStatusMessage; +import org.apache.flink.runtime.minicluster.MiniCluster; +import org.apache.flink.runtime.minicluster.MiniClusterConfiguration; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.ozonehis.data.pipelines.streaming.StreamingETLJob; public class Environment { + private static Logger logger = LoggerFactory.getLogger(StreamingETLJob.class); + public static String getEnv(String key, String defaultValue) { String value = System.getenv(key); if (value == null) { @@ -13,28 +30,63 @@ public static String getEnv(String key, String defaultValue) { return value; } - public static StreamExecutionEnvironment getExecutionEnvironment() { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - if (System.console() != null) { - Configuration config = new Configuration(); - config.setString("restart-strategy.type", "exponential-delay"); - // set the checkpoint mode to EXACTLY_ONCE - config.setString("execution.checkpointing.mode", "EXACTLY_ONCE"); - config.setString("execution.checkpointing.interval", "10min"); - config.setString("execution.checkpointing.timeout", "10min"); - config.setString("execution.checkpointing.unaligned.enabled", "true"); - config.setString("execution.checkpointing.tolerable-failed-checkpoints", "400"); - config.setString("table.dynamic-table-options.enabled", "true"); - config.setString("state.backend.type", "hashmap"); - config.setString("state.backend.incremental", "true"); - config.setString("state.checkpoints.dir", "file:///tmp/checkpoints/"); - config.setString("state.savepoints.dir", "file:///tmp/savepoints/"); - config.setString("taskmanager.network.numberOfBuffers", "20"); - config.setString("table.exec.resource.default-parallelism", "3"); - //config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true); - env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config); - env.setParallelism(3); + public static MiniCluster initMiniClusterWithEnv() throws Exception { + return initMiniClusterWithEnv(true); + } + + public static MiniCluster initMiniClusterWithEnv(Boolean isStreaming) throws Exception { + Configuration flinkConfig = new Configuration(); + flinkConfig.set(TaskManagerOptions.NETWORK_MEMORY_MIN, MemorySize.parse("64m")); + flinkConfig.set(TaskManagerOptions.NETWORK_MEMORY_MAX, MemorySize.parse("64m")); + + flinkConfig.setString("restart-strategy.type", "exponential-delay"); + flinkConfig.setString("execution.checkpointing.mode", "EXACTLY_ONCE"); + flinkConfig.setString("execution.checkpointing.interval", "10min"); + flinkConfig.setString("execution.checkpointing.timeout", "10min"); + flinkConfig.setString("execution.checkpointing.unaligned.enabled", "true"); + flinkConfig.setString("execution.checkpointing.tolerable-failed-checkpoints", "50"); + flinkConfig.setString("table.dynamic-table-options.enabled", "true"); + flinkConfig.setString("table.exec.resource.default-parallelism", "1"); + flinkConfig.setString("state.backend.type", "rocksdb"); + flinkConfig.setString("state.backend.incremental", "true"); + flinkConfig.setString("state.checkpoints.dir", "file:///tmp/flink/checkpoints/"); + flinkConfig.setString("state.savepoints.dir", "file:///tmp/flink/savepoints/"); + flinkConfig.setInteger("state.checkpoints.num-retained", 4); + flinkConfig.setString("taskmanager.network.numberOfBuffers", "20"); + flinkConfig.setString("io.tmp.dirs", "/tmp/temp"); + if (isStreaming) { + flinkConfig.setString("high-availability", "ZOOKEEPER"); + flinkConfig.setString("high-availability.storageDir", "/tmp/flink/ha"); + flinkConfig.setString("high-availability.zookeeper.quorum", getEnv("ZOOKEEPER_URL", "zookeeper:2181")); } - return env; + MiniClusterConfiguration clusterConfig = new MiniClusterConfiguration.Builder().setNumTaskManagers(1) + .setNumSlotsPerTaskManager(20).setConfiguration(flinkConfig).build(); + MiniCluster cluster = new MiniCluster(clusterConfig); + return cluster; + } + + public static void exitOnComplete(MiniCluster cluster) { + Runnable exitOnCompleteRunnable = new Runnable() { + + public void run() { + try { + Collection jobs = cluster.listJobs().get(); + if (jobs.size() == 0) { + System.exit(0); + } + Boolean[] jobStatuses = jobs.stream().map(job -> job.getJobState().isGloballyTerminalState()) + .toArray(Boolean[]::new); + if (Stream.of(jobStatuses).allMatch(Boolean::valueOf)) { + logger.info("All jobs are completed. Exiting..."); + System.exit(0); + } + } + catch (Exception e) { + e.printStackTrace(); + } + } + }; + ScheduledExecutorService exec = Executors.newScheduledThreadPool(1); + exec.scheduleAtFixedRate(exitOnCompleteRunnable, 0, 1, TimeUnit.MINUTES); } }