Skip to content

Commit

Permalink
#OBS-I380 - add connector information to the system events (#103)
Browse files Browse the repository at this point in the history
  • Loading branch information
SanthoshVasabhaktula authored Dec 5, 2024
1 parent 435610c commit 46256e8
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,13 @@ trait SystemEventHandler {
obsrvMeta.get(stat.toString).map(f => f.asInstanceOf[Number].longValue())
}

private def getSource(obsrvMeta: Map[String, AnyRef]): Option[Source] = {
if(obsrvMeta.get("source").isDefined) {
val source = obsrvMeta("source").asInstanceOf[Map[String, String]]
Some(Source(source.get("connector").getOrElse("api"), source.get("connectorInstance").getOrElse("api")))
} else None
}

def getError(error: ErrorConstants.Error, producer: Producer, functionalError: FunctionalError): Option[ErrorLog] = {
Some(ErrorLog(pdata_id = producer, pdata_status = StatusCode.failed, error_type = functionalError, error_code = error.errorCode, error_message = error.errorMsg, error_level = ErrorLevel.critical, error_count = Some(1)))
}
Expand All @@ -46,7 +53,7 @@ trait SystemEventHandler {
val timespans = obsrvMeta("timespans").asInstanceOf[Map[String, AnyRef]]

val systemEvent: SystemEvent = SystemEvent(
EventID.METRIC, ctx = ContextData(module = ModuleID.processing, pdata = PData(config.jobName, PDataType.flink, Some(producer)), dataset = dataset, dataset_type = dataset_type),
EventID.METRIC, ctx = ContextData(module = ModuleID.processing, pdata = PData(config.jobName, PDataType.flink, Some(producer)), dataset = dataset, dataset_type = dataset_type, eid = None, source = getSource(obsrvMeta)),
data = EData(error = error, pipeline_stats = Some(PipelineStats(extractor_events = None,
extractor_status = getStatus(flags, Producer.extractor), extractor_time = getTime(timespans, Producer.extractor),
validator_status = getStatus(flags, Producer.validator), validator_time = getTime(timespans, Producer.validator),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@ import org.sunbird.obsrv.core.exception.ObsrvException

object Models {

case class Source(connector: String, connectorInstance: String)
case class PData(id: String,@JsonScalaEnumeration(classOf[PDataTypeType]) `type`: PDataType,@JsonScalaEnumeration(classOf[ProducerType]) pid: Option[Producer])

case class ContextData(@JsonScalaEnumeration(classOf[ModuleIDType]) module: ModuleID, pdata: PData, dataset: Option[String] = None, dataset_type: Option[String] = None, eid: Option[String] = None)
case class ContextData(@JsonScalaEnumeration(classOf[ModuleIDType]) module: ModuleID, pdata: PData, dataset: Option[String] = None, dataset_type: Option[String] = None, eid: Option[String] = None, source: Option[Source] = None)

case class ErrorLog(@JsonScalaEnumeration(classOf[ProducerType]) pdata_id: Producer, @JsonScalaEnumeration(classOf[StatusCodeType]) pdata_status: StatusCode, @JsonScalaEnumeration(classOf[FunctionalErrorType]) error_type: FunctionalError, error_code: String, error_message: String,@JsonScalaEnumeration(classOf[ErrorLevelType]) error_level: ErrorLevel, error_count:Option[Int] = None)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,11 @@ class MapDeserializationSchema extends KafkaRecordDeserializationSchema[mutable.
"processingStartTime" -> System.currentTimeMillis(),
"flags" -> Map(),
"timespans" -> Map(),
"error" -> Map()
"error" -> Map(),
"source" -> Map(
"connector" -> "api",
"connectorInstance" -> "api",
)
))
}
}
Expand Down

0 comments on commit 46256e8

Please sign in to comment.