Skip to content

Commit

Permalink
fix: handle fatal producer errors
Browse files Browse the repository at this point in the history
  • Loading branch information
omerlh committed Jan 21, 2024
1 parent 4665883 commit f5db629
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 1 deletion.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "dafka-producer",
"version": "7.2.3",
"version": "7.2.4",
"description": "Dockerized kafka producer",
"private": true,
"repository": {
Expand Down
8 changes: 8 additions & 0 deletions src/main/scala/adapters/kafka/ProducerImpl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,14 @@ class ProducerImpl(
}).map(seq => seq.asJava).orNull
)
)).map(_ => ())
.handleErrorWith(e => {
if (e.getMessage =="Cannot execute transactional method because we are in an error state") {
logger.error(e)("Unrecoverable Kafka error, aborting")
System.exit(-5)
}

return IO.raiseError(e)
})

override def healthy(): IO[Boolean] = {
readinessTopic.map { topic =>
Expand Down

0 comments on commit f5db629

Please sign in to comment.