Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Raise RuntimeError if watcher.cancel() is called after client.close() #17

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions aetcd/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -644,12 +644,16 @@ async def response_callback(response):
canceled = asyncio.Event()

async def cancel():
if not self._watcher:
raise RuntimeError('Calling watcher.cancel() on a closed client')
canceled.set()
await events.put(None)
await self._watcher.cancel(watcher_callback.watch_id)

@_handle_errors
async def iterator():
if not self._watcher:
raise RuntimeError('Using watcher as iterator on a closed client')
while not canceled.is_set():
event = await events.get()
if event is None:
Expand Down
11 changes: 8 additions & 3 deletions aetcd/rtypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,10 +269,15 @@ def __init__(self, kind, kv, prev_kv=None):


class Watch:
"""Reperesents the result of a watch operation.
"""Represents the result of a watch operation.

To get emitted events use as an asynchronous iterator, emitted events are
instances of an :class:`~aetcd.rtypes.Event`.
Use as an asynchronous iterator to get emitted events.
Events are instances of an :class:`~aetcd.rtypes.Event`.
Such iterator is exhausted either when undelying :class:`~aetcd.client.Client`
is closed or cancel method is called on this instance.

Instance must not be used after underlying :class:`~aetcd.client.Client`
is closed, and doing so will raise RuntimeError.

Usage example:

Expand Down
43 changes: 43 additions & 0 deletions tests/integration/test_watch.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,3 +234,46 @@ async def test_watch_key_ignores_global_timeout(client, etcdctl_put):
break

await w.cancel()


@pytest.mark.asyncio
async def test_watch_event_before_iterator(etcd, etcdctl_put):
w = await etcd.watch(b'key')

etcdctl_put('key', '1')
await asyncio.sleep(1)
w_iter = aiter(w)

event = await anext(w_iter)
assert event.kv.value == b'1'

await w.cancel()

with pytest.raises(StopAsyncIteration):
await anext(w_iter)


@pytest.mark.asyncio
async def test_watch_for_closed_client(etcd, etcdctl_put):
w = await etcd.watch(b'key')

etcdctl_put('key', '1')
w_iter = aiter(w)
event = await anext(w_iter)
assert event.kv.value == b'1'

etcdctl_put('key', '2')
await etcd.close()
# key=2 event will be lost on close(), if not consumed already

etcdctl_put('key', '3')
async for kv in w_iter:
raise AssertionError('non-empty watcher iterator with closed client')

etcdctl_put('key', '4')
with pytest.raises(RuntimeError):
async for kv in w:
raise AssertionError('non-empty watcher iterator with closed client')

with pytest.raises(RuntimeError):
await w.cancel()