diff --git a/posthog/tasks/periodic_digest.py b/posthog/tasks/periodic_digest.py index 5825260283423..d25054e9fabff 100644 --- a/posthog/tasks/periodic_digest.py +++ b/posthog/tasks/periodic_digest.py @@ -22,14 +22,14 @@ from posthog.session_recordings.models.session_recording_playlist import ( SessionRecordingPlaylist, ) -from posthog.tasks.email import ( - NotificationSetting, - NotificationSettingType, - should_send_notification, +from posthog.tasks.email import NotificationSetting, NotificationSettingType +from posthog.tasks.report_utils import ( + OrgDigestReport, + TeamDigestReport, + capture_event, + get_user_team_lookup, ) -from posthog.tasks.report_utils import capture_event from posthog.tasks.usage_report import USAGE_REPORT_TASK_KWARGS, get_instance_metadata -from posthog.tasks.utils import CeleryQueue from posthog.warehouse.models.external_data_source import ExternalDataSource logger = structlog.get_logger(__name__) @@ -212,51 +212,50 @@ def get_periodic_digest_report(all_digest_data: dict[str, Any], team: Team) -> p ) -@shared_task(queue=CeleryQueue.USAGE_REPORTS.value, ignore_result=True, max_retries=3) -def send_periodic_digest_report( - *, - team_id: int, - team_name: str, - periodic_digest_report: dict[str, Any], - instance_metadata: dict[str, Any], - period_end: datetime, - period_start: datetime, - digest_items_with_data: int, -) -> None: - period_str = period_end.strftime("%Y-%m-%d") - days = (period_end - period_start).days - campaign_key = f"periodic_digest_{period_str}_{days}d" - - # Use a consistent identifier for the team - team_identifier = f"team_{team_id}" - - # Check if we've already sent this digest using get_or_create - record, created = MessagingRecord.objects.get_or_create(raw_email=team_identifier, campaign_key=campaign_key) - - if not created and record.sent_at: - logger.info(f"Skipping duplicate periodic digest for team {team_id} for period ending {period_str}") - return - - full_report_dict = { - "team_id": team_id, - "team_name": team_name, - "template_name": "periodic_digest_report", - "digest_items_with_data": digest_items_with_data, - **periodic_digest_report, - **instance_metadata, - } +def _get_all_org_digest_reports(period_start: datetime, period_end: datetime) -> dict[str, OrgDigestReport]: + """ + Gets all digest data and organizes it by organization + """ + logger.info("Getting all digest data...") + time_now = datetime.now() + all_digest_data = _get_all_digest_data_as_team_rows(period_start, period_end) + logger.debug(f"Getting all digest data took {(datetime.now() - time_now).total_seconds()} seconds.") + + logger.info("Getting teams for digest reports...") + time_now = datetime.now() + teams = get_teams_for_digest() + logger.debug(f"Getting teams for digest reports took {(datetime.now() - time_now).total_seconds()} seconds.") + + org_reports: dict[str, OrgDigestReport] = {} + + logger.info("Generating reports for organizations...") + time_now = datetime.now() + + for team in teams: + org_id = str(team.organization_id) + if org_id not in org_reports: + org_reports[org_id] = OrgDigestReport( + organization_id=org_id, + organization_name=team.organization.name, + organization_created_at=team.organization.created_at.isoformat(), + teams=[], + total_digest_items_with_data=0, + ) - send_digest_notifications( - team_id=team_id, - organization_id=None, # Will be derived from team - event_name="transactional email", - properties=full_report_dict, - notification_type=NotificationSetting.WEEKLY_PROJECT_DIGEST.value, - ) + team_report = get_periodic_digest_report(all_digest_data, team) + if count_non_zero_digest_items(team_report) > 0: # Only include teams with data + org_reports[org_id].teams.append( + TeamDigestReport( + team_id=team.id, + team_name=team.name, + report=dataclasses.asdict(team_report), + digest_items_with_data=count_non_zero_digest_items(team_report), + ) + ) - # Mark as sent - record.sent_at = timezone.now() - record.save() + time_since = datetime.now() - time_now + logger.debug(f"Generating reports for organizations took {time_since.total_seconds()} seconds.") + return org_reports @shared_task(**USAGE_REPORT_TASK_KWARGS, max_retries=0) @@ -273,28 +272,69 @@ def send_all_periodic_digest_reports( period_start = parser.parse(begin_date) if begin_date else period_end - timedelta(days=7) try: - all_digest_data = _get_all_digest_data_as_team_rows(period_start, period_end) - teams = get_teams_for_digest() + org_reports = _get_all_org_digest_reports(period_start, period_end) + instance_metadata = dataclasses.asdict(get_instance_metadata((period_start, period_end))) + + logger.info("Sending digest reports...") time_now = datetime.now() - for team in teams: - report = get_periodic_digest_report(all_digest_data, team) - full_report_dict = dataclasses.asdict(report) - instance_metadata = dataclasses.asdict(get_instance_metadata((period_start, period_end))) - digest_items_with_data = count_non_zero_digest_items(report) - - # Then capture as events to PostHog, so they can be sent via email - if digest_items_with_data > 0 and not dry_run: - send_periodic_digest_report.delay( - team_id=team.id, - team_name=team.name, - periodic_digest_report=full_report_dict, - instance_metadata=instance_metadata, - period_end=period_end, - period_start=period_start, - digest_items_with_data=digest_items_with_data, + + for org_id, org_report in org_reports.items(): + if not org_report.teams: # Skip if no teams have data + continue + + if dry_run: + continue + + # Get user access and notification preferences + user_teams, user_notifications = get_user_team_lookup(org_id) + + # Check if we've already sent this digest + period_str = period_end.strftime("%Y-%m-%d") + days = (period_end - period_start).days + campaign_key = f"periodic_digest_{period_str}_{days}d" + + record, created = MessagingRecord.objects.get_or_create( + raw_email=f"org_{org_id}", campaign_key=campaign_key + ) + + if not created and record.sent_at: + logger.info(f"Skipping duplicate periodic digest for org {org_id}") + continue + + # Get all org members + org_members = OrganizationMembership.objects.filter(organization_id=org_id).select_related("user") + # Send customized report to each user + for membership in org_members: + user = membership.user + user_team_ids = user_teams.get(user.id, set()) + user_notif_team_ids = user_notifications.get(user.id, set()) + + # Filter report to only include teams the user has access to + user_report = org_report.filter_for_user(user_team_ids, user_notif_team_ids) + + if not user_report.teams or not user.distinct_id: + continue + + report_dict = dataclasses.asdict(user_report) + send_digest_notifications( + organization_id=org_id, + event_name="transactional email", + properties={ + **report_dict, + **instance_metadata, + "template_name": "periodic_digest_report", + }, + notification_type=NotificationSetting.WEEKLY_PROJECT_DIGEST.value, + distinct_id=user.distinct_id, ) + + # Mark as sent + record.sent_at = timezone.now() + record.save() + time_since = datetime.now() - time_now - logger.debug(f"Sending usage reports to PostHog and Billing took {time_since.total_seconds()} seconds.") # noqa T201 + logger.debug(f"Sending digest reports took {time_since.total_seconds()} seconds.") + except Exception as err: capture_exception(err) raise @@ -302,43 +342,25 @@ def send_all_periodic_digest_reports( def send_digest_notifications( *, - team_id: int, - organization_id: Optional[str], + organization_id: str, event_name: str, properties: dict[str, Any], notification_type: NotificationSettingType, timestamp: Optional[datetime] = None, + distinct_id: str, ) -> None: """ - Determines eligible recipients and sends individual notifications for digest reports. + Sends a single notification for digest reports. """ pha_client = Client("sTMFPsFhdP1Ssg") - team = Team.objects.get(id=team_id) if not organization_id else None - organization_id = organization_id or str(team.organization_id) - - users = ( - [ - membership.user - for membership in OrganizationMembership.objects.filter(organization_id=organization_id).select_related( - "user" - ) - ] - if organization_id - else team.all_users_with_access() + capture_event( + pha_client=pha_client, + name=event_name, + organization_id=organization_id, + team_id=None, + properties=properties, + timestamp=timestamp, + distinct_id=distinct_id, ) - - eligible_users = [user for user in users if should_send_notification(user, notification_type, team_id)] - # Send individual events for each eligible user - for user in eligible_users: - capture_event( - pha_client=pha_client, - name=event_name, - organization_id=organization_id, - team_id=team_id, - properties=properties, - timestamp=timestamp, - distinct_id=user.distinct_id, - ) - pha_client.group_identify("organization", organization_id, properties) diff --git a/posthog/tasks/report_utils.py b/posthog/tasks/report_utils.py index 75cd8a3fdc99b..4957104b370b7 100644 --- a/posthog/tasks/report_utils.py +++ b/posthog/tasks/report_utils.py @@ -1,3 +1,4 @@ +import dataclasses from datetime import datetime from typing import Any, Optional, Union, cast @@ -81,3 +82,65 @@ def capture_event( groups={"instance": settings.SITE_URL}, timestamp=timestamp, ) + + +@dataclasses.dataclass +class TeamDigestReport: + team_id: int + team_name: str + report: dict[str, Any] + digest_items_with_data: int + + +@dataclasses.dataclass +class OrgDigestReport: + organization_id: str + organization_name: str + organization_created_at: str + teams: list[TeamDigestReport] + total_digest_items_with_data: int + + def filter_for_user(self, user_teams: set[int], user_notification_teams: set[int]) -> "OrgDigestReport": + """Returns a new OrgDigestReport with only the teams the user has access to and notifications enabled for""" + filtered_teams = [ + team_report + for team_report in self.teams + if team_report.team_id in user_teams and team_report.team_id in user_notification_teams + ] + return OrgDigestReport( + organization_id=self.organization_id, + organization_name=self.organization_name, + organization_created_at=self.organization_created_at, + teams=filtered_teams, + total_digest_items_with_data=sum(team_report.digest_items_with_data for team_report in filtered_teams), + ) + + +def get_user_team_lookup(organization_id: str) -> tuple[dict[int, set[int]], dict[int, set[int]]]: + """ + Returns (user_team_access, user_notification_prefs) where: + - user_team_access maps user_id -> set of team_ids they have access to + - user_notification_prefs maps user_id -> set of team_ids where notifications are enabled + """ + from posthog.models.organization import Organization + from posthog.tasks.email import NotificationSetting, should_send_notification + + org = Organization.objects.prefetch_related( + "teams", "teams__explicit_memberships__parent_membership__user", "memberships__user" + ).get(id=organization_id) + + user_teams: dict[int, set[int]] = {} + user_notifications: dict[int, set[int]] = {} + + # Build lookup of team access + for team in org.teams.all(): + for user in team.all_users_with_access(): + if user.id not in user_teams: + user_teams[user.id] = set() + user_notifications[user.id] = set() + user_teams[user.id].add(team.id) + # Check notification preferences + if should_send_notification(user, NotificationSetting.WEEKLY_PROJECT_DIGEST.value, team.id): + user_notifications[user.id].add(team.id) + + return user_teams, user_notifications diff --git a/posthog/tasks/test/test_periodic_digest.py b/posthog/tasks/test/test_periodic_digest.py index 40ed11f154423..1320f0afebf9b 100644 --- a/posthog/tasks/test/test_periodic_digest.py +++ b/posthog/tasks/test/test_periodic_digest.py @@ -5,8 +5,16 @@ from django.utils.timezone import now from freezegun import freeze_time -from posthog.models import Dashboard, EventDefinition, Experiment, FeatureFlag, Survey +from posthog.models import ( + Dashboard, + EventDefinition, + Experiment, + FeatureFlag, + Survey, + Team, +) from posthog.models.messaging import MessagingRecord +from posthog.models.organization import OrganizationMembership from posthog.session_recordings.models.session_recording_playlist import ( SessionRecordingPlaylist, ) @@ -129,8 +137,76 @@ def test_periodic_digest_report(self, mock_capture: MagicMock) -> None: # Check that the capture event was called with the correct data expected_properties = { - "team_id": self.team.id, - "team_name": self.team.name, + "organization_id": str(self.team.organization_id), + "organization_name": self.organization.name, + "organization_created_at": self.organization.created_at.isoformat(), + "teams": [ + { + "team_id": self.team.id, + "team_name": self.team.name, + "report": { + "new_dashboards": [ + { + "name": "Test Dashboard", + "id": dashboard.id, + } + ], + "new_event_definitions": [ + { + "name": "Test Event", + "id": event_definition.id, + } + ], + "new_playlists": [ + { + "name": "Test Playlist", + "id": playlist.short_id, + }, + { + "name": "Derived Playlist", + "id": derived_playlist.short_id, + }, + ], + "new_experiments_launched": [ + { + "name": "Launched Experiment", + "id": launched_experiment.id, + "start_date": launched_experiment.start_date.isoformat(), # type: ignore + } + ], + "new_experiments_completed": [ + { + "name": "Completed Experiment", + "id": completed_experiment.id, + "start_date": completed_experiment.start_date.isoformat(), # type: ignore + "end_date": completed_experiment.end_date.isoformat(), # type: ignore + } + ], + "new_external_data_sources": [ + { + "source_type": "Stripe", + "id": external_data_source.id, + } + ], + "new_surveys_launched": [ + { + "name": "Test Survey", + "id": survey.id, + "start_date": survey.start_date.isoformat(), # type: ignore + "description": "Test Description", + } + ], + "new_feature_flags": [ + { + "name": "Test Flag", + "id": feature_flag.id, + "key": "test-flag", + } + ], + }, + "digest_items_with_data": 8, + } + ], "template_name": "periodic_digest_report", "users_who_logged_in": [], "users_who_logged_in_count": 0, @@ -150,65 +226,7 @@ def test_periodic_digest_report(self, mock_capture: MagicMock) -> None: "deployment_infrastructure": "unknown", "helm": {}, "instance_tag": "none", - "new_dashboards": [ - { - "name": "Test Dashboard", - "id": dashboard.id, - } - ], - "new_event_definitions": [ - { - "name": "Test Event", - "id": event_definition.id, - } - ], - "new_playlists": [ - { - "name": "Test Playlist", - "id": playlist.short_id, - }, - { - "name": "Derived Playlist", - "id": derived_playlist.short_id, - }, - ], - "new_experiments_launched": [ - { - "name": "Launched Experiment", - "id": launched_experiment.id, - "start_date": launched_experiment.start_date.isoformat(), # type: ignore - } - ], - "new_experiments_completed": [ - { - "name": "Completed Experiment", - "id": completed_experiment.id, - "start_date": completed_experiment.start_date.isoformat(), # type: ignore - "end_date": completed_experiment.end_date.isoformat(), # type: ignore - } - ], - "new_external_data_sources": [ - { - "source_type": "Stripe", - "id": external_data_source.id, - } - ], - "new_surveys_launched": [ - { - "name": "Test Survey", - "id": survey.id, - "start_date": survey.start_date.isoformat(), # type: ignore - "description": "Test Description", - } - ], - "new_feature_flags": [ - { - "name": "Test Flag", - "id": feature_flag.id, - "key": "test-flag", - } - ], - "digest_items_with_data": 8, + "total_digest_items_with_data": 8, } mock_capture.assert_called_once_with( @@ -216,7 +234,7 @@ def test_periodic_digest_report(self, mock_capture: MagicMock) -> None: distinct_id=str(self.user.distinct_id), organization_id=str(self.team.organization_id), name="transactional email", - team_id=self.team.id, + team_id=None, properties=expected_properties, timestamp=None, ) @@ -250,9 +268,10 @@ def test_periodic_digest_report_custom_dates(self, mock_capture: MagicMock) -> N # Check that the capture event was called with the correct data expected_properties = { - "team_id": self.team.id, - "team_name": self.team.name, "template_name": "periodic_digest_report", + "organization_id": str(self.team.organization_id), + "organization_name": self.organization.name, + "organization_created_at": self.organization.created_at.isoformat(), "users_who_logged_in": [], "users_who_logged_in_count": 0, "users_who_signed_up": [], @@ -271,20 +290,29 @@ def test_periodic_digest_report_custom_dates(self, mock_capture: MagicMock) -> N "deployment_infrastructure": "unknown", "helm": {}, "instance_tag": "none", - "new_dashboards": [ + "teams": [ { - "name": "Test Dashboard", - "id": dashboard.id, + "report": { + "new_dashboards": [ + { + "name": "Test Dashboard", + "id": dashboard.id, + } + ], + "new_event_definitions": [], + "new_playlists": [], + "new_experiments_launched": [], + "new_experiments_completed": [], + "new_external_data_sources": [], + "new_surveys_launched": [], + "new_feature_flags": [], + }, + "team_id": self.team.id, + "team_name": self.team.name, + "digest_items_with_data": 1, } ], - "new_event_definitions": [], - "new_playlists": [], - "new_experiments_launched": [], - "new_experiments_completed": [], - "new_external_data_sources": [], - "new_surveys_launched": [], - "new_feature_flags": [], - "digest_items_with_data": 1, + "total_digest_items_with_data": 1, } mock_capture.assert_called_once_with( @@ -292,7 +320,7 @@ def test_periodic_digest_report_custom_dates(self, mock_capture: MagicMock) -> N distinct_id=str(self.user.distinct_id), organization_id=str(self.team.organization_id), name="transactional email", - team_id=self.team.id, + team_id=None, properties=expected_properties, timestamp=None, ) @@ -316,7 +344,7 @@ def test_periodic_digest_report_idempotency(self, mock_capture: MagicMock) -> No # Check that messaging record was created record = MessagingRecord.objects.get( # type: ignore - raw_email=f"team_{self.team.id}", campaign_key="periodic_digest_2024-01-20_7d" + raw_email=f"org_{self.organization.id}", campaign_key="periodic_digest_2024-01-20_7d" ) self.assertIsNotNone(record.sent_at) @@ -349,7 +377,7 @@ def test_periodic_digest_different_periods(self, mock_capture: MagicMock) -> Non mock_capture.assert_called_once() # Verify two different records exist - records = MessagingRecord.objects.filter(raw_email=f"team_{self.team.id}") # type: ignore + records = MessagingRecord.objects.filter(raw_email=f"org_{self.organization.id}") # type: ignore self.assertEqual(records.count(), 2) campaign_keys = sorted([r.campaign_key for r in records]) self.assertEqual(campaign_keys, ["periodic_digest_2024-01-20_30d", "periodic_digest_2024-01-20_7d"]) @@ -409,12 +437,13 @@ def test_periodic_digest_excludes_playlists_without_names_and_derived_names(self call_args = mock_capture.call_args self.assertIsNotNone(call_args) properties = call_args[1]["properties"] - playlists = properties["new_playlists"] + team_data = next(team for team in properties["teams"] if team["team_id"] == self.team.id) + playlists = team_data["report"]["new_playlists"] # Verify only the valid playlist is included - self.assertEqual(len(playlists), 1) - self.assertEqual(playlists[0]["name"], "Valid Playlist") - self.assertEqual(playlists[0]["id"], valid_playlist.short_id) + assert len(playlists) == 1 + assert playlists[0]["name"] == "Valid Playlist" + assert playlists[0]["id"] == valid_playlist.short_id @freeze_time("2024-01-20T00:01:00Z") @patch("posthog.tasks.periodic_digest.capture_event") @@ -445,3 +474,107 @@ def test_periodic_digest_respects_team_notification_settings(self, mock_capture: # Verify the call was for the original user and not the one with disabled notifications call_args = mock_capture.call_args[1] self.assertEqual(call_args["distinct_id"], str(self.user.distinct_id)) + + @freeze_time("2024-01-20T00:01:00Z") + @patch("posthog.tasks.periodic_digest.capture_event") + def test_periodic_digest_report_multiple_teams(self, mock_capture: MagicMock) -> None: + # Create a second team in the same organization + team_2 = Team.objects.create(organization=self.organization, name="Second Team") + + # Create test data for both teams + with freeze_time("2024-01-15T00:01:00Z"): + # Data for first team + Dashboard.objects.create( + team=self.team, + name="Team 1 Dashboard", + ) + + # Data for second team + Dashboard.objects.create( + team=team_2, + name="Team 2 Dashboard", + ) + FeatureFlag.objects.create( + team=team_2, + name="Team 2 Flag", + key="team-2-flag", + ) + + send_all_periodic_digest_reports() + + # Should be called once with data for both teams + assert mock_capture.call_count == 1 + + call_args = mock_capture.call_args[1] + properties = call_args["properties"] + + # Verify organization-level properties + assert properties["organization_id"] == str(self.organization.id) + assert properties["organization_name"] == self.organization.name + + # Verify teams data + teams_data = properties["teams"] + assert len(teams_data) == 2 + + # Find teams by team_id in the array + team_1_data = next(team for team in teams_data if team["team_id"] == self.team.id) + team_2_data = next(team for team in teams_data if team["team_id"] == team_2.id) + + # Verify first team's data + assert team_1_data["team_name"] == self.team.name + assert len(team_1_data["report"]["new_dashboards"]) == 1 + assert team_1_data["report"]["new_dashboards"][0]["name"] == "Team 1 Dashboard" + assert len(team_1_data["report"]["new_feature_flags"]) == 0 + + # Verify second team's data + assert team_2_data["team_name"] == team_2.name + assert len(team_2_data["report"]["new_dashboards"]) == 1 + assert team_2_data["report"]["new_dashboards"][0]["name"] == "Team 2 Dashboard" + assert len(team_2_data["report"]["new_feature_flags"]) == 1 + assert team_2_data["report"]["new_feature_flags"][0]["name"] == "Team 2 Flag" + + @freeze_time("2024-01-20T00:01:00Z") + @patch("posthog.tasks.periodic_digest.capture_event") + def test_periodic_digest_report_respects_team_access(self, mock_capture: MagicMock) -> None: + # Create a second team in the same organization + team_2 = Team.objects.create(organization=self.organization, name="Second Team") + team_2.access_control = True + team_2.save() + + # Create test data for both teams + with freeze_time("2024-01-15T00:01:00Z"): + Dashboard.objects.create( + team=self.team, + name="Team 1 Dashboard", + ) + Dashboard.objects.create( + team=team_2, + name="Team 2 Dashboard", + ) + + # Create a second user with access only to team_2 + user_2 = self._create_user("test2@posthog.com") + self.organization.members.add(user_2) + org_membership = OrganizationMembership.objects.get(organization=self.organization, user=user_2) + team_2.explicit_memberships.create(parent_membership=org_membership) + + # Run the periodic digest report task + send_all_periodic_digest_reports() + + # Should be called twice - once for each user + assert mock_capture.call_count == 2 + + # Check calls to ensure each user only got their accessible teams + calls = mock_capture.call_args_list + for call in calls: + properties = call[1]["properties"] + distinct_id = call[1]["distinct_id"] + + if distinct_id == str(self.user.distinct_id): + # First user should only see team 1 because they were not added to team 2 + assert len(properties["teams"]) == 1 + assert any(team["team_id"] == self.team.id for team in properties["teams"]) + else: + # Second user should see team 1 and team 2 + assert len(properties["teams"]) == 2 + assert any(team["team_id"] == team_2.id for team in properties["teams"])