From 89f9734247ca109b6bffa03509460b7215c7848b Mon Sep 17 00:00:00 2001 From: Kevin Kaspari Date: Wed, 13 Nov 2024 14:03:50 -0800 Subject: [PATCH 01/11] Initial dump of pickle --- tools/pickles_to_json.py | 17 +++++++++++++++++ tron/core/job_scheduler.py | 2 +- 2 files changed, 18 insertions(+), 1 deletion(-) create mode 100644 tools/pickles_to_json.py diff --git a/tools/pickles_to_json.py b/tools/pickles_to_json.py new file mode 100644 index 000000000..01fc331e4 --- /dev/null +++ b/tools/pickles_to_json.py @@ -0,0 +1,17 @@ +import pickle + +import boto3 + +dev_session = boto3.Session(profile_name="dev") + +source_table = dev_session.resource("dynamodb", region_name="us-west-1").Table("infrastage-tron-state") + +primary_key_value = "job_run_state compute-infra-test-service.test_load_foo1.5696" + +index = 0 +response = source_table.get_item(Key={"key": primary_key_value, "index": index}) + +if "Item" in response: + item = response["Item"] + data = pickle.loads(item["val"].value) + print(data) diff --git a/tron/core/job_scheduler.py b/tron/core/job_scheduler.py index 3b8262c1c..00abc44c7 100644 --- a/tron/core/job_scheduler.py +++ b/tron/core/job_scheduler.py @@ -197,7 +197,7 @@ def run_queue_schedule(self): if queued_run: reactor.callLater(0, self.run_job, queued_run, run_queued=True) - # Attempt to schedule a new run. This will only schedule a run if the + # Attempt to schedule a new run. This will only schedule a run if the # previous run was cancelled from a scheduled state, or if the job # scheduler is `schedule_on_complete`. self.schedule() From 4cb9c8b46b1493119ac29ff4716b7d2c1885a0e3 Mon Sep 17 00:00:00 2001 From: Kevin Kaspari Date: Fri, 13 Dec 2024 10:04:31 -0800 Subject: [PATCH 02/11] Add functions to scan table, get pickles, test read, and convert single and all pickles --- tools/pickles_to_json.py | 212 +++++++++++++++++++++++++++++++++++++-- 1 file changed, 203 insertions(+), 9 deletions(-) diff --git a/tools/pickles_to_json.py b/tools/pickles_to_json.py index 01fc331e4..14d4edb43 100644 --- a/tools/pickles_to_json.py +++ b/tools/pickles_to_json.py @@ -1,17 +1,211 @@ +import logging +import math import pickle +from typing import List import boto3 +from boto3.resources.base import ServiceResource -dev_session = boto3.Session(profile_name="dev") +from tron.core.job import Job +from tron.core.jobrun import JobRun +from tron.serialize import runstate -source_table = dev_session.resource("dynamodb", region_name="us-west-1").Table("infrastage-tron-state") +# TODO: partitioned pickles! -primary_key_value = "job_run_state compute-infra-test-service.test_load_foo1.5696" -index = 0 -response = source_table.get_item(Key={"key": primary_key_value, "index": index}) +# Max DynamoDB object size is 400KB. Since we save two copies of the object (pickled and JSON), +# we need to consider this max size applies to the entire item, so we use a max size of 200KB +# for each version. +# +# In testing I could get away with 201_000 for both partitions so this should be enough overhead +# to contain other attributes like object name and number of partitions. +OBJECT_SIZE = 200_000 -if "Item" in response: - item = response["Item"] - data = pickle.loads(item["val"].value) - print(data) +# TODO: use logging for all the prints +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +def get_dynamodb_table( + aws_profile: str = "dev", table: str = "infrastage-tron-state", region: str = "us-west-1" +) -> ServiceResource: + """ + Get the DynamoDB table resource. + + :param aws_profile: The name of the AWS profile to use (default is "dev"). + :param table: The name of the table to get (default is "infrastage-tron-state"). + :param region: The region of the table (default is "us-west-1"). + :return: The DynamoDB table resource. + """ + session = boto3.Session(profile_name=aws_profile) + return session.resource("dynamodb", region_name=region).Table(table) + + +def summarize_table(source_table: ServiceResource) -> None: + """ + Summarize the DynamoDB table and output basic info about each key. + + :param source_table: The DynamoDB table resource to scan. + """ + response = source_table.scan() + items = response.get("Items", []) + + print(f"{'Key':<120} {'Has json_val':<15} {'Num JSON Partitions':<20} {'Num Pickle Partitions':<20}") + + for item in items: + key = item.get("key", "Unknown Key") + has_json_val = "json_val" in item + num_json_partitions = int(item.get("num_json_val_partitions", 0)) + num_pickle_partitions = int(item.get("num_partitions", 0)) + + print(f"{key:<120} {str(has_json_val):<15} {num_json_partitions:<20} {num_pickle_partitions:<20}") + + +def get_jobs_without_json_val(source_table: ServiceResource) -> List[str]: + """ + Scan the DynamoDB table and return a list of jobs that don't have a json_val. + Also log the job names. + + :param source_table: The DynamoDB table resource to scan. + :return: A list of job keys without json_val. + """ + response = source_table.scan() + items = response.get("Items", []) + + jobs_without_json_val = [] + + for item in items: + key = item.get("key", "Unknown Key") + has_json_val = "json_val" not in item + + if has_json_val: + jobs_without_json_val.append(key) + logger.info(f"Job without json_val: {key}") + + return jobs_without_json_val + + +def load_and_print_pickle(source_table: ServiceResource, key: str, index: int = 0) -> None: + """ + Load the pickled data from DynamoDB for a given key and log a success message. + + :param source_table: The DynamoDB table resource. + :param key: The primary key of the item to retrieve. + :param index: The index of the partition to retrieve (default is 0). + """ + try: + response = source_table.get_item(Key={"key": key, "index": index}) + + if "Item" in response: + item = response["Item"] + pickle.loads(item["val"].value) # TODO: use in conversion to JSON + print(f"Key: {key:<120} - Pickle successfully loaded") + else: + print(f"Key: {key:<120} - Item not found") + + except Exception as e: + logger.error(f"Key: {key} - Failed to load pickle: {e}") + + +def load_and_print_all_pickles(source_table: ServiceResource) -> None: + """ + Scan the entire DynamoDB table, attempt to load the pickled data for each item, + and log a success message for each key. + + :param source_table: The DynamoDB table resource. + """ + response = source_table.scan() + items = response.get("Items", []) + + for item in items: + key = item.get("key", "Unknown Key") + index = int(item.get("index", 0)) + + try: + pickle.loads(item["val"].value) # TODO: use data in conversion to JSON + print(f"Key: {key:<120} Index: {index:<10} - Pickle successfully loaded") + except Exception as e: + logger.error(f"Key: {key}, Index: {index} - Failed to load pickle: {e}") + + +def convert_pickle_to_json(source_table: ServiceResource, key: str, index: int = 0) -> None: + """ + Convert a single pickled item to JSON and update the DynamoDB entry. + + :param source_table: The DynamoDB table resource. + :param key: The primary key of the item to retrieve. + :param index: The index of the partition to retrieve (default is 0). + """ + try: + response = source_table.get_item(Key={"key": key, "index": index}) + + if "Item" in response: + item = response["Item"] + pickled_data = pickle.loads(item["val"].value) + + state_type = key.split()[0] + if state_type == runstate.JOB_STATE: + json_data = Job.to_json(pickled_data) + elif state_type == runstate.JOB_RUN_STATE: + json_data = JobRun.to_json(pickled_data) + else: + raise ValueError(f"Unknown type: {state_type}") + + num_json_partitions = math.ceil(len(json_data) / OBJECT_SIZE) + for partition_index in range(num_json_partitions): + json_partition = json_data[ + partition_index * OBJECT_SIZE : min((partition_index + 1) * OBJECT_SIZE, len(json_data)) + ] + + source_table.update_item( + Key={"key": key, "index": partition_index}, + UpdateExpression="SET json_val = :json, num_json_val_partitions = :num_partitions", + ExpressionAttributeValues={":json": json_partition, ":num_partitions": num_json_partitions}, + ) + + logger.info(f"Key: {key} - Pickle converted to JSON and updated") + else: + logger.warning(f"Key: {key} - Item not found") + + except Exception as e: + logger.error(f"Key: {key} - Failed to convert pickle to JSON: {e}") + + +def convert_all_pickles_to_json(source_table: ServiceResource) -> None: + """ + Convert all pickled items in the DynamoDB table to JSON and update the entries. + + :param source_table: The DynamoDB table resource. + """ + response = source_table.scan() + items = response.get("Items", []) + + for item in items: + key = item.get("key", "Unknown Key") + index = int(item.get("index", 0)) + + convert_pickle_to_json(source_table, key, index) + + +aws_profile = "dev" +table_name = "infrastage-tron-state" +table_region = "us-west-1" +source_table = get_dynamodb_table(aws_profile, table_name, table_region) + +# # 1. Scan table +# summarize_table(source_table) + +# # 2. Load and print a single job_state pickle +# load_and_print_pickle(source_table, "job_state compute-infra-test-service.test_load_foo1") + +# # 2.1 Load and print a single job_run_state pickle +# load_and_print_pickle(source_table, "job_run_state katamari_test_service.test_load_foo2.255") + +# 3. Load and print all pickles +load_and_print_all_pickles(source_table) + +# # 4. Convert a single pickle to JSON +# convert_pickle_to_json(source_table, "job_state compute-infra-test-service.test_load_foo1") + +# # 5. Convert all pickles to JSON +# convert_all_pickles_to_json(source_table) From bf6c5f9bcb7c595656a6d596d27f187c37402789 Mon Sep 17 00:00:00 2001 From: Kevin Kaspari Date: Fri, 20 Dec 2024 18:18:10 -0800 Subject: [PATCH 03/11] Add more functions for getting jobs and handle partitions --- tools/pickles_to_json.py | 209 +++++++++++++++++++++++++++------------ 1 file changed, 145 insertions(+), 64 deletions(-) diff --git a/tools/pickles_to_json.py b/tools/pickles_to_json.py index 14d4edb43..719ca0123 100644 --- a/tools/pickles_to_json.py +++ b/tools/pickles_to_json.py @@ -2,6 +2,7 @@ import math import pickle from typing import List +from typing import Optional import boto3 from boto3.resources.base import ServiceResource @@ -12,7 +13,6 @@ # TODO: partitioned pickles! - # Max DynamoDB object size is 400KB. Since we save two copies of the object (pickled and JSON), # we need to consider this max size applies to the entire item, so we use a max size of 200KB # for each version. @@ -26,6 +26,7 @@ logger = logging.getLogger(__name__) +# Get Table def get_dynamodb_table( aws_profile: str = "dev", table: str = "infrastage-tron-state", region: str = "us-west-1" ) -> ServiceResource: @@ -41,6 +42,7 @@ def get_dynamodb_table( return session.resource("dynamodb", region_name=region).Table(table) +# Get Jobs def summarize_table(source_table: ServiceResource) -> None: """ Summarize the DynamoDB table and output basic info about each key. @@ -61,151 +63,230 @@ def summarize_table(source_table: ServiceResource) -> None: print(f"{key:<120} {str(has_json_val):<15} {num_json_partitions:<20} {num_pickle_partitions:<20}") -def get_jobs_without_json_val(source_table: ServiceResource) -> List[str]: +def get_all_jobs(source_table: ServiceResource) -> List[str]: + """ + Scan the DynamoDB table and return a list of all job keys. + + :param source_table: The DynamoDB table resource to scan. + :return: A list of all job keys. + """ + response = source_table.scan() + items = response.get("Items", []) + + all_job_keys = [item.get("key", "Unknown Key") for item in items] + return all_job_keys + + +def get_jobs_without_json_val(source_table: ServiceResource, partitioned: Optional[bool] = None) -> List[str]: """ Scan the DynamoDB table and return a list of jobs that don't have a json_val. - Also log the job names. + Optionally filter by whether the jobs are partitioned. :param source_table: The DynamoDB table resource to scan. - :return: A list of job keys without json_val. + :param partitioned: If specified, filter jobs by partitioned status. + True for partitioned, False for non-partitioned, None for no filter. + :return: A list of job keys without json_val, filtered by partitioned status if specified. """ response = source_table.scan() items = response.get("Items", []) - jobs_without_json_val = [] + jobs_without_json_val = set() for item in items: key = item.get("key", "Unknown Key") has_json_val = "json_val" not in item + num_partitions = int(item.get("num_partitions", 0)) if has_json_val: - jobs_without_json_val.append(key) - logger.info(f"Job without json_val: {key}") + if partitioned is None: + jobs_without_json_val.add(key) + elif partitioned and num_partitions > 1: + jobs_without_json_val.add(key) + elif not partitioned and num_partitions <= 1: + jobs_without_json_val.add(key) - return jobs_without_json_val + return list(jobs_without_json_val) -def load_and_print_pickle(source_table: ServiceResource, key: str, index: int = 0) -> None: +# Load and Print +def load_and_combine_partitions(source_table: ServiceResource, key: str) -> bytes: """ - Load the pickled data from DynamoDB for a given key and log a success message. + Load and combine all partitions of a pickled item from DynamoDB. :param source_table: The DynamoDB table resource. :param key: The primary key of the item to retrieve. - :param index: The index of the partition to retrieve (default is 0). + :return: The combined pickled data as bytes. """ - try: + combined_data = bytearray() + index = 0 + + while True: response = source_table.get_item(Key={"key": key, "index": index}) + if "Item" not in response: + break - if "Item" in response: - item = response["Item"] - pickle.loads(item["val"].value) # TODO: use in conversion to JSON - print(f"Key: {key:<120} - Pickle successfully loaded") + item = response["Item"] + combined_data.extend(item["val"].value) + index += 1 + + return bytes(combined_data) + + +def load_and_print_single_pickle(source_table: ServiceResource, key: str, print_full: bool = False) -> None: + """ + Load the pickled data from DynamoDB for a given key, handle partitioned items, + and print the loaded pickle. Optionally print the entire pickle based on a flag. + + :param source_table: The DynamoDB table resource. + :param key: The primary key of the item to retrieve. + :param print_full: Flag to indicate whether to print the entire pickle (default is False). + """ + try: + pickled_data = load_and_combine_partitions(source_table, key) + loaded_pickle = pickle.loads(pickled_data) + + if print_full: + print(f"Key: {key:<100}\nFull Pickle:") + print(loaded_pickle) else: - print(f"Key: {key:<120} - Item not found") + print(f"Key: {key:<100} Pickle successfully loaded") except Exception as e: logger.error(f"Key: {key} - Failed to load pickle: {e}") -def load_and_print_all_pickles(source_table: ServiceResource) -> None: +def load_and_print_pickles(source_table: ServiceResource, keys: List[str]) -> None: """ - Scan the entire DynamoDB table, attempt to load the pickled data for each item, - and log a success message for each key. + Load and print pickles for the given list of keys. :param source_table: The DynamoDB table resource. + :param keys: A list of keys for which to load and print pickles. """ - response = source_table.scan() - items = response.get("Items", []) + for key in keys: + load_and_print_single_pickle(source_table, key) - for item in items: - key = item.get("key", "Unknown Key") - index = int(item.get("index", 0)) - try: - pickle.loads(item["val"].value) # TODO: use data in conversion to JSON - print(f"Key: {key:<120} Index: {index:<10} - Pickle successfully loaded") - except Exception as e: - logger.error(f"Key: {key}, Index: {index} - Failed to load pickle: {e}") +def load_and_print_json(source_table: ServiceResource, key: str) -> None: + """ + Load the JSON data from DynamoDB for a given key and print it. + + :param source_table: The DynamoDB table resource. + :param key: The primary key of the item to retrieve. + """ + try: + response = source_table.get_item(Key={"key": key, "index": 0}) + item = response.get("Item", {}) + + if "json_val" in item: + json_data = item["json_val"] + print(f"Key: {key:<120} - JSON successfully loaded and printed") + print(json_data) + else: + print(f"Key: {key:<120} - No JSON value found") + + except Exception as e: + logger.error(f"Key: {key} - Failed to load JSON: {e}") -def convert_pickle_to_json(source_table: ServiceResource, key: str, index: int = 0) -> None: +# Convert +# KKASP: REVIEW +def convert_pickle_to_json(source_table: ServiceResource, key: str, dry_run: bool = True) -> None: """ Convert a single pickled item to JSON and update the DynamoDB entry. :param source_table: The DynamoDB table resource. :param key: The primary key of the item to retrieve. - :param index: The index of the partition to retrieve (default is 0). + :param dry_run: If True, simulate the conversion without updating the table. """ try: - response = source_table.get_item(Key={"key": key, "index": index}) - - if "Item" in response: - item = response["Item"] - pickled_data = pickle.loads(item["val"].value) - - state_type = key.split()[0] - if state_type == runstate.JOB_STATE: - json_data = Job.to_json(pickled_data) - elif state_type == runstate.JOB_RUN_STATE: - json_data = JobRun.to_json(pickled_data) - else: - raise ValueError(f"Unknown type: {state_type}") + # Skip conversion for job_state MASTER and job_run_state MASTER jobs + if key.startswith("job_state MASTER") or key.startswith("job_run_state MASTER"): + # logger.info(f"Skipping conversion for key: {key}") + return + + pickled_data = load_and_combine_partitions(source_table, key) + state_data = pickle.loads(pickled_data) + + state_type = key.split()[0] + if state_type == runstate.JOB_STATE: + json_data = Job.to_json(state_data) + elif state_type == runstate.JOB_RUN_STATE: + json_data = JobRun.to_json(state_data) + else: + raise ValueError(f"Unknown type: {state_type}") - num_json_partitions = math.ceil(len(json_data) / OBJECT_SIZE) - for partition_index in range(num_json_partitions): - json_partition = json_data[ - partition_index * OBJECT_SIZE : min((partition_index + 1) * OBJECT_SIZE, len(json_data)) - ] + num_json_partitions = math.ceil(len(json_data) / OBJECT_SIZE) + for partition_index in range(num_json_partitions): + json_partition = json_data[ + partition_index * OBJECT_SIZE : min((partition_index + 1) * OBJECT_SIZE, len(json_data)) + ] + if not dry_run: source_table.update_item( Key={"key": key, "index": partition_index}, UpdateExpression="SET json_val = :json, num_json_val_partitions = :num_partitions", - ExpressionAttributeValues={":json": json_partition, ":num_partitions": num_json_partitions}, + ExpressionAttributeValues={ + ":json": json_partition, + ":num_partitions": num_json_partitions, # KKASP: is this correct? + }, ) - logger.info(f"Key: {key} - Pickle converted to JSON and updated") + if dry_run: + logger.info(f"DRY RUN: Key: {key} - Pickle would have been converted to JSON and updated") else: - logger.warning(f"Key: {key} - Item not found") - + logger.info(f"Key: {key} - Pickle converted to JSON and updated") except Exception as e: logger.error(f"Key: {key} - Failed to convert pickle to JSON: {e}") -def convert_all_pickles_to_json(source_table: ServiceResource) -> None: +def convert_all_pickles_to_json(source_table: ServiceResource, dry_run: bool = True) -> None: """ Convert all pickled items in the DynamoDB table to JSON and update the entries. :param source_table: The DynamoDB table resource. + :param dry_run: If True, simulate the conversion without updating the table. """ response = source_table.scan() items = response.get("Items", []) for item in items: key = item.get("key", "Unknown Key") - index = int(item.get("index", 0)) - convert_pickle_to_json(source_table, key, index) + convert_pickle_to_json(source_table, key, dry_run) aws_profile = "dev" table_name = "infrastage-tron-state" +# table_name = "norcal-devc-tron-state" table_region = "us-west-1" source_table = get_dynamodb_table(aws_profile, table_name, table_region) -# # 1. Scan table +# 1. Scan table # summarize_table(source_table) +print(f"All jobs:\n{get_all_jobs(source_table)}\n") +print(f"Jobs without json_val:\n{get_jobs_without_json_val(source_table, False)}\n") # # 2. Load and print a single job_state pickle -# load_and_print_pickle(source_table, "job_state compute-infra-test-service.test_load_foo1") +# load_and_print_single_pickle(source_table, "job_state compute-infra-test-service.test_load_foo1") # # 2.1 Load and print a single job_run_state pickle -# load_and_print_pickle(source_table, "job_run_state katamari_test_service.test_load_foo2.255") +# load_and_print_single_pickle(source_table, "job_run_state katamari_test_service.test_load_foo2.255") # 3. Load and print all pickles -load_and_print_all_pickles(source_table) +# load_and_print_pickles(source_table, get_all_jobs(source_table)) # # 4. Convert a single pickle to JSON -# convert_pickle_to_json(source_table, "job_state compute-infra-test-service.test_load_foo1") +# convert_pickle_to_json(source_table, "job_state compute-infra-test-service.test_load_foo1", dry_run=True) + +# 5. Convert all pickles to JSON +convert_all_pickles_to_json(source_table, dry_run=True) + -# # 5. Convert all pickles to JSON -# convert_all_pickles_to_json(source_table) +# KKASP: +# 1. Test getting job keys +# DONE: infrastage-tron-state +# TODO: norcal-devc-tron-state +# 2. Test loading pickles +# DONE: infrastrage-tron-state +# 3. Test converting pickles to JSON +# 4. Add dry run flag that doesn't update the table From 0b93d04a11cf51b78dec1e4244ceb94754adc731 Mon Sep 17 00:00:00 2001 From: Kevin Kaspari Date: Fri, 27 Dec 2024 11:20:08 -0800 Subject: [PATCH 04/11] Add counter for keys updated. Update func names --- tools/pickles_to_json.py | 45 +++++++++++++++++++++++++++++++++------- 1 file changed, 37 insertions(+), 8 deletions(-) diff --git a/tools/pickles_to_json.py b/tools/pickles_to_json.py index 719ca0123..a1b937952 100644 --- a/tools/pickles_to_json.py +++ b/tools/pickles_to_json.py @@ -190,7 +190,7 @@ def load_and_print_json(source_table: ServiceResource, key: str) -> None: # Convert # KKASP: REVIEW -def convert_pickle_to_json(source_table: ServiceResource, key: str, dry_run: bool = True) -> None: +def convert_pickle_to_json_and_update_table(source_table: ServiceResource, key: str, dry_run: bool = True) -> None: """ Convert a single pickled item to JSON and update the DynamoDB entry. @@ -239,20 +239,45 @@ def convert_pickle_to_json(source_table: ServiceResource, key: str, dry_run: boo logger.error(f"Key: {key} - Failed to convert pickle to JSON: {e}") -def convert_all_pickles_to_json(source_table: ServiceResource, dry_run: bool = True) -> None: +def convert_all_pickles_to_json_and_update_table(source_table: ServiceResource, dry_run: bool = True) -> None: """ Convert all pickled items in the DynamoDB table to JSON and update the entries. :param source_table: The DynamoDB table resource. :param dry_run: If True, simulate the conversion without updating the table. """ - response = source_table.scan() - items = response.get("Items", []) + items = scan_table(source_table) + total_keys = len(items) + converted_keys = 0 for item in items: key = item.get("key", "Unknown Key") + try: + convert_pickle_to_json_and_update_table(source_table, key, dry_run) + converted_keys += 1 + except Exception as e: + logger.error(f"Key: {key} - Failed to convert pickle to JSON: {e}") + + print(f"Total keys in the table: {total_keys}") + print(f"Number of keys converted: {converted_keys}") + + +def scan_table(source_table: ServiceResource) -> List[dict]: + """ + Scan the DynamoDB table and return all items, handling pagination. - convert_pickle_to_json(source_table, key, dry_run) + :param source_table: The DynamoDB table resource to scan. + :return: A list of all items in the table. + """ + items = [] + response = source_table.scan() + items.extend(response.get("Items", [])) + + while "LastEvaluatedKey" in response: + response = source_table.scan(ExclusiveStartKey=response["LastEvaluatedKey"]) + items.extend(response.get("Items", [])) + + return items aws_profile = "dev" @@ -261,10 +286,13 @@ def convert_all_pickles_to_json(source_table: ServiceResource, dry_run: bool = T table_region = "us-west-1" source_table = get_dynamodb_table(aws_profile, table_name, table_region) +convert_all_pickles_to_json_and_update_table(source_table, dry_run=False) +# load_and_print_json(source_table, "job_run_state paasta-contract-monitor.k8s.1573") + # 1. Scan table # summarize_table(source_table) -print(f"All jobs:\n{get_all_jobs(source_table)}\n") -print(f"Jobs without json_val:\n{get_jobs_without_json_val(source_table, False)}\n") +# print(f"All jobs:\n{get_all_jobs(source_table)}\n") +# print(f"Jobs without json_val:\n{get_jobs_without_json_val(source_table, False)}\n") # # 2. Load and print a single job_state pickle # load_and_print_single_pickle(source_table, "job_state compute-infra-test-service.test_load_foo1") @@ -279,7 +307,7 @@ def convert_all_pickles_to_json(source_table: ServiceResource, dry_run: bool = T # convert_pickle_to_json(source_table, "job_state compute-infra-test-service.test_load_foo1", dry_run=True) # 5. Convert all pickles to JSON -convert_all_pickles_to_json(source_table, dry_run=True) +# convert_all_pickles_to_json(source_table, dry_run=True) # KKASP: @@ -289,4 +317,5 @@ def convert_all_pickles_to_json(source_table: ServiceResource, dry_run: bool = T # 2. Test loading pickles # DONE: infrastrage-tron-state # 3. Test converting pickles to JSON +# TODO: convert and print a single pickle and compare to a printed JSON? # 4. Add dry run flag that doesn't update the table From 4eebe896ff1201acb87d0eaff520cc4dc65493e4 Mon Sep 17 00:00:00 2001 From: Kevin Kaspari Date: Mon, 6 Jan 2025 06:59:04 -0800 Subject: [PATCH 05/11] Add arg parse and support for reading partitions --- tools/pickles_to_json.py | 396 ++++++++++++++++++++------------------- 1 file changed, 207 insertions(+), 189 deletions(-) diff --git a/tools/pickles_to_json.py b/tools/pickles_to_json.py index a1b937952..b7a1c2570 100644 --- a/tools/pickles_to_json.py +++ b/tools/pickles_to_json.py @@ -1,5 +1,6 @@ -import logging +import argparse import math +import os import pickle from typing import List from typing import Optional @@ -11,311 +12,328 @@ from tron.core.jobrun import JobRun from tron.serialize import runstate -# TODO: partitioned pickles! - # Max DynamoDB object size is 400KB. Since we save two copies of the object (pickled and JSON), # we need to consider this max size applies to the entire item, so we use a max size of 200KB # for each version. -# -# In testing I could get away with 201_000 for both partitions so this should be enough overhead -# to contain other attributes like object name and number of partitions. OBJECT_SIZE = 200_000 -# TODO: use logging for all the prints -logging.basicConfig(level=logging.INFO) -logger = logging.getLogger(__name__) - -# Get Table def get_dynamodb_table( - aws_profile: str = "dev", table: str = "infrastage-tron-state", region: str = "us-west-1" + aws_profile: Optional[str] = None, table: str = "infrastage-tron-state", region: str = "us-west-1" ) -> ServiceResource: """ Get the DynamoDB table resource. - - :param aws_profile: The name of the AWS profile to use (default is "dev"). + :param aws_profile: The name of the AWS profile to use (default is None for default profile). :param table: The name of the table to get (default is "infrastage-tron-state"). :param region: The region of the table (default is "us-west-1"). :return: The DynamoDB table resource. """ - session = boto3.Session(profile_name=aws_profile) + session = boto3.Session(profile_name=aws_profile) if aws_profile else boto3.Session() return session.resource("dynamodb", region_name=region).Table(table) -# Get Jobs -def summarize_table(source_table: ServiceResource) -> None: - """ - Summarize the DynamoDB table and output basic info about each key. - - :param source_table: The DynamoDB table resource to scan. - """ - response = source_table.scan() - items = response.get("Items", []) - - print(f"{'Key':<120} {'Has json_val':<15} {'Num JSON Partitions':<20} {'Num Pickle Partitions':<20}") - - for item in items: - key = item.get("key", "Unknown Key") - has_json_val = "json_val" in item - num_json_partitions = int(item.get("num_json_val_partitions", 0)) - num_pickle_partitions = int(item.get("num_partitions", 0)) - - print(f"{key:<120} {str(has_json_val):<15} {num_json_partitions:<20} {num_pickle_partitions:<20}") - - def get_all_jobs(source_table: ServiceResource) -> List[str]: """ - Scan the DynamoDB table and return a list of all job keys. - + Scan the DynamoDB table and return a list of unique job keys. :param source_table: The DynamoDB table resource to scan. :return: A list of all job keys. """ - response = source_table.scan() - items = response.get("Items", []) - - all_job_keys = [item.get("key", "Unknown Key") for item in items] - return all_job_keys - - -def get_jobs_without_json_val(source_table: ServiceResource, partitioned: Optional[bool] = None) -> List[str]: - """ - Scan the DynamoDB table and return a list of jobs that don't have a json_val. - Optionally filter by whether the jobs are partitioned. - - :param source_table: The DynamoDB table resource to scan. - :param partitioned: If specified, filter jobs by partitioned status. - True for partitioned, False for non-partitioned, None for no filter. - :return: A list of job keys without json_val, filtered by partitioned status if specified. - """ - response = source_table.scan() - items = response.get("Items", []) - - jobs_without_json_val = set() - - for item in items: - key = item.get("key", "Unknown Key") - has_json_val = "json_val" not in item - num_partitions = int(item.get("num_partitions", 0)) - - if has_json_val: - if partitioned is None: - jobs_without_json_val.add(key) - elif partitioned and num_partitions > 1: - jobs_without_json_val.add(key) - elif not partitioned and num_partitions <= 1: - jobs_without_json_val.add(key) - - return list(jobs_without_json_val) + items = scan_table(source_table) + unique_keys = {item.get("key", "Unknown Key") for item in items} + return list(unique_keys) -# Load and Print -def load_and_combine_partitions(source_table: ServiceResource, key: str) -> bytes: +def combine_pickle_partitions(source_table: ServiceResource, key: str) -> bytes: """ Load and combine all partitions of a pickled item from DynamoDB. - :param source_table: The DynamoDB table resource. :param key: The primary key of the item to retrieve. :return: The combined pickled data as bytes. """ + response = source_table.get_item(Key={"key": key, "index": 0}, ConsistentRead=True) + if "Item" not in response: + raise Exception(f"No item found for key {key} at index 0") + item = response["Item"] + num_partitions = int(item.get("num_partitions", 1)) combined_data = bytearray() - index = 0 - - while True: - response = source_table.get_item(Key={"key": key, "index": index}) + for index in range(num_partitions): + response = source_table.get_item(Key={"key": key, "index": index}, ConsistentRead=True) if "Item" not in response: - break - + raise Exception(f"Missing partition {index} for key {key}") item = response["Item"] combined_data.extend(item["val"].value) - index += 1 - return bytes(combined_data) -def load_and_print_single_pickle(source_table: ServiceResource, key: str, print_full: bool = False) -> None: +def dump_pickle_key(source_table: ServiceResource, key: str) -> None: """ - Load the pickled data from DynamoDB for a given key, handle partitioned items, - and print the loaded pickle. Optionally print the entire pickle based on a flag. - + Load the pickled data from DynamoDB for a given key, handling partitioned + items, and print the full pickle data. :param source_table: The DynamoDB table resource. :param key: The primary key of the item to retrieve. - :param print_full: Flag to indicate whether to print the entire pickle (default is False). """ try: - pickled_data = load_and_combine_partitions(source_table, key) + pickled_data = combine_pickle_partitions(source_table, key) loaded_pickle = pickle.loads(pickled_data) - - if print_full: - print(f"Key: {key:<100}\nFull Pickle:") - print(loaded_pickle) - else: - print(f"Key: {key:<100} Pickle successfully loaded") - + print(f"Key: {key} - Pickle data:") + print(loaded_pickle) except Exception as e: - logger.error(f"Key: {key} - Failed to load pickle: {e}") + print(f"Key: {key} - Failed to load pickle: {e}") + raise -def load_and_print_pickles(source_table: ServiceResource, keys: List[str]) -> None: +def dump_pickle_keys(source_table: ServiceResource, keys: List[str]) -> None: """ Load and print pickles for the given list of keys. - :param source_table: The DynamoDB table resource. :param keys: A list of keys for which to load and print pickles. """ for key in keys: - load_and_print_single_pickle(source_table, key) + dump_pickle_key(source_table, key) -def load_and_print_json(source_table: ServiceResource, key: str) -> None: +def validate_pickles(source_table: ServiceResource, keys: List[str]) -> None: """ - Load the JSON data from DynamoDB for a given key and print it. + Validate pickles for the given list of keys without printing the data. + :param source_table: The DynamoDB table resource. + :param keys: A list of keys for which to validate pickles. + """ + total_keys = len(keys) + success_count = 0 + failure_count = 0 + for key in keys: + try: + pickled_data = combine_pickle_partitions(source_table, key) + pickle.loads(pickled_data) + print(f"Key: {key} - Pickle successfully loaded") + success_count += 1 + except Exception as e: + print(f"Key: {key} - Failed to load pickle: {e}") + failure_count += 1 + print(f"Total keys: {total_keys}") + print(f"Successful validations: {success_count}") + print(f"Failures: {failure_count}") + +def dump_json_key(source_table: ServiceResource, key: str) -> None: + """ + Load the JSON data from DynamoDB for a given key and print it. :param source_table: The DynamoDB table resource. :param key: The primary key of the item to retrieve. """ try: - response = source_table.get_item(Key={"key": key, "index": 0}) - item = response.get("Item", {}) - - if "json_val" in item: - json_data = item["json_val"] - print(f"Key: {key:<120} - JSON successfully loaded and printed") + json_data = combine_json_partitions(source_table, key) + if json_data is not None: + print(f"Key: {key} - JSON data:") print(json_data) else: - print(f"Key: {key:<120} - No JSON value found") - + print(f"Key: {key} - No JSON value found") except Exception as e: - logger.error(f"Key: {key} - Failed to load JSON: {e}") + print(f"Key: {key} - Failed to load JSON: {e}") -# Convert -# KKASP: REVIEW -def convert_pickle_to_json_and_update_table(source_table: ServiceResource, key: str, dry_run: bool = True) -> None: +def dump_json_keys(source_table: ServiceResource, keys: List[str]) -> None: """ - Convert a single pickled item to JSON and update the DynamoDB entry. + Load and print JSON data for the given list of keys. + :param source_table: The DynamoDB table resource. + :param keys: A list of keys for which to load and print JSON data. + """ + for key in keys: + dump_json_key(source_table, key) + +def combine_json_partitions(source_table: ServiceResource, key: str) -> Optional[str]: + """ + Combine all partitions of a JSON item from DynamoDB. :param source_table: The DynamoDB table resource. :param key: The primary key of the item to retrieve. - :param dry_run: If True, simulate the conversion without updating the table. + :return: The combined JSON data as a string, or None if not found. + """ + response = source_table.get_item(Key={"key": key, "index": 0}, ConsistentRead=True) + if "Item" not in response: + return None + item = response["Item"] + num_json_partitions = int(item.get("num_json_val_partitions", 0)) + if num_json_partitions == 0: + return None + combined_json = "" + for index in range(num_json_partitions): + response = source_table.get_item(Key={"key": key, "index": index}, ConsistentRead=True) + if "Item" not in response: + raise Exception(f"Missing JSON partition {index} for key {key}") + item = response["Item"] + if "json_val" in item: + combined_json += item["json_val"] + else: + raise Exception(f"No 'json_val' in partition {index} for key {key}") + return combined_json + + +def convert_pickle_to_json_and_update_table(source_table: ServiceResource, key: str, dry_run: bool = True) -> bool: + """ + Convert a single pickled item to JSON and update the DynamoDB entry. + Returns True if the conversion was successful, False if skipped. + Raises an exception if conversion fails. """ try: - # Skip conversion for job_state MASTER and job_run_state MASTER jobs + # Skip conversion for job_state MASTER and job_run_state MASTER jobs that are from infrastage testing (i.e., not real jobs) if key.startswith("job_state MASTER") or key.startswith("job_run_state MASTER"): - # logger.info(f"Skipping conversion for key: {key}") - return - - pickled_data = load_and_combine_partitions(source_table, key) + print(f"Skipping conversion for key: {key}") + return False + pickled_data = combine_pickle_partitions(source_table, key) state_data = pickle.loads(pickled_data) - state_type = key.split()[0] if state_type == runstate.JOB_STATE: json_data = Job.to_json(state_data) elif state_type == runstate.JOB_RUN_STATE: json_data = JobRun.to_json(state_data) else: - raise ValueError(f"Unknown type: {state_type}") - + # This will skip the state metadata and any other non-standard keys we have in the table + print(f"Key: {key} - Unknown state type: {state_type}. Skipping.") + return False num_json_partitions = math.ceil(len(json_data) / OBJECT_SIZE) for partition_index in range(num_json_partitions): json_partition = json_data[ partition_index * OBJECT_SIZE : min((partition_index + 1) * OBJECT_SIZE, len(json_data)) ] - if not dry_run: source_table.update_item( Key={"key": key, "index": partition_index}, UpdateExpression="SET json_val = :json, num_json_val_partitions = :num_partitions", ExpressionAttributeValues={ ":json": json_partition, - ":num_partitions": num_json_partitions, # KKASP: is this correct? + ":num_partitions": num_json_partitions, }, ) - if dry_run: - logger.info(f"DRY RUN: Key: {key} - Pickle would have been converted to JSON and updated") + print(f"DRY RUN: Key: {key} - Pickle would have been converted to JSON and updated") else: - logger.info(f"Key: {key} - Pickle converted to JSON and updated") + print(f"Key: {key} - Pickle converted to JSON and updated") + return True except Exception as e: - logger.error(f"Key: {key} - Failed to convert pickle to JSON: {e}") + print(f"Key: {key} - Failed to convert pickle to JSON: {e}") + raise -def convert_all_pickles_to_json_and_update_table(source_table: ServiceResource, dry_run: bool = True) -> None: +def convert_pickles_to_json_and_update_table( + source_table: ServiceResource, keys: List[str], dry_run: bool = True +) -> None: """ - Convert all pickled items in the DynamoDB table to JSON and update the entries. - + Convert pickled items in the DynamoDB table to JSON and update the entries. :param source_table: The DynamoDB table resource. + :param keys: List of keys to convert. :param dry_run: If True, simulate the conversion without updating the table. """ - items = scan_table(source_table) - total_keys = len(items) + total_keys = len(keys) converted_keys = 0 - - for item in items: - key = item.get("key", "Unknown Key") + skipped_keys = 0 + failed_keys = 0 + for key in keys: try: - convert_pickle_to_json_and_update_table(source_table, key, dry_run) - converted_keys += 1 - except Exception as e: - logger.error(f"Key: {key} - Failed to convert pickle to JSON: {e}") - - print(f"Total keys in the table: {total_keys}") - print(f"Number of keys converted: {converted_keys}") + result = convert_pickle_to_json_and_update_table(source_table, key, dry_run) + if result: + converted_keys += 1 + else: + skipped_keys += 1 + except Exception: + failed_keys += 1 + print(f"Total keys processed: {total_keys}") + print(f"Conversions attempted: {total_keys - skipped_keys}") + print(f"Conversions succeeded: {converted_keys}") + print(f"Conversions skipped: {skipped_keys}") + print(f"Conversions failed: {failed_keys}") + if dry_run: + print("Dry run complete. No changes were made to the DynamoDB table.") def scan_table(source_table: ServiceResource) -> List[dict]: """ Scan the DynamoDB table and return all items, handling pagination. - :param source_table: The DynamoDB table resource to scan. :return: A list of all items in the table. """ items = [] response = source_table.scan() items.extend(response.get("Items", [])) - while "LastEvaluatedKey" in response: response = source_table.scan(ExclusiveStartKey=response["LastEvaluatedKey"]) items.extend(response.get("Items", [])) - return items -aws_profile = "dev" -table_name = "infrastage-tron-state" -# table_name = "norcal-devc-tron-state" -table_region = "us-west-1" -source_table = get_dynamodb_table(aws_profile, table_name, table_region) - -convert_all_pickles_to_json_and_update_table(source_table, dry_run=False) -# load_and_print_json(source_table, "job_run_state paasta-contract-monitor.k8s.1573") - -# 1. Scan table -# summarize_table(source_table) -# print(f"All jobs:\n{get_all_jobs(source_table)}\n") -# print(f"Jobs without json_val:\n{get_jobs_without_json_val(source_table, False)}\n") - -# # 2. Load and print a single job_state pickle -# load_and_print_single_pickle(source_table, "job_state compute-infra-test-service.test_load_foo1") - -# # 2.1 Load and print a single job_run_state pickle -# load_and_print_single_pickle(source_table, "job_run_state katamari_test_service.test_load_foo2.255") - -# 3. Load and print all pickles -# load_and_print_pickles(source_table, get_all_jobs(source_table)) - -# # 4. Convert a single pickle to JSON -# convert_pickle_to_json(source_table, "job_state compute-infra-test-service.test_load_foo1", dry_run=True) - -# 5. Convert all pickles to JSON -# convert_all_pickles_to_json(source_table, dry_run=True) - - -# KKASP: -# 1. Test getting job keys -# DONE: infrastage-tron-state -# TODO: norcal-devc-tron-state -# 2. Test loading pickles -# DONE: infrastrage-tron-state -# 3. Test converting pickles to JSON -# TODO: convert and print a single pickle and compare to a printed JSON? -# 4. Add dry run flag that doesn't update the table +def main(): + parser = argparse.ArgumentParser( + description="Utilities for working with pickles and JSON items in Tron's DynamoDB state store.", + epilog=""" +Actions: + convert Convert pickled state data to JSON format and update the DynamoDB table. + dump-pickle Load and print the pickles for specified keys. + validate-pickle Validate pickle loads for specified keys. + dump-json Load and print JSON data for specified keys. +Examples: + Convert all pickles to JSON (dry run): + pickles_to_json.py --table-name infrastage-tron-state --table-region us-west-1 --action convert --all --dry-run + Convert specific pickles to JSON: + pickles_to_json.py --table-name infrastage-tron-state --table-region us-west-1 --action convert --keys "key1" "key2" + Validate specific pickle keys: + pickles_to_json.py --table-name infrastage-tron-state --table-region us-west-1 --action validate-pickle --keys "key1" "key2" + Load and print specific JSON keys: + pickles_to_json.py --table-name infrastage-tron-state --table-region us-west-1 --action dump-json --keys "key1" "key2" +""", + formatter_class=argparse.RawDescriptionHelpFormatter, + ) + parser.add_argument( + "--aws-profile", + default=os.environ.get("AWS_PROFILE", None), + help="AWS profile to use (default: taken from AWS_PROFILE environment variable)", + ) + parser.add_argument("--table-name", required=True, help="Name of the DynamoDB table") + parser.add_argument("--table-region", required=True, help="AWS region of the DynamoDB table") + parser.add_argument( + "--dry-run", + action="store_true", + help="Simulate the action without making any changes to the DynamoDB table", + ) + parser.add_argument( + "--action", + choices=["convert", "dump-pickle", "validate-pickle", "dump-json"], + required=True, + help="Action to perform", + ) + parser.add_argument( + "--keys", + nargs="+", + required=False, + help="Specific key(s) to perform the action on.", + ) + parser.add_argument( + "--all", + action="store_true", + help="Apply the action to all keys in the table.", + ) + args = parser.parse_args() + source_table = get_dynamodb_table(args.aws_profile, args.table_name, args.table_region) + + if not args.keys and not args.all: + parser.error("You must provide either --keys or --all.") + + if args.all: + print("Processing all keys in the table...") + keys = get_all_jobs(source_table) + else: + keys = args.keys + + if args.action == "convert": + convert_pickles_to_json_and_update_table(source_table, keys=keys, dry_run=args.dry_run) + elif args.action == "dump-pickle": + dump_pickle_keys(source_table, keys) + elif args.action == "validate-pickle": + validate_pickles(source_table, keys) + elif args.action == "dump-json": + dump_json_keys(source_table, keys) + else: + print(f"Unknown action: {args.action}") + + +if __name__ == "__main__": + main() From c7d720a8eef68fef53a50272306a2a107783976d Mon Sep 17 00:00:00 2001 From: Kevin Kaspari Date: Mon, 6 Jan 2025 07:56:41 -0800 Subject: [PATCH 06/11] Remove comment --- tools/pickles_to_json.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tools/pickles_to_json.py b/tools/pickles_to_json.py index b7a1c2570..cc920c8fa 100644 --- a/tools/pickles_to_json.py +++ b/tools/pickles_to_json.py @@ -23,9 +23,9 @@ def get_dynamodb_table( ) -> ServiceResource: """ Get the DynamoDB table resource. - :param aws_profile: The name of the AWS profile to use (default is None for default profile). - :param table: The name of the table to get (default is "infrastage-tron-state"). - :param region: The region of the table (default is "us-west-1"). + :param aws_profile: The name of the AWS profile to use. + :param table: The name of the table to get. + :param region: The region of the table. :return: The DynamoDB table resource. """ session = boto3.Session(profile_name=aws_profile) if aws_profile else boto3.Session() From c0fe87496bc3d3d7c0b757a176169e520ffb5746 Mon Sep 17 00:00:00 2001 From: Kevin Kaspari Date: Tue, 7 Jan 2025 12:42:04 -0800 Subject: [PATCH 07/11] Add delete keys flag. Replace validate flag with convert dry-run and output failures to file --- tools/pickles_to_json.py | 161 ++++++++++++++++++++++++++++----------- 1 file changed, 115 insertions(+), 46 deletions(-) diff --git a/tools/pickles_to_json.py b/tools/pickles_to_json.py index cc920c8fa..fa779aa52 100644 --- a/tools/pickles_to_json.py +++ b/tools/pickles_to_json.py @@ -92,29 +92,6 @@ def dump_pickle_keys(source_table: ServiceResource, keys: List[str]) -> None: dump_pickle_key(source_table, key) -def validate_pickles(source_table: ServiceResource, keys: List[str]) -> None: - """ - Validate pickles for the given list of keys without printing the data. - :param source_table: The DynamoDB table resource. - :param keys: A list of keys for which to validate pickles. - """ - total_keys = len(keys) - success_count = 0 - failure_count = 0 - for key in keys: - try: - pickled_data = combine_pickle_partitions(source_table, key) - pickle.loads(pickled_data) - print(f"Key: {key} - Pickle successfully loaded") - success_count += 1 - except Exception as e: - print(f"Key: {key} - Failed to load pickle: {e}") - failure_count += 1 - print(f"Total keys: {total_keys}") - print(f"Successful validations: {success_count}") - print(f"Failures: {failure_count}") - - def dump_json_key(source_table: ServiceResource, key: str) -> None: """ Load the JSON data from DynamoDB for a given key and print it. @@ -142,6 +119,46 @@ def dump_json_keys(source_table: ServiceResource, keys: List[str]) -> None: dump_json_key(source_table, key) +def delete_keys(source_table: ServiceResource, keys: List[str]) -> None: + """ + Delete items with the given list of keys from the DynamoDB table. + :param source_table: The DynamoDB table resource. + :param keys: A list of keys to delete. + """ + total_keys = len(keys) + deleted_count = 0 + failure_count = 0 + for key in keys: + try: + num_partitions = get_num_partitions(source_table, key) + for index in range(num_partitions): + source_table.delete_item(Key={"key": key, "index": index}) + print(f"Key: {key} - Successfully deleted") + deleted_count += 1 + except Exception as e: + print(f"Key: {key} - Failed to delete: {e}") + failure_count += 1 + print(f"Total keys: {total_keys}") + print(f"Successfully deleted: {deleted_count}") + print(f"Failures: {failure_count}") + + +def get_num_partitions(source_table: ServiceResource, key: str) -> int: + """ + Get the number of partitions for a given key in the DynamoDB table. + :param source_table: The DynamoDB table resource. + :param key: The primary key of the item to retrieve. + :return: The number of partitions for the key. + """ + response = source_table.get_item(Key={"key": key, "index": 0}, ConsistentRead=True) + if "Item" not in response: + return 0 + item = response["Item"] + num_partitions = int(item.get("num_partitions", 1)) + num_json_val_partitions = int(item.get("num_json_val_partitions", 0)) + return max(num_partitions, num_json_val_partitions) + + def combine_json_partitions(source_table: ServiceResource, key: str) -> Optional[str]: """ Combine all partitions of a JSON item from DynamoDB. @@ -189,6 +206,7 @@ def convert_pickle_to_json_and_update_table(source_table: ServiceResource, key: json_data = JobRun.to_json(state_data) else: # This will skip the state metadata and any other non-standard keys we have in the table + # TODO: how does this impact delete? print(f"Key: {key} - Unknown state type: {state_type}. Skipping.") return False num_json_partitions = math.ceil(len(json_data) / OBJECT_SIZE) @@ -216,18 +234,22 @@ def convert_pickle_to_json_and_update_table(source_table: ServiceResource, key: def convert_pickles_to_json_and_update_table( - source_table: ServiceResource, keys: List[str], dry_run: bool = True + source_table: ServiceResource, + keys: List[str], + dry_run: bool = True, + keys_file: Optional[str] = None, ) -> None: """ Convert pickled items in the DynamoDB table to JSON and update the entries. :param source_table: The DynamoDB table resource. :param keys: List of keys to convert. :param dry_run: If True, simulate the conversion without updating the table. + :param keys_file: File to write failed keys to in dry run. """ total_keys = len(keys) converted_keys = 0 skipped_keys = 0 - failed_keys = 0 + failed_keys = [] for key in keys: try: result = convert_pickle_to_json_and_update_table(source_table, key, dry_run) @@ -235,13 +257,19 @@ def convert_pickles_to_json_and_update_table( converted_keys += 1 else: skipped_keys += 1 - except Exception: - failed_keys += 1 + except Exception as e: + print(f"Key: {key} - Failed to convert pickle to JSON: {e}") + failed_keys.append(key) print(f"Total keys processed: {total_keys}") print(f"Conversions attempted: {total_keys - skipped_keys}") print(f"Conversions succeeded: {converted_keys}") print(f"Conversions skipped: {skipped_keys}") - print(f"Conversions failed: {failed_keys}") + print(f"Conversions failed: {len(failed_keys)}") + if dry_run and keys_file and failed_keys: + with open(keys_file, "w") as f: + for key in failed_keys: + f.write(f"{key}\n") + print(f"Failed keys have been written to {keys_file}") if dry_run: print("Dry run complete. No changes were made to the DynamoDB table.") @@ -266,19 +294,24 @@ def main(): description="Utilities for working with pickles and JSON items in Tron's DynamoDB state store.", epilog=""" Actions: - convert Convert pickled state data to JSON format and update the DynamoDB table. - dump-pickle Load and print the pickles for specified keys. - validate-pickle Validate pickle loads for specified keys. - dump-json Load and print JSON data for specified keys. + convert Convert pickled state data to JSON format and update the DynamoDB table. + dump-pickle Load and print the pickles for specified keys. + dump-json Load and print JSON data for specified keys. + delete-keys Delete the specified keys from the DynamoDB table. + Examples: + Validate pickles (dry run, write failed keys to keys.txt): + pickles_to_json.py --table-name infrastage-tron-state --table-region us-west-1 --action convert --all --dry-run --keys-file keys.txt Convert all pickles to JSON (dry run): pickles_to_json.py --table-name infrastage-tron-state --table-region us-west-1 --action convert --all --dry-run + Convert specific pickles to JSON using keys from a file: + pickles_to_json.py --table-name infrastage-tron-state --table-region us-west-1 --action convert --keys-file keys.txt Convert specific pickles to JSON: pickles_to_json.py --table-name infrastage-tron-state --table-region us-west-1 --action convert --keys "key1" "key2" - Validate specific pickle keys: - pickles_to_json.py --table-name infrastage-tron-state --table-region us-west-1 --action validate-pickle --keys "key1" "key2" - Load and print specific JSON keys: - pickles_to_json.py --table-name infrastage-tron-state --table-region us-west-1 --action dump-json --keys "key1" "key2" + Load and print specific JSON keys using keys from a file: + pickles_to_json.py --table-name infrastage-tron-state --table-region us-west-1 --action dump-json --keys-file keys.txt + Delete specific keys: + pickles_to_json.py --table-name infrastage-tron-state --table-region us-west-1 --action delete-keys --keys "key1" "key2" """, formatter_class=argparse.RawDescriptionHelpFormatter, ) @@ -296,7 +329,7 @@ def main(): ) parser.add_argument( "--action", - choices=["convert", "dump-pickle", "validate-pickle", "dump-json"], + choices=["convert", "dump-pickle", "dump-json", "delete-keys"], required=True, help="Action to perform", ) @@ -306,34 +339,70 @@ def main(): required=False, help="Specific key(s) to perform the action on.", ) + parser.add_argument( + "--keys-file", + required=False, + help="File containing keys to perform the action on. One key per line. On dry run, failed keys will be written to this file.", + ) parser.add_argument( "--all", action="store_true", help="Apply the action to all keys in the table.", ) + args = parser.parse_args() source_table = get_dynamodb_table(args.aws_profile, args.table_name, args.table_region) - - if not args.keys and not args.all: - parser.error("You must provide either --keys or --all.") - + if not args.keys and not args.keys_file and not args.all: + parser.error("You must provide either --keys, --keys-file, or --all.") if args.all: print("Processing all keys in the table...") keys = get_all_jobs(source_table) else: - keys = args.keys - + keys = [] + # TODO: either or? + if args.keys: + keys.extend(args.keys) + if args.keys_file: + try: + with open(args.keys_file) as f: + keys_from_file = [line.strip() for line in f if line.strip()] + keys.extend(keys_from_file) + except Exception as e: + parser.error(f"Error reading keys from file {args.keys_file}: {e}") + if not keys: + parser.error("No keys provided. Please provide keys via --keys or --keys-file.") + keys = list(set(keys)) if args.action == "convert": - convert_pickles_to_json_and_update_table(source_table, keys=keys, dry_run=args.dry_run) + convert_pickles_to_json_and_update_table( + source_table, + keys=keys, + dry_run=args.dry_run, + keys_file=args.keys_file, + ) elif args.action == "dump-pickle": dump_pickle_keys(source_table, keys) - elif args.action == "validate-pickle": - validate_pickles(source_table, keys) elif args.action == "dump-json": dump_json_keys(source_table, keys) + elif args.action == "delete-keys": + confirm = ( + input(f"Are you sure you want to delete {len(keys)} keys from the table '{args.table_name}'? [y/N]: ") + .strip() + .lower() + ) + if confirm in ("y", "yes"): + delete_keys(source_table, keys) + else: + print("Deletion cancelled.") else: print(f"Unknown action: {args.action}") if __name__ == "__main__": main() + + +# KKASP: How to identify keys that can be deleted +# 1. Update validate_pickles to catch these keys (or maybe just convert dry run?) +# - Write failed keys to file? +# 2. Anything that doesn't have json_val after conversion can be deleted +# - TODO: failed keys file? From ddd75a853a2512df42e0c165470370a7adccf11c Mon Sep 17 00:00:00 2001 From: Kevin Kaspari Date: Wed, 8 Jan 2025 07:24:29 -0800 Subject: [PATCH 08/11] Remove comment --- tools/pickles_to_json.py | 9 --------- 1 file changed, 9 deletions(-) diff --git a/tools/pickles_to_json.py b/tools/pickles_to_json.py index fa779aa52..7a105af3d 100644 --- a/tools/pickles_to_json.py +++ b/tools/pickles_to_json.py @@ -206,7 +206,6 @@ def convert_pickle_to_json_and_update_table(source_table: ServiceResource, key: json_data = JobRun.to_json(state_data) else: # This will skip the state metadata and any other non-standard keys we have in the table - # TODO: how does this impact delete? print(f"Key: {key} - Unknown state type: {state_type}. Skipping.") return False num_json_partitions = math.ceil(len(json_data) / OBJECT_SIZE) @@ -359,7 +358,6 @@ def main(): keys = get_all_jobs(source_table) else: keys = [] - # TODO: either or? if args.keys: keys.extend(args.keys) if args.keys_file: @@ -399,10 +397,3 @@ def main(): if __name__ == "__main__": main() - - -# KKASP: How to identify keys that can be deleted -# 1. Update validate_pickles to catch these keys (or maybe just convert dry run?) -# - Write failed keys to file? -# 2. Anything that doesn't have json_val after conversion can be deleted -# - TODO: failed keys file? From 5a60f18085370b3b59b3bd164124bfad3393859c Mon Sep 17 00:00:00 2001 From: Kevin Kaspari Date: Thu, 9 Jan 2025 11:50:09 -0800 Subject: [PATCH 09/11] Update deletion logic to call the Tron API for valid jobs, and discard the rest --- tools/pickles_to_json.py | 74 +++++++++++++++++++++++++++++++++------- 1 file changed, 62 insertions(+), 12 deletions(-) diff --git a/tools/pickles_to_json.py b/tools/pickles_to_json.py index 7a105af3d..6333211ed 100644 --- a/tools/pickles_to_json.py +++ b/tools/pickles_to_json.py @@ -6,6 +6,7 @@ from typing import Optional import boto3 +import requests from boto3.resources.base import ServiceResource from tron.core.job import Job @@ -43,6 +44,24 @@ def get_all_jobs(source_table: ServiceResource) -> List[str]: return list(unique_keys) +def get_job_names(base_url: str) -> List[str]: + """ + Get the list of job names from the Tron API. + :param base_url: The base URL of the Tron API. + :return: A list of job names. + """ + try: + full_url = f"http://{base_url}.yelpcorp.com:8089/api/jobs?include_job_runs=0" + response = requests.get(full_url) + response.raise_for_status() + data = response.json() + job_names = [job["name"] for job in data.get("jobs", [])] + return job_names + except requests.exceptions.RequestException as e: + print(f"An error occurred: {e}") + return [] + + def combine_pickle_partitions(source_table: ServiceResource, key: str) -> bytes: """ Load and combine all partitions of a pickled item from DynamoDB. @@ -119,6 +138,7 @@ def dump_json_keys(source_table: ServiceResource, keys: List[str]) -> None: dump_json_key(source_table, key) +# TODO: clean up old run history for valid jobs? something something look at job_state, then whitelist those runs instead of whitelisting entire jobs def delete_keys(source_table: ServiceResource, keys: List[str]) -> None: """ Delete items with the given list of keys from the DynamoDB table. @@ -237,6 +257,7 @@ def convert_pickles_to_json_and_update_table( keys: List[str], dry_run: bool = True, keys_file: Optional[str] = None, + job_names: List[str] = [], ) -> None: """ Convert pickled items in the DynamoDB table to JSON and update the entries. @@ -249,7 +270,26 @@ def convert_pickles_to_json_and_update_table( converted_keys = 0 skipped_keys = 0 failed_keys = [] + delete_keys = [] + for key in keys: + # Extract the job name from the key + parts = key.split() + if len(parts) < 2: + continue + + state_type, job_info = parts[0], parts[1] + + # Ignore run_num for job_run_state keys + if state_type == "job_run_state": + job_name = ".".join(job_info.split(".")[:-1]) + else: + job_name = job_info + + if job_name not in job_names: + delete_keys.append(key) + continue + try: result = convert_pickle_to_json_and_update_table(source_table, key, dry_run) if result: @@ -259,16 +299,19 @@ def convert_pickles_to_json_and_update_table( except Exception as e: print(f"Key: {key} - Failed to convert pickle to JSON: {e}") failed_keys.append(key) + print(f"Total keys processed: {total_keys}") print(f"Conversions attempted: {total_keys - skipped_keys}") print(f"Conversions succeeded: {converted_keys}") print(f"Conversions skipped: {skipped_keys}") print(f"Conversions failed: {len(failed_keys)}") - if dry_run and keys_file and failed_keys: + print(f"Keys to be deleted: {len(delete_keys)}") + + if keys_file: with open(keys_file, "w") as f: - for key in failed_keys: + for key in failed_keys + delete_keys: # TODO: failed keys to separate file? f.write(f"{key}\n") - print(f"Failed keys have been written to {keys_file}") + print(f"Failed and delete keys have been written to {keys_file}") if dry_run: print("Dry run complete. No changes were made to the DynamoDB table.") @@ -300,17 +343,19 @@ def main(): Examples: Validate pickles (dry run, write failed keys to keys.txt): - pickles_to_json.py --table-name infrastage-tron-state --table-region us-west-1 --action convert --all --dry-run --keys-file keys.txt + pickles_to_json.py --table-name infrastage-tron-state --table-region us-west-1 --action convert --all --dry-run --keys-file keys.txt --tron-api-url tron-infrastage Convert all pickles to JSON (dry run): - pickles_to_json.py --table-name infrastage-tron-state --table-region us-west-1 --action convert --all --dry-run + pickles_to_json.py --table-name infrastage-tron-state --table-region us-west-1 --action convert --all --dry-run --tron-api-url tron-infrastage Convert specific pickles to JSON using keys from a file: - pickles_to_json.py --table-name infrastage-tron-state --table-region us-west-1 --action convert --keys-file keys.txt + pickles_to_json.py --table-name infrastage-tron-state --table-region us-west-1 --action convert --keys-file keys.txt --tron-api-url tron-infrastage Convert specific pickles to JSON: - pickles_to_json.py --table-name infrastage-tron-state --table-region us-west-1 --action convert --keys "key1" "key2" + pickles_to_json.py --table-name infrastage-tron-state --table-region us-west-1 --action convert --keys "key1" "key2" --tron-api-url tron-infrastage Load and print specific JSON keys using keys from a file: pickles_to_json.py --table-name infrastage-tron-state --table-region us-west-1 --action dump-json --keys-file keys.txt Delete specific keys: pickles_to_json.py --table-name infrastage-tron-state --table-region us-west-1 --action delete-keys --keys "key1" "key2" + Delete keys from a file: + pickles_to_json.py --table-name infrastage-tron-state --table-region us-west-1 --action delete-keys --keys-file keys.txt """, formatter_class=argparse.RawDescriptionHelpFormatter, ) @@ -348,7 +393,11 @@ def main(): action="store_true", help="Apply the action to all keys in the table.", ) - + parser.add_argument( + "--tron-api-url", + required=True, + help="URL of the Tron API to fetch job names from.", + ) args = parser.parse_args() source_table = get_dynamodb_table(args.aws_profile, args.table_name, args.table_region) if not args.keys and not args.keys_file and not args.all: @@ -370,12 +419,13 @@ def main(): if not keys: parser.error("No keys provided. Please provide keys via --keys or --keys-file.") keys = list(set(keys)) + + # Get job names from the Tron API using the provided URL + job_names = get_job_names(args.tron_api_url) + if args.action == "convert": convert_pickles_to_json_and_update_table( - source_table, - keys=keys, - dry_run=args.dry_run, - keys_file=args.keys_file, + source_table, keys=keys, dry_run=args.dry_run, keys_file=args.keys_file, job_names=job_names ) elif args.action == "dump-pickle": dump_pickle_keys(source_table, keys) From 56490b07f1a5ac10634906084cf900e4eccefd01 Mon Sep 17 00:00:00 2001 From: Kevin Kaspari Date: Thu, 9 Jan 2025 12:20:43 -0800 Subject: [PATCH 10/11] Add a separation for failed-keys so that we can investigate failures outside of deprecated keys --- tools/pickles_to_json.py | 22 +++++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/tools/pickles_to_json.py b/tools/pickles_to_json.py index 6333211ed..4a27f72e4 100644 --- a/tools/pickles_to_json.py +++ b/tools/pickles_to_json.py @@ -257,6 +257,7 @@ def convert_pickles_to_json_and_update_table( keys: List[str], dry_run: bool = True, keys_file: Optional[str] = None, + failed_keys_file: Optional[str] = None, job_names: List[str] = [], ) -> None: """ @@ -309,9 +310,14 @@ def convert_pickles_to_json_and_update_table( if keys_file: with open(keys_file, "w") as f: - for key in failed_keys + delete_keys: # TODO: failed keys to separate file? + for key in delete_keys: f.write(f"{key}\n") - print(f"Failed and delete keys have been written to {keys_file}") + print(f"Deprecated keys have been written to {keys_file}") + if failed_keys_file: + with open(failed_keys_file, "w") as f: + for key in failed_keys: + f.write(f"{key}\n") + print(f"Failed have been written to {failed_keys_file}") if dry_run: print("Dry run complete. No changes were made to the DynamoDB table.") @@ -388,6 +394,11 @@ def main(): required=False, help="File containing keys to perform the action on. One key per line. On dry run, failed keys will be written to this file.", ) + parser.add_argument( + "--failed-keys-file", + required=False, + help="File to write failed keys to in dry run.", + ) parser.add_argument( "--all", action="store_true", @@ -425,7 +436,12 @@ def main(): if args.action == "convert": convert_pickles_to_json_and_update_table( - source_table, keys=keys, dry_run=args.dry_run, keys_file=args.keys_file, job_names=job_names + source_table, + keys=keys, + dry_run=args.dry_run, + keys_file=args.keys_file, + failed_keys_file=args.failed_keys_file, + job_names=job_names, ) elif args.action == "dump-pickle": dump_pickle_keys(source_table, keys) From d9e8c3a3126aa00dd7f246a0a1175fc05cb4c095 Mon Sep 17 00:00:00 2001 From: Kevin Kaspari Date: Mon, 13 Jan 2025 08:26:14 -0800 Subject: [PATCH 11/11] Add sub_parsers and restructure args a bit --- tools/pickles_to_json.py | 179 +++++++++++++++++++++++++++------------ 1 file changed, 124 insertions(+), 55 deletions(-) diff --git a/tools/pickles_to_json.py b/tools/pickles_to_json.py index 4a27f72e4..53a3fd3e7 100644 --- a/tools/pickles_to_json.py +++ b/tools/pickles_to_json.py @@ -256,8 +256,8 @@ def convert_pickles_to_json_and_update_table( source_table: ServiceResource, keys: List[str], dry_run: bool = True, - keys_file: Optional[str] = None, - failed_keys_file: Optional[str] = None, + deprecated_keys_output: Optional[str] = None, + failed_keys_output: Optional[str] = None, job_names: List[str] = [], ) -> None: """ @@ -265,7 +265,9 @@ def convert_pickles_to_json_and_update_table( :param source_table: The DynamoDB table resource. :param keys: List of keys to convert. :param dry_run: If True, simulate the conversion without updating the table. - :param keys_file: File to write failed keys to in dry run. + :param deprecated_keys_output: Output file to write deprecated keys to. + :param failed_keys_output: Output file to write keys that failed to convert to. + :param job_names: List of job names to use for filtering keys. """ total_keys = len(keys) converted_keys = 0 @@ -308,16 +310,16 @@ def convert_pickles_to_json_and_update_table( print(f"Conversions failed: {len(failed_keys)}") print(f"Keys to be deleted: {len(delete_keys)}") - if keys_file: - with open(keys_file, "w") as f: + if deprecated_keys_output: + with open(deprecated_keys_output, "w") as f: for key in delete_keys: f.write(f"{key}\n") - print(f"Deprecated keys have been written to {keys_file}") - if failed_keys_file: - with open(failed_keys_file, "w") as f: + print(f"Deprecated keys have been written to {deprecated_keys_output}") + if failed_keys_output: + with open(failed_keys_output, "w") as f: for key in failed_keys: f.write(f"{key}\n") - print(f"Failed have been written to {failed_keys_file}") + print(f"Failed have been written to {failed_keys_output}") if dry_run: print("Dry run complete. No changes were made to the DynamoDB table.") @@ -346,22 +348,19 @@ def main(): dump-pickle Load and print the pickles for specified keys. dump-json Load and print JSON data for specified keys. delete-keys Delete the specified keys from the DynamoDB table. - Examples: - Validate pickles (dry run, write failed keys to keys.txt): - pickles_to_json.py --table-name infrastage-tron-state --table-region us-west-1 --action convert --all --dry-run --keys-file keys.txt --tron-api-url tron-infrastage - Convert all pickles to JSON (dry run): - pickles_to_json.py --table-name infrastage-tron-state --table-region us-west-1 --action convert --all --dry-run --tron-api-url tron-infrastage - Convert specific pickles to JSON using keys from a file: - pickles_to_json.py --table-name infrastage-tron-state --table-region us-west-1 --action convert --keys-file keys.txt --tron-api-url tron-infrastage - Convert specific pickles to JSON: - pickles_to_json.py --table-name infrastage-tron-state --table-region us-west-1 --action convert --keys "key1" "key2" --tron-api-url tron-infrastage - Load and print specific JSON keys using keys from a file: - pickles_to_json.py --table-name infrastage-tron-state --table-region us-west-1 --action dump-json --keys-file keys.txt - Delete specific keys: - pickles_to_json.py --table-name infrastage-tron-state --table-region us-west-1 --action delete-keys --keys "key1" "key2" - Delete keys from a file: - pickles_to_json.py --table-name infrastage-tron-state --table-region us-west-1 --action delete-keys --keys-file keys.txt + Validate pickles (dry run, write deprecated keys to deprecated_keys.txt): + pickles_to_json.py --table-name infrastage-tron-state --table-region us-west-1 convert --all --dry-run --deprecated-keys-output deprecated_keys.txt --tron-api tron-infrastage + Convert all pickles to JSON (without dry run): + pickles_to_json.py --table-name infrastage-tron-state --table-region us-west-1 convert --all --tron-api tron-infrastage + Convert specific pickles to JSON using keys from an input file: + pickles_to_json.py --table-name infrastage-tron-state --table-region us-west-1 convert --keys-file input_keys.txt --tron-api tron-infrastage + Load and print specific JSON keys using keys from an input file: + pickles_to_json.py --table-name infrastage-tron-state --table-region us-west-1 dump-json --keys-file input_keys.txt + Delete specific keys (dry run): + pickles_to_json.py --table-name infrastage-tron-state --table-region us-west-1 delete-keys --keys "key1" "key2" --dry-run + Delete keys from an input file (without dry run): + pickles_to_json.py --table-name infrastage-tron-state --table-region us-west-1 delete-keys --keys-file input_keys.txt """, formatter_class=argparse.RawDescriptionHelpFormatter, ) @@ -372,47 +371,114 @@ def main(): ) parser.add_argument("--table-name", required=True, help="Name of the DynamoDB table") parser.add_argument("--table-region", required=True, help="AWS region of the DynamoDB table") - parser.add_argument( + + subparsers = parser.add_subparsers(dest="action", required=True, help="Action to perform") + + convert_parser = subparsers.add_parser( + "convert", help="Convert pickled state data to JSON format and update the DynamoDB table." + ) + convert_parser.add_argument( + "--tron-api", + required=True, + help="Base URL of the Tron API to fetch job names from.", + ) + convert_parser.add_argument( + "--keys", + nargs="+", + required=False, + help="Specific key(s) to perform the action on.", + ) + convert_parser.add_argument( + "--keys-file", + required=False, + help="Input file containing keys to perform the action on. One key per line.", + ) + convert_parser.add_argument( + "--deprecated-keys-output", + required=False, + help="Output file to write deprecated keys to. These are keys associated with jobs not present in Tron. One key per line.", + ) + convert_parser.add_argument( + "--failed-keys-output", + required=False, + help="Output file to write keys that failed to convert. One key per line.", + ) + convert_parser.add_argument( + "--all", + action="store_true", + help="Apply the action to all keys in the table.", + ) + convert_parser.add_argument( "--dry-run", action="store_true", - help="Simulate the action without making any changes to the DynamoDB table", + help="Simulate the conversion without making any changes to the DynamoDB table", ) - parser.add_argument( - "--action", - choices=["convert", "dump-pickle", "dump-json", "delete-keys"], - required=True, - help="Action to perform", + + dump_pickle_parser = subparsers.add_parser("dump-pickle", help="Load and print the pickles for specified keys.") + dump_pickle_parser.add_argument( + "--keys", + nargs="+", + required=False, + help="Specific key(s) to perform the action on.", ) - parser.add_argument( + dump_pickle_parser.add_argument( + "--keys-file", + required=False, + help="Input file containing keys to perform the action on. One key per line.", + ) + dump_pickle_parser.add_argument( + "--all", + action="store_true", + help="Apply the action to all keys in the table.", + ) + + dump_json_parser = subparsers.add_parser("dump-json", help="Load and print JSON data for specified keys.") + dump_json_parser.add_argument( "--keys", nargs="+", required=False, help="Specific key(s) to perform the action on.", ) - parser.add_argument( + dump_json_parser.add_argument( "--keys-file", required=False, - help="File containing keys to perform the action on. One key per line. On dry run, failed keys will be written to this file.", + help="Input file containing keys to perform the action on. One key per line.", ) - parser.add_argument( - "--failed-keys-file", + dump_json_parser.add_argument( + "--all", + action="store_true", + help="Apply the action to all keys in the table.", + ) + + delete_keys_parser = subparsers.add_parser("delete-keys", help="Delete the specified keys from the DynamoDB table.") + delete_keys_parser.add_argument( + "--keys", + nargs="+", required=False, - help="File to write failed keys to in dry run.", + help="Specific key(s) to perform the action on.", ) - parser.add_argument( + delete_keys_parser.add_argument( + "--keys-file", + required=False, + help="Input file containing keys to perform the action on. One key per line.", + ) + delete_keys_parser.add_argument( "--all", action="store_true", help="Apply the action to all keys in the table.", ) - parser.add_argument( - "--tron-api-url", - required=True, - help="URL of the Tron API to fetch job names from.", + delete_keys_parser.add_argument( + "--dry-run", + action="store_true", + help="Simulate the deletion without making any changes to the DynamoDB table", ) + args = parser.parse_args() source_table = get_dynamodb_table(args.aws_profile, args.table_name, args.table_region) + if not args.keys and not args.keys_file and not args.all: parser.error("You must provide either --keys, --keys-file, or --all.") + if args.all: print("Processing all keys in the table...") keys = get_all_jobs(source_table) @@ -431,16 +497,14 @@ def main(): parser.error("No keys provided. Please provide keys via --keys or --keys-file.") keys = list(set(keys)) - # Get job names from the Tron API using the provided URL - job_names = get_job_names(args.tron_api_url) - if args.action == "convert": + job_names = get_job_names(args.tron_api) convert_pickles_to_json_and_update_table( source_table, keys=keys, dry_run=args.dry_run, - keys_file=args.keys_file, - failed_keys_file=args.failed_keys_file, + deprecated_keys_output=args.deprecated_keys_output, + failed_keys_output=args.failed_keys_file, job_names=job_names, ) elif args.action == "dump-pickle": @@ -448,15 +512,20 @@ def main(): elif args.action == "dump-json": dump_json_keys(source_table, keys) elif args.action == "delete-keys": - confirm = ( - input(f"Are you sure you want to delete {len(keys)} keys from the table '{args.table_name}'? [y/N]: ") - .strip() - .lower() - ) - if confirm in ("y", "yes"): - delete_keys(source_table, keys) + if args.dry_run: + print(f"DRY RUN: Would delete {len(keys)} keys from the table '{args.table_name}'.") + for key in keys: + print(f"Would delete key: {key}") else: - print("Deletion cancelled.") + confirm = ( + input(f"Are you sure you want to delete {len(keys)} keys from the table '{args.table_name}'? [y/N]: ") + .strip() + .lower() + ) + if confirm in ("y", "yes"): + delete_keys(source_table, keys) + else: + print("Deletion cancelled.") else: print(f"Unknown action: {args.action}")