From 48d650b548a7246767ad81940c08c3621f063681 Mon Sep 17 00:00:00 2001 From: Igor Artamonov Date: Thu, 25 Aug 2022 19:18:19 -0400 Subject: [PATCH] problem: reconnects WS too often when the only response is an error --- .../upstream/ethereum/WsConnection.kt | 20 ++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/WsConnection.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/WsConnection.kt index 46743d221..e7972bd4f 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/WsConnection.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/WsConnection.kt @@ -96,7 +96,7 @@ open class WsConnection( private var reconnectBackoff: BackOff = ExponentialBackOff().also { it.initialInterval = Duration.ofMillis(100).toMillis() - it.maxInterval = Duration.ofMinutes(1).toMillis() + it.maxInterval = Duration.ofMinutes(5).toMillis() } private var currentBackOff = reconnectBackoff.start() @@ -225,18 +225,20 @@ open class WsConnection( val consumer = inbound .aggregateFrames(msgSizeLimit) .receiveFrames() - .doOnNext { - if (!read) { - // restart backoff only after a successful read from the connection, - // otherwise it may restart it even if the connection is faulty - currentBackOff = reconnectBackoff.start() - read = true - } - } .map { ByteBufInputStream(it.content()).readAllBytes() } .flatMap { try { val msg = parser.parse(it) + if (!read) { + if (msg.error != null) { + log.warn("Received error ${msg.error.code} from $uri: ${msg.error.message}") + } else { + // restart backoff only after a successful read from the connection, + // otherwise it may restart it even if the connection is faulty or responds only with error + currentBackOff = reconnectBackoff.start() + read = true + } + } if (msg.type == ResponseWSParser.Type.SUBSCRIPTION) { onSubscription(msg) } else {