This Case Study draws from the mobile network operators (MNO) domain. The goal of the project is to implement ongoing, live tracking of data usage by mobile subscribers. As soon as data used within a billing period exceeds max data usage defined in the data plan - the system should generate appropriate notification. In this case study, we expect that appropriate messages are published to data-records-aggregates Kafka topic. Later these messages could be used for different purposes, like to inform the subscriber they exceeded the data plan or to lower the data transfer speed for the data used outside of the plan.
The source data for the project are data record files (tracking data used by mobile phone users) and subscribers' agreements defined by MNO.
The project uses Flink to import the files into the system, and as a streaming platform for detecting the moment of exceeding the data plan.
An auxiliary Spring backend project helps in generating test data:
- 1️⃣ subscribers' agreements that define max data in the data plan, and
- 2️⃣ data record files, that would be coming from MNO in a real project.
The project uses Kafka as the messaging platform and Cassandra database to store ingested data records. Spring backend publishes generated agreements to agreements Kafka topic, and stores generated data record files in /mobilecs/incoming-data-records
There are two main processes in the Case Study.
- Implemented by 3️⃣ Incoming Data Records Importer Flink job
- Reads raw data record files received from MNO (from
folder) - Enriches them with an identifier that allows to uniquely identify any record globally in the system (regardless of identification provided by MNO)
- Publishes them to 4️⃣ incoming-data-records Kafka topic
- Implemented by 5️⃣ Incoming Data Records Ingester Flink job
- Reads imported data records from the incoming-data-records Kafka topic
- Reads subscribers' agreements from the agreements Kafka topic
- Matches incoming data records with agreements (using Flink RichCoFlatMapFunction), creating ingested data records, from now on referred to as simply data records
- Stores resulting data records in 6️⃣ Cassandra database, in mobilecs.data_record table
- Defines 7️⃣ a tumbling window that corresponds to the billing period (with a custom assigner, a custom trigger, and a process window function)
- Aggregates data records within the window and publishes appropriate message to 8️⃣ data-records-aggregates topic twice in the window lifetime:
- as soon as the data used in the period exceeds the data plan (DATA_PLAN_EXCEEDED)
- at the end of the billing period (BILLING_PERIOD_CLOSED)
The project can be executed both in docker-compose and kubernetes. Docker compose allows to quickly start single containers - useful for local development.
Build maven projects
mvn clean package
Start docker compose
docker-compose up -d --build
ℹ️ Due to the nature of docker-compose (no retry policy), projects with dependencies might not start from time to time (e.g. kafka when zookeeper does not start on time, backend when cassandra does not start on time). You may need to start them manually with another call to
docker-compose up
. The solution could be to switch to docker swarm (docker swarm supports retry policy) or use Kubernetes which auto-restarts pods if they fail to start - see below. -
docker ps -a
, the output should look like below: 6 containers in up status.CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES 2d06adc47024 mobilecs-backend "/docker-entrypoint.…" About a minute ago Up About a minute>8080/tcp mobilecs-backend 59b8931c7760 flink:1.11.2-scala_2.11-java11 "/docker-entrypoint.…" About a minute ago Up About a minute 6123/tcp, 8081/tcp mobilecs-taskmanager bfca0eeae3b8 confluentinc/cp-kafka:5.4.3 "/etc/confluent/dock…" About a minute ago Up About a minute>9092/tcp,>29092/tcp mobilecs-kafka f3a29089bce0 confluentinc/cp-zookeeper:5.4.3 "/etc/confluent/dock…" About a minute ago Up About a minute 2888/tcp,>2181/tcp, 3888/tcp mobilecs-zookeeper 311f4b8249d6 cassandra:3.11.8 "docker-entrypoint.s…" About a minute ago Up About a minute 7000-7001/tcp, 7199/tcp, 9160/tcp,>9042/tcp mobilecs-cassandra e57f76f6517b mobilecs-flink "/docker-entrypoint-…" About a minute ago Up About a minute 6123/tcp,>8081/tcp mobilecs-jobmanager
Open Flink Dashboard (http://localhost:8081/#/job/running) and make sure both jobs are running: Incoming Data Records Importer and Incoming Data Records Ingester
In case of restarting the project and running it again from scratch (e.g. after
docker-compose down
) make sure to also clean the volume in which incoming data files are generated (docker-compose down
does not clean volumes).- Run
docker exec -it mobilecs-backend bash
- Delete all the files in
Or delete the volume before restarting docker compose:
docker volume rm mobilecs_incoming-data-records
- Run
Start Kafka consumer for expected data records aggregated in Flink
docker exec mobilecs-kafka bash -c "kafka-console-consumer --topic data-records-aggregates --from-beginning --bootstrap-server mobilecs-kafka:29092 --property print.timestamp=true"
Generate test agreement and CDR Data Records
curl http://localhost:8080/api/agreements/generate curl http://localhost:8080/api/incoming-data-records/generate
Go back to the console in which Kafka consumer is running for data-records-aggregates topic. Check results, it should look like below. When it shows - it means that this Case Study project finished successfully.
CreateTime:1580511599999 { "agreementId" : "0b12c601-9287-3f5c-a78c-df508fe0f889", "year" : 2020, "month" : 1, "latestRecordedAt" : "Fri Jan 10 23:00:00 UTC 2020", "latestInternalRecordId" : "523d8cc7-e611-3508-90bc-fab260b2973e", "totalRecordedBytes" : 5697948758, "billingPeriodTimeZone" : "Europe/Warsaw", "maxBytesInBillingPeriod" : 5368709120, "type" : "DATA_PLAN_EXCEEDED" } CreateTime:1580511599999 { "agreementId" : "0b12c601-9287-3f5c-a78c-df508fe0f889", "year" : 2020, "month" : 1, "latestRecordedAt" : "Thu Jan 30 23:00:00 UTC 2020", "latestInternalRecordId" : "b58a3008-23a9-384c-b4e6-2cc837d1ef07", "totalRecordedBytes" : 13559994361, "billingPeriodTimeZone" : "Europe/Warsaw", "maxBytesInBillingPeriod" : 5368709120, "type" : "BILLING_PERIOD_CLOSED" }
Other commands for further insight
Kafka consumer for agreements generated in backend service
docker exec mobilecs-kafka bash -c "kafka-console-consumer --topic agreements --from-beginning --bootstrap-server mobilecs-kafka:29092 --property print.timestamp=true"
Kafka consumer for data records imported by Flink
docker exec mobilecs-kafka bash -c "kafka-console-consumer --topic incoming-data-records --from-beginning --bootstrap-server mobilecs-kafka:29092 --property print.timestamp=true"
Bash for backend service
docker exec -it mobilecs-backend bash
Useful docker and docker compose commands
docker volume ls docker-compose logs -f --tail=all docker-compose up -d kafka docker-compose down docker system prune -a --volumes
⚠️ docker system prune -a --volumes
cleans up docker COMPLETELY so use with caution! More on these cleaning commands, here.
Install and start minikube,, e.g.:
minikube start --profile mobilecs --driver=hyperkit --cpus=3 --memory=8g
Build maven projects and deploy docker images
ℹ️ The script configures docker daemon to use minikube, so that kubernetes can access mobilecs project images (also requires
imagePullPolicy: IfNotPresent
- see kubernetes.yaml. -
Apply all kubernetes objects
kubectl apply -f kubernetes.yaml
kubectl get pods
, the output should look like below: 6 pods in READY status 1/1.NAME READY STATUS RESTARTS AGE backend-deployment-6bb6846cbd-77fgv 1/1 Running 5 9m47s cassandra-statefulset-0 1/1 Running 0 9m47s jobmanager-deployment-69f64fcf6b-v45dk 1/1 Running 0 9m46s kafka-statefulset-0 1/1 Running 0 9m46s taskmanager-deployment-677b6f454-92p9f 1/1 Running 0 9m46s zookeeper-statefulset-0 1/1 Running 0 9m46s
Open Flink Dashboard (e.g. and make sure both jobs are running: Incoming Data Records Importer and Incoming Data Records Ingester
ℹ️ Run
minikube -p mobilecs service jobmanager-rest-service --url
to check URL of Flink Dashboard and replace in the address above. -
In case of restarting the project and running it again from scratch make sure to also clean all persistent volumes.
Start Kafka consumer for expected data records aggregated in Flink
kubectl exec `kubectl get pods -l app=kafka -o name` -- bash -c "kafka-console-consumer --topic data-records-aggregates --from-beginning --bootstrap-server kafka-service:29092 --property print.timestamp=true"
Generate test CDR Data Records and agreements
curl `minikube -p mobilecs service backend-service --url`/api/agreements/generate curl `minikube -p mobilecs service backend-service --url`/api/incoming-data-records/generate
Go back to the console in which Kafka consumer is running for data-records-aggregates topic. Check results, it should look like below. When it shows - it means that this Case Study project finished successfully.
CreateTime:1580511599999 { "agreementId" : "0b12c601-9287-3f5c-a78c-df508fe0f889", "year" : 2020, "month" : 1, "latestRecordedAt" : "Fri Jan 10 23:00:00 UTC 2020", "latestInternalRecordId" : "523d8cc7-e611-3508-90bc-fab260b2973e", "totalRecordedBytes" : 5697948758, "billingPeriodTimeZone" : "Europe/Warsaw", "maxBytesInBillingPeriod" : 5368709120, "type" : "DATA_PLAN_EXCEEDED" } CreateTime:1580511599999 { "agreementId" : "0b12c601-9287-3f5c-a78c-df508fe0f889", "year" : 2020, "month" : 1, "latestRecordedAt" : "Thu Jan 30 23:00:00 UTC 2020", "latestInternalRecordId" : "b58a3008-23a9-384c-b4e6-2cc837d1ef07", "totalRecordedBytes" : 13559994361, "billingPeriodTimeZone" : "Europe/Warsaw", "maxBytesInBillingPeriod" : 5368709120, "type" : "BILLING_PERIOD_CLOSED" }
Other commands for further insight
Kafka consumer for agreements generated in backend service
kubectl exec `kubectl get pods -l app=kafka -o name` -- bash -c "kafka-console-consumer --topic agreements --from-beginning --bootstrap-server kafka-service:29092 --property print.timestamp=true"
Kafka consumer for data records imported in Flink
kubectl exec `kubectl get pods -l app=kafka -o name` -- bash -c "kafka-console-consumer --topic incoming-data-records --from-beginning --bootstrap-server kafka-service:29092 --property print.timestamp=true"
Open bash for backend service
kubectl exec -it `kubectl get pods -l app=backend -o name` -- bash
Open bash for cassandra service
kubectl exec -it `kubectl get pods -l app=cassandra -o name` -- bash
Useful kubernetes commands
Read about stern here)
Install jq with
brew install jq
minikube dashboard minikube profile list minikube profile list -o json | jq . minikube stop -p mobilecs minikube delete -p mobilecs kubectl get pods --show-labels -w -o wide watch -n 0.1 kubectl get pods --show-labels -o wide stern backend
One of the results of running the complete case study as described above, is that incoming data records matched with agreements are stored in Cassandra database, in mobilecs.data_record table. Instead of running a complete case study, one can directly generate data records using a dedicated generator.
Start only cassandra database via docker compose
docker-compose up -d cassandra
Start backend project locally (e.g. in IntelliJ)
Directly generate
curl http://localhost:8080/api/data-records/generate
docker exec -it mobilecs-cassandra bash -c "cqlsh"
Useful cassandra queries
use mobilecs; select * from data_record; select * from data_record where agreement_id = 0b12c601-9287-3f5c-a78c-df508fe0f889 and year = 2020 and month = 01; select * from data_record where agreement_id = 0b12c601-9287-3f5c-a78c-df508fe0f889 and year = 2020 and month = 02; truncate data_record;
Getting and upserting data records via REST API
curl http://localhost:8080/api/data-records/0b12c601-9287-3f5c-a78c-df508fe0f889/2020/01 | jq . curl http://localhost:8080/api/data-records/0b12c601-9287-3f5c-a78c-df508fe0f889/2020/02 | jq . curl --header "Content-Type: application/json" --request PUT --data '{"recordedAt": "2020-01-01T23:00:00.000+00:00", "internalRecordId": "82b6a30a-b659-3a4d-85f7-2771b6f69f56", "recordedBytes": 123456}' http://localhost:8080/api/data-records/0b12c601-9287-3f5c-a78c-df508fe0f889 | jq .
Getting data records in React app (work in progress)
cd frontend yarn start
and navigate to http://localhost:3000
Getting and upserting data records via GraphQL (http://localhost:8080/graphql and http://localhost:8080/graphiql)
Finding data records, click here
{ findDataRecords(agreementId: "0b12c601-9287-3f5c-a78c-df508fe0f889", year: 2020, month: 1) { key { recordedAt internalRecordId } recordedBytes } }
Getting total data usage, click here
{ getTotalDataUsage(agreementId: "0b12c601-9287-3f5c-a78c-df508fe0f889", year: 2020, month: 1) }
Upserting data record mutation, click here
mutation upsertDataRecord($agreementId: String!, $upsertRequest: DataRecordUpsertRequest!) { upsertDataRecord(agreementId: $agreementId, upsertRequest: $upsertRequest) { key { agreementId year month recordedAt internalRecordId } recordedBytes } }
Query variables:
{ "agreementId": "0b12c601-9287-3f5c-a78c-df508fe0f889", "upsertRequest": { "recordedAt": "2020-01-01T23:00:00.000+00:00", "internalRecordId": "82b6a30a-b659-3a4d-85f7-2771b6f69f56", "recordedBytes": 123456 } }