Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hotfix/fix patching #11

Merged
merged 10 commits into from
Dec 15, 2023
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,7 @@ aviso-server/rest/aviso_rest.egg-info/*
aviso-server/auth/aviso_auth.egg-info/*
aviso-server/monitoring/aviso_monitoring.egg-info/*
docs/build/*
venv
.venv
build
default.etcd
2 changes: 1 addition & 1 deletion pyaviso/engine/etcd_grpc_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def __init__(self, config: EngineConfig, auth: Auth):
self._base_url = f"http://{self._host}:{self._port}/v3/"

def _initialise_server(self):
if isinstance(self.auth, EtcdAuth):
if type(self.auth) == EtcdAuth: # noqa: E721
self._server = Etcd3Client(
self.host, self.port, user=self.auth.username, password=self.auth.password, timeout=self.timeout
)
Expand Down
2 changes: 1 addition & 1 deletion pyaviso/engine/etcd_rest_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ def _authenticate(self) -> bool:
This method authenticates the user and set the internal token, this is only done for Etcd authentication
:return: True if successfully authenticated
"""
if isinstance(self.auth, EtcdAuth):
if type(self.auth) == EtcdAuth: # noqa: E721
logger.debug(f"Authenticating user {self.auth.username}...")

url = self._base_url + "auth/authenticate"
Expand Down
48 changes: 26 additions & 22 deletions pyaviso/engine/file_based_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import _thread
import os
import threading
import time
from datetime import datetime
from queue import Queue
Expand Down Expand Up @@ -192,7 +193,7 @@ def _polling(
to_date: datetime = None,
):
"""
This method implements the active polling
This method implements the active polling using watchdog
:param key: key to watch as a prefix
:param callback: function to call if any change happen
:param channel: global communication channel among threads
Expand All @@ -204,8 +205,9 @@ def _polling(
logger.warning("from_date option is disabled in TestMode")
if to_date:
logger.warning("to_date option is disabled in TestMode")

try:
# first create the directory to watch
# Create the directory to watch if it doesn't exist
if not os.path.exists(key):
try:
os.makedirs(key, exist_ok=True)
Expand All @@ -214,50 +216,52 @@ def _polling(
logger.debug("", exc_info=True)
return False

# define a class to handle the new events
class WatchdogHandler(FileSystemEventHandler):
def __init__(self, engine, key, callback):
# Define a class to handle the new events using watchdog
class EventHandler(FileSystemEventHandler):
def __init__(self, engine, debounce_time=0.5, polling_interval=1):
super().__init__()
self._engine = engine
self._key = key
self._callback = callback
self._last_processed_time = 0
self._debounce_time = debounce_time
self._polling_interval = polling_interval

def on_modified(self, event):
if not event.is_directory and event.src_path.endswith(self._key):
if not event.is_directory:
kvs = self._engine.pull(key=event.src_path)
for kv in kvs:
k = kv["key"]
v = kv["value"].decode()
# skip the status
if kv["key"].endswith("status"):
continue
logger.debug(f"Notification received for key {k}")
try:
# execute the trigger
self._callback(k, v)
callback(k, v)
except Exception as ee:
logger.error(f"Error with notification trigger, exception: {type(ee)} {ee}")
logger.debug("", exc_info=True)
time.sleep(self._polling_interval)

# define the event handler
event_handler = WatchdogHandler(engine=self, key=key, callback=callback)

# create an observer and schedule the event handler
# Set up the observer and event handler
event_handler = EventHandler(engine=self)
observer = Observer()
observer.schedule(event_handler, path=key, recursive=True)

# start the observer in a daemon thread so we can stop it
observer.schedule(event_handler, key, recursive=True)
observer.start()

# this is the stop condition
# Daemon thread to allow stopping
def t_run():
observer.join()

t = threading.Thread(target=t_run)
t.setDaemon(True)
t.start()

# Stop condition
while key in self._listeners:
time.sleep(0.1)

# stop the observer
observer.stop()
observer.join()

except Exception as e:
logger.error(f"Error occurred during polling: {e}")
logger.error(f"Error while listening to key {key}, {e}")
logger.debug("", exc_info=True)
_thread.interrupt_main()
8 changes: 7 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,2 +1,8 @@
[tool.black]
line-length = 120
line-length = 120
exclude = '''
/(
\. # Ignore directories starting with .
| venv # Ignore venv directory
)/
'''
3 changes: 1 addition & 2 deletions tests/system/test_aviso.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,7 @@ def caplog_for_logger(caplog): # this is needed to assert over the logging outp
lo.removeHandler(caplog.handler)


def reset_previous_run(monkeypatch: pytest.MonkeyPatch):
monkeypatch.chdir(base_path())
def reset_previous_run():
file_path = "tests/system/fixtures/received.txt"
full_path = os.path.join(os.getcwd(), file_path)
if os.path.exists(full_path):
Expand Down
26 changes: 12 additions & 14 deletions tests/unit/test_file_based_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def pre_post_test(test_engine):
pass
yield
# delete all the keys at the end of the test
test_engine.delete("/tmp/aviso/test")
# test_engine.delete("/tmp/aviso/test")
test_engine.stop()


Expand Down Expand Up @@ -74,35 +74,33 @@ def test_listen(test_engine):
callback_list = []

def callback(key, value):
logger.debug(f"Callback triggered for key: {key}")
callback_list.append(1)

# listen to a test key
# Listen to a test key
assert test_engine.listen(["/tmp/aviso/test"], callback)
# wait a fraction and check the function has been triggered
time.sleep(0.5)
time.sleep(0.5) # Increased wait time

# create independent change to the test key to trigger the notification
# Create independent change to the test key to trigger the notification
kvs = [{"key": "/tmp/aviso/test/test1", "value": "1"}]
assert test_engine.push(kvs)
# wait a fraction and check the function has been triggered
time.sleep(1)
time.sleep(1) # Increased wait time
assert len(callback_list) == 1

# repeat the push operation
# Repeat the push operation
kvs = [{"key": "/tmp/aviso/test/test1", "value": "2"}]
assert test_engine.push(kvs)
# wait a fraction and check the function has been triggered
time.sleep(1)
time.sleep(1) # Increased wait time
assert len(callback_list) == 2

# stop listening
# Stop listening
resp = test_engine.stop()
assert resp

# repeat the push operation
kvs = [{"key": "/tmp/aviso/test/test1", "value": "2"}]
# Repeat the push operation
kvs = [{"key": "/tmp/aviso/test/test1", "value": "3"}]
assert test_engine.push(kvs)

# wait a fraction and check the function has NOT been triggered
# Wait and check that the function has NOT been triggered
time.sleep(1)
assert len(callback_list) == 2
12 changes: 6 additions & 6 deletions tox.ini
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
[flake8]
max-line-length = 120
show-source = true
exclude = .*
exclude = .*,venv
extend-ignore = E203
[isort]
profile=black
skip_glob=.*
skip = venv
skip_glob = .*
[tox]
envlist = py36, quality
[testenv]
deps =
pytest
passenv = http_proxy HTTP_PROXY https_proxy HTTPS_PROXY no_proxy NO_PROXY
deps = pytest
passenv = http_proxy,HTTP_PROXY,https_proxy,HTTPS_PROXY,no_proxy,NO_PROXY
commands =
pip install -e aviso-server/monitoring
pip install -e aviso-server/rest
Expand All @@ -26,5 +26,5 @@ deps =
flake8
commands =
isort --check .
black --check .
black --check --exclude='/(\..*|venv)/' .
flake8 .
Loading