Skip to content

Commit

Permalink
fix: notify users status change in bg
Browse files Browse the repository at this point in the history
  • Loading branch information
SaintShit committed Dec 30, 2023
1 parent bf58b1c commit cd36ace
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 21 deletions.
31 changes: 10 additions & 21 deletions app/jobs/review_users.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@
from typing import TYPE_CHECKING

from sqlalchemy.orm import Session
from fastapi import BackgroundTasks

from app import logger, scheduler, xray
from app.db import (GetDB, get_notification_reminder, get_users,
update_user_status, start_user_expire)
start_user_expire, update_user_status)
from app.models.user import ReminderType, UserResponse, UserStatus
from app.utils import report
from app.utils.concurrency import GetBG
from app.utils.helpers import (calculate_expiration_days,
calculate_usage_percent)
from config import (NOTIFY_DAYS_LEFT, NOTIFY_REACHED_USAGE_PERCENT,
Expand All @@ -35,9 +35,9 @@ def add_notification_reminders(db: Session, user: "User", now: datetime = dateti
user.id, user.expire)


def review(bg: BackgroundTasks):
def review():
now = datetime.utcnow()
with GetDB() as db:
with GetDB() as db, GetBG() as bg:
for user in get_users(db, status=UserStatus.active):

limited = user.data_limit and user.used_traffic >= user.data_limit
Expand All @@ -53,8 +53,10 @@ def review(bg: BackgroundTasks):

xray.operations.remove_user(user)
update_user_status(db, user, status)
bg.add_task(report.status_change, user.username, status,
UserResponse.from_orm(user))

bg.add_task(
report.status_change, user.username, status, UserResponse.from_orm(user)
)

logger.info(f"User \"{user.username}\" status changed to {status}")

Expand All @@ -79,22 +81,9 @@ def review(bg: BackgroundTasks):
update_user_status(db, user, status)
start_user_expire(db, user)
bg.add_task(report.status_change, user.username, status,
UserResponse.from_orm(user))
UserResponse.from_orm(user))

logger.info(f"User \"{user.username}\" status changed to {status}")

for user in get_users(db, status=UserStatus.expired):

active = user.expire and datetime.timestamp(user.expire) >= now.timestamp()
if active:
status = UserStatus.active
else:
continue

update_user_status(db, user, status)
xray.operations.add_user(user)

logger.info(f"User \"{user.username}\" status fixed.")


scheduler.add_job(review, 'interval', seconds=5, coalesce=True, max_instances=1, args=[BackgroundTasks()])
scheduler.add_job(review, 'interval', seconds=5, coalesce=True, max_instances=1)
24 changes: 24 additions & 0 deletions app/utils/concurrency.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,32 @@
from threading import Thread

import anyio
from fastapi import BackgroundTasks


def threaded_function(func):
def wrapper(*args, **kwargs):
thread = Thread(target=func, args=args, daemon=True, kwargs=kwargs)
thread.start()
return wrapper


class GetBG:
"""
context manager for fastapi.BackgroundTasks
"""

def __init__(self):
self.bg = BackgroundTasks()

def __enter__(self):
return self.bg

def __exit__(self, exc_type, exc_value, traceback):
anyio.run(self.bg)

async def __aenter__(self):
return self.bg

async def __aexit__(self, exc_type, exc_value, traceback):
await self.bg()

0 comments on commit cd36ace

Please sign in to comment.