Skip to content

Commit

Permalink
Add bg-service ex. to readme (#8)
Browse files Browse the repository at this point in the history
  • Loading branch information
janbjorge authored Feb 22, 2024
1 parent 0175610 commit bcd9d48
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 15 deletions.
131 changes: 118 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,93 @@ Initialize PostgreSQL triggers to emit NOTIFY events on data changes. PGCacheWat
pgcachewatch install <tables-to-cache>
```

### FastAPI Example
Example showing how to use PGCacheWatch for cache invalidation in a FastAPI app
## Automating User Data Enrichment with PGCacheWatch and Asyncio

In the era of data-driven applications, keeping user information comprehensive and up-to-date is paramount. However, the challenge often lies in efficiently updating user profiles with additional information fetched from external sources, especially in response to new user registrations. This process can significantly benefit from automation, ensuring that every new user's data is enriched without manual intervention.

The following Python example leverages `PGCacheWatch` in conjunction with `asyncio` and `asyncpg` to automate the enrichment of new user data in a PostgreSQL database. By listening for new user events, the application fetches additional information asynchronously from simulated external REST APIs and updates the user's record. This seamless integration not only enhances data quality but also optimizes backend workflows by reducing the need for constant database polling.

### What This Example Covers

- **Listening for New User Registrations**: Utilizing `PGCacheWatch` to listen for new user events in a PostgreSQL database, triggering data enrichment processes.
- **Fetching Additional Information**: Simulating asynchronous calls to external REST APIs to fetch additional information for newly registered users.
- **Updating User Profiles**: Demonstrating how to update user records in the database with the fetched information, completing the data enrichment cycle.

This guide is intended for developers seeking to automate data enrichment processes in their applications, particularly those using PostgreSQL for data management. The example provides a practical approach to integrating real-time event handling with asynchronous programming for efficient data updates.

```python
import asyncio
import asyncpg
from pgcachewatch import listeners, models

async def fetch_users_without_additional_user_info() -> list:
"""
Fetches a list of users who do not yet have additional user information associated.
"""
...

async def update_users_without_additional_user_info(
user_id: int,
additional_user_info: dict,
) -> None:
"""
Updates users with the additional information fetched from an external source.
"""
...

async def fetch_additional_user_info(user_id: int) -> dict:
"""
Simulates fetching additional user information via REST APIs.
Note: This is a mock function. In a real application, this would make an asynchronous
API call to fetch information from an external service.
"""
await asyncio.sleep(1) # Simulate API call delay
return {"info": "Additional info for user"} # Example return value

async def process_new_user_event() -> None:
"""
Processes new user events by fetching additional information for new users
and updating their records.
"""
new_users = await fetch_users_without_additional_user_info()
for user_id in new_users:
user_info = await fetch_additional_user_info(user_id)
await update_users_without_additional_user_info(user_id, user_info)

async def listen_for_new_users() -> None:
"""
Listens for new user events and processes each event as it arrives.
This function establishes a connection to the database and listens on a specified
channel for new user events. When a new user is added (detected via an "insert" operation),
it triggers the processing of new user events to fetch and update additional information.
"""
conn = await asyncpg.connect() # Connect to your PostgreSQL database
listener = listeners.PGEventQueue()
await listener.connect(conn)

try:
print("Listening for new user events...")
async for event in listener.aiter():
if event.operation == "insert":
await process_new_user_event()
finally:
await conn.close()

if __name__ == "__main__":
asyncio.run(listen_for_new_users())
```

## Integrating PGCacheWatch with FastAPI for Dynamic Cache Invalidation
In modern web applications, maintaining data consistency while ensuring high performance can be a significant challenge. Caching is a common strategy to enhance performance, but it introduces complexity when it comes to invalidating cached data upon updates. `PGCacheWatch` offers a robust solution by leveraging PostgreSQL's NOTIFY/LISTEN features to invalidate cache entries in real-time, ensuring your application's data remains fresh and consistent.

This example demonstrates how to integrate `PGCacheWatch` with FastAPI, a popular asynchronous web framework, to create an efficient and responsive web application. By combining FastAPI's scalability with `PGCacheWatch`'s real-time cache invalidation capabilities, developers can build applications that automatically update cached data upon changes in the database, minimizing latency and improving user experience.

### What You'll Learn

- **Setting Up `PGCacheWatch` with FastAPI**: How to configure `PGCacheWatch` to work within the FastAPI application lifecycle, including database connection setup and teardown.
- **Implementing Cache Invalidation Strategies**: Utilizing `PGCacheWatch`'s decorators and strategies to invalidate cached data based on database events, specifically focusing on updates.
- **Creating Responsive Endpoints**: Building FastAPI routes that serve dynamically updated data, ensuring that the information presented to the user is always up-to-date.

```python
import contextlib
Expand All @@ -36,33 +121,53 @@ import asyncpg
from fastapi import FastAPI
from pgcachewatch import decorators, listeners, models, strategies

# Initialize a PGEventQueue listener to listen for database events.
listener = listeners.PGEventQueue()


@contextlib.asynccontextmanager
async def app_setup_teardown(_: FastAPI) -> typing.AsyncGenerator[None, None]:
conn = await asyncpg.connect()
await listener.connect(conn, models.PGChannel("ch_pgcachewatch_table_change"))
yield
await conn.close()
"""
Asynchronous context manager for FastAPI app setup and teardown.
This context manager is used to establish and close the database connection
at the start and end of the FastAPI application lifecycle, respectively.
"""
# Establish a database connection using asyncpg.
conn = await asyncpg.connect()
# Connect the listener to the database using the specified channel.
await listener.connect(conn)
yield # Yield control back to the event loop.
await conn.close() # Ensure the database connection is closed on app teardown.

# Create an instance of FastAPI, specifying the app setup and teardown actions.
APP = FastAPI(lifespan=app_setup_teardown)


# Only allow for cache refresh after an update
# Decorate the cached_query function with cache invalidation logic.
@decorators.cache(
strategy=strategies.Gready(
strategy=strategies.Greedy( # Note: Assuming 'Gready' is a typo, corrected to 'Greedy'.
listener=listener,
# Invalidate the cache only for 'update' operations on the database.
predicate=lambda x: x.operation == "update",
)
)
async def cached_query() -> dict[str, str]:
# Simulate a database query
"""
Simulates a database query that benefits from cache invalidation.
This function is decorated to use PGCacheWatch's cache invalidation, ensuring
that the data returned is up-to-date following any relevant 'update' operations
on the database.
"""
# Return a mock data response.
return {"data": "query result"}


# Define a FastAPI route to fetch data, utilizing the cached_query function.
@APP.get("/data")
async def get_data() -> dict:
"""
This endpoint uses the cached_query function to return data, demonstrating
how cache invalidation can be integrated into a web application route.
"""
# Fetch and return the data using the cached query function.
return await cached_query()
```
```
4 changes: 2 additions & 2 deletions src/pgcachewatch/listeners.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,11 @@ def parse_and_put(
parsed = models.Event.model_validate(
json.loads(payload) | {"channel": channel}
)
if parsed.latency > self._max_latency:
logging.warning("Latency for %s above %s.", parsed, self._max_latency)
except Exception:
logging.exception("Unable to parse `%s`.", payload)
else:
if parsed.latency > self._max_latency:
logging.warning("Latency for %s above %s.", parsed, self._max_latency)
logging.info("Received event: %s on %s", parsed, channel)
try:
self.put_nowait(parsed)
Expand Down
1 change: 1 addition & 0 deletions src/pgcachewatch/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
"update",
"delete",
]

PGChannel = typing.NewType(
"PGChannel",
str,
Expand Down

0 comments on commit bcd9d48

Please sign in to comment.