Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: only send one digest email per org with all team info #27851

Merged
merged 3 commits into from
Jan 25, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
214 changes: 118 additions & 96 deletions posthog/tasks/periodic_digest.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@
from posthog.session_recordings.models.session_recording_playlist import (
SessionRecordingPlaylist,
)
from posthog.tasks.email import (
NotificationSetting,
NotificationSettingType,
should_send_notification,
from posthog.tasks.email import NotificationSetting, NotificationSettingType
from posthog.tasks.report_utils import (
OrgDigestReport,
TeamDigestReport,
capture_event,
get_user_team_lookup,
)
from posthog.tasks.report_utils import capture_event
from posthog.tasks.usage_report import USAGE_REPORT_TASK_KWARGS, get_instance_metadata
from posthog.tasks.utils import CeleryQueue
from posthog.warehouse.models.external_data_source import ExternalDataSource

logger = structlog.get_logger(__name__)
Expand Down Expand Up @@ -212,51 +212,48 @@
)


@shared_task(queue=CeleryQueue.USAGE_REPORTS.value, ignore_result=True, max_retries=3)
def send_periodic_digest_report(
*,
team_id: int,
team_name: str,
periodic_digest_report: dict[str, Any],
instance_metadata: dict[str, Any],
period_end: datetime,
period_start: datetime,
digest_items_with_data: int,
) -> None:
period_str = period_end.strftime("%Y-%m-%d")
days = (period_end - period_start).days
campaign_key = f"periodic_digest_{period_str}_{days}d"

# Use a consistent identifier for the team
team_identifier = f"team_{team_id}"

# Check if we've already sent this digest using get_or_create
record, created = MessagingRecord.objects.get_or_create(raw_email=team_identifier, campaign_key=campaign_key)

if not created and record.sent_at:
logger.info(f"Skipping duplicate periodic digest for team {team_id} for period ending {period_str}")
return

full_report_dict = {
"team_id": team_id,
"team_name": team_name,
"template_name": "periodic_digest_report",
"digest_items_with_data": digest_items_with_data,
**periodic_digest_report,
**instance_metadata,
}
def _get_all_org_digest_reports(period_start: datetime, period_end: datetime) -> dict[str, OrgDigestReport]:
"""
Gets all digest data and organizes it by organization
"""
logger.info("Getting all digest data...")
time_now = datetime.now()
all_digest_data = _get_all_digest_data_as_team_rows(period_start, period_end)
logger.debug(f"Getting all digest data took {(datetime.now() - time_now).total_seconds()} seconds.")

logger.info("Getting teams for digest reports...")
time_now = datetime.now()
teams = get_teams_for_digest()
logger.debug(f"Getting teams for digest reports took {(datetime.now() - time_now).total_seconds()} seconds.")

org_reports: dict[str, OrgDigestReport] = {}

logger.info("Generating reports for organizations...")
time_now = datetime.now()

for team in teams:
org_id = str(team.organization_id)
if org_id not in org_reports:
org_reports[org_id] = OrgDigestReport(
organization_id=org_id,
organization_name=team.organization.name,
organization_created_at=team.organization.created_at.isoformat(),
teams={},
total_digest_items_with_data=0,
)

send_digest_notifications(
team_id=team_id,
organization_id=None, # Will be derived from team
event_name="transactional email",
properties=full_report_dict,
notification_type=NotificationSetting.WEEKLY_PROJECT_DIGEST.value,
)
team_report = get_periodic_digest_report(all_digest_data, team)
if count_non_zero_digest_items(team_report) > 0: # Only include teams with data
org_reports[org_id].teams[str(team.id)] = TeamDigestReport(
team_id=team.id,
team_name=team.name,
report=dataclasses.asdict(team_report),
digest_items_with_data=count_non_zero_digest_items(team_report),
)

# Mark as sent
record.sent_at = timezone.now()
record.save()
time_since = datetime.now() - time_now
logger.debug(f"Generating reports for organizations took {time_since.total_seconds()} seconds.")
return org_reports


@shared_task(**USAGE_REPORT_TASK_KWARGS, max_retries=0)
Expand All @@ -273,28 +270,70 @@
period_start = parser.parse(begin_date) if begin_date else period_end - timedelta(days=7)

try:
all_digest_data = _get_all_digest_data_as_team_rows(period_start, period_end)
teams = get_teams_for_digest()
org_reports = _get_all_org_digest_reports(period_start, period_end)
instance_metadata = dataclasses.asdict(get_instance_metadata((period_start, period_end)))

logger.info("Sending digest reports...")
time_now = datetime.now()
for team in teams:
report = get_periodic_digest_report(all_digest_data, team)
full_report_dict = dataclasses.asdict(report)
instance_metadata = dataclasses.asdict(get_instance_metadata((period_start, period_end)))
digest_items_with_data = count_non_zero_digest_items(report)

# Then capture as events to PostHog, so they can be sent via email
if digest_items_with_data > 0 and not dry_run:
send_periodic_digest_report.delay(
team_id=team.id,
team_name=team.name,
periodic_digest_report=full_report_dict,
instance_metadata=instance_metadata,
period_end=period_end,
period_start=period_start,
digest_items_with_data=digest_items_with_data,

for org_id, org_report in org_reports.items():
if not org_report.teams: # Skip if no teams have data
continue

if dry_run:
continue

# Get user access and notification preferences
user_teams, user_notifications = get_user_team_lookup(org_id)

# Check if we've already sent this digest
period_str = period_end.strftime("%Y-%m-%d")
days = (period_end - period_start).days
campaign_key = f"periodic_digest_{period_str}_{days}d"
raquelmsmith marked this conversation as resolved.
Show resolved Hide resolved

record, created = MessagingRecord.objects.get_or_create(
raw_email=f"org_{org_id}", campaign_key=campaign_key
)

if not created and record.sent_at:
logger.info(f"Skipping duplicate periodic digest for org {org_id}")
continue

# Get all org members
org_members = OrganizationMembership.objects.filter(organization_id=org_id).select_related("user")
# Send customized report to each user
for membership in org_members:
user = membership.user
user_team_ids = user_teams.get(user.id, set())
user_notif_team_ids = user_notifications.get(user.id, set())

# Filter report to only include teams the user has access to
user_report = org_report.filter_for_user(user_team_ids, user_notif_team_ids)

if not user_report.teams: # Skip if user has no accessible teams with data
continue

report_dict = dataclasses.asdict(user_report)
send_digest_notifications(
team_id=int(next(iter(user_report.teams.keys()))), # Use first team for compatibility
organization_id=org_id,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: using first team arbitrarily could cause issues if that team has different settings or configuration than other teams in the org - consider using a more deterministic selection method

event_name="transactional email",
properties={
**report_dict,
**instance_metadata,
"template_name": "periodic_digest_report",
},
notification_type=NotificationSetting.WEEKLY_PROJECT_DIGEST.value,
distinct_id=user.distinct_id,

Check failure on line 327 in posthog/tasks/periodic_digest.py

View workflow job for this annotation

GitHub Actions / Python code quality checks

Argument "distinct_id" to "send_digest_notifications" has incompatible type "str | None"; expected "str"
)

# Mark as sent
record.sent_at = timezone.now()
record.save()

time_since = datetime.now() - time_now
logger.debug(f"Sending usage reports to PostHog and Billing took {time_since.total_seconds()} seconds.") # noqa T201
logger.debug(f"Sending digest reports took {time_since.total_seconds()} seconds.")

except Exception as err:
capture_exception(err)
raise
Expand All @@ -303,42 +342,25 @@
def send_digest_notifications(
*,
team_id: int,
organization_id: Optional[str],
organization_id: str,
event_name: str,
properties: dict[str, Any],
notification_type: NotificationSettingType,
timestamp: Optional[datetime] = None,
distinct_id: str,
) -> None:
"""
Determines eligible recipients and sends individual notifications for digest reports.
Sends a single notification for digest reports.
"""
pha_client = Client("sTMFPsFhdP1Ssg")

team = Team.objects.get(id=team_id) if not organization_id else None
organization_id = organization_id or str(team.organization_id)

users = (
[
membership.user
for membership in OrganizationMembership.objects.filter(organization_id=organization_id).select_related(
"user"
)
]
if organization_id
else team.all_users_with_access()
capture_event(
pha_client=pha_client,
name=event_name,
organization_id=organization_id,
team_id=team_id,
properties=properties,
timestamp=timestamp,
distinct_id=distinct_id,
)

eligible_users = [user for user in users if should_send_notification(user, notification_type, team_id)]
# Send individual events for each eligible user
for user in eligible_users:
capture_event(
pha_client=pha_client,
name=event_name,
organization_id=organization_id,
team_id=team_id,
properties=properties,
timestamp=timestamp,
distinct_id=user.distinct_id,
)

pha_client.group_identify("organization", organization_id, properties)
65 changes: 65 additions & 0 deletions posthog/tasks/report_utils.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import dataclasses
from datetime import datetime
from typing import Any, Optional, Union, cast

Expand Down Expand Up @@ -81,3 +82,67 @@ def capture_event(
groups={"instance": settings.SITE_URL},
timestamp=timestamp,
)


@dataclasses.dataclass
class TeamDigestReport:
team_id: int
team_name: str
report: dict[str, Any]
digest_items_with_data: int


@dataclasses.dataclass
class OrgDigestReport:
organization_id: str
organization_name: str
organization_created_at: str
teams: dict[str, TeamDigestReport] # team_id -> TeamDigestReport
total_digest_items_with_data: int
raquelmsmith marked this conversation as resolved.
Show resolved Hide resolved

def filter_for_user(self, user_teams: set[int], user_notification_teams: set[int]) -> "OrgDigestReport":
"""Returns a new OrgDigestReport with only the teams the user has access to and notifications enabled for"""
filtered_teams = {
team_id: team_report
for team_id, team_report in self.teams.items()
if int(team_id) in user_teams and int(team_id) in user_notification_teams
}
return OrgDigestReport(
organization_id=self.organization_id,
organization_name=self.organization_name,
organization_created_at=self.organization_created_at,
teams=filtered_teams,
total_digest_items_with_data=sum(
team_report.digest_items_with_data for team_report in filtered_teams.values()
),
)


def get_user_team_lookup(organization_id: str) -> tuple[dict[int, set[int]], dict[int, set[int]]]:
"""
Returns (user_team_access, user_notification_prefs) where:
- user_team_access maps user_id -> set of team_ids they have access to
- user_notification_prefs maps user_id -> set of team_ids where notifications are enabled
"""
from posthog.models.organization import Organization
from posthog.tasks.email import NotificationSetting, should_send_notification

org = Organization.objects.prefetch_related(
"teams", "teams__explicit_memberships__parent_membership__user", "memberships__user"
).get(id=organization_id)

user_teams: dict[int, set[int]] = {}
user_notifications: dict[int, set[int]] = {}

# Build lookup of team access
for team in org.teams.all():
for user in team.all_users_with_access():
if user.id not in user_teams:
user_teams[user.id] = set()
user_notifications[user.id] = set()
raquelmsmith marked this conversation as resolved.
Show resolved Hide resolved
user_teams[user.id].add(team.id)
# Check notification preferences
if should_send_notification(user, NotificationSetting.WEEKLY_PROJECT_DIGEST.value, team.id):
user_notifications[user.id].add(team.id)

return user_teams, user_notifications
Loading
Loading