-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpublish.py
executable file
·56 lines (47 loc) · 1.69 KB
/
publish.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
#! /usr/bin/env python
from kafka import KafkaProducer
import json
def publish_message(kf_producer, topic_name, key, value):
try:
# Getting non-fatal error or warning here possibly: "encoding without a string argument"
print(f"{ type( key ) } : { type( value ) }")
print(f"{ key } : { value }")
# key is an int and value is a string (which so happens to contain a json object)
# Therefore it must be key that is the problem. (Error is not specific and has no stack trace.)
# Error actually appears to be fatal. I did not realize we were inside a try until now.
# key_bytes = bytes(key, encoding="utf-8")
# value_bytes = bytes(value, encoding="utf-8")
# Solution: Force string on key.:
key_bytes = bytes(str(key), encoding="utf-8")
value_bytes = bytes(value, encoding="utf-8")
kf_producer.send(topic_name, key=key_bytes, value=value_bytes)
kf_producer.flush()
print("Message published successfully.")
except Exception as ex:
print(str(ex))
if __name__ == "__main__":
kafka_producer = KafkaProducer(
bootstrap_servers=["localhost:9092"],
api_version=(0, 10)
)
employees = [
{
"name": "John Smith",
"id": 1
}, {
"name": "Susan Doe",
"id": 2
}, {
"name": "Karen Rock",
"id": 3
},
]
for employee in employees:
publish_message(
kf_producer=kafka_producer,
topic_name="employees",
key=employee["id"],
value=json.dumps(employee)
)
if kafka_producer is not None:
kafka_producer.close()