From 7de73c64fdd825fa419bf1d6317de1c40025b4ba Mon Sep 17 00:00:00 2001 From: Omer Levi Hevroni Date: Sun, 21 Jan 2024 14:17:10 +0200 Subject: [PATCH] fix: handle fatal producer errors (#34) * fix: handle fatal producer errors * . --- package.json | 2 +- src/main/scala/adapters/kafka/ProducerImpl.scala | 8 ++++++++ 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/package.json b/package.json index 0ccd2e4..265d7f0 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "dafka-producer", - "version": "7.2.3", + "version": "7.2.4", "description": "Dockerized kafka producer", "private": true, "repository": { diff --git a/src/main/scala/adapters/kafka/ProducerImpl.scala b/src/main/scala/adapters/kafka/ProducerImpl.scala index 2467f7e..f83152e 100644 --- a/src/main/scala/adapters/kafka/ProducerImpl.scala +++ b/src/main/scala/adapters/kafka/ProducerImpl.scala @@ -42,6 +42,14 @@ class ProducerImpl( }).map(seq => seq.asJava).orNull ) )).map(_ => ()) + .handleErrorWith(e => { + if (e.getMessage =="Cannot execute transactional method because we are in an error state") { + logger.error(e)("Unrecoverable Kafka error, aborting") + System.exit(-5) + } + + IO.raiseError(e) + }) override def healthy(): IO[Boolean] = { readinessTopic.map { topic =>