Skip to content

Commit

Permalink
polling logic is added to watchdog
Browse files Browse the repository at this point in the history
  • Loading branch information
sametd committed Dec 14, 2023
1 parent 44b435e commit 347e2af
Showing 1 changed file with 27 additions and 23 deletions.
50 changes: 27 additions & 23 deletions pyaviso/engine/file_based_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import _thread
import os
import threading
import time
from datetime import datetime
from queue import Queue
Expand Down Expand Up @@ -192,7 +193,7 @@ def _polling(
to_date: datetime = None,
):
"""
This method implements the active polling
This method implements the active polling using watchdog
:param key: key to watch as a prefix
:param callback: function to call if any change happen
:param channel: global communication channel among threads
Expand All @@ -204,8 +205,9 @@ def _polling(
logger.warning("from_date option is disabled in TestMode")
if to_date:
logger.warning("to_date option is disabled in TestMode")

try:
# first create the directory to watch
# Create the directory to watch if it doesn't exist
if not os.path.exists(key):
try:
os.makedirs(key, exist_ok=True)
Expand All @@ -214,50 +216,52 @@ def _polling(
logger.debug("", exc_info=True)
return False

# define a class to handle the new events
class WatchdogHandler(FileSystemEventHandler):
def __init__(self, engine, key, callback):
# Define a class to handle the new events using watchdog
class EventHandler(FileSystemEventHandler):
def __init__(self, engine, debounce_time=0.5, polling_interval=1):
super().__init__()
self._engine = engine
self._key = key
self._callback = callback
self._last_processed_time = 0
self._debounce_time = debounce_time
self._polling_interval = polling_interval

def on_modified(self, event):
if not event.is_directory and event.src_path.endswith(self._key):
if not event.is_directory:
kvs = self._engine.pull(key=event.src_path)
for kv in kvs:
k = kv["key"]
v = kv["value"].decode()
# skip the status
if kv["key"].endswith("status"):
continue
logger.debug(f"Notification received for key {k}")
try:
# execute the trigger
self._callback(k, v)
callback(k, v)
except Exception as ee:
logger.error(f"Error with notification trigger, exception: {type(ee)} {ee}")
logger.debug("", exc_info=True)
time.sleep(self._polling_interval)

# define the event handler
event_handler = WatchdogHandler(engine=self, key=key, callback=callback)

# create an observer and schedule the event handler
# Set up the observer and event handler
event_handler = EventHandler(engine=self)
observer = Observer()
observer.schedule(event_handler, path=key, recursive=True)

# start the observer in a daemon thread so we can stop it
observer.schedule(event_handler, key, recursive=True)
observer.start()

# this is the stop condition
# Daemon thread to allow stopping
def t_run():
observer.join()

t = threading.Thread(target=t_run)
t.setDaemon(True)
t.start()

# Stop condition
while key in self._listeners:
time.sleep(0.1)

# stop the observer
observer.stop()
observer.join()

except Exception as e:
logger.error(f"Error occurred during polling: {e}")
logger.error(f"Error while listening to key {key}, {e}")
logger.debug("", exc_info=True)
_thread.interrupt_main()
_thread.interrupt_main()

0 comments on commit 347e2af

Please sign in to comment.