- Kafka API
Format | Support | Differentiating Property |
---|---|---|
AVRO | Supports READ + WRITE | 'gimel.kafka.avro.schema.string'='' |
STRING | Supports READ + WRITE | 'gimel.kafka.message.value.type'='string' |
JSON | Supports READ + WRITE | 'gimel.kafka.message.value.type'='json' |
BINARY | Supports READ + WRITE | 'gimel.kafka.message.value.type'='binary' |
- Current implementation does not support batch read / stream from multiple topics.
- Until SPARK-23636 is resolved - we may have to use num-cores=1 while doing batch fetch from kafka . Proposed Solution --> apache/spark#20767
- Save The CheckPoint States in Supplied or a Default ZooKeeper.
- Clear the CheckPoint and start over from the beginning via function calls.
- Advanced parameters to consume data from Kafka via Parallelism (Batch only)
- Horizontal Scaling for tasks/executors
- Control Number of messages per executor (Batch only)
- Ability to Control the Number of rows/messages to fetch from kafka in each run. (Batch only)
- Express your entire logic in SQL that can combine various datasets from storages such as Hive, HDFS, ES, Kafka
- Explore Data by "select statements"
- Insert Data into Targets by "insert statements" --> On Successful Insert : Consumer's Kafka States are Saved Implicitly in CheckPoint Nodes in Zookeeper
Property | Mandatory? | Description | Example | Default |
---|---|---|---|---|
gimel.kafka.bootstrap.servers | Y | the broker list for kafka | host1:9092,host2:9092 | |
gimel.kafka.kafka.whitelist.topics | Y | the topic name in kafka | flights | |
key.serializer | Y | the kafka key serializer | org.apache.kafka.common.serialization.StringSerializer | |
value.serializer | Y | the kafka message serializer | org.apache.kafka.common.serialization.StringSerializer | org.apache.kafka.common.serialization.ByteArraySerializer |
key.deserializer | Y | the kafka key DeSerializer | org.apache.kafka.common.serialization.StringDeserializer | |
value.deserializer | Y | the kafka message DeSerializer | org.apache.kafka.common.serialization.StringDeserializer | org.apache.kafka.common.serialization.ByteArrayDeserializer |
zookeeper.connection.timeout.ms | Y | ZooKeeper Time out Millisec | 10000 | |
group.id | N | If specified, all readers of this topic will go under same consumer group Thus, only as many consumers as # of partitions can read from this dataset concurrently |
assigned random number at runtime | |
gimel.kafka.avro.schema.source | Y | INLINE - indicates avro schema is supplied via gimel.kafka.avro.schema.string CDH - picks up schema from confluent schema registry based on gimel.kafka.avro.schema.source.url |
||
gimel.kafka.avro.schema.string | N | This is a Must if gimel.kafka.avro.schema.source=INLINE |
Refer Examples below for Avro Data | |
gimel.kafka.avro.schema.source.url | N | This is a Must if gimel.kafka.avro.schema.source=CDH |
Refer Examples below for CDH Data | |
gimel.kafka.avro.schema.source.key | N | This is a Must if gimel.kafka.avro.schema.source=CDH , the key to lookup confluent schema registry |
Refer Examples below for CDH Data | |
gimel.kafka.avro.schema.source.wrapper.key | N | This is a Must if gimel.kafka.avro.schema.source=CDH CDH has 2 levels of avro serialization, this is the key for confluent schema registry lookup - for the outer schema |
Refer Examples below for CDH Data | flights |
gimel.kafka.checkpoint.zookeeper.host | Y | Zookeeper host where the kafka offsets will be checkpointed for each application | localhost:2181 | |
gimel.kafka.checkpoint.zookeeper.path | Y | The root note in ZK for checkpointing, additional child paths will be appended based on spark.app.name, spark.user, datasetName to get uniqueness of Zk Node | /gimel/kafka/checkpoints | |
gimel.kafka.throttle.batch.fetchRowsOnFirstRun | N | The number of last N messages to fetch, if consumer is pulling data first time ever. | 25000000 | 25000000 |
gimel.kafka.throttle.batch.maxRecordsPerPartition | N | Total Number of Records that will be limited per partition in the Kafka Topic | 10000000 | 10000000 |
gimel.kafka.throttle.batch.parallelsPerPartition | N | This is a very advanced option to Parallelize the number of connections per Partition. It is best left defaulted. The configuration was introduced during troubleshooting & Performance optimization. |
250 | 250 |
gimel.kafka.throttle.batch.minRowsPerParallel | N | This is to ensure we do not over subscribe to parallelism & cause very few records to be processed per executor. For instance : providing 100000 messages will be read at minimum from each executor. |
100000 | 100000 |
gimel.kafka.throttle.batch.targetCoalesceFactor | N | If we are writing to a HDFS sink where small files is an issue, we could set this parameter to 1 so that DF will be repartitioned to 1 executor and written to fewer number of files on HDFS |
1 | Default parallelism in application |
gimel.kafka.throttle.streaming.isParallel | N | Use in Streaming Mode to parallelize the steps where deserialization happens this feature is recommended if preserving ordering is not a necessity in the sink (like HDFS) Once messages are fetched from kafka, with this flag turned ON, messages can be repartition across executors to process data in parallel via below listed properties |
false | true |
gimel.kafka.throttle.streaming.maxRatePerPartition | N | Max Records to Process per partition | 1000 | 3600 |
gimel.kafka.throttle.stream.parallelism.factor | N | The number of executors / repartitions to create while deserializing. | 10 | 10 |
CREATE EXTERNAL TABLE `default.user`(
`name` string,
`age` int,
`rev` bigint
)
LOCATION 'hdfs:///tmp/user'
TBLPROPERTIES (
'gimel.storage.type' = 'KAFKA',
'gimel.kafka.bootstrap.servers'='localhost:9092',
'gimel.kafka.whitelist.topics'='user',
'gimel.kafka.checkpoint.zookeeper.host'='zk_host:2181',
'gimel.kafka.checkpoint.zookeeper.path'='/pcatalog/kafka_consumer/checkpoint',
'key.serializer'='org.apache.kafka.common.serialization.StringSerializer',
'value.serializer'='org.apache.kafka.common.serialization.ByteArraySerializer',
'zookeeper.connection.timeout.ms'='10000',
'auto.offset.reset'='earliest',
'gimel.kafka.avro.schema.string'=' {
"type" : "record",
"namespace" : "default",
"name" : "user",
"fields" : [
{ "name" : "name" , "type" : "string" },
{ "name" : "age" , "type" : "int" },
{ "name" : "rev" , "type" : "long" }
]}'
)
CREATE EXTERNAL TABLE `default.user`(
`name` string,
`age` int,
`rev` bigint
)
LOCATION 'hdfs:///tmp/user'
TBLPROPERTIES (
'gimel.storage.type' = 'KAFKA',
'gimel.kafka.bootstrap.servers'='localhost:9092',
'gimel.kafka.whitelist.topics'='kafka_topic',
'gimel.kafka.checkpoint.zookeeper.host'='zkhost:2181',
'gimel.kafka.checkpoint.zookeeper.path'='/pcatalog/kafka_consumer/checkpoint',
'gimel.kafka.avro.schema.source'='CSR',
'gimel.kafka.avro.schema.source.url'='http://schemaregistry:8081',
'gimel.kafka.avro.schema.source.wrapper.key'='flights', -- This is the Schema Lookup Key for Scheme Registry
'key.serializer'='org.apache.kafka.common.serialization.StringSerializer',
'value.serializer'='org.apache.kafka.common.serialization.ByteArraySerializer',
'zookeeper.connection.timeout.ms'='10000',
'auto.offset.reset'='earliest',
)
import org.apache.avro.generic.GenericRecord;
import com.paypal.gimel.logger.Logger;
import com.paypal.gimel.datastreamfactory.{StreamingResult, WrappedData};
import com.paypal.gimel._;
import org.apache.spark.rdd._
import org.apache.spark.streaming._
import org.apache.spark._
import org.apache.spark.sql._
// Prepare Test Data
def stringed(n: Int) = s"""{"id": ${n}, "name": "MAC-${n}", "rev": ${n * 10000}}"""
val texts: Seq[String] = (1 to 100).map { x => stringed(x) }.toSeq
val rdd: RDD[String] = sparkSession.sparkContext.parallelize(texts)
val df: DataFrame = sparkSession.read.json(rdd)
//Initiate DataSet
val dataset = com.paypal.gimel.DataSet(sparkSession)
//DataSet Name
val datasetName = "default.user"
//write some data
dataset.write(datasetName,df)
//Initiate DataSet
val dataSet: DataSet = DataSet(sparkSession)
//options "can" be used to pick smaller subset of rows
val options = "gimel.kafka.throttle.batch.fetchRowsOnFirstRun=2500:gimel.kafka.throttle.batch.parallelsPerPartition=250:gimel.kafka.throttle.batch.maxRecordsPerPartition=25000000"
//read API
val recsDF = dataSet.read("default.user",options)
// Get Kafka Operator for CheckPoint Operations
val kafkaOperator = dataSet.latestKafkaDataSetReader.get
// If required, clear checkpoint to begin reading from kafka from beginning
kafkaOperator.clearCheckPoint()
// Do some usecase
recsDF.show()
// Save CheckPoint at the end of each batch
kafkaOperator.saveCheckPoint()
//Specify Batch Interval for Streaming
val batchInterval = 15
//Context
val ssc = new StreamingContext(sc, Seconds(batchInterval.toInt))
//Initiate DStream
val dataStream = DataStream(ssc)
// Assign your Dataset Name
val datasetName = "default.user"
//Get Reference to Stream
val streamingResult: StreamingResult = dataStream.read(datasetName)
//Clear CheckPoint if necessary
streamingResult.clearCheckPoint("clear")
//Helper for Clients
streamingResult.dStream.foreachRDD { rdd =>
val k: RDD[WrappedData] = rdd
val count = rdd.count()
if (count > 0) {
/**
* Mandatory | Get Offset for Current Window, so we can checkpoint at the end of this window's operation
*/
streamingResult.getCurrentCheckPoint(rdd)
/**
* Begin | User's Usecases
*/
//Sample UseCase | Display Count
println("count is -->")
println(count)
//Sample UseCase | Get Avro Generic Record
val rddAvro: RDD[GenericRecord] = streamingResult.convertBytesToAvro(rdd)
rddAvro.map(x => x.toString)
println("sample records from Avro-->")
rddAvro.map(x => x.toString).take(10).foreach(x => println(x))
//Sample UseCase | Convert to DataFrame
val df: DataFrame = streamingResult.convertAvroToDF(sqlContext, rddAvro)
println("sample records -->")
df.show(5)
/**
* End | User's Usecases
*/
/**
* Mandatory | Save Current Window - CheckPoint
*/
streamingResult.saveCurrentCheckPoint()
}
}
//Streaming will start after these steps
dataStream.streamingContext.start()
dataStream.streamingContext.awaitTermination()
%%pcatalog-batch select * from default.flights;
%%pcatalog-stream select * from default.flights;
CREATE EXTERNAL TABLE `default.hiveTable`(
`payload` string
)
TBLPROPERTIES (
'gimel.storage.type' = 'KAFKA',
'gimel.kafka.bootstrap.servers'='zk_host:9092',
'gimel.kafka.whitelist.topics'='your_topic',
'key.serializer'='org.apache.kafka.common.serialization.StringSerializer',
'value.serializer'='org.apache.kafka.common.serialization.StringSerializer',
'zookeeper.connection.timeout.ms'='10000',
'auto.offset.reset'='earliest',
'gimel.kafka.checkpoint.zookeeper.host'='zk_host:2181',
'gimel.kafka.checkpoint.zookeeper.path'='/pcatalog/kafka_consumer/checkpoint',
'gimel.kafka.message.column.alias'='value',
'gimel.kafka.message.value.type'='string' -- can be json or binary depending on the messaging format requirement
)
// Prepare Test Data
def stringed(n: Int) = s"""{"id": ${n}, "name": "MAC-${n}", "rev": ${n * 10000}}"""
val texts: Seq[String] = (1 to 100).map { x => stringed(x) }.toSeq
val rdd: RDD[String] = sparkSession.sparkContext.parallelize(texts)
//Initiate DataSet
val dataset = com.paypal.gimel.DataSet(sparkSession)
//write some data
dataset.write("default.hiveTable",rdd)
//Initiate DataSet
val dataSet: DataSet = DataSet(sparkSession)
//options "can" be used to pick smaller subset of rows
val options = "gimel.kafka.throttle.batch.fetchRowsOnFirstRun=2500:gimel.kafka.throttle.batch.batch.parallelsPerPartition=250:gimel.kafka.throttle.batch.maxRecordsPerPartition=25000000"
//read API
val recsDF = dataSet.read("default.hiveTable",options)
// Get Kafka Operator for CheckPoint Operations
val kafkaOperator = dataSet.latestKafkaDataSetReader.get
// If required, clear checkpoint to begin reading from kafka from beginning
kafkaOperator.clearCheckPoint()
// Do some usecase
recsDF.show()
// Save CheckPoint at the end of each batch
kafkaOperator.saveCheckPoint()
//Specify Batch Interval for Streaming
val batchInterval = 15
//Context
val ssc = new StreamingContext(sc, Seconds(batchInterval.toInt))
//Initiate DStream
val dataStream = DataStream(ssc)
// Assign your Dataset Name
val datasetName = "default.hiveTable"
//Get Reference to Stream
val streamingResult: StreamingResult = dataStream.read(datasetName)
//Clear CheckPoint if necessary
streamingResult.clearCheckPoint("clear")
//Helper for Clients
streamingResult.dStream.foreachRDD { rdd =>
val k: RDD[WrappedData] = rdd
val count = rdd.count()
if (count > 0) {
/**
* Mandatory | Get Offset for Current Window, so we can checkpoint at the end of this window's operation
*/
streamingResult.getCurrentCheckPoint(rdd)
/**
* Begin | User's Usecases
*/
//Sample UseCase | Display Count
println("count is -->")
println(count)
//Sample UseCase | If Kafka topic is a Simple String Message, use below function to get DF
streamingResult.convertStringMessageToDF(sqlContext,rdd).show
/**
* End | User's Usecases
*/
/**
* Mandatory | Save Current Window - CheckPoint
*/
streamingResult.saveCurrentCheckPoint()
}
}
//Streaming will start after these steps
dataStream.streamingContext.start()
dataStream.streamingContext.awaitTermination()
%%pcatalog-batch select value from default.hiveTable;
%%pcatalog-stream select value from default.hiveTable;