From 3c404084a57a4f94b20fb01c06511bd2aae77118 Mon Sep 17 00:00:00 2001 From: David Sutherland Date: Wed, 29 May 2024 15:25:27 +1200 Subject: [PATCH] uis subscription data sync info (UIS-597) --- cylc/flow/network/resolvers.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/cylc/flow/network/resolvers.py b/cylc/flow/network/resolvers.py index 8bd9c2347ac..07b6121805e 100644 --- a/cylc/flow/network/resolvers.py +++ b/cylc/flow/network/resolvers.py @@ -569,6 +569,8 @@ async def subscribe_delta( delta_queues = self.data_store_mgr.delta_queues deltas_queue: DeltaQueue = queue.Queue() + self.data_store_mgr.graphql_sub_interrogate(sub_id, info) + counters: Dict[str, int] = {} delta_yield_queue: DeltaQueue = queue.Queue() flow_delta_queues: Dict[str, queue.Queue[Tuple[str, dict]]] = {} @@ -591,6 +593,9 @@ async def subscribe_delta( if w_id in self.data_store_mgr.data: if sub_id not in delta_queues[w_id]: delta_queues[w_id][sub_id] = deltas_queue + await self.data_store_mgr.graphql_sub_data_match( + w_id, sub_id + ) # On new yield workflow data-store as added delta if args.get('initial_burst'): delta_store = create_delta_store( @@ -658,6 +663,7 @@ async def subscribe_delta( import traceback logger.warning(traceback.format_exc()) finally: + self.data_store_mgr.graphql_sub_discard(sub_id) for w_id in w_ids: if delta_queues.get(w_id, {}).get(sub_id): del delta_queues[w_id][sub_id]