diff --git a/examples/src/main/java/org/streamreasoning/rsp4j/wspbook/locationscenario/CSpriteExample.java b/examples/src/main/java/org/streamreasoning/rsp4j/wspbook/chapter3/CSpriteExample.java similarity index 98% rename from examples/src/main/java/org/streamreasoning/rsp4j/wspbook/locationscenario/CSpriteExample.java rename to examples/src/main/java/org/streamreasoning/rsp4j/wspbook/chapter3/CSpriteExample.java index 5f88c2ac..18ee2cc3 100644 --- a/examples/src/main/java/org/streamreasoning/rsp4j/wspbook/locationscenario/CSpriteExample.java +++ b/examples/src/main/java/org/streamreasoning/rsp4j/wspbook/chapter3/CSpriteExample.java @@ -1,4 +1,4 @@ -package org.streamreasoning.rsp4j.wspbook.locationscenario; +package org.streamreasoning.rsp4j.wspbook.chapter3; import org.apache.commons.rdf.api.Graph; import org.streamreasoning.rsp4j.operatorapi.ContinuousProgram; diff --git a/examples/src/main/java/org/streamreasoning/rsp4j/wspbook/locationscenario/assignment/CustomR2RAssignment.java b/examples/src/main/java/org/streamreasoning/rsp4j/wspbook/locationscenario/assignment/CustomR2RAssignment.java new file mode 100644 index 00000000..71b9b762 --- /dev/null +++ b/examples/src/main/java/org/streamreasoning/rsp4j/wspbook/locationscenario/assignment/CustomR2RAssignment.java @@ -0,0 +1,189 @@ +package org.streamreasoning.rsp4j.wspbook.locationscenario.assignment; + +import org.apache.commons.rdf.api.Graph; +import org.streamreasoning.rsp4j.abstraction.RSPEngine; +import org.streamreasoning.rsp4j.api.RDFUtils; +import org.streamreasoning.rsp4j.api.enums.ReportGrain; +import org.streamreasoning.rsp4j.api.enums.Tick; +import org.streamreasoning.rsp4j.api.operators.r2r.utils.R2RPipe; +import org.streamreasoning.rsp4j.api.operators.s2r.execution.assigner.StreamToRelationOp; +import org.streamreasoning.rsp4j.api.secret.report.Report; +import org.streamreasoning.rsp4j.api.secret.report.ReportImpl; +import org.streamreasoning.rsp4j.api.secret.report.strategies.OnWindowClose; +import org.streamreasoning.rsp4j.api.secret.time.Time; +import org.streamreasoning.rsp4j.api.secret.time.TimeImpl; +import org.streamreasoning.rsp4j.api.stream.data.DataStream; +import org.streamreasoning.rsp4j.bigdata2021.utils.StreamGenerator; +import org.streamreasoning.rsp4j.operatorapi.ContinuousProgram; +import org.streamreasoning.rsp4j.operatorapi.TaskOperatorAPIImpl; +import org.streamreasoning.rsp4j.operatorapi.table.BindingStream; +import org.streamreasoning.rsp4j.reasoning.datalog.DatalogR2R; +import org.streamreasoning.rsp4j.reasoning.datalog.ReasonerTriple; +import org.streamreasoning.rsp4j.reasoning.datalog.Rule; +import org.streamreasoning.rsp4j.reasoning.datalog.TripleGenerator; +import org.streamreasoning.rsp4j.yasper.querying.PrefixMap; +import org.streamreasoning.rsp4j.yasper.querying.operators.Rstream; +import org.streamreasoning.rsp4j.yasper.querying.operators.r2r.BGP; +import org.streamreasoning.rsp4j.yasper.querying.operators.r2r.Binding; +import org.streamreasoning.rsp4j.yasper.querying.operators.r2r.joins.HashJoinAlgorithm; + +import java.util.List; + +/*** + * In this exercise, we will show how to create custom R2R operators, by combining two existing R2R operators. + * We will pipeline the BGP pattern with a reasoning component that infers the location of individuals reported throug + * the contact tracing stream. + * + * When we know that (:bob :isWith :elena) and (:elena :isIn :redRoom) we can infer that :bob is also in the :redRoom + * Graph representation: + * :bob -isWith-> :elena -isIn-> :redRoom + * \ ^ + * \-----inferred :isIn--------/ + * + * By inferring this relation, we can simplify the query definitions when we want to fetch who is in each room. + * This comes in handy when want to query who has been infected because they were in a room with a person who got + * a positive test result. + * + * This means that you will need to check: + * 1) All infected individuals + * 2) The location of these infected individuals + * 3) All individuals that were in the same room as the infected ones + * + * TIP: + * To easily fetch the location of all individuals, you can define a rule that infers the locations of the individuals + * reported through the contact tracing stream. Note that the rule needs to be execute over both the contact tracing + * and observation stream. + * + * Used prefixes: + * PREFIX : + */ +public class CustomR2RAssignment { + + public static void main(String[] args) throws InterruptedException { + // Setup the stream generator + StreamGenerator generator = new StreamGenerator(); + /* Creates the observation stream + * Contains both the RFIDObservations and FacebookPosts + * IRI: http://rsp4j.io/covid/observations + * + * Example RFID observation: + * :observationX a :RFIDObservation . + * :observationX :who :Alice . + * :observationX :where :RedRoom . + * :Alice :isIn :RedRoom . + * + * Example Facebook Post checkin: + * :postY a :FacebookPost . + * :postY :who :Bob . + * :postY :where :BlueRoom . + * :Bob :isIn :BlueRoom . + */ + DataStream observationStream = generator.getObservationStream(); + /* Creates the contact tracing stream + * Describes who was with whom + * IRI: http://rsp4j.io/covid/tracing + * + * Example contact post: + * :postZ a :ContactTracingPost . + * :postZ :who :Carl. + * :Carl :isWith :Bob . + */ + DataStream tracingStream = generator.getContactStream(); + /* Creates the covid results stream + * Contains the test results + * IRI: http://rsp4j.io/covid/testResults + * + * Example covid result: + * :postQ a :TestResultPost. + * :postQ :who :Carl . + * :postQ :hasResult :positive + */ + + DataStream covidStream = generator.getCovidStream(); + + // define output stream + BindingStream outStream = new BindingStream("out"); + + // Engine properties + + Report report = new ReportImpl(); + report.add(new OnWindowClose()); + Tick tick = Tick.TIME_DRIVEN; + ReportGrain report_grain = ReportGrain.SINGLE; + Time instance = new TimeImpl(0); + + RSPEngine engine = new RSPEngine(instance, tick, report_grain, report); + // Window (S2R) declaration + StreamToRelationOp w1 = engine.createCSparqlWindow( + RDFUtils.createIRI("w1"), + 600_000, + 60_000); + + StreamToRelationOp w2 = engine.createCSparqlWindow( + RDFUtils.createIRI("w2"), + 600_000, + 60_000); + StreamToRelationOp w3 = engine.createCSparqlWindow( + RDFUtils.createIRI("w3"), + 60*60_000, + 60_000); + + PrefixMap prefixes = new PrefixMap(); + prefixes.addPrefix("","http://rsp4j.io/covid/"); + // R2R + //TODO: define the first BGP that fetches the locations of the individuals + // this BGP will be executed over the results of the reasoner + BGP bgp = BGP.createWithPrefixes(prefixes) + .addTP("?s", "?p", "?o") + .build(); + + // TODO: define the second BGP that fetches the infected individuals + BGP bgp2 = BGP.createWithPrefixes(prefixes) + .addTP("?s", "?p", "?o") + .build(); + + //TODO: Define a rule that infers the location of individuals reported through the contact tracing stream. + // TIP: Each Rule object takes a number of ReasonerTriples, the first one being the head of the rule, + // while the remainder triples are the body of the rule. + // Adding multiple triples as body translates to a conjunction (AND) between the body triples. + // Example: the rule below can be interpreted as ?x ?p ?o1 AND ?o1 ?p ?o -> ?x ?p ?o + DatalogR2R datalogR2R = new DatalogR2R(); + TripleGenerator tripleGenerator = new TripleGenerator(prefixes); + + ReasonerTriple head = tripleGenerator.createReasonerTriple("?x", "?p", "?o"); + ReasonerTriple body1 = tripleGenerator.createReasonerTriple("?x", "?p", "?o1"); + ReasonerTriple body2 = tripleGenerator.createReasonerTriple("?o1", "?p", "?o"); + + Rule r = new Rule(head,body1,body2); + + datalogR2R.addRule(r); + + // Create a pipe of two r2r operators, datalog reasoner and BGP + R2RPipe r2r = new R2RPipe<>(datalogR2R,bgp); + + TaskOperatorAPIImpl t = + new TaskOperatorAPIImpl.TaskBuilder(prefixes) + .addS2R(":observations", w1, "w1") + .addS2R(":tracing", w2, "w2") + .addS2R(":testResults", w3, "w3") + .addR2R(List.of("w1", "w2"), r2r) + .addR2R("w3", bgp2) + .addR2S("out", new Rstream()) + //.addProjection(List.of(new VarImpl("?o"))) + .build(); + ContinuousProgram cp = + new ContinuousProgram.ContinuousProgramBuilder() + .in(observationStream) + .in(tracingStream) + .in(covidStream) + .addTask(t) + .out(outStream) + .addJoinAlgorithm(new HashJoinAlgorithm()) + .build(); + + outStream.addConsumer((el, ts) -> System.out.println(el + " @ " + ts)); + generator.startStreaming(); + Thread.sleep(20_000); + generator.stopStreaming(); + } +} diff --git a/examples/src/main/java/org/streamreasoning/rsp4j/wspbook/locationscenario/example/CustomR2RExample.java b/examples/src/main/java/org/streamreasoning/rsp4j/wspbook/locationscenario/example/CustomR2RExample.java new file mode 100644 index 00000000..9541780e --- /dev/null +++ b/examples/src/main/java/org/streamreasoning/rsp4j/wspbook/locationscenario/example/CustomR2RExample.java @@ -0,0 +1,181 @@ +package org.streamreasoning.rsp4j.wspbook.locationscenario.example; + +import org.apache.commons.rdf.api.Graph; +import org.streamreasoning.rsp4j.abstraction.RSPEngine; +import org.streamreasoning.rsp4j.api.RDFUtils; +import org.streamreasoning.rsp4j.api.enums.ReportGrain; +import org.streamreasoning.rsp4j.api.enums.Tick; +import org.streamreasoning.rsp4j.api.operators.r2r.utils.R2RPipe; +import org.streamreasoning.rsp4j.api.operators.s2r.execution.assigner.StreamToRelationOp; +import org.streamreasoning.rsp4j.api.secret.report.Report; +import org.streamreasoning.rsp4j.api.secret.report.ReportImpl; +import org.streamreasoning.rsp4j.api.secret.report.strategies.OnWindowClose; +import org.streamreasoning.rsp4j.api.secret.time.Time; +import org.streamreasoning.rsp4j.api.secret.time.TimeImpl; +import org.streamreasoning.rsp4j.api.stream.data.DataStream; +import org.streamreasoning.rsp4j.bigdata2021.utils.StreamGenerator; +import org.streamreasoning.rsp4j.operatorapi.ContinuousProgram; +import org.streamreasoning.rsp4j.operatorapi.TaskOperatorAPIImpl; +import org.streamreasoning.rsp4j.operatorapi.table.BindingStream; +import org.streamreasoning.rsp4j.reasoning.datalog.DatalogR2R; +import org.streamreasoning.rsp4j.reasoning.datalog.ReasonerTriple; +import org.streamreasoning.rsp4j.reasoning.datalog.Rule; +import org.streamreasoning.rsp4j.reasoning.datalog.TripleGenerator; +import org.streamreasoning.rsp4j.yasper.querying.PrefixMap; +import org.streamreasoning.rsp4j.yasper.querying.operators.Istream; +import org.streamreasoning.rsp4j.yasper.querying.operators.r2r.BGP; +import org.streamreasoning.rsp4j.yasper.querying.operators.r2r.Binding; +import org.streamreasoning.rsp4j.yasper.querying.operators.r2r.joins.HashJoinAlgorithm; + +import java.util.List; + +/*** + * In this example, we will show how to create custom R2R operators, by combining two existing R2R operators. + * We will pipeline the BGP pattern with a reasoning component that adds the parent type of the different Observations + * and Posts. + * + * The hierarchy of posts looks as follows: + * + * :Observation + * ^ + * | + * ------------------------------------------------------ + * | | | | + * :RFIDObservation :FacebookPost :ContactTracingPost :TestResultPost + * + * + * By taking this hierarchy into account, we can query for all :Observations at once, without the need to differentiate + * between different types of observations/posts. + * + * Used prefixes: + * PREFIX : + */ +public class CustomR2RExample { + + public static void main(String[] args) throws InterruptedException { + // Setup the stream generator + StreamGenerator generator = new StreamGenerator(); + /* Creates the observation stream + * Contains both the RFIDObservations and FacebookPosts + * IRI: http://rsp4j.io/covid/observations + * + * Example RFID observation: + * :observationX a :RFIDObservation . + * :observationX :who :Alice . + * :observationX :where :RedRoom . + * :Alice :isIn :RedRoom . + * + * Example Facebook Post checkin: + * :postY a :FacebookPost . + * :postY :who :Bob . + * :postY :where :BlueRoom . + * :Bob :isIn :BlueRoom . + */ + DataStream observationStream = generator.getObservationStream(); + /* Creates the contact tracing stream + * Describes who was with whom + * IRI: http://rsp4j.io/covid/tracing + * + * Example contact post: + * :postZ a :ContactTracingPost . + * :postZ :who :Carl. + * :Carl :isWith :Bob . + */ + DataStream tracingStream = generator.getContactStream(); + /* Creates the covid results stream + * Contains the test results + * IRI: http://rsp4j.io/covid/testResults + * + * Example covid result: + * :postQ a :TestResultPost. + * :postQ :who :Carl . + * :postQ :hasResult :positive + */ + DataStream covidStream = generator.getCovidStream(); + + // define output stream + BindingStream outStream = new BindingStream("out"); + + // Engine properties + Report report = new ReportImpl(); + report.add(new OnWindowClose()); + Tick tick = Tick.TIME_DRIVEN; + ReportGrain report_grain = ReportGrain.SINGLE; + Time instance = new TimeImpl(0); + + RSPEngine engine = new RSPEngine(instance, tick, report_grain, report); + // Window (S2R) declaration + StreamToRelationOp w1 = engine.createCSparqlWindow( + RDFUtils.createIRI("w1"), + 600_000, + 60_000); + + StreamToRelationOp w2 = engine.createCSparqlWindow( + RDFUtils.createIRI("w2"), + 600_000, + 60_000); + StreamToRelationOp w3 = engine.createCSparqlWindow( + RDFUtils.createIRI("w3"), + 60*60_000, + 60_000); + + PrefixMap prefixes = new PrefixMap(); + prefixes.addPrefix("","http://rsp4j.io/covid/"); + // R2R + BGP bgp = BGP.createWithPrefixes(prefixes) + .addTP("?obs", "a", ":Observation") + .build(); + + + // Define a simple Reasoning component that + DatalogR2R datalogR2R = new DatalogR2R(); + TripleGenerator tripleGenerator = new TripleGenerator(prefixes); + // Each part of the hierarchy is defined as a Rule + // :RFIDObservation -> :Observation + ReasonerTriple body = tripleGenerator.createReasonerTriple("?x", "a", ":RFIDObservation"); + ReasonerTriple head = tripleGenerator.createReasonerTriple("?x", "a", ":Observation"); + datalogR2R.addRule(new Rule(head,body)); + + // :FacebookPost -> :Observation + ReasonerTriple body2 = tripleGenerator.createReasonerTriple("?x", "a", ":FacebookPost"); + ReasonerTriple head2 = tripleGenerator.createReasonerTriple("?x", "a", ":Observation"); + datalogR2R.addRule(new Rule(head2,body2)); + + // :TestResultPost -> :Observation + ReasonerTriple body3 = tripleGenerator.createReasonerTriple("?x", "a", ":TestResultPost"); + ReasonerTriple head3 = tripleGenerator.createReasonerTriple("?x", "a", ":Observation"); + datalogR2R.addRule(new Rule(head3,body3)); + + // :ContactTracingPost -> :Observation + ReasonerTriple body4 = tripleGenerator.createReasonerTriple("?x", "a", ":ContactTracingPost"); + ReasonerTriple head4 = tripleGenerator.createReasonerTriple("?x", "a", ":Observation"); + datalogR2R.addRule(new Rule(head4,body4)); + + // Create a pipe of two r2r operators, datalog reasoner and BGP + R2RPipe r2r = new R2RPipe<>(datalogR2R,bgp); + + TaskOperatorAPIImpl t = + new TaskOperatorAPIImpl.TaskBuilder(prefixes) + .addS2R(":observations", w1, "w1") + .addS2R(":tracing", w2, "w2") + .addS2R(":testResults", w3, "w3") + .addR2R(List.of("w1", "w2","w3"), r2r) + .addR2S("out", new Istream()) + //.addProjection(List.of(new VarImpl("?o"))) + .build(); + ContinuousProgram cp = + new ContinuousProgram.ContinuousProgramBuilder() + .in(observationStream) + .in(tracingStream) + .in(covidStream) + .addTask(t) + .out(outStream) + .addJoinAlgorithm(new HashJoinAlgorithm()) + .build(); + + outStream.addConsumer((el, ts) -> System.out.println(el + " @ " + ts)); + generator.startStreaming(); + Thread.sleep(20_000); + generator.stopStreaming(); + } +} diff --git a/examples/src/main/java/org/streamreasoning/rsp4j/wspbook/locationscenario/solution/CustomR2RSolution.java b/examples/src/main/java/org/streamreasoning/rsp4j/wspbook/locationscenario/solution/CustomR2RSolution.java new file mode 100644 index 00000000..e54c932e --- /dev/null +++ b/examples/src/main/java/org/streamreasoning/rsp4j/wspbook/locationscenario/solution/CustomR2RSolution.java @@ -0,0 +1,185 @@ +package org.streamreasoning.rsp4j.wspbook.locationscenario.solution; + +import org.apache.commons.rdf.api.Graph; +import org.streamreasoning.rsp4j.abstraction.RSPEngine; +import org.streamreasoning.rsp4j.api.RDFUtils; +import org.streamreasoning.rsp4j.api.enums.ReportGrain; +import org.streamreasoning.rsp4j.api.enums.Tick; +import org.streamreasoning.rsp4j.api.operators.r2r.utils.R2RPipe; +import org.streamreasoning.rsp4j.api.operators.s2r.execution.assigner.StreamToRelationOp; +import org.streamreasoning.rsp4j.api.secret.report.Report; +import org.streamreasoning.rsp4j.api.secret.report.ReportImpl; +import org.streamreasoning.rsp4j.api.secret.report.strategies.OnWindowClose; +import org.streamreasoning.rsp4j.api.secret.time.Time; +import org.streamreasoning.rsp4j.api.secret.time.TimeImpl; +import org.streamreasoning.rsp4j.api.stream.data.DataStream; +import org.streamreasoning.rsp4j.bigdata2021.utils.StreamGenerator; +import org.streamreasoning.rsp4j.operatorapi.ContinuousProgram; +import org.streamreasoning.rsp4j.operatorapi.TaskOperatorAPIImpl; +import org.streamreasoning.rsp4j.operatorapi.table.BindingStream; +import org.streamreasoning.rsp4j.reasoning.datalog.DatalogR2R; +import org.streamreasoning.rsp4j.reasoning.datalog.ReasonerTriple; +import org.streamreasoning.rsp4j.reasoning.datalog.Rule; +import org.streamreasoning.rsp4j.reasoning.datalog.TripleGenerator; +import org.streamreasoning.rsp4j.yasper.querying.PrefixMap; +import org.streamreasoning.rsp4j.yasper.querying.operators.Rstream; +import org.streamreasoning.rsp4j.yasper.querying.operators.r2r.BGP; +import org.streamreasoning.rsp4j.yasper.querying.operators.r2r.Binding; +import org.streamreasoning.rsp4j.yasper.querying.operators.r2r.joins.HashJoinAlgorithm; + +import java.util.List; + +/*** + * In this exercise, we will show how to create custom R2R operators, by combining two existing R2R operators. + * We will pipeline the BGP pattern with a reasoning component that infers the location of individuals reported throug + * the contact tracing stream. + * + * When we know that (:bob :isWith :elena) and (:elena :isIn :redRoom) we can infer that :bob is also in the :redRoom + * Graph representation: + * :bob -isWith-> :elena -isIn-> :redRoom + * \ ^ + * \----inferred :isIn---------/ + * + * By inferring this relation, we can simplify the query definitions when we want to fetch who is in each room. + * This comes in handy when want to query who has been infected because they were in a room with a person who got + * a positive test result. + * + * This means that you will need to check: + * 1) All infected individuals + * 2) The location of these infected individuals + * 3) All individuals that were in the same room as the infected ones + * + * TIP: + * To easily fetch the location of all individuals, you can define a rule that infers the locations of the individuals + * reported through the contact tracing stream. Note that the rule needs to be execute over both the contact tracing + * and observation stream. + * + * Used prefixes: + * PREFIX : + */ +public class CustomR2RSolution { + + public static void main(String[] args) throws InterruptedException { + // Setup the stream generator + StreamGenerator generator = new StreamGenerator(); + /* Creates the observation stream + * Contains both the RFIDObservations and FacebookPosts + * IRI: http://rsp4j.io/covid/observations + * + * Example RFID observation: + * :observationX a :RFIDObservation . + * :observationX :who :Alice . + * :observationX :where :RedRoom . + * :Alice :isIn :RedRoom . + * + * Example Facebook Post checkin: + * :postY a :FacebookPost . + * :postY :who :Bob . + * :postY :where :BlueRoom . + * :Bob :isIn :BlueRoom . + */ + DataStream observationStream = generator.getObservationStream(); + /* Creates the contact tracing stream + * Describes who was with whom + * IRI: http://rsp4j.io/covid/tracing + * + * Example contact post: + * :postZ a :ContactTracingPost . + * :postZ :who :Carl. + * :Carl :isWith :Bob . + */ + DataStream tracingStream = generator.getContactStream(); + /* Creates the covid results stream + * Contains the test results + * IRI: http://rsp4j.io/covid/testResults + * + * Example covid result: + * :postQ a :TestResultPost. + * :postQ :who :Carl . + * :postQ :hasResult :positive + */ + + DataStream covidStream = generator.getCovidStream(); + + // define output stream + BindingStream outStream = new BindingStream("out"); + + // Engine properties + + Report report = new ReportImpl(); + report.add(new OnWindowClose()); + Tick tick = Tick.TIME_DRIVEN; + ReportGrain report_grain = ReportGrain.SINGLE; + Time instance = new TimeImpl(0); + + RSPEngine engine = new RSPEngine(instance, tick, report_grain, report); + // Window (S2R) declaration + StreamToRelationOp w1 = engine.createCSparqlWindow( + RDFUtils.createIRI("w1"), + 600_000, + 60_000); + + StreamToRelationOp w2 = engine.createCSparqlWindow( + RDFUtils.createIRI("w2"), + 600_000, + 60_000); + StreamToRelationOp w3 = engine.createCSparqlWindow( + RDFUtils.createIRI("w3"), + 60*60_000, + 60_000); + + PrefixMap prefixes = new PrefixMap(); + prefixes.addPrefix("","http://rsp4j.io/covid/"); + // R2R + BGP bgp = BGP.createWithPrefixes(prefixes) + .addTP("?s", ":isIn", "?o") + .addTP("?s2",":isIn", "?o") + .build(); + + BGP bgp2 = BGP.createWithPrefixes(prefixes) + .addTP("?testResult", "a", ":TestResultPost") + .addTP("?testResult",":who", "?s") + .addTP("?testResult",":hasResult",":positive") + .build(); + + // Define a rule that infers the location of individuals reported through the contact tracing stream + DatalogR2R datalogR2R = new DatalogR2R(); + TripleGenerator tripleGenerator = new TripleGenerator(prefixes); + // ?x :isIn ?room <- ?x :isWith ?y. ?y :isIn ?room + ReasonerTriple head = tripleGenerator.createReasonerTriple("?x", ":isIn", "?room"); + ReasonerTriple body1 = tripleGenerator.createReasonerTriple("?x", ":isWith", "?y"); + ReasonerTriple body2 = tripleGenerator.createReasonerTriple("?y", ":isIn", "?room"); + + Rule r = new Rule(head,body1,body2); + + datalogR2R.addRule(r); + + // Create a pipe of two r2r operators, datalog reasoner and BGP + R2RPipe r2r = new R2RPipe<>(datalogR2R,bgp); + + TaskOperatorAPIImpl t = + new TaskOperatorAPIImpl.TaskBuilder(prefixes) + .addS2R(":observations", w1, "w1") + .addS2R(":tracing", w2, "w2") + .addS2R(":testResults", w3, "w3") + .addR2R(List.of("w1", "w2"), r2r) + .addR2R("w3", bgp2) + .addR2S("out", new Rstream()) + //.addProjection(List.of(new VarImpl("?o"))) + .build(); + ContinuousProgram cp = + new ContinuousProgram.ContinuousProgramBuilder() + .in(observationStream) + .in(tracingStream) + .in(covidStream) + .addTask(t) + .out(outStream) + .addJoinAlgorithm(new HashJoinAlgorithm()) + .build(); + + outStream.addConsumer((el, ts) -> System.out.println(el + " @ " + ts)); + generator.startStreaming(); + Thread.sleep(20_000); + generator.stopStreaming(); + } +} diff --git a/examples/src/main/java/org/streamreasoning/rsp4j/wspbook/wildstreams/ProcessingDBPediaLive.java b/examples/src/main/java/org/streamreasoning/rsp4j/wspbook/wildstreams/ProcessingDBPediaLive.java index cc59f463..a0bfa2c6 100644 --- a/examples/src/main/java/org/streamreasoning/rsp4j/wspbook/wildstreams/ProcessingDBPediaLive.java +++ b/examples/src/main/java/org/streamreasoning/rsp4j/wspbook/wildstreams/ProcessingDBPediaLive.java @@ -1,57 +1,94 @@ package org.streamreasoning.rsp4j.wspbook.wildstreams; -import org.apache.commons.rdf.api.Graph; +import org.apache.commons.configuration.ConfigurationException; +import org.apache.jena.graph.Graph; +import org.apache.jena.rdf.model.Model; +import org.apache.jena.rdf.model.ModelFactory; +import org.streamreasoning.rsp4j.api.engine.config.EngineConfiguration; import org.streamreasoning.rsp4j.api.querying.ContinuousQuery; +import org.streamreasoning.rsp4j.api.sds.SDSConfiguration; +import org.streamreasoning.rsp4j.api.stream.data.DataStream; +import org.streamreasoning.rsp4j.csparql2.engine.CSPARQLEngine; +import org.streamreasoning.rsp4j.csparql2.engine.JenaContinuousQueryExecution; +import org.streamreasoning.rsp4j.csparql2.sysout.ResponseFormatterFactory; +import org.streamreasoning.rsp4j.io.DataStreamImpl; import org.streamreasoning.rsp4j.io.sources.FileSource; import org.streamreasoning.rsp4j.io.utils.RDFBase; import org.streamreasoning.rsp4j.io.utils.parsing.JenaRDFCommonsParsingStrategy; import org.streamreasoning.rsp4j.operatorapi.ContinuousProgram; import org.streamreasoning.rsp4j.operatorapi.QueryTaskOperatorAPIImpl; import org.streamreasoning.rsp4j.operatorapi.TaskOperatorAPIImpl; +import org.streamreasoning.rsp4j.wspbook.wildstreams.dbplutils.DBPLDataFetcher; import org.streamreasoning.rsp4j.yasper.querying.operators.r2r.Binding; import org.streamreasoning.rsp4j.yasper.querying.operators.r2r.joins.HashJoinAlgorithm; import org.streamreasoning.rsp4j.yasper.querying.syntax.TPQueryFactory; +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.net.URL; + /*** * This example shows how to analyse data from DBPedia Live. - * To simplify analysis, a snippit of the live stream is stored in the file datasets/dbpedialive_snippit.nt. + * It will download the reinserted triples from DBPedia Live and analyse them in a streaming fashion. */ public class ProcessingDBPediaLive { - public static void main(String[] args){ - JenaRDFCommonsParsingStrategy parsingStrategy = new JenaRDFCommonsParsingStrategy(RDFBase.NT); - // create file source to read the newly created file - String filePath = ProcessingDBPediaLive.class.getResource("/datasets/dbpedialive_snippit.nt").getPath(); - FileSource fileSource = new FileSource(filePath, 10, parsingStrategy); - fileSource.stream(); - - // Define the query that checks the different types of data in the DBPedia Live Stream - ContinuousQuery query = - TPQueryFactory.parse( - String.format("PREFIX : " + public static void main(String[] args) throws ConfigurationException { + // Creates a DataStream that fetches the updates from DBPedia Live + DBPLDataFetcher dataFetcher = new DBPLDataFetcher(DBPLDataFetcher.BDPLStreamType.REINSTERT, 3000,2019); + DataStreamImpl dbplStream = dataFetcher.getStream(); + // Defines the RSP-QL query that counts all the different types of updates in the stream. + String rspqlQuery = + "PREFIX : " + " PREFIX rsp4j: " + "REGISTER RSTREAM AS " - + "SELECT ?type " + + "SELECT ?type (COUNT(?s) AS ?count) " + " " - + " FROM NAMED WINDOW rsp4j:window ON <%s> [RANGE PT1S STEP PT1S] " + + " FROM NAMED WINDOW rsp4j:window ON [RANGE PT1S STEP PT1S] " + "WHERE {" + " WINDOW rsp4j:window { ?s a ?type .}" - + "} ",filePath)); - - // Create the RSP4J Task and Continuous Program - TaskOperatorAPIImpl t = - new QueryTaskOperatorAPIImpl.QueryTaskBuilder().fromQuery(query).build(); - - ContinuousProgram cp = - new ContinuousProgram.ContinuousProgramBuilder() - .in(fileSource) - .addTask(t) - .out(query.getOutputStream()) - .addJoinAlgorithm(new HashJoinAlgorithm()) - .build(); - // Add the Consumer to the stream - query.getOutputStream().addConsumer((el, ts) -> System.out.println(el + " @ " + ts)); + + "} GROUP BY ?type"; + // Configures the C-SPARQL 2.0 engine + URL resource = ProcessingGDELT.class.getResource("/csparql.properties"); + SDSConfiguration config = new SDSConfiguration(resource.getPath()); + EngineConfiguration ec = EngineConfiguration.loadConfig("/csparql.properties"); + CSPARQLEngine sr = new CSPARQLEngine(0, ec); + + DataStream registered = sr.register(dbplStream); + dbplStream.addConsumer((e,t)->registered.put(e,t)); + + JenaContinuousQueryExecution cqe = (JenaContinuousQueryExecution)sr.register(rspqlQuery, config); + + ContinuousQuery query = cqe.query(); + + System.out.println(query.toString()); + + System.out.println("<<------>>"); + + if (query.isConstructType()) { + cqe.addQueryFormatter(ResponseFormatterFactory.getConstructResponseSysOutFormatter("JSON-LD", true)); + } else if (query.isSelectType()) { + cqe.addQueryFormatter(ResponseFormatterFactory.getSelectResponseSysOutFormatter("TABLE", true)); //or "CSV" or "JSON" or "JSON-LD" + } + + DataStream outputStream = cqe.outstream(); + outputStream.addConsumer((o, l) -> System.out.println(o)); + dataFetcher.start(); + + } + public static org.apache.jena.graph.Graph parse(String parseString) { + Model dataModel = ModelFactory.createDefaultModel(); + try { + InputStream targetStream = new ByteArrayInputStream(parseString.getBytes()); + dataModel.read(targetStream, null, RDFBase.NT.name()); + return dataModel.getGraph(); + + } catch (Exception e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + return dataModel.getGraph(); } } diff --git a/examples/src/main/java/org/streamreasoning/rsp4j/wspbook/wildstreams/ProcessingGDELT.java b/examples/src/main/java/org/streamreasoning/rsp4j/wspbook/wildstreams/ProcessingGDELT.java index 5ad1209f..8fd526b4 100644 --- a/examples/src/main/java/org/streamreasoning/rsp4j/wspbook/wildstreams/ProcessingGDELT.java +++ b/examples/src/main/java/org/streamreasoning/rsp4j/wspbook/wildstreams/ProcessingGDELT.java @@ -31,10 +31,13 @@ import java.nio.file.Files; import java.nio.file.Path; - +/*** + * This example shows how to process the live data from the GDELT project. + */ public class ProcessingGDELT { private static String prefix = "http://gdelt.org/gkg/"; private static String semicolon = ";"; + // Defines some additional functions to enrich the raw CSV data private static Object[] functions = new Object[]{new DBPediaPeopleLookup(), new DBPediaPeopleLookup("http://xmlns.com/foaf/0.1/Person,Wikidata:Q5,Wikidata:Q24229398,Wikidata:Q215627,DUL:NaturalPerson,DUL:Agent,Schema:Person,DBpedia:Person,DBpedia:Agent".split(",")), new URISplitFunction(semicolon), @@ -44,19 +47,18 @@ public class ProcessingGDELT { public static void main(String[] args) throws IOException, URISyntaxException, ConfigurationException { + // First a stream is created that fetches the GDELT stream DataStreamImpl gdeltStream = new DataStreamImpl<>("GDELTStream"); GDELTDataFetcher fetcher = new GDELTDataFetcher("export", gdeltStream, 1000); - + // A mapping is define the map the raw CSV files to RDF String rmlMapping = Files.readString(Path.of(ProcessingGDELT.class.getResource("/mapping/gdelt_export.ttl").toURI())); - - CARMLCSVMapper mapper = new CARMLCSVMapper(rmlMapping,"GDELTStream", functions); - + // The raw data is converted to RDF in string format and then to internal RDF graph objects DataStreamImpl mappedStream = gdeltStream.map(mapper::apply, "http://example.org/test/mapped"); JenaRDFCommonsParsingStrategy jenaParser = new JenaRDFCommonsParsingStrategy(RDFBase.NT); DataStreamImpl rdfStream = mappedStream.map(e->parse(e), "http://example.org/test/rdf"); - - String rspqlQuery = + // Defines the RSPQL query that processes the GDELT data + String rspqlQuery = "PREFIX gdelt: \n" + "PREFIX xsd: \n" + "PREFIX : \n" @@ -68,6 +70,7 @@ public static void main(String[] args) throws IOException, URISyntaxException, C + " ?actor1 gdelt:actorName \"CHINA\"^^xsd:string \n" + " }\n" + "}"; + // Configures the CSPARQL2.0 engine URL resource = ProcessingGDELT.class.getResource("/csparql.properties"); SDSConfiguration config = new SDSConfiguration(resource.getPath()); EngineConfiguration ec = EngineConfiguration.loadConfig("/csparql.properties"); diff --git a/examples/src/main/java/org/streamreasoning/rsp4j/wspbook/wildstreams/ProcessingWikimedia.java b/examples/src/main/java/org/streamreasoning/rsp4j/wspbook/wildstreams/ProcessingWikimedia.java index 6340e42f..52ff17ca 100644 --- a/examples/src/main/java/org/streamreasoning/rsp4j/wspbook/wildstreams/ProcessingWikimedia.java +++ b/examples/src/main/java/org/streamreasoning/rsp4j/wspbook/wildstreams/ProcessingWikimedia.java @@ -28,13 +28,17 @@ import java.io.InputStream; import java.net.URL; +/*** + * This example will show you how to process live data from Wikimedia streams. + */ public class ProcessingWikimedia { public static void main(String[] args) throws IOException, ConfigurationException { - + // First a new stream is opened that fetches the data from wikimedia SSESource sseSource = new SSESource("https://stream.wikimedia.org/v2/stream/recentchange",1000); sseSource.addRequestOptions("Accept","application/json"); sseSource.stream(); + // Next the RML mapping is defined that converts the raw JSON data to RDF triples String rmlMapping = "" + "@prefix : .\n" @@ -114,52 +118,27 @@ public static void main(String[] args) throws IOException, ConfigurationExceptio // Map the Stream with CSV Strings to NT Strings using the RML Mapper // We need to to define the URL of the result stream DataStreamImpl mappedStream = sseSource.map(mapper::apply, "http://example.org/test/mapped"); - - // JenaRDFCommonsParsingStrategy jenaParser = new JenaRDFCommonsParsingStrategy(RDFBase.NT); -// DataStreamImpl rdfStream = mappedStream.map(jenaParser::parse, "http://example.org/test/rdf"); -// rdfStream.addConsumer((e, t) -> System.out.println(e)); -// -// // Define the query that checks the different types of data in the DBPedia Live Stream -// ContinuousQuery query = -// TPQueryFactory.parse( -// "PREFIX : " -// + " PREFIX rsp4j: " -// + "REGISTER RSTREAM AS " -// + "SELECT ?user " -// + " " -// + " FROM NAMED WINDOW rsp4j:window ON [RANGE PT1S STEP PT1S] " -// -// + "WHERE {" -// + " WINDOW rsp4j:window { ?s ?user .}" -// -// + "} "); -// -// // Create the RSP4J Task and Continuous Program -// TaskOperatorAPIImpl t = -// new QueryTaskOperatorAPIImpl.QueryTaskBuilder().fromQuery(query).build(); -// -// ContinuousProgram cp = -// new ContinuousProgram.ContinuousProgramBuilder() -// .in(rdfStream) -// .addTask(t) -// .out(query.getOutputStream()) -// .addJoinAlgorithm(new HashJoinAlgorithm()) -// .build(); -// // Add the Consumer to the stream -// query.getOutputStream().addConsumer((el, ts) -> System.out.println(el + " @ " + ts)); + // We map the RDF stream to a stream of internal graph objects DataStreamImpl rdfStream = mappedStream.map(e->parse(e), "http://example.org/test/rdf"); - mappedStream.addConsumer((e, t) -> System.out.println(e)); + + mappedStream.addConsumer((e, t) -> System.out.println(e)); + // we define the RSP-QL query to process the data stream String rspqlQuery = "PREFIX gdelt: \n" + "PREFIX xsd: \n" + "PREFIX : \n" + "REGISTER RSTREAM AS\n" - + "SELECT ?user (COUNT(?event) AS ?count) \n" - + "FROM NAMED WINDOW ON [RANGE PT10S STEP PT1S]\t\n" + + + "Select ?wiki ?count " + + "FROM NAMED WINDOW ON [RANGE PT60S STEP PT10S]\t\n" + + " WHERE { {" + + "SELECT ?wiki (COUNT(?event) AS ?count) \n" + "WHERE{\n" - + " WINDOW { ?event ?user" + + " WINDOW { ?event ?wiki" + " } \n" - + "} GROUP BY ?user "; + + "} GROUP BY ?wiki} " + + " FILTER(?count >2) } Order By DESC(?count) "; + // We define the CSPARQL 2.0 engine URL resource = ProcessingGDELT.class.getResource("/csparql.properties"); SDSConfiguration config = new SDSConfiguration(resource.getPath()); EngineConfiguration ec = EngineConfiguration.loadConfig("/csparql.properties"); @@ -197,6 +176,6 @@ public static org.apache.jena.graph.Graph parse(String parseString) { // TODO Auto-generated catch block e.printStackTrace(); } - return null; + return dataModel.getGraph(); } } diff --git a/examples/src/main/java/org/streamreasoning/rsp4j/wspbook/wildstreams/dbplutils/DBPLDataFetcher.java b/examples/src/main/java/org/streamreasoning/rsp4j/wspbook/wildstreams/dbplutils/DBPLDataFetcher.java new file mode 100644 index 00000000..16c78aa8 --- /dev/null +++ b/examples/src/main/java/org/streamreasoning/rsp4j/wspbook/wildstreams/dbplutils/DBPLDataFetcher.java @@ -0,0 +1,158 @@ +package org.streamreasoning.rsp4j.wspbook.wildstreams.dbplutils; + +import org.apache.jena.graph.Graph; +import org.apache.jena.rdf.model.Model; +import org.apache.jena.rdf.model.ModelFactory; +import org.streamreasoning.rsp4j.api.stream.data.DataStream; +import org.streamreasoning.rsp4j.io.DataStreamImpl; + +import java.io.*; +import java.net.HttpURLConnection; +import java.net.URL; +import java.net.URLConnection; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.zip.GZIPInputStream; + +public class DBPLDataFetcher { + + private final BDPLStreamType streamType; + private final long pollingDelay; + private static final String changesURL = + "https://downloads.dbpedia.org/live/changesets/"; + private final int startingYear; + private final DataStreamImpl stream; + + public enum BDPLStreamType { + ADDED(".added.nt.gz"), + REINSTERT(".reinserted.nt.gz"), + REMOVE(".removed.nt.gz"); + private String streamType; + + BDPLStreamType(String t) { + this.streamType = t; + } + + public String getType() { + return streamType; + } + } + + public DBPLDataFetcher(BDPLStreamType streamType, long pollingDelay, int startingYear) { + this.streamType = streamType; + this.pollingDelay = pollingDelay; + this.startingYear = startingYear; + this.stream = new DataStreamImpl<>("https://live.dbpedia.org/live/sync/changes"); + + } + public DBPLDataFetcher(BDPLStreamType streamType, long pollingDelay) { + this(streamType,pollingDelay,2013); + } + public DataStreamImpl getStream(){ + return stream; + } + public void start(){ + Runnable runnable = () -> { + try { + this.replay(stream); + } catch (IOException e) { + e.printStackTrace(); + } + }; + Thread thread = new Thread(runnable); + thread.start(); + } + + private void replay(DataStreamImpl stream) throws IOException { + for(int year = startingYear; year<=2021; year++){ + List months = + fetchDataFromURL(changesURL + year).stream() + .map(e->extractSubPages(e)) + .flatMap(List::stream) + .collect(Collectors.toList()); + for(String month : months){ + List days = fetchDataFromURL(changesURL + year+"/"+month).stream() + .map(e->extractSubPages(e)) + .flatMap(List::stream) + .collect(Collectors.toList()); + for(String day : days){ + List hours = fetchDataFromURL(changesURL + year+"/"+month+day).stream() + .map(e->extractSubPages(e)) + .flatMap(List::stream) + .collect(Collectors.toList()); + for(String hour: hours){ + List datasets = fetchDataFromURL(changesURL + year+"/"+month+day+hour).stream() + .map(e->extractSubPages(e)) + .flatMap(List::stream) + .filter(e->e.endsWith(streamType.getType())) + .collect(Collectors.toList()); + for(String dataset : datasets){ + Model download = download(changesURL + year+"/"+month+day+hour+dataset); + stream.put(download.getGraph(),System.currentTimeMillis()); + try { + Thread.sleep(pollingDelay); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + } + } + } + } + private List fetchDataFromURL(String fetchURL) throws IOException { + List lines = new ArrayList<>(); + URL url = new URL(fetchURL); + HttpURLConnection connection = (HttpURLConnection) url.openConnection(); + connection.setRequestMethod("GET"); + BufferedReader br; + if (200 <= connection.getResponseCode() && connection.getResponseCode() <= 299) { + br = new BufferedReader(new InputStreamReader(connection.getInputStream())); + } else { + br = new BufferedReader(new InputStreamReader(connection.getErrorStream())); + } + String line; + while ((line = br.readLine()) != null) { + lines.add(line); + } + return lines; + } + + private Model download(String fileURL) { + try { + + URL file = new URL(fileURL); + + System.out.println("[" + System.currentTimeMillis() + "]" + fileURL); + + URLConnection urlConnection = file.openConnection(); + InputStream inputStream = urlConnection.getInputStream(); + BufferedInputStream tosave = new BufferedInputStream(inputStream); + // save(finalLine, new GZIPInputStream(tosave)); + + Model nt = ModelFactory.createDefaultModel().read(new GZIPInputStream(tosave), "", "NT"); + return nt; + } catch (FileNotFoundException e) { + } catch (Exception e) { + e.printStackTrace(); + } + return ModelFactory.createDefaultModel(); + } + private List extractSubPages(String pageData){ + List pages = new ArrayList<>(); + Pattern pattern = Pattern.compile("\">(.+?).*"); + Matcher matcher = pattern.matcher(pageData); + while (matcher.find()) + { + if (!matcher.group(1).equals("../")) { + pages.add(matcher.group(1)); + } + } + return pages; + } + +}