Skip to content

Commit

Permalink
Remove deprecated itest. Rename some variables to make things a bit c…
Browse files Browse the repository at this point in the history
…learer.
  • Loading branch information
KaspariK committed Sep 18, 2024
1 parent 84f4148 commit e65aef5
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 21 deletions.
11 changes: 0 additions & 11 deletions itest.sh
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,3 @@ fi

kill -SIGTERM $TRON_PID
wait $TRON_PID || true

# /opt/venvs/tron/bin/python - <<EOF
# import os
# from tron.serialize.runstate.shelvestore import ShelveStateStore, ShelveKey
# db = ShelveStateStore('$TRON_WORKDIR/tron_state')
# key = ShelveKey('mcp_state', 'StateMetadata') "mcp_state___StateMetadata"
# res = db.restore([key])
# ts = res[key][u'create_time']
# print("assert db time {} > start time {}".format(ts, int(os.environ['TRON_START_TIME'])))
# assert ts > int(os.environ['TRON_START_TIME'])
# EOF
6 changes: 3 additions & 3 deletions tests/serialize/runstate/dynamodb_state_store_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ def large_object():
"runs": [],
"cleanup_run": None,
"manual": False,
"large_data": [i for i in range(10_000)], # TODO: is this enough to get partitioned?
"large_data": [i for i in range(1_000_000)],
}


Expand Down Expand Up @@ -154,8 +154,8 @@ def test_save(self, store, small_object, large_object):
for key in keys:
item = store.table.get_item(Key={"key": key, "index": 0})
assert "Item" in item
assert "raw_val" in item["Item"]
assert_equal(json.loads(item["Item"]["raw_val"]), small_object)
assert "json_val" in item["Item"]
assert_equal(json.loads(item["Item"]["json_val"]), small_object)

def test_delete_if_val_is_none(self, store, small_object, large_object):
key_value_pairs = [
Expand Down
14 changes: 7 additions & 7 deletions tron/serialize/runstate/dynamodb_state_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,10 +167,10 @@ def save(self, key_value_pairs) -> None:
self.save_queue[key] = (val, None)
else:
state_type = self.get_type_from_key(key)
serialized_val = self._serialize_item(state_type, val)
json_val = self._serialize_item(state_type, val)
self.save_queue[key] = (
val,
serialized_val,
json_val,
)
break

Expand All @@ -181,20 +181,20 @@ def _consume_save_queue(self):
for _ in range(qlen):
try:
with self.save_lock:
key, (original_val, serialized_val) = self.save_queue.popitem(last=False)
log.debug(f"Processing save for {key} with a value of {original_val}")
key, (pickled_val, json_val) = self.save_queue.popitem(last=False)
log.debug(f"Processing save for {key} with a value of {pickled_val}")
# Remove all previous data with the same partition key
# TODO: only remove excess partitions if new data has fewer
self._delete_item(key)
if original_val is not None:
self.__setitem__(key, pickle.dumps(original_val), serialized_val)
if pickled_val is not None:
self.__setitem__(key, pickle.dumps(pickled_val), json_val)
# reset errors count if we can successfully save
saved += 1
except Exception as e:
error = "tron_dynamodb_save_failure: failed to save key " f'"{key}" to dynamodb:\n{repr(e)}'
log.error(error)
with self.save_lock:
self.save_queue[key] = (original_val, serialized_val)
self.save_queue[key] = (pickled_val, json_val)
duration = time.time() - start
log.info(f"saved {saved} items in {duration}s")

Expand Down
1 change: 1 addition & 0 deletions tron/serialize/runstate/shelvestore.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
log = logging.getLogger(__name__)


# TODO: TRON-2293 This class does some Python 2 and Python 3 handling shenanigans. It should be cleaned up.
class Py2Shelf(shelve.Shelf):
def __init__(self, filename, flag="c", protocol=2, writeback=False):
db = bsddb3.hashopen(filename, flag)
Expand Down

0 comments on commit e65aef5

Please sign in to comment.