-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathiog_test.py
executable file
·163 lines (145 loc) · 4.95 KB
/
iog_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
#!/usr/bin/python3
import requests
import json
import re
from requests.models import HTTPError
url = "https://api.octopus.energy/v1/graphql/"
APIKeyPattern = re.compile('sk_live_[A-Za-z0-9]{24}?$')
accountNumberPattern = re.compile('A-[0-9A-F]{8}?$')
def validate_input(prompt, pattern):
while True:
user_input = input(prompt)
if pattern.match(user_input):
return user_input
else:
print("Invalid input. Please try again.")
accountNumber = validate_input("Enter account number: ", accountNumberPattern)
apikey = validate_input("Enter API key: ", APIKeyPattern)
def refreshToken(apiKey, accountNumber):
try:
query = """
mutation krakenTokenAuthentication($api: String!) {
obtainKrakenToken(input: {APIKey: $api}) {
token
}
}
"""
variables = {'api': apiKey}
r = requests.post(url, json={'query': query, 'variables': variables})
except HTTPError as http_err:
print(f'HTTP Error {http_err}')
except Exception as err:
print(f'Another error occurred: {err}')
jsonResponse = json.loads(r.text)
if r.text.find("Authentication failed.") > -1:
raise SystemExit("Authentication failed - check API key")
else:
return jsonResponse['data']['obtainKrakenToken']['token']
def print_json(json_data):
for key, value in json_data.items():
print(f"{key}:")
if isinstance(value, list):
for item in value:
print(" -")
for sub_key, sub_value in item.items():
print(f" {sub_key}: {sub_value}")
else:
for sub_key, sub_value in value.items():
print(f" {sub_key}: {sub_value}")
print()
def getObject():
global authToken
try:
query = """
query getData($input: String!) {
plannedDispatches(accountNumber: $input) {
startDt
endDt
delta
}
}
"""
variables = {'input': accountNumber}
headers = {"Authorization": authToken}
r = requests.post(url, json={'query': query, 'variables': variables, 'operationName': 'getData'}, headers=headers)
formatted_json = json.loads(r.text)['data']
print_json(formatted_json)
return formatted_json
except HTTPError as http_err:
print(f'HTTP Error {http_err}')
except Exception as err:
print(f'Another error occurred: {err}')
pass
def getObjectCompleted():
global authToken
try:
query = """
query getData($input: String!) {
completedDispatches(accountNumber: $input) {
startDt
endDt
delta
}
}
"""
variables = {'input': accountNumber}
headers = {"Authorization": authToken}
r = requests.post(url, json={'query': query, 'variables': variables, 'operationName': 'getData'}, headers=headers)
formatted_json = json.loads(r.text)['data']
print_json(formatted_json)
return formatted_json
except HTTPError as http_err:
print(f'HTTP Error {http_err}')
except Exception as err:
print(f'Another error occurred: {err}')
pass
def getObjectAccount():
global authToken
try:
query = """
query getData($input: String!) {
registeredKrakenflexDevice(accountNumber: $input) {
krakenflexDeviceId
provider
vehicleMake
vehicleModel
vehicleBatterySizeInKwh
chargePointMake
chargePointModel
chargePointPowerInKw
status
suspended
hasToken
createdAt
}
}
"""
variables = {'input': accountNumber}
headers = {"Authorization": authToken}
r = requests.post(url, json={'query': query, 'variables': variables, 'operationName': 'getData'}, headers=headers)
if r.text.find('"registeredKrakenflexDevice":null') > -1:
raise SystemExit("Error fetching device data - check account number")
else:
formatted_json = json.loads(r.text)['data']
if formatted_json['registeredKrakenflexDevice']['krakenflexDeviceId'] is None:
raise SystemExit("No KrakenflexDevice found - account not on IOG tariff?")
print_json(formatted_json)
return formatted_json
except HTTPError as http_err:
print(f'HTTP Error {http_err}')
except Exception as err:
print(f'Another error occurred: {err}')
pass
def getTimesPlanned():
graphQL = getObject()
print()
def getTimesCompleted():
graphQL = getObjectCompleted()
print()
def getKraken():
graphQL = getObjectAccount()
print()
authToken = refreshToken(apikey, accountNumber)
getKraken()
getTimesPlanned()
getTimesCompleted()