Skip to content

Commit

Permalink
Fix event buffer exit blocking (#10)
Browse files Browse the repository at this point in the history
  • Loading branch information
bpapillon authored Jun 17, 2024
1 parent 9451018 commit 652a735
Showing 1 changed file with 13 additions and 22 deletions.
35 changes: 13 additions & 22 deletions src/schematic/event_buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ def __init__(

# Start periodic flushing thread
self.flush_thread = threading.Thread(target=self._periodic_flush)
self.flush_thread.daemon = True
self.flush_thread.start()

def _flush(self):
Expand All @@ -47,17 +48,9 @@ def _flush(self):
self.current_size = 0

def _periodic_flush(self):
while True:
if self.shutdown.wait(timeout=self.interval):
# Stop accepting new events
self.stopped = True

# Flush any remaining events
self._flush()

break
else:
self._flush()
while not self.shutdown.is_set():
self._flush()
self.shutdown.wait(timeout=self.interval)

def push(self, event: CreateEventRequestBody):
if self.stopped:
Expand All @@ -74,8 +67,9 @@ def push(self, event: CreateEventRequestBody):

def stop(self):
try:
self.stopped = True
self.shutdown.set()
self.flush_thread.join()
self.flush_thread.join(timeout=5)
except Exception as e:
self.logger.error(f"Panic occurred while closing client: {e}")

Expand Down Expand Up @@ -116,18 +110,14 @@ async def _flush(self):
self.current_size = 0

async def _periodic_flush(self):
while True:
while not self.shutdown_event.is_set():
await self._flush()
try:
await asyncio.wait_for(self.shutdown_event.wait(), timeout=self.interval)
# Stop accepting new events
self.stopped = True

# Flush any remaining events
await self._flush()

break
await asyncio.wait_for(
self.shutdown_event.wait(), timeout=self.interval
)
except asyncio.TimeoutError:
await self._flush()
pass

async def push(self, event: CreateEventRequestBody):
if self.stopped:
Expand All @@ -144,6 +134,7 @@ async def push(self, event: CreateEventRequestBody):

async def stop(self):
try:
self.stopped = True
self.shutdown_event.set()
await self.flush_task
except Exception as e:
Expand Down

0 comments on commit 652a735

Please sign in to comment.