Skip to content

Commit

Permalink
feat: v2 Refactoring. Following are the changes done:
Browse files Browse the repository at this point in the history
1. removed the separation logic of master-dataset and dataset
2. Merged both pipelines into one.
3. Created cache indexer to index data into redis for master datasets similar to Hudi
4. Upgraded the dataset config to the newer version
5. Move the entry_topic as a separate field. This is to enable creation of multiple pipelines in the future
  • Loading branch information
SanthoshVasabhaktula committed Jun 27, 2024
1 parent ded2fde commit 49043d7
Show file tree
Hide file tree
Showing 57 changed files with 886 additions and 302 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/build_and_deploy.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ jobs:
target: "master-data-processor-image"
- image: "lakehouse-connector"
target: "lakehouse-connector-image"
- image: "cache-indexer"
target: "cache-indexer-image"
steps:
- uses: actions/checkout@v4
with:
Expand Down
4 changes: 4 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,7 @@ FROM --platform=linux/x86_64 sanketikahub/flink:1.15.0-scala_2.12-lakehouse as l
USER flink
RUN mkdir $FLINK_HOME/custom-lib
COPY ./pipeline/hudi-connector/target/hudi-connector-1.0.0.jar $FLINK_HOME/custom-lib

FROM --platform=linux/x86_64 sanketikahub/flink:1.15.2-scala_2.12-jdk-11 as cache-indexer-image
USER flink
COPY --from=build-pipeline /app/pipeline/cache-indexer/target/cache-indexer-1.0.0.jar $FLINK_HOME/lib
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ object MasterDataProcessorIndexer {
logger.info(s"createDataFile() | START | dataset=${dataset.id} ")
import spark.implicits._
val readWriteConf = ReadWriteConfig(scanCount = config.getInt("redis.scan.count"), maxPipelineSize = config.getInt("redis.max.pipeline.size"))
val redisConfig = new RedisConfig(initialHost = RedisEndpoint(host = dataset.datasetConfig.redisDBHost.get, port = dataset.datasetConfig.redisDBPort.get, dbNum = dataset.datasetConfig.redisDB.get))
val cacheConfig = dataset.datasetConfig.cacheConfig.get
val redisConfig = new RedisConfig(initialHost = RedisEndpoint(host = cacheConfig.redisDBHost.get, port = cacheConfig.redisDBPort.get, dbNum = cacheConfig.redisDB.get))
val ts: Long = new DateTime(DateTimeZone.UTC).withTimeAtStartOfDay().getMillis
val rdd = spark.sparkContext.fromRedisKV("*")(redisConfig = redisConfig, readWriteConfig = readWriteConf).map(
f => CommonUtil.processEvent(f._2, ts)
Expand All @@ -83,9 +84,9 @@ object MasterDataProcessorIndexer {
}

private def getDatasets(): List[Dataset] = {
val datasets: List[Dataset] = DatasetRegistry.getAllDatasets("master-dataset")
val datasets: List[Dataset] = DatasetRegistry.getAllDatasets(Some("master"))
datasets.filter(dataset => {
dataset.datasetConfig.indexData.nonEmpty && dataset.datasetConfig.indexData.get && dataset.status == DatasetStatus.Live
dataset.datasetConfig.indexingConfig.olapStoreEnabled && dataset.status == DatasetStatus.Live
})
}

Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import com.fasterxml.jackson.annotation.JsonProperty
import com.fasterxml.jackson.core.`type`.TypeReference
import com.fasterxml.jackson.module.scala.JsonScalaEnumeration
import org.sunbird.obsrv.core.model.SystemConfig
import org.sunbird.obsrv.model.DatasetStatus.DatasetStatus
import org.sunbird.obsrv.model.DatasetStatus.{DatasetStatus, Value}
import org.sunbird.obsrv.model.TransformMode.TransformMode
import org.sunbird.obsrv.model.ValidationMode.ValidationMode

Expand Down Expand Up @@ -33,25 +33,42 @@ object DatasetModels {

case class RouterConfig(@JsonProperty("topic") topic: String)

case class DatasetConfig(@JsonProperty("data_key") key: String, @JsonProperty("timestamp_key") tsKey: String, @JsonProperty("entry_topic") entryTopic: String,
@JsonProperty("exclude_fields") excludeFields: Option[List[String]] = None, @JsonProperty("redis_db_host") redisDBHost: Option[String] = None,
@JsonProperty("redis_db_port") redisDBPort: Option[Int] = None, @JsonProperty("redis_db") redisDB: Option[Int] = None,
@JsonProperty("index_data") indexData: Option[Boolean] = None, @JsonProperty("timestamp_format") tsFormat: Option[String] = None,
@JsonProperty("dataset_tz") datasetTimezone: Option[String] = None)
case class IndexingConfig(@JsonProperty("olap_store_enabled") olapStoreEnabled: Boolean, @JsonProperty("lakehouse_enabled") lakehouseEnabled: Boolean,
@JsonProperty("cache_enabled") cacheEnabled: Boolean)

case class KeysConfig(@JsonProperty("data_key") dataKey: Option[String], @JsonProperty("partition_key") partitionKey: Option[String],
@JsonProperty("timestamp_key") tsKey: Option[String], @JsonProperty("timestamp_format") tsFormat: Option[String])

case class CacheConfig(@JsonProperty("redis_db_host") redisDBHost: Option[String], @JsonProperty("redis_db_port") redisDBPort: Option[Int],
@JsonProperty("redis_db") redisDB: Option[Int])

case class DatasetConfigV1(@JsonProperty("data_key") key: String, @JsonProperty("timestamp_key") tsKey: String, @JsonProperty("entry_topic") entryTopic: String,
@JsonProperty("exclude_fields") excludeFields: Option[List[String]] = None, @JsonProperty("redis_db_host") redisDBHost: Option[String] = None,
@JsonProperty("redis_db_port") redisDBPort: Option[Int] = None, @JsonProperty("redis_db") redisDB: Option[Int] = None,
@JsonProperty("index_data") indexData: Option[Boolean] = None, @JsonProperty("timestamp_format") tsFormat: Option[String] = None,
@JsonProperty("dataset_tz") datasetTimezone: Option[String] = None)

case class DatasetConfig(@JsonProperty("indexing_config") indexingConfig: IndexingConfig,
@JsonProperty("keys_config") keysConfig: KeysConfig,
@JsonProperty("exclude_fields") excludeFields: Option[List[String]] = None,
@JsonProperty("dataset_tz") datasetTimezone: Option[String] = None,
@JsonProperty("cache_config") cacheConfig: Option[CacheConfig] = None)

case class Dataset(@JsonProperty("id") id: String, @JsonProperty("type") datasetType: String, @JsonProperty("extraction_config") extractionConfig: Option[ExtractionConfig],
@JsonProperty("dedup_config") dedupConfig: Option[DedupConfig], @JsonProperty("validation_config") validationConfig: Option[ValidationConfig],
@JsonProperty("data_schema") jsonSchema: Option[String], @JsonProperty("denorm_config") denormConfig: Option[DenormConfig],
@JsonProperty("router_config") routerConfig: RouterConfig, datasetConfig: DatasetConfig, @JsonProperty("status") @JsonScalaEnumeration(classOf[DatasetStatusType]) status: DatasetStatus,
@JsonProperty("tags") tags: Option[Array[String]] = None, @JsonProperty("data_version") dataVersion: Option[Int] = None)
@JsonProperty("router_config") routerConfig: RouterConfig, datasetConfig: DatasetConfig,
@JsonProperty("status") @JsonScalaEnumeration(classOf[DatasetStatusType]) status: DatasetStatus,
@JsonProperty("entry_topic") entryTopic: String, @JsonProperty("tags") tags: Option[Array[String]] = None,
@JsonProperty("data_version") dataVersion: Option[Int] = None, @JsonProperty("api_version") apiVersion: Option[String] = None)

case class Condition(@JsonProperty("type") `type`: String, @JsonProperty("expr") expr: String)

case class TransformationFunction(@JsonProperty("type") `type`: String, @JsonProperty("condition") condition: Option[Condition], @JsonProperty("expr") expr: String)

case class DatasetTransformation(@JsonProperty("id") id: String, @JsonProperty("dataset_id") datasetId: String,
@JsonProperty("field_key") fieldKey: String, @JsonProperty("transformation_function") transformationFunction: TransformationFunction,
@JsonProperty("status") status: String, @JsonProperty("mode") @JsonScalaEnumeration(classOf[TransformModeType]) mode: Option[TransformMode] = Some(TransformMode.Strict))
@JsonProperty("mode") @JsonScalaEnumeration(classOf[TransformModeType]) mode: Option[TransformMode] = Some(TransformMode.Strict))

case class ConnectorConfig(@JsonProperty("kafkaBrokers") kafkaBrokers: String, @JsonProperty("topic") topic: String, @JsonProperty("type") databaseType: String,
@JsonProperty("connection") connection: Connection, @JsonProperty("tableName") tableName: String, @JsonProperty("databaseName") databaseName: String,
Expand Down Expand Up @@ -94,4 +111,10 @@ class DatasetStatusType extends TypeReference[DatasetStatus.type]
object DatasetStatus extends Enumeration {
type DatasetStatus = Value
val Draft, Publish, Live, Retired, Purged = Value
}

class DatasetTypeType extends TypeReference[DatasetType.type]
object DatasetType extends Enumeration {
type DatasetType = Value
val event, transaction, master = Value
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,14 @@ object DatasetRegistry {
datasets ++= DatasetRegistryService.readAllDatasets()
lazy private val datasetTransformations: Map[String, List[DatasetTransformation]] = DatasetRegistryService.readAllDatasetTransformations()

def getAllDatasets(datasetType: String): List[Dataset] = {
def getAllDatasets(datasetType: Option[String]): List[Dataset] = {
val datasetList = DatasetRegistryService.readAllDatasets()
datasetList.filter(f => f._2.datasetType.equals(datasetType)).values.toList
if(datasetType.isDefined) {
datasetList.filter(f => f._2.datasetType.equals(datasetType.get)).values.toList
} else {
datasetList.values.toList
}

}

def getDataset(id: String): Option[Dataset] = {
Expand Down Expand Up @@ -47,8 +52,8 @@ object DatasetRegistry {
datasourceList.getOrElse(List())
}

def getDataSetIds(datasetType: String): List[String] = {
datasets.filter(f => f._2.datasetType.equals(datasetType)).keySet.toList
def getDataSetIds(): List[String] = {
datasets.keySet.toList
}

def updateDatasourceRef(datasource: DataSource, datasourceRef: String): Int = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ object DatasetRegistryService {
Option(Iterator.continually((rs, rs.next)).takeWhile(f => f._2).map(f => f._1).map(result => {
parseDatasource(result)
}).toList)
} finally {
postgresConnect.closeConnection()
}
}

Expand All @@ -130,7 +132,7 @@ object DatasetRegistryService {
def updateConnectorStats(id: String, lastFetchTimestamp: Timestamp, records: Long): Int = {
val query = s"UPDATE dataset_source_config SET connector_stats = coalesce(connector_stats, '{}')::jsonb || " +
s"jsonb_build_object('records', COALESCE(connector_stats->>'records', '0')::int + '$records'::int) || " +
s"jsonb_build_object('last_fetch_timestamp', '${lastFetchTimestamp}'::timestamp) || " +
s"jsonb_build_object('last_fetch_timestamp', '$lastFetchTimestamp'::timestamp) || " +
s"jsonb_build_object('last_run_timestamp', '${new Timestamp(System.currentTimeMillis())}'::timestamp) WHERE id = '$id';"
updateRegistry(query)
}
Expand Down Expand Up @@ -163,11 +165,25 @@ object DatasetRegistryService {
val jsonSchema = rs.getString("data_schema")
val denormConfig = rs.getString("denorm_config")
val routerConfig = rs.getString("router_config")
val datasetConfig = rs.getString("dataset_config")
val datasetConfigStr = rs.getString("dataset_config")
val status = rs.getString("status")
val tagArray = rs.getArray("tags")
val tags = if (tagArray != null) tagArray.getArray.asInstanceOf[Array[String]] else null
val dataVersion = rs.getInt("data_version")
val apiVersion = rs.getString("api_version")
val entryTopic = rs.getString("entry_topic")

val datasetConfig: DatasetConfig = if ("v2".equalsIgnoreCase(apiVersion)) {
JSONUtil.deserialize[DatasetConfig](datasetConfigStr)
} else {
val v1Config = JSONUtil.deserialize[DatasetConfigV1](datasetConfigStr)
DatasetConfig(
indexingConfig = IndexingConfig(olapStoreEnabled = true, lakehouseEnabled = false, cacheEnabled = if ("master".equalsIgnoreCase(datasetType)) true else false),
keysConfig = KeysConfig(dataKey = Some(v1Config.key), None, tsKey = Some(v1Config.tsKey), None),
excludeFields = v1Config.excludeFields, datasetTimezone = v1Config.datasetTimezone,
cacheConfig = Some(CacheConfig(redisDBHost = v1Config.redisDBHost, redisDBPort = v1Config.redisDBPort, redisDB = v1Config.redisDB))
)
}

Dataset(datasetId, datasetType,
if (extractionConfig == null) None else Some(JSONUtil.deserialize[ExtractionConfig](extractionConfig)),
Expand All @@ -176,10 +192,12 @@ object DatasetRegistryService {
Option(jsonSchema),
if (denormConfig == null) None else Some(JSONUtil.deserialize[DenormConfig](denormConfig)),
JSONUtil.deserialize[RouterConfig](routerConfig),
JSONUtil.deserialize[DatasetConfig](datasetConfig),
datasetConfig,
DatasetStatus.withName(status),
entryTopic,
Option(tags),
Option(dataVersion)
Option(dataVersion),
Option(apiVersion)
)
}

Expand Down Expand Up @@ -214,10 +232,9 @@ object DatasetRegistryService {
val datasetId = rs.getString("dataset_id")
val fieldKey = rs.getString("field_key")
val transformationFunction = rs.getString("transformation_function")
val status = rs.getString("status")
val mode = rs.getString("mode")

DatasetTransformation(id, datasetId, fieldKey, JSONUtil.deserialize[TransformationFunction](transformationFunction), status, Some(if (mode != null) TransformMode.withName(mode) else TransformMode.Strict))
DatasetTransformation(id, datasetId, fieldKey, JSONUtil.deserialize[TransformationFunction](transformationFunction), Some(if (mode != null) TransformMode.withName(mode) else TransformMode.Strict))
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ abstract class BaseDatasetProcessFunction(config: BaseJobConfig[mutable.Map[Stri

override def getMetricsList(): MetricsList = {
val metrics = getMetrics() ++ List(config.eventFailedMetricsCount)
MetricsList(DatasetRegistry.getDataSetIds(config.datasetType()), metrics)
MetricsList(DatasetRegistry.getDataSetIds(), metrics)
}

private def initMetrics(datasetId: String): Unit = {
Expand Down Expand Up @@ -138,7 +138,7 @@ abstract class BaseDatasetWindowProcessFunction(config: BaseJobConfig[mutable.Ma

override def getMetricsList(): MetricsList = {
val metrics = getMetrics() ++ List(config.eventFailedMetricsCount)
MetricsList(DatasetRegistry.getDataSetIds(config.datasetType()), metrics)
MetricsList(DatasetRegistry.getDataSetIds(), metrics)
}

private def initMetrics(datasetId: String): Unit = {
Expand Down
Loading

0 comments on commit 49043d7

Please sign in to comment.