Skip to content

Commit

Permalink
USE_ASYNC now observes all indices
Browse files Browse the repository at this point in the history
  • Loading branch information
toluaina committed Aug 12, 2024
1 parent bb64021 commit d8855a5
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 7 deletions.
12 changes: 9 additions & 3 deletions pgsync/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ def __init__(
self._plugins: Plugins = Plugins("plugins", self.plugins)
self.query_builder: QueryBuilder = QueryBuilder(verbose=verbose)
self.count: dict = dict(xlog=0, db=0, redis=0)
self.tasks: t.List[asyncio.Task] = []

def validate(self, repl_slots: bool = True) -> None:
"""Perform all validation right away."""
Expand Down Expand Up @@ -1295,13 +1296,11 @@ def receive(self) -> None:
cursor.execute(f'LISTEN "{self.database}"')
event_loop = asyncio.get_event_loop()
event_loop.add_reader(self.conn, self.async_poll_db)
tasks: list = [
self.tasks: t.List[asyncio.Task] = [
event_loop.create_task(self.async_poll_redis()),
event_loop.create_task(self.async_truncate_slots()),
event_loop.create_task(self.async_status()),
]
event_loop.run_until_complete(asyncio.wait(tasks))
event_loop.close()

else:
# sync up to and produce items in the Redis cache
Expand Down Expand Up @@ -1477,6 +1476,7 @@ def main(
time.sleep(settings.POLL_INTERVAL)

else:
tasks: t.List[asyncio.Task] = []
for doc in config_loader(config):
sync: Sync = Sync(
doc,
Expand All @@ -1489,6 +1489,12 @@ def main(
sync.pull()
if daemon:
sync.receive()
tasks.extend(sync.tasks)

if settings.USE_ASYNC:
event_loop = asyncio.get_event_loop()
event_loop.run_until_complete(asyncio.gather(*tasks))
event_loop.close()


if __name__ == "__main__":
Expand Down
4 changes: 2 additions & 2 deletions requirements/base.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@ charset-normalizer==3.3.2
# via requests
click==8.1.7
# via -r requirements/base.in
elastic-transport==8.13.1
elastic-transport==8.15.0
# via elasticsearch
elasticsearch==8.14.0
# via
# -r requirements/base.in
# elasticsearch-dsl
elasticsearch-dsl==8.14.0
elasticsearch-dsl==8.15.0
# via -r requirements/base.in
environs==11.0.0
# via -r requirements/base.in
Expand Down
4 changes: 2 additions & 2 deletions requirements/dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@ coverage[toml]==7.6.1
# pytest-cov
distlib==0.3.8
# via virtualenv
elastic-transport==8.13.1
elastic-transport==8.15.0
# via elasticsearch
elasticsearch==8.14.0
# via
# -r requirements/base.in
# elasticsearch-dsl
elasticsearch-dsl==8.14.0
elasticsearch-dsl==8.15.0
# via -r requirements/base.in
environs==11.0.0
# via -r requirements/base.in
Expand Down

0 comments on commit d8855a5

Please sign in to comment.