Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Integration][Gitlab] Handle cases where handling event takes more than the allowed timeout #1326

Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 26 additions & 16 deletions integrations/gitlab/gitlab_integration/events/event_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
event_context,
EventContext,
)

import time
Observer = Callable[[str, dict[str, Any]], Awaitable[Any]]


Expand Down Expand Up @@ -85,21 +85,31 @@ async def _notify(self, event_id: str, body: dict[str, Any]) -> None:
)
return
for observer in observers_list:
try:
if asyncio.iscoroutinefunction(observer):
if inspect.ismethod(observer):
handler = observer.__self__.__class__.__name__
logger.debug(
f"Notifying observer: {handler}, for event: {event_id}",
event_id=event_id,
handler=handler,
)
await observer(event_id, body) # Sequentially call each observer
except Exception as e:
logger.error(
f"Error processing event {event_id} with observer {observer}: {str(e)}"
)

retries_left = 3
timeout = 90
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets store it in consts

observer_time = time.time()
while retries_left > 0:
try:
if asyncio.iscoroutinefunction(observer):
if inspect.ismethod(observer):
handler = observer.__self__.__class__.__name__
logger.debug(
f"Notifying observer: {handler}, for event: {event_id} at {observer_time}",
event_id=event_id,
handler=handler,
)
await asyncio.wait_for(observer(event_id, body), timeout) # Sequentially call each observer
break
except asyncio.TimeoutError:
logger.error(
f"{handler} started work at {observer_time}, did not complete handling event {event_id} within {timeout} seconds, retrying"
)
retries_left -= 1
except Exception as e:
logger.error(
f"Error processing event {event_id} with observer {observer}: {e}",exc_info=True,
)
break

class SystemEventHandler(BaseEventHandler):
def __init__(self) -> None:
Expand Down
Loading