forked from zkemail/relayer
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcoordinator.py
247 lines (200 loc) · 9.44 KB
/
coordinator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
import modal
import sys
import time
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import subprocess
import os
from dotenv import load_dotenv
import boto3
import requests
from urllib.parse import urlencode, quote
from enum import Enum
from common import stub, image
# Prover type
class Prover(Enum):
LOCAL = "local"
MODAL_CLOUD = "cloud"
MODAL_ENDPOINT = "modal_endpoint"
MODAL_STUB = "modal_stub_deprecated"
load_dotenv() # Load environment variables from .env file
PROVER_LOCATION = Prover(os.getenv('PROVER_LOCATION', Prover.MODAL_ENDPOINT.value))
bucket_name = "relayer-emails" # Replace with your S3 bucket name
object_key_template = "emls/wallet_[nonce].txt" # Replace with the desired object key (name) in S3
s3_url = "https://" + bucket_name + ".s3.amazonaws.com/" + object_key_template
# ----------------- LOCAL ENV -----------------
# Define the paths to the AWS config and credentials files
aws_config_path = os.path.expanduser('~/.aws/config')
aws_credentials_path = os.path.expanduser('~/.aws/credentials')
aws_credentials = {}
env_credentials = {}
# Check if both the config and credentials files exist
if os.path.isfile(aws_config_path) and os.path.isfile(aws_credentials_path):
print("AWS has been configured.")
# Read AWS credentials from the environment variables or configuration files
session = boto3.Session()
aws_credentials = {
'AWS_ACCESS_KEY_ID': session.get_credentials().access_key,
'AWS_SECRET_ACCESS_KEY': session.get_credentials().secret_key
}
else:
print("AWS has not been configured. Please run 'aws configure' to set up your AWS credentials.")
# ----------- ENV VARIABLES ------------
env = "./.env"
if os.path.isfile(env):
def get_variable_names_from_env_file(file_path=env):
variable_names = []
with open(file_path) as file:
for line in file:
# Ignore comments and empty lines
if line.startswith('#') or line.strip() == '':
continue
# Extract variable name (part before the '=' character)
variable_name = line.split('=')[0].strip()
variable_names.append(variable_name)
return variable_names
# Load additional environment variables from the .env file
additional_vars = get_variable_names_from_env_file()
env_credentials = {}
load_dotenv() # Load environment variables from .env file
# TODO: Make all env vars work more robustly. Make specific to just the relevant path variables
# Replace all env vars with LOCAL_{NAME} so that the paths align.
for var_name in additional_vars:
# If it doesnt start with local
if not var_name.startswith("LOCAL_"):
var_value = os.getenv(var_name)
if var_value is not None:
# TODO: Make this cleaner; remap the modal_ var to the local_ var
env_credentials[var_name] = var_value
var_name = var_name.replace("MODAL_", "")
env_credentials[var_name] = var_value
# Merge the credentials and aws_credentials dictionaries
merged_credentials = {**env_credentials, **aws_credentials}
# --------- AWS HELPER FUNCTIONS ------------
def send_to_modal(url, nonce):
# Path 2: Send to modal
# Construct the URL with query parameters
query_params = {
'aws_url': url, # Convert bytes to string for URL encoding
'nonce': nonce
}
encoded_params = urlencode(query_params, quote_via=quote)
webhook_url = f"https://ziztuww--aayush-pull-and-prove-email.modal.run?{encoded_params}"
# Send the POST request
response = requests.post(
webhook_url,
headers={'Content-Type': 'application/octet-stream'},
)
# Check the status code of the response
if response.status_code == 200:
# Read the response body
response_body = response.text
# Handle the successful response (e.g., print the response body)
print(f"Modal response: {response_body}")
elif response.status_code == 400:
# Handle the bad request error (e.g., print an error message)
print("Bad request to Modal")
else:
# Handle other status codes (e.g., print a generic error message)
print("An error occurred on Modal...")
def download_and_write_file(s3_url, nonce):
# Extract the bucket name and object key from the S3 URL
s3_url_parts = s3_url.replace("https://", "").replace("[nonce]", nonce).split("/")
bucket_name = s3_url_parts[0].split(".")[0]
object_key = "/".join(s3_url_parts[1:])
# Create an S3 client using boto3
s3_client = boto3.client("s3")
# Download the object from S3
response = s3_client.get_object(Bucket=bucket_name, Key=object_key)
file_contents = response['Body'].read().decode('utf-8')
print("File pulled from AWS: ", file_contents)
# Write the file_contents to the file named after the nonce
file_name = f"./relayer/received_eml/wallet_{nonce}.eml"
with open(file_name, 'w') as file:
file.write(file_contents)
print("Email file contents: ", file_contents)
def upload_file_to_s3(local_file_path, bucket_name, nonce):
object_key = object_key_template.replace("[nonce]", nonce)
# Create an S3 client using boto3
s3_client = boto3.client('s3')
# Upload the file to S3 with private access (default)
s3_client.upload_file(local_file_path, bucket_name, object_key, ExtraArgs={'ACL': 'private'})
# Print a success message
print(f"File '{local_file_path}' successfully uploaded to {bucket_name}/{object_key} as a private object.")
return s3_url.replace("[nonce]", nonce)
# # --------- MODAL CLOUD COORDINATOR (MOVED TO COMMON.PY) ------------
# image = modal.Image.from_registry(
# "aayushg0/zkemail-image-updated:modal",
# setup_dockerfile_commands=["RUN apt-get install -y python3 python-is-python3 python3-pip", "RUN cp -r /rapidsnark /root/rapidsnark",
# "RUN cd /relayer && git pull && cargo build --target x86_64-unknown-linux-gnu",
# "RUN cp -r /relayer /root/relayer",
# "RUN cp -r /zk-email-verify /root/zk-email-verify"], force_build=True).pip_install_from_requirements("requirements.txt")
# stub = modal.Stub(image=image)
@stub.function(cpu=4, image=image)
def prove_email(file_contents: str, nonce: str):
# Write the file_contents to the file named after the nonce
file_name = f"/root/relayer/received_eml/wallet_{nonce}.eml"
with open(file_name, 'w') as file:
file.write(file_contents)
print("file_contents: ", file_contents)
# Print the output of the 'proofgen' command
circom_script_path = "/root/relayer/src/circom_proofgen.sh"
print("proofgen modal python output; ")
subprocess.run([circom_script_path, nonce], check=False, text=True)
return len(file_contents)
# Create and deploy the secret containing AWS credentials and additional environment variables
stub['credentials_secret'] = modal.Secret.from_dict(merged_credentials)
@stub.function(cpu=14, memory=6000, secret=stub['credentials_secret'])
@modal.web_endpoint(method="POST")
def pull_and_prove_email(aws_url: str, nonce: str):
download_and_write_file(aws_url, nonce)
# Print the output of the 'proofgen' command
new_env = os.environ.copy()
# Define the command as a list of strings
command = ["sh", "/relayer/src/circom_proofgen.sh", nonce]
result = subprocess.run(command, text=True, env=new_env, cwd='/root/zk-email-verify')
if result.returncode == 0: return "Execution successful"
else: return "Execution failed"
# --------- LOCAL COORDINATOR ------------
def is_eml_file(file_name):
_, file_extension = os.path.splitext(file_name)
return file_extension.lower() == '.eml'
class DirectoryChangeHandler(FileSystemEventHandler):
def on_created(self, event):
if not event.is_directory:
print(f"New file {event.src_path} has been added.")
file_name = os.path.basename(event.src_path)
if (is_eml_file(file_name)):
# with open(event.src_path, 'r') as file:
# email_content = file.read()
nonce = file_name[file_name.find('wallet_') + 7:file_name.rfind('.')]
aws_url = upload_file_to_s3(event.src_path, bucket_name, nonce)
if PROVER_LOCATION == Prover.LOCAL:
subprocess.run(["./src/circom_proofgen.sh", nonce])
elif PROVER_LOCATION == Prover.MODAL_ENDPOINT or PROVER_LOCATION == Prover.MODAL_CLOUD:
send_to_modal(aws_url, nonce)
elif PROVER_LOCATION == Prover.MODAL_STUB:
# Note that this is deprecated and will be removed in the future
with stub.run():
prove_email.call(aws_url, nonce)
@stub.local_entrypoint()
def prove_on_email(path: str):
event_handler = DirectoryChangeHandler()
observer = Observer()
observer.schedule(event_handler, path, recursive=False)
observer.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()
if __name__ == "__main__":
load_dotenv() # Load environment variables from .env file
path = os.getenv("LOCAL_INCOMING_EML_PATH")
if path is None:
print("Error: LOCAL_INCOMING_EML_PATH is not set in the .env file")
sys.exit(1)
else:
print("Monitoring directory: ", path)
prove_on_email(path)