Skip to content

Commit

Permalink
Merge pull request #1018 from Yelp/u/kkasp/update-delete-item
Browse files Browse the repository at this point in the history
Update delete_item logic to handle json partitions
  • Loading branch information
KaspariK authored Jan 9, 2025
2 parents a480ef7 + 0934829 commit 875d617
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 11 deletions.
29 changes: 27 additions & 2 deletions tests/serialize/runstate/dynamodb_state_store_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,12 +262,37 @@ def test_delete_item(self, store, small_object, large_object):
value = large_object
pairs = list(zip(keys, (value for i in range(len(keys)))))
store.save(pairs)
store._consume_save_queue()

for key, value in pairs:
store._delete_item(key)

for key, value in pairs:
assert_equal(store._get_num_of_partitions(key), 0)
num_partitions, num_json_val_partitions = store._get_num_of_partitions(key)
assert_equal(num_partitions, 0)
assert_equal(num_json_val_partitions, 0)

def test_delete_item_with_json_partitions(self, store, small_object, large_object):
key = store.build_key("job_state", "test_job")
value = large_object

store.save([(key, value)])
store._consume_save_queue()

num_partitions, num_json_val_partitions = store._get_num_of_partitions(key)
assert num_partitions > 0
assert num_json_val_partitions > 0

store._delete_item(key)

num_partitions, num_json_val_partitions = store._get_num_of_partitions(key)
assert_equal(num_partitions, 0)
assert_equal(num_json_val_partitions, 0)

with mock.patch("tron.config.static_config.load_yaml_file", autospec=True), mock.patch(
"tron.config.static_config.build_configuration_watcher", autospec=True
):
vals = store.restore([key])
assert key not in vals

def test_retry_saving(self, store, small_object, large_object):
with mock.patch(
Expand Down
19 changes: 10 additions & 9 deletions tron/serialize/runstate/dynamodb_state_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,6 @@ def _get_first_partitions(self, keys: list):
new_keys = [{"key": {"S": key}, "index": {"N": "0"}} for key in keys]
return self._get_items(new_keys)

# TODO: Check max partitions as JSON is larger
def _get_remaining_partitions(self, items: list, read_json: bool):
"""Get items in the remaining partitions: N = 1 and beyond"""
keys_for_remaining_items = []
Expand Down Expand Up @@ -363,12 +362,13 @@ def __setitem__(self, key: str, value: Tuple[bytes, str]) -> None:
delta=time.time() - start,
)

# TODO: TRON-2238 - Is this ok if we just use the max number of partitions?
def _delete_item(self, key: str) -> None:
start = time.time()
try:
num_partitions, num_json_val_partitions = self._get_num_of_partitions(key)
max_partitions = max(num_partitions, num_json_val_partitions)
with self.table.batch_writer() as batch:
for index in range(self._get_num_of_partitions(key)):
for index in range(max_partitions):
batch.delete_item(
Key={
"key": key,
Expand All @@ -381,23 +381,24 @@ def _delete_item(self, key: str) -> None:
delta=time.time() - start,
)

# TODO: TRON-2238 - Get max partitions between pickle and json
def _get_num_of_partitions(self, key: str) -> int:
def _get_num_of_partitions(self, key: str) -> Tuple[int, int]:
"""
Return the number of partitions an item is divided into.
Return the number of partitions an item is divided into for both pickled and JSON data.
"""
try:
partition = self.table.get_item(
Key={
"key": key,
"index": 0,
},
ProjectionExpression="num_partitions",
ProjectionExpression="num_partitions, num_json_val_partitions",
ConsistentRead=True,
)
return int(partition.get("Item", {}).get("num_partitions", 0))
num_partitions = int(partition.get("Item", {}).get("num_partitions", 0))
num_json_val_partitions = int(partition.get("Item", {}).get("num_json_val_partitions", 0))
return num_partitions, num_json_val_partitions
except self.client.exceptions.ResourceNotFoundException:
return 0
return 0, 0

def cleanup(self) -> None:
self.stopping = True
Expand Down

0 comments on commit 875d617

Please sign in to comment.