Skip to content

Commit

Permalink
problem: Proxy fails to serialize JSON
Browse files Browse the repository at this point in the history
reason: expects full JSON response from upstream, which is replaces with result field only
  • Loading branch information
splix committed Jun 30, 2020
1 parent e26b92b commit 6e4e92c
Show file tree
Hide file tree
Showing 5 changed files with 192 additions and 52 deletions.
2 changes: 2 additions & 0 deletions src/main/kotlin/io/emeraldpay/dshackle/Global.kt
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import com.fasterxml.jackson.core.Version
import com.fasterxml.jackson.databind.DeserializationFeature
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.module.SimpleModule
import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcResponse
import java.text.SimpleDateFormat
import java.util.*

Expand All @@ -31,6 +32,7 @@ class Global {

private fun createObjectMapper(): ObjectMapper {
val module = SimpleModule("EmeraldDshackle", Version(1, 0, 0, null, null, null))
module.addSerializer(JsonRpcResponse::class.java, JsonRpcResponse.ResponseJsonSerializer())

val objectMapper = ObjectMapper()
objectMapper.registerModule(module)
Expand Down
31 changes: 15 additions & 16 deletions src/main/kotlin/io/emeraldpay/dshackle/proxy/WriteRpcJson.kt
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,11 @@ package io.emeraldpay.dshackle.proxy
import com.fasterxml.jackson.databind.ObjectMapper
import io.emeraldpay.api.proto.BlockchainOuterClass
import io.emeraldpay.dshackle.Global
import io.infinitape.etherjar.rpc.RpcResponseError
import io.infinitape.etherjar.rpc.json.ResponseJson
import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcResponse
import org.slf4j.LoggerFactory
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Service
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import java.lang.StringBuilder
import java.time.Duration
import java.util.function.Function

/**
Expand All @@ -48,29 +44,32 @@ open class WriteRpcJson() {
open fun toJsons(call: ProxyCall): Function<Flux<BlockchainOuterClass.NativeCallReplyItem>, Flux<String>> {
return Function { flux ->
flux.flatMap { response ->
val json = ResponseJson<Any, Any>()
if (!call.ids.containsKey(response.id)) {
log.warn("ID wasn't requested: ${response.id}")
return@flatMap Flux.empty<String>()
}
json.id = call.ids[response.id]
if (response.succeed) {
val payload = objectMapper.readValue(response.payload.toByteArray(), ResponseJson::class.java)
if (payload.error != null) {
json.error = payload.error
} else {
json.result = payload.result
}
val json = toJson(call, response)
if (json == null) {
Flux.empty<String>()
} else {
json.error = RpcResponseError(-32002, response.errorMessage)
Flux.just(json)
}
Flux.just(objectMapper.writeValueAsString(json))
}.onErrorContinue { t, u ->
log.warn("Failed to convert to JSON", t)
}
}
}

open fun toJson(call: ProxyCall, response: BlockchainOuterClass.NativeCallReplyItem): String? {
val id = call.ids[response.id] ?: return null;
val json = if (response.succeed) {
JsonRpcResponse.ok(response.payload.toByteArray(), JsonRpcResponse.Id.from(id))
} else {
JsonRpcResponse.error(-32002, response.errorMessage, JsonRpcResponse.Id.from(id))
}
return objectMapper.writeValueAsString(json)
}

/**
* Format response as JSON Array, for Batch requests
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,12 @@ import reactor.core.publisher.Mono

class JsonRpcResponse(
private val result: ByteArray?,
val error: ResponseError?
val error: ResponseError?,
val id: Id
) {

constructor(result: ByteArray?, error: ResponseError?) : this(result, error, IntId(0))

companion object {
private val NULL_VALUE = "null".toByteArray()

Expand All @@ -34,6 +37,11 @@ class JsonRpcResponse(
return JsonRpcResponse(value, null)
}

@JvmStatic
fun ok(value: ByteArray, id: Id): JsonRpcResponse {
return JsonRpcResponse(value, null, id);
}

@JvmStatic
fun ok(value: String): JsonRpcResponse {
return JsonRpcResponse(value.toByteArray(), null)
Expand All @@ -43,6 +51,11 @@ class JsonRpcResponse(
fun error(code: Int, msg: String): JsonRpcResponse {
return JsonRpcResponse(null, ResponseError(code, msg))
}

@JvmStatic
fun error(code: Int, msg: String, id: Id): JsonRpcResponse {
return JsonRpcResponse(null, ResponseError(code, msg), id)
}
}

fun hasResult(): Boolean {
Expand Down Expand Up @@ -114,11 +127,67 @@ class JsonRpcResponse(
}
}

/**
* JSON RPC wrapper. Makes sure that the id is either Int or String
*/
interface Id {
fun asInt(): Int
fun asString(): String
fun isInt(): Boolean

companion object {
fun from(id: Any): Id {
if (id is Int) {
return IntId(id)
}
if (id is Number) {
return IntId(id.toInt())
}
if (id is String) {
return StringId(id)
}
throw IllegalArgumentException("Id must be Int or String")
}
}
}

class IntId(val id: Int) : Id {
override fun asInt(): Int {
return id
}

override fun asString(): String {
throw IllegalStateException("Not string")
}

override fun isInt(): Boolean {
return true
}
}

class StringId(val id: String) : Id {
override fun asInt(): Int {
throw IllegalStateException("Not int")
}

override fun asString(): String {
return id
}

override fun isInt(): Boolean {
return false
}
}

class ResponseJsonSerializer : JsonSerializer<JsonRpcResponse>() {
override fun serialize(value: JsonRpcResponse, gen: JsonGenerator, serializers: SerializerProvider) {
gen.writeStartObject()
gen.writeStringField("jsonrpc", "2.0")
gen.writeNumberField("id", 0)
if (value.id.isInt()) {
gen.writeNumberField("id", value.id.asInt())
} else {
gen.writeStringField("id", value.id.asString())
}
if (value.error != null) {
gen.writeObjectFieldStart("error")
gen.writeNumberField("code", value.error.code)
Expand All @@ -128,7 +197,9 @@ class JsonRpcResponse(
if (value.result == null) {
throw IllegalStateException("No result set")
}
gen.writeRawUTF8String(value.result, 0, value.result.size)
gen.writeFieldName("result")
gen.writeRaw(":")
gen.writeRaw(String(value.result))
}
gen.writeEndObject()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,61 +84,52 @@ class WriteRpcJsonSpec extends Specification {
def "Convert basic to JSON"() {
setup:
def call = new ProxyCall(ProxyCall.RpcType.SINGLE)
call.ids[1] = "aaa"
call.ids[1] = 105
def data = [
BlockchainOuterClass.NativeCallReplyItem.newBuilder()
.setId(1)
.setSucceed(true)
.setPayload(ByteString.copyFrom('{"jsonrpc": "2.0", "id": 1, "result": "0x98dbb1"}', 'UTF-8'))
.setPayload(ByteString.copyFrom('"0x98dbb1"', 'UTF-8'))
.build()
]
when:
def act = Flux.fromIterable(data)
.transform(writer.toJsons(call))
.collectList()
.block(Duration.ofSeconds(1))
def act = writer.toJson(call, data[0])
then:
act == ['{"jsonrpc":"2.0","id":"aaa","result":"0x98dbb1"}']
act == '{"jsonrpc":"2.0","id":105,"result":"0x98dbb1"}'
}

def "Convert error to JSON"() {
def "Convert gRPC error to JSON"() {
setup:
def call = new ProxyCall(ProxyCall.RpcType.SINGLE)
call.ids[1] = 1
def data = [
BlockchainOuterClass.NativeCallReplyItem.newBuilder()
.setId(1)
.setSucceed(true)
.setPayload(ByteString.copyFrom('{"jsonrpc": "2.0", "id": 1, "error": {"code": -32001, "message": "oops"}}', 'UTF-8'))
.setSucceed(false)
.setErrorMessage("Internal Error")
.build()
]
when:
def act = Flux.fromIterable(data)
.transform(writer.toJsons(call))
.collectList()
.block(Duration.ofSeconds(1))
def act = writer.toJson(call, data[0])
then:
act == ['{"jsonrpc":"2.0","id":1,"error":{"code":-32001,"message":"oops"}}']
act == '{"jsonrpc":"2.0","id":1,"error":{"code":-32002,"message":"Internal Error"}}'
}

def "Convert gRPC error to JSON"() {
def "Convert basic to JSON with string id"() {
setup:
def call = new ProxyCall(ProxyCall.RpcType.SINGLE)
call.ids[1] = 1
call.ids[1] = "aaa"
def data = [
BlockchainOuterClass.NativeCallReplyItem.newBuilder()
.setId(1)
.setSucceed(false)
.setErrorMessage("Internal Error")
.setSucceed(true)
.setPayload(ByteString.copyFrom('"0x98dbb1"', 'UTF-8'))
.build()
]
when:
def act = Flux.fromIterable(data)
.transform(writer.toJsons(call))
.collectList()
.block(Duration.ofSeconds(1))
def act = writer.toJson(call, data[0])
then:
act == ['{"jsonrpc":"2.0","id":1,"error":{"code":-32002,"message":"Internal Error"}}']
act == '{"jsonrpc":"2.0","id":"aaa","result":"0x98dbb1"}'
}

def "Convert few items to JSON"() {
Expand All @@ -151,17 +142,17 @@ class WriteRpcJsonSpec extends Specification {
BlockchainOuterClass.NativeCallReplyItem.newBuilder()
.setId(1)
.setSucceed(true)
.setPayload(ByteString.copyFrom('{"jsonrpc": "2.0", "id": 1, "result": "0x98dbb1"}', 'UTF-8'))
.setPayload(ByteString.copyFrom('"0x98dbb1"', 'UTF-8'))
.build(),
BlockchainOuterClass.NativeCallReplyItem.newBuilder()
.setId(2)
.setSucceed(true)
.setPayload(ByteString.copyFrom('{"jsonrpc": "2.0", "id": 2, "error": {"code": -32001, "message": "oops"}}', 'UTF-8'))
.setSucceed(false)
.setErrorMessage("oops")
.build(),
BlockchainOuterClass.NativeCallReplyItem.newBuilder()
.setId(3)
.setSucceed(true)
.setPayload(ByteString.copyFrom('{"jsonrpc": "2.0", "id": 3, "result": {"hash": "0x2484f459dc"}}', 'UTF-8'))
.setPayload(ByteString.copyFrom('{"hash": "0x2484f459dc"}', 'UTF-8'))
.build(),
]
when:
Expand All @@ -170,10 +161,9 @@ class WriteRpcJsonSpec extends Specification {
.collectList()
.block(Duration.ofSeconds(1))
then:
act == [
'{"jsonrpc":"2.0","id":10,"result":"0x98dbb1"}',
'{"jsonrpc":"2.0","id":11,"error":{"code":-32001,"message":"oops"}}',
'{"jsonrpc":"2.0","id":15,"result":{"hash":"0x2484f459dc"}}'
]
act.size() == 3
act[0] == '{"jsonrpc":"2.0","id":10,"result":"0x98dbb1"}'
act[1] == '{"jsonrpc":"2.0","id":11,"error":{"code":-32002,"message":"oops"}}'
act[2] == '{"jsonrpc":"2.0","id":15,"result":{"hash": "0x2484f459dc"}}'
}
}
Loading

0 comments on commit 6e4e92c

Please sign in to comment.