Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avro Serialisation is not working #98

Open
ibalachandar86 opened this issue Jul 16, 2022 · 1 comment
Open

Avro Serialisation is not working #98

ibalachandar86 opened this issue Jul 16, 2022 · 1 comment

Comments

@ibalachandar86
Copy link

Hi,

I am using FsSourceConnector Kafka connector to ingest CSV files into a Kafka topic.
I am using confluentinc/cp-helm-charts, with custom build docker image for Kafka connect (Added FsSourceConnector connector jar).
I have mentioned the prerequisites, Kafka Connect and Kafka Connector details below.

Problem Statement:
The below Kafka connector is working and I am able to ingest CSV in to Kafka Topic as a string. My goal is to Avro serialise the CSV data and store it in topics. I am not sure which serialisation configuration is missing in my connect/connector properties.

Prerequisites:
I have placed the CSV file in the kafka connect pod directory. Created a schema in confluent schema registry for the csv.

Below is the Kafka connect details,
cp-control-center:
enabled: false

cp-kafka:
enabled: true

cp-kafka-rest:
enabled: false

cp-ksql-server:
enabled: false

cp-schema-registry:
enabled: true

cp-zookeeper:
enabled: true

cp-kafka-connect:
replicaCount: 1

image: localhost:5000/kc
imageTag: v1
imagePullPolicy: Always

servicePort: 8083

configurationOverrides:
“key.converter”: “io.confluent.connect.avro.AvroConverter”
“key.converter.schema.registry.url”: “test-cp-schema-registry:8081”
“value.converter”: “io.confluent.connect.avro.AvroConverter”
“value.converter.schema.registry.url”: “test-cp-schema-registry:8081”
“key.converter.schemas.enable”: “false”
“value.converter.schemas.enable”: “false”
“internal.key.converter”: “org.apache.kafka.connect.json.JsonConverter”
“internal.value.converter”: “org.apache.kafka.connect.json.JsonConverter”
“use.latest.version”: “true”
“auto.register.schemas”: “false”
“auto.create.topics”: “false”
“config.storage.replication.factor”: “1”
“offset.storage.replication.factor”: “1”
“status.storage.replication.factor”: “1”
“plugin.path”: “/usr/share/java,/usr/share/confluent-hub-components,/etc/kafka-connect/jars”

heapOptions: “-Xms5g -Xmx10g”

customEnv:
KAFKA_JMX_HOSTNAME: “127.0.0.1”

kafka:
bootstrapServers: “test-cp-kafka-headless:9092”

cp-schema-registry:
url: “test-cp-schema-registry:8081”

fullnameOverride: test

Below is the Kafka connector details:
curl -X POST \ http://localhost:8083/connectors \
-H 'Content-Type:application/json' \
-d '
{ "name": "sample",
"config": {
"connector.class": "com.github.mmolimar.kafka.connect.fs.FsSourceConnector",
"tasks.max": "1",
"fs.uris": "/home/appuser/csv",
"topic": "sampledata",
"use.latest.version": "true",
"auto.register.schemas": "false",
"poll.interval.ms": "10000",
"auto.create.topics": "false",
"policy.class": "com.github.mmolimar.kafka.connect.fs.policy.SimplePolicy",
"policy.batch_size": "0",
"policy.recursive": "true",
"policy.regexp": "^*.csv$",
"policy.resume.on.error": "false",
"key.converter.schema.registry.url": "http://test-cp-schema-registry:8081",
"key.enhanced.avro.schema.support": "true",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://test-cp-schema-registry:8081", "value.enhanced.avro.schema.support": "true",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"file_reader.delimited.settings.format.quote": """,
"file_reader.delimited.settings.escape_unquoted": "false",
"file_reader.class": "com.github.mmolimar.kafka.connect.fs.file.reader.CsvFileReader", "file_reader.delimited.compression.type": "none",
"file_reader.delimited.settings.schema.avro": "{"type":"record","name":"sampledata","namespace":"default","fields":[{"name":"c1","type":"string"},{"name":"c2","type":"string"},{"name":"c3","type":"string"}]}", "file_reader.delimited.settings.delimiter_detection": "false",
"file_reader.delimited.compression.concatenated": "true",
"file_reader.delimited.settings.format.comment": "#",
"file_reader.delimited.settings.format.quote_escape": """,
"file_reader.delimited.settings.format.delimiter": ",",
"file_reader.encryption.passphrase": "",
"file_reader.delimited.settings.max_chars_per_column": "4096", "file_reader.delimited.settings.line_separator_detection": "false", "file_reader.delimited.settings.format.line_separator": "\n",
"file_reader.delimited.settings.max_columns": "512",
"file_reader.encryption.type": "NONE",
"file_reader.delimited.settings.header": "true",
"file_reader.delimited.settings.ignore_leading_whitespaces": "true",
"file_reader.delimited.settings.rows_to_skip": "0",
"file_reader.batch_size": "0",
"file_reader.encryption.secret": ""
} }'

CSV file:
c1,c2,c3
abc,def,ghi
jkl,mno,pqr
stu,wvy,xyz
x1,x2,x3

Schema in Schema Registry:
{"subject":"sampledata-value","version":1,"id":1,"schema":"{"type":"record","name":"sampledata","namespace":"default","fields":[{"name":"c1","type":"string"},{"name":"c2","type":"string"},{"name":"c3","type":"string"}]}"}

Data in Topic:
/bin/kafka-console-consumer --topic sampledata --from-beginning --bootstrap-server cef-cp-kafka-headless:9092
abcdefghi
jklmnopqr
stuwvyxyz
x1x2x3

@baratamavinash225
Copy link

Any update on this .. We have the similar issue facing with the parquet as source

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants