Skip to content

The Time Varying Graph and Streaming Data Set

Riccardo Tommasini edited this page Jan 3, 2021 · 2 revisions

The core abstractions RSP-QL are the Time-Varying Graphs and the Streaming Data Set (SDS).

Once applied to an RDF Stream, an RSP-QL time-based window operator (see Stream to Relation Operators) produces a Time-Varying Graph. A set of Time-Varying Graphs constitute a Streaming Data Set (SDS).

Time-Varying Graph

A Time-Varying Graph is a function that takes a time instant as input and produce an RDF Graph as output. More formally,

f1

The domain of the time-varying graph is the set of time instants t where time-based window operator is defined. The co-domain definition is more subtle. Indeed, the window operator chunks the RDF Stream into finite portions each containing a number of RDF Graphs.

Thus, as per RSP-QL semantics, the co-domain of the time-varying graph consists of all the RDF graphs resulting from the UNION of each RDF Graph inside a window.

RSP4J implements a generalzied version of the Time-Varyin Graph, allowing its users to use the graph representation of choice. For instance, in YASPER we use org.apache.commons.rdf.api.Graph.

Moreover, the method coalesce allows custom implementation of the functional nature of the class. For instance, one could implemenet FIRST/LAST rather than UNION as a coalesce logic.

public interface TimeVarying<E> {

    void materialize(long ts);

    E get();

    String iri();

    default boolean named() {
        return iri() != null;
    }

}

Streaming Data Set

def:rspql:sds{#def:rspql:sds label="def:rspql:sds"} An RSPQL dataset SDS consists of an (optional) default graph $G_{def}$ and $n (n\geq0)$ named Time-Varying Graphs resulting from applying Time-Based Sliding Window operators over $m \leq n$ RDF streams.

public interface SDS<E> {

    Collection<TimeVarying<E>> asTimeVaryingEs();

    void add(IRI iri, TimeVarying<E> tvg);

    void add(TimeVarying<E> tvg);

    default void materialize(long ts) {
        asTimeVaryingEs().forEach(eTimeVarying -> eTimeVarying.materialize(ts));
    }
}

In YASPER

Yasper implements the Time-Varying interface using org.apache.commons.rdf.api.Graph parameter. The materialize method refers to the underlying StreamToRelationOp operator.

@AllArgsConstructor
@RequiredArgsConstructor
public class TimeVaryingGraph implements TimeVarying<Graph> {

    private final StreamToRelationOp<Graph, Graph> op;
    private IRI name;
    private Graph graph;

    @Override
    public void materialize(long ts) {
        graph = op.getContent(ts).coalesce();
    }
[...]

}

To understand how the retrival of the RDF works in practice, it is important to understand the related Content class. For a complete outlook look at the Execution Semantics#content.

The ContentGraph implements the content function of SECRET. The class is parametric in both the ingestion and state types, allowing the RSP4J users to decouple what data arrive from the stream from the way data ara managed in-memory.

YASPER's implementation of the content method is trivial and closelt follows the RSP-QL specificaiton, i.e., the graphs that are assigned to a certain window instance are retrived.

public class ContentGraph implements Content<Graph, Graph> {
    private List<Graph> elements;
    private long last_timestamp_changed;

[...]

    @Override
    public Content<Graph, Graph> content(long t_e) {
        Optional<Window> max = windows.keySet().stream()
                .filter(w -> w.getO() < t_e && w.getC() <= t_e)
                .max(Comparator.comparingLong(Window::getC));

        if (max.isPresent())
            return windows.get(max.get());

        return new EmptyGraphContent();
    }
}

For the SDS, let's focus on the materialize method.

@Override
public SDS<Graph> materialize(final long ts) {
   this.clear();
  
   defs.stream().map(g -> {
            g.materialize(ts);
            return (Graph) g.get();
        }).flatMap(Graph::stream)
                .forEach(t -> this.add(def, t.getSubject(), t.getPredicate(), t.getObject()));

   tvgs.entrySet().stream()
                .map(e -> {
                    e.getValue().materialize(ts);
                    return new NamedGraph(e.getKey(), e.getValue().get());
                }).forEach(n -> n.g.stream()
                .forEach(o -> this.add(n.name, o.getSubject(), o.getPredicate(), o.getObject())));
    }

Default SDS coaleces all the content of the time-varying graphs and produces the SDS to who execute the query.

  • First the engine clears up the old quads collection (RDF common bias)
  • Second, the SDS propagates the materialization to the Time-Varying Graphs (named and default), which in turn ask the StreamToRelation operator the content at time t.
    • In general, the content of the operator is a set of graphs identified by a certain window instance Stream to Relation Operators.
    • However, the time-varying graph is a function and must return a single element in its co-domain. Hence, the coalesce method.