-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathmain.py
102 lines (79 loc) · 3.37 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
from flask import Flask, render_template, request, redirect, url_for
import threading
from producer import fetch_stock_price, toggle_producer, historical_data
from consumer import consume_stock_prices, toggle_consumer, kafka_data_queue
import logging
import plotly.express as px
# Set up logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
app = Flask(__name__)
# Initialize Kafka data list
kafka_data = []
kafka_data_event = threading.Event()
def fetch_data_from_queue():
global kafka_data
while True:
data = kafka_data_queue.get()
kafka_data.append(data)
kafka_data_event.set() # Signal that new data is available
# Start a thread to fetch data from the queue
data_fetch_thread = threading.Thread(target=fetch_data_from_queue)
data_fetch_thread.start()
@app.route('/', methods=['GET', 'POST'])
def index():
global kafka_data, historical_data
if request.method == 'POST':
if 'start' in request.form:
toggle_producer(True) # Start data production
toggle_consumer(True) # Start data consumption
elif 'stop' in request.form:
toggle_producer(False) # Stop data production
toggle_consumer(False) # Stop data consumption
# Create historical chart
if historical_data:
historical_data_chart = {
'Symbol': [item['symbol'] for item in historical_data],
'Price': [item['price'] for item in historical_data],
}
historical_fig = px.line(historical_data_chart, x='Symbol', y='Price', title='Historical Stock Prices')
historical_chart_div = historical_fig.to_html(full_html=False)
else:
historical_chart_div = "<p>No historical data available.</p>"
return render_template('index.html', historical_chart=historical_chart_div)
@app.route('/real-time')
def real_time():
global kafka_data, kafka_data_event
# Wait for the event to be set, but with a timeout to avoid blocking indefinitely
if not kafka_data_event.wait(timeout=30):
# No new data within the timeout
real_time_chart_div = "<p>No real-time data available.</p>"
# Clear the event flag for the next iteration
kafka_data_event.clear()
# Create a Plotly bar chart using the fetched real-time data
if kafka_data:
real_time_data = {
'Symbol': [item['symbol'] for item in kafka_data],
'Price': [item['price'] for item in kafka_data],
}
# Create a Plotly bar chart for real-time data
real_time_fig = px.bar(real_time_data, x='Symbol', y='Price', title='Real-Time Stock Prices')
# Convert the real-time chart to HTML
real_time_chart_div = real_time_fig.to_html(full_html=False)
else:
real_time_chart_div = "<p>No real-time data available.</p>"
return real_time_chart_div
if __name__ == "__main__":
symbols = ['AAPL', 'GOOGL', 'MSFT']
threads = []
for symbol in symbols:
try:
t = threading.Thread(target=fetch_stock_price, args=(symbol,), name=symbol)
t.start()
threads.append(t)
logger.info(f'Started thread for {symbol}')
except Exception as e:
logger.error(f'Error starting thread for {symbol}: {e}')
consumer_thread = threading.Thread(target=consume_stock_prices)
consumer_thread.start()
app.run(host='0.0.0.0', port=5000, debug=True)