Skip to content

Adding your own streams

Pieter Bonte edited this page Jan 4, 2021 · 3 revisions

In this section, you'll learn how to define your own streams. As an example, we provide a WebSocket client implementation.

In Defining the streams we showed how to define a WebDataStream and manually add content to the stream. In this section, we will explain how you can consume data from a websocket.

Flow

We will maintain to following flow:

  1. Creating the WebSocketStream
  2. Open a connection to a WebSocket server
  3. Parse the received data (from Strings to graphs)
  4. Add the parsed data to the stream

Creating a new WebSocketStream

We will create a WebSocketStream as an extension of the RDFStream, because we imagine consuming RDF Graph directly from the websocket server.

Open a connection to a WebSocket server

In order to connect to a WebSocket server, we will utilize Jetty library. To do so, add the following dependency to the pom file:

<dependency>
   <groupId>org.eclipse.jetty.websocket</groupId>
   <artifactId>websocket-client</artifactId>
   <version>9.4.35.v20201120</version>
</dependency>

Through the following snippet, we can connect to a WebSocket server. Note that WebSocketInputStream defines the logic what happens when we receive a message from the server. This discussed in more detail below.

 WebSocketClient client = new WebSocketClient();

 WebSocketInputStream socket = new WebSocketInputStream(this); // connection logic happens here, see below
 try {
    client.start();

    URI echoUri = new URI(wsUrl); // the websocket url
    ClientUpgradeRequest request = new ClientUpgradeRequest();
    client.connect(socket, echoUri, request);
    System.out.printf("Connecting to : %s%n", echoUri);

} catch (Throwable t) {
   t.printStackTrace();
} 

Parse the received data

The websocket class defines what happens when we connect, receive data or close the connection with the WebSocket server. In our case, the message method is the most important one, as it defines how to handle the incoming messages from the WebSocket server. In this example, we use the parsing functionality of Apache Jena directly. Parsing is necessary as the WebSocket produces Strings instead of Graph objects.

@WebSocket
    public class WebSocketInputStream {
        private WebDataStream<Graph> stream;
        
        ...

        @OnWebSocketMessage
        public void message(Session session, String message) throws IOException {
            // code for parsing using Jena
            Model dataModel = ModelFactory.createDefaultModel();
            try {
                InputStream targetStream = new ByteArrayInputStream(message.getBytes());
                dataModel.read(targetStream, null, "TTL");
                //converting jena Model to generic Graph
                JenaRDF jena = new JenaRDF();
                Graph g1 = jena.asGraph(dataModel);
                // publishing the parsed data to the stream
                stream.put(g1,System.currentTimeMillis());
            }catch (Exception e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
        }

        }
        

    }

Wrap up

In this section we have shown how to implement your own stream by extending the existing RDFStream class. We have connected to a websocket, parsed the received data and published the graph objects on the stream. Below you can find the whole example:

import it.polimi.deib.sr.rsp.api.stream.data.WebDataStream;
import org.apache.commons.rdf.api.Graph;
import org.apache.commons.rdf.jena.JenaRDF;
import org.apache.jena.rdf.model.Model;
import org.apache.jena.rdf.model.ModelFactory;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.annotations.*;
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.client.WebSocketClient;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;


public class WebsocketStream extends RDFStream {

    protected String wsUrl;
    protected String stream_uri;
    public WebsocketStream(String stream_uri,String wsUrl){
        super(stream_uri);
        this.stream_uri = stream_uri;
        this.wsUrl = wsUrl;
    }

    public void stream() {
        WebSocketClient client = new WebSocketClient();

        WebSocketInputStream socket = new WebSocketInputStream(this);
        try {
            client.start();

            URI echoUri = new URI(this.wsUrl);
            ClientUpgradeRequest request = new ClientUpgradeRequest();
            client.connect(socket, echoUri, request);
            System.out.printf("Connecting to : %s%n", echoUri);

        } catch (Throwable t) {
            t.printStackTrace();
        }
    }
    @WebSocket
    public class WebSocketInputStream {
        private WebDataStream<Graph> stream;
        public WebSocketInputStream(WebDataStream<Graph> stream) {
            this.stream=stream;
        }
        @OnWebSocketConnect
        public void connected(Session session) {
            System.out.println("connecting");
        }

        @OnWebSocketClose
        public void closed(Session session, int statusCode, String reason) {
        }

        @OnWebSocketMessage
        public void message(Session session, String message) throws IOException {
            Model dataModel = ModelFactory.createDefaultModel();
            try {
                InputStream targetStream = new ByteArrayInputStream(message.getBytes());
                dataModel.read(targetStream, null, "TTL");
                JenaRDF jena = new JenaRDF();
                Graph g1 = jena.asGraph(dataModel);
                stream.put(g1,System.currentTimeMillis());
            }catch (Exception e) {
                e.printStackTrace();
        }

        }

    }
}