Skip to content

Files

145 lines (122 loc) · 9.12 KB

README.md

File metadata and controls

145 lines (122 loc) · 9.12 KB

Train of Thought

Named after the brilliant lumosity game.

Stay away from negative people - They have a problem for every solution.

In this line of reasoning we have recognized a problem. Technology ought to solve problems. If it does without creating another, bigger one it has deserved the right to exist. Have you ever seen people running for the train, in uncomfortable shoes, working clothes or unsuitable bodies, to then arrive exhausted at the station to find out that the train arrives a bit later today. I see it all the time, and actually, I am one of them. Someone should take care of this, right? It is a daily little dose of human suffering faced by contemporary man on a global scale. Many small struggles add up to constitute a large one. Why can we not sync our own movement (changing geo-location) with public transportation movement (also changing geo-location)? Well, in fact we can, and this repo solves the first half of the puzzle. Requirement is the amendement of equal rights; we do so without complicated system interfacing in order to create an utmost generic system that can run/be implemented in any place at any time (it works for buses too).

It also is a simple programming experiment with as underlying purpose to show (to anyone who'd like to see) how we can transform and manipulate data (streams) into useful and thus valuable information. For this reason, and to allow it to coordinate any transportation monitoring system without complex interfacing, only a minimum (too little really) of input data is required for it to function: trains and stations; even the train routs are deduced! Its architecture is highly scalable, and with the right ops-people it could monitor the entire public transportation system of any country.

The unconventional and unprofessional (kafka topic, mvn module, and class) names are chosen to facilitate communicating the app's inner workings to my wife :)

Inner Workings

A picture says more than...

flow diagram

The basic idea is explained with the thick arrowed flow. Each train regularly sends out a message with its location (source). When we split such a message to create a similar one for each station on its route (exploder) we are ready to add travel time predictions (predictor). Without predictions for negative arrival times of passed stations, we end up with each train telling in how much time it will presumably arrive at what station (station-sink).

Question: how do we know the arrival time from a location to a station? Answer: by experience. Tile38 does the hard work and 'detects' each time any train comes within the range of any station. Knowing this, and with the help of some KSQL glue, we combine a train-event with an arrival-event in the arrival-processor service. Kafka-streams windowed joins do an excellent job here. The time difference between events is how long it actually took the train to arrive from its location to a station.

Last step is to make predictions. For demo purposes we simply take the average ariving time from a location to a station (averager). It should not be hard to come up with something more intelligent here. We do need to do a bit of work though. There will be way too many lat-lon combinations to make the average value a valuable one. Therefore, let us normalize the location with a GeoHash so that approximations becomes useable.

And voilà, we have a streaming processor topology!!

Custom Kafka Connector

Use these configuration ....

  • Go to localhost:3030 - choose connectors
  • Create two Tile38SinkConnectors with the following configuration properties:
name=Tile38SinkTrains
topics=I_AM_HERE
tasks.max=1
connector.class=guru.bonacci.kafka.connect.Tile38SinkConnector
tile38.url=tile38
tile38.port=9851
key.name=trains
object.type=POINT
optional.field.name=route
key.converter.schemas.enable=false
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.storage.StringConverter

and

name=Tile38SinkStations
topics=STATIONS
tasks.max=1
connector.class=guru.bonacci.kafka.connect.Tile38SinkConnector
tile38.url=tile38
tile38.port=9851
key.name=stations
object.type=POINT
key.converter.schemas.enable=false
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.storage.StringConverter
  • Sometimes it's necessary to restart 'source' after you've first setup the connectors. This in order to read the stations topic from the beginning.
  • docker-compose stop source ; docker-compose start source

Run Me

  • docker-compose exec ksql-cli ksql http://ksql-server:8088
  • SET 'auto.offset.reset'='earliest';
  • SET 'ksql.sink.partitions'='1';
  • PRINT i_have_arrived FROM BEGINNING;

CREATE STREAM i_have_arrived_src (id STRING, time STRING, fields STRUCT, nearby STRUCT< key STRING, id STRING, object STRING, meters INT>) WITH (KAFKA_TOPIC='I_HAVE_ARRIVED', VALUE_FORMAT='JSON');

CREATE STREAM i_am_home AS SELECT id, time as moment, mystringtoint(nearby->id) as station FROM i_have_arrived_src WHERE nearby IS NOT NULL PARTITION BY id;

CREATE STREAM on_route AS SELECT fields->route AS route, mystringtoint(nearby->id) AS station FROM i_have_arrived_src WHERE nearby IS NOT NULL PARTITION BY route;

  • If all services are running and your laptop can handle a bit more, launch two sink container to check out the load balancing
  • docker-compose stop sink
  • docker-compose up -d --scale sink=3
  • Now query the REST endpoint, first to find the machine name, then to query the data:
  • docker run --tty --rm -i --network ks debezium/tooling:1.0
  • http sink:8080/train-stations/meta-data
  • http --follow 96b52c724a8d:8080/train-stations/data/1
  • or, forget about the scaling and just run http sink:8080/train-stations/data/1

TODO

  • simplify setup by adding docker-compose instructions
  • make train simulator of wellington
  • tune partitions
  • compile services to executables and 'sync' with partitions
  • tests

Useful Resoures

Disclaimer

This repo is honoured to be forked. If you encounter any difficulties running it, and believe me you will (at this stage), let me know and I'll try to help you out.