Skip to content

Commit

Permalink
Change default zio-http configuration so that ws close frames are for…
Browse files Browse the repository at this point in the history
…warded to Tapir's code; also fix vertx (#4242)
  • Loading branch information
adamw authored Jan 10, 2025
1 parent e15d8dc commit 82d17d9
Show file tree
Hide file tree
Showing 18 changed files with 210 additions and 37 deletions.
14 changes: 5 additions & 9 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -82,19 +82,15 @@ jobs:
if: matrix.target-platform == 'JVM' && matrix.java == '21' && matrix.scala-version == '3'
run: sbt $SBT_JAVA_OPTS -v "project examples3" verifyExamplesCompileUsingScalaCli
- name: Test
if: matrix.target-platform == 'JVM' && matrix.scala-version == '2.12'
uses: nick-fields/retry@v3
with:
timeout_minutes: 15
max_attempts: 4
command: sbt $SBT_JAVA_OPTS -v "testScoped ${{ matrix.scala-version }} ${{ matrix.target-platform }}; openapiCodegenSbt2_12/scripted"
- name: Test
if: matrix.target-platform != 'JS' && !(matrix.target-platform == 'JVM' && matrix.scala-version == '2.12')
if: matrix.target-platform != 'JS'
uses: nick-fields/retry@v3
with:
timeout_minutes: 15
timeout_minutes: 8
max_attempts: 4
command: sbt $SBT_JAVA_OPTS -v "testScoped ${{ matrix.scala-version }} ${{ matrix.target-platform }}"
- name: Test OpenAPI codegen
if: matrix.target-platform == 'JVM' && matrix.scala-version == '2.12'
run: sbt $SBT_JAVA_OPTS -v "openapiCodegenSbt2_12/scripted"
# The finatra tests take a really long time (1/3 of the build duration); hence, they are disabled and need to be run separately
#- name: Test finatra
# if: matrix.target-platform != 'JS'
Expand Down
26 changes: 21 additions & 5 deletions doc/endpoint/websockets.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ When creating a `webSocketBody`, we need to provide the following parameters:
* the `Streams` implementation, which determines the pipe type

By default, ping-pong frames are handled automatically, fragmented frames are combined, and close frames aren't
decoded, but this can be customised through methods on `webSocketBody`.
decoded, but this can be customized through methods on `webSocketBody`.

## Close frames

Expand All @@ -50,9 +50,25 @@ webSocketBody[Option[String], CodecFormat.TextPlain, Option[Response], CodecForm
the websocket-processing pipe will receive a `None: Option[String]` when the client closes the web socket. Moreover,
if the pipe emits a `None: Option[Response]`, the web socket will be closed by the server.

Alternatively, if the codec for your high-level type already handles close frames (but its schema is not derived as
optional), you can request that the close frames are decoded by the codec as well. Here's an example which does this
on the server side:

```scala
webSocketBody[...](...).decodeCloseRequests(true)
```

If you'd like to decode close frames when the endpoint is interpreted as a client, you should use the
`decodeCloseResponses` method.

```{note}
Not all server interpreters expose control frames (such as close frames) to user (and Tapir) code. Refer to the
documentation of individual interpreters for more details.
```

## Raw web sockets

Alternatively, it's possible to obtain a raw pipe transforming `WebSocketFrame`s:
The second web socket handling variant is to obtain a raw pipe transforming `WebSocketFrame`s:

```scala mdoc:silent
import org.apache.pekko.stream.scaladsl.Flow
Expand All @@ -69,11 +85,11 @@ endpoint.out(webSocketBodyRaw(PekkoStreams)): PublicEndpoint[
```

Such a pipe by default doesn't handle ping-pong frames automatically, doesn't concatenate fragmented flames, and
passes close frames to the pipe as well. As before, this can be customised by methods on the returned output.
passes close frames to the pipe as well. As before, this can be customized by methods on the returned output.

Request/response schemas can be customised through `.requestsSchema` and `.responsesSchema`.
Request/response schemas can be customized through `.requestsSchema` and `.responsesSchema`.

## Interpreting as a sever
## Interpreting as a server

When interpreting a web socket endpoint as a server, the [server logic](../server/logic.md) needs to provide a
streaming-specific pipe from requests to responses. E.g. in Pekko's case, this will be `Flow[REQ, RESP, Any]`.
Expand Down
7 changes: 7 additions & 0 deletions doc/server/http4s.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,13 @@ BlazeServerBuilder[IO]
.withHttpWebSocketApp(wsb => Router("/" -> wsRoutes(wsb)).orNotFound)
```

```{note}
When a close frame is received by http4s, the server seems to cancel the stream that is processing the web socket frames.
This means that the `.decodeCloseRequests(true)` setting (also effective when the decoded type is optional, e.g. `Option`)
is not reliable: values corresponding to close frames will not always be processed by the stream. Hence, it's recommended
to avoid using this option with the http4s interpreter.
```

## Server Sent Events

The interpreter supports [SSE (Server Sent Events)](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events).
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// {cat=WebSocket; effects=ZIO; server=zio-http}: Describe and implement a WebSocket endpoint, being notified on the server-side that a client closed the socket

//> using dep com.softwaremill.sttp.tapir::tapir-core:1.11.11
//> using dep com.softwaremill.sttp.tapir::tapir-zio-http-server:1.11.11
//> using dep com.softwaremill.sttp.tapir::tapir-zio:1.11.11

package sttp.tapir.examples.websocket

import sttp.capabilities.WebSockets
import sttp.capabilities.zio.ZioStreams
import sttp.tapir.generic.auto.*
import sttp.tapir.server.ziohttp.ZioHttpInterpreter
import sttp.tapir.ztapir.*
import sttp.tapir.{Codec, CodecFormat, DecodeResult, PublicEndpoint}
import sttp.ws.WebSocketFrame
import zio.http.{Response as ZioHttpResponse, Routes, Server}
import zio.stream.Stream
import zio.{Console, ExitCode, URIO, ZIO, ZIOAppDefault, ZLayer}

// After running, try opening a ws connection to ws://localhost:8080/ws, sending some text messages, and then closing
// from the client-side.

object WebSocketZioHttpServer extends ZIOAppDefault:
enum ClientMessage:
case Leave
case Message(text: String)

val wsEndpoint: PublicEndpoint[Unit, Unit, ZioStreams.Pipe[ClientMessage, Option[String]], ZioStreams & WebSockets] =
endpoint.get
.in("ws")
.out(
webSocketBody[ClientMessage, CodecFormat.TextPlain, Option[String], CodecFormat.TextPlain](ZioStreams)
// the schema for the `ClientMessage` type is derived as non-optional, that's why we need to explicitly
// request that close frames should be passed for decoding as well
// the response type is optional, so we don't need to do that, and a `None` will be encoded as a close
// frame using the default codec
.decodeCloseRequests(true)
)

val wsServerEndpoint = wsEndpoint.zServerLogic[Any](_ =>
ZIO.succeed { (clientMessageStream: Stream[Throwable, ClientMessage]) =>
clientMessageStream.mapZIO {
case ClientMessage.Leave => Console.printLine("Received a 'leave' message, the socket was closed by the client").map(_ => None)
case ClientMessage.Message(text) => Console.printLine(s"Received: '$text' message").map(_ => Some("OK"))
}
}
)

// as we are using a custom high-level type for representing incoming requests from the client, we need to provide
// a codec which knows how to translate WebSocket frames to instances of ClientMessage
given wsFrameCodec: Codec[WebSocketFrame, ClientMessage, CodecFormat.TextPlain] =
val baseStringCodec = Codec.textWebSocketFrame[String, CodecFormat.TextPlain]

Codec.fromDecodeAndMeta[WebSocketFrame, ClientMessage, CodecFormat.TextPlain](CodecFormat.TextPlain()) {
case WebSocketFrame.Close(code, reason) => DecodeResult.Value(ClientMessage.Leave)
case frame => baseStringCodec.decode(frame).map(ClientMessage.Message.apply(_))
} {
case ClientMessage.Leave => WebSocketFrame.close
case ClientMessage.Message(msg) => baseStringCodec.encode(msg)
}

// interpreting the endpoints & exposing them
val routes: Routes[Any, ZioHttpResponse] = ZioHttpInterpreter().toHttp(List(wsServerEndpoint))

override def run: URIO[Any, ExitCode] =
Server
.serve(routes)
.provide(ZLayer.succeed(Server.Config.default.port(8080)), Server.live)
.exitCode
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,8 @@ class AkkaHttpServerTest extends TestSuite with EitherValues {
createServerTest,
AkkaStreams,
autoPing = false,
handlePong = false
handlePong = false,
decodeCloseRequests = false
) {
override def functionToPipe[A, B](f: A => B): streams.Pipe[A, B] = Flow.fromFunction(f)
override def emptyPipe[A, B]: Flow[A, B, Any] = Flow.fromSinkAndSource(Sink.ignore, Source.empty)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,9 @@ class Http4sServerTest[R >: Fs2Streams[IO] with WebSockets] extends TestSuite wi
createServerTest,
Fs2Streams[IO],
autoPing = true,
handlePong = false
handlePong = false,
decodeCloseRequests =
false // when a close frame is received, http4s cancels the stream, so sometimes the close frames are never processed
) {
override def functionToPipe[A, B](f: A => B): streams.Pipe[A, B] = in => in.map(f)
override def emptyPipe[A, B]: Pipe[IO, A, B] = _ => fs2.Stream.empty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,9 @@ class ZHttp4sServerTest extends TestSuite with OptionValues {
createServerTest,
ZioStreams,
autoPing = true,
handlePong = false
handlePong = false,
decodeCloseRequests =
false // when a close frame is received, http4s cancels the stream, so sometimes the close frames are never processed
) {
override def functionToPipe[A, B](f: A => B): streams.Pipe[A, B] = in => in.map(f)
override def emptyPipe[A, B]: streams.Pipe[A, B] = _ => ZStream.empty
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,33 @@
package sttp.tapir.server.jdkhttp

import cats.effect.{IO, Resource}
import org.scalatest.EitherValues
import org.scalatest.{EitherValues, Exceptional, FutureOutcome}
import sttp.tapir.server.jdkhttp.internal.idMonad
import sttp.tapir.server.tests._
import sttp.tapir.tests.{Test, TestSuite}

import scala.concurrent.Future

class JdkHttpServerTest extends TestSuite with EitherValues {
// these tests often fail on CI with:
// "Cause: java.io.IOException: HTTP/1.1 header parser received no bytes"
// "Cause: java.io.EOFException: EOF reached while reading"
// for an unknown reason; adding retries to avoid flaky tests
val retries = 5

override def withFixture(test: NoArgAsyncTest): FutureOutcome = withFixture(test, retries)

def withFixture(test: NoArgAsyncTest, count: Int): FutureOutcome = {
val outcome = super.withFixture(test)
new FutureOutcome(outcome.toFuture.flatMap {
case Exceptional(e) =>
println(s"Test ${test.name} failed, retrying.")
e.printStackTrace()
(if (count == 1) super.withFixture(test) else withFixture(test, count - 1)).toFuture
case other => Future.successful(other)
})
}

override def tests: Resource[IO, List[Test]] =
backendResource.flatMap { backend =>
Resource
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ class WebSocketPingPongFrameHandler(ignorePong: Boolean, autoPongOnPing: Boolean
if (autoPongOnPing) {
val _ = ctx.writeAndFlush(new PongWebSocketFrame(ping.content().retain()))
} else {
val _ = ping.content().release()
val _ = ctx.fireChannelRead(ping)
}
case pong: PongWebSocketFrame =>
if (!ignorePong) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,8 @@ class PekkoHttpServerTest extends TestSuite with EitherValues {
createServerTest,
PekkoStreams,
autoPing = false,
handlePong = false
handlePong = false,
decodeCloseRequests = false
) {
override def functionToPipe[A, B](f: A => B): streams.Pipe[A, B] = Flow.fromFunction(f)
override def emptyPipe[A, B]: Flow[A, B, Any] = Flow.fromSinkAndSource(Sink.ignore, Source.empty)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,8 @@ class PlayServerTest extends TestSuite {
createServerTest,
PekkoStreams,
autoPing = false,
handlePong = false
handlePong = false,
decodeCloseRequests = false
) {
override def functionToPipe[A, B](f: A => B): streams.Pipe[A, B] = Flow.fromFunction(f)
override def emptyPipe[A, B]: Flow[A, B, Any] = Flow.fromSinkAndSource(Sink.ignore, Source.empty)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ class PlayServerTest extends TestSuite {
createServerTest,
AkkaStreams,
autoPing = false,
handlePong = false
handlePong = false,
decodeCloseRequests = false
) {
override def functionToPipe[A, B](f: A => B): streams.Pipe[A, B] = Flow.fromFunction(f)
override def emptyPipe[A, B]: Flow[A, B, Any] = Flow.fromSinkAndSource(Sink.ignore, Source.empty)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,17 @@ import sttp.tapir.tests.data.Fruit
import sttp.ws.{WebSocket, WebSocketFrame}

import scala.concurrent.duration._
import java.util.concurrent.atomic.AtomicReference

abstract class ServerWebSocketTests[F[_], S <: Streams[S], OPTIONS, ROUTE](
createServerTest: CreateServerTest[F, S with WebSockets, OPTIONS, ROUTE],
val streams: S,
autoPing: Boolean,
handlePong: Boolean,
// Disabled for eaxmple for vert.x, which sometimes drops connection without returning Close
// Disabled for example for vert.x, which sometimes drops connection without returning Close
expectCloseResponse: Boolean = true,
frameConcatenation: Boolean = true
frameConcatenation: Boolean = true,
decodeCloseRequests: Boolean = true
)(implicit
m: MonadError[F]
) extends EitherValues {
Expand Down Expand Up @@ -246,7 +248,7 @@ abstract class ServerWebSocketTests[F[_], S <: Streams[S], OPTIONS, ROUTE](
response2.body shouldBe Right("echo: testOk")
}
}
) ++ autoPingTests ++ failingPipeTests ++ handlePongTests ++ frameConcatenationTests
) ++ autoPingTests ++ failingPipeTests ++ handlePongTests ++ frameConcatenationTests ++ decodeCloseRequestsTests

val autoPingTests =
if (autoPing)
Expand Down Expand Up @@ -294,10 +296,10 @@ abstract class ServerWebSocketTests[F[_], S <: Streams[S], OPTIONS, ROUTE](
.response(asWebSocket { (ws: WebSocket[IO]) =>
for {
_ <- ws.sendText("test1")
_ <- ws.sendText("test2")
_ <- ws.sendText("error-trigger")
m1 <- ws.eitherClose(ws.receiveText())
_ <- ws.sendText("test2")
m2 <- ws.eitherClose(ws.receiveText())
_ <- ws.sendText("error-trigger")
m3 <- ws.eitherClose(ws.receiveText())
} yield List(m1, m2, m3)
})
Expand Down Expand Up @@ -370,26 +372,33 @@ abstract class ServerWebSocketTests[F[_], S <: Streams[S], OPTIONS, ROUTE](
List(
testServer(
endpoint.out(
webSocketBody[String, CodecFormat.TextPlain, String, CodecFormat.TextPlain](streams)
webSocketBodyRaw(streams)
.autoPing(None)
.autoPongOnPing(false)
),
"not pong on ping if disabled"
)((_: Unit) => pureResult(stringEcho.asRight[Unit])) { (backend, baseUri) =>
)((_: Unit) =>
pureResult(functionToPipe[WebSocketFrame, WebSocketFrame] {
case WebSocketFrame.Ping(payload) => WebSocketFrame.text(s"ping: ${new String(payload)}")
case WebSocketFrame.Text(payload, _, _) => WebSocketFrame.text(s"text: ${payload}")
case f => WebSocketFrame.text(s"other: $f")
}.asRight[Unit])
) { (backend, baseUri) =>
basicRequest
.response(asWebSocket { (ws: WebSocket[IO]) =>
for {
_ <- ws.sendText("test1")
_ <- ws.send(WebSocketFrame.Ping("test-ping-text".getBytes()))
m1 <- ws.receiveText()
_ <- ws.sendText("test2")
_ <- ws.send(WebSocketFrame.Ping("test-ping-text".getBytes()))
m2 <- ws.receiveText()
} yield List(m1, m2)
_ <- ws.sendText("test2")
m3 <- ws.receiveText()
} yield List(m1, m2, m3)
})
.get(baseUri.scheme("ws"))
.send(backend)
.map(
_.body shouldBe Right(List("echo: test1", "echo: test2"))
_.body shouldBe Right(List("text: test1", "ping: test-ping-text", "text: test2"))
)
},
testServer(
Expand Down Expand Up @@ -431,4 +440,41 @@ abstract class ServerWebSocketTests[F[_], S <: Streams[S], OPTIONS, ROUTE](
}
)
else List.empty

val decodeCloseRequestsTests =
if (decodeCloseRequests)
List(
{
val serverTrail = new AtomicReference[Vector[Option[String]]](Vector.empty)
testServer(
// using an optional request type causes `decodeCloseRequests` to become `true` (because the schema is optional)
endpoint.out(webSocketBody[Option[String], CodecFormat.TextPlain, Option[String], CodecFormat.TextPlain](streams)),
"receive a client-sent close frame as a None"
)((_: Unit) =>
pureResult(functionToPipe[Option[String], Option[String]] { msg =>
serverTrail.updateAndGet(v => v :+ msg)
msg
}.asRight[Unit])
) { (backend, baseUri) =>
basicRequest
.response(asWebSocket { (ws: WebSocket[IO]) =>
for {
_ <- ws.sendText("test1")
m1 <- ws.eitherClose(ws.receiveText())
_ <- ws.close()
m2 <- ws.eitherClose(ws.receiveText())
} yield List(m1, m2)
})
.get(baseUri.scheme("ws"))
.send(backend)
.map { response =>
response.body.map(_.map(_.toOption)) shouldBe Right(List(Some("test1"), None))

// verifying what happened on the server; clearing the trail if there are retries
serverTrail.getAndSet(Vector.empty) shouldBe Vector(Some("test1"), None)
}
}
}
)
else List.empty
}
Loading

0 comments on commit 82d17d9

Please sign in to comment.