-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathtemp.py
60 lines (40 loc) · 1.31 KB
/
temp.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
import redis
import base64
from finalcheck import predict_output
import os
import pandas as pd
conn = redis.Redis(host='192.168.43.23', port=6379)
def get_id(xread_object):
try:
xid = xread_object[0][1][0][0]
except IndexError:
return []
return xid
def get_headers(stream_name, xread_object):
csv_flow = str(xread_object[0][1][0][1][b'csv']).strip('b').strip("'")
data = csv_flow.split("\\n")
headers = data[0].split(",")[0:-1]
return headers
def parse_stream(stream_name, xread_object):
csv_flow = str(xread_object[0][1][0][1][b'csv']).strip('b').strip("'")
data = csv_flow.split("\\n")
_data = data[1:][0].split(",")
return _data
def read_redis_stream(stream_name, data):
xread_object = conn.xread({stream_name: b"0-0"}, count=1)
xid = get_id(xread_object)
print(data)
if xid != []:
parsed_data = parse_stream(stream_name, xread_object)
print(parsed_data)
data.append(parsed_data)
conn.xdel(stream_name, xid)
read_redis_stream(stream_name, data)
return data
final_df = pd.DataFrame(data, columns=headers)
return final_df
def make_df(data, headers):
def flow_classify(filename):
print(predict_output(filename))
df_data_list = read_redis_stream("SAURAV", [])
flow_classify(df_data_list)