From d5189218db80a9bc6dd27f24a7448a1664ac3833 Mon Sep 17 00:00:00 2001 From: Suvodeep Pyne Date: Fri, 12 Jan 2024 13:21:21 -0800 Subject: [PATCH] [notification] Do not notify historical anomalies for subscription group. Any anomaly ending before the max of (alert creation time, subscription group creation time) will be skipped for notification. --- .../ai/startree/thirdeye/HappyPathTest.java | 24 ++++++ ...ubscriptionGroupFilterIntegrationTest.java | 78 +++++++++++++++++-- .../notification/SubscriptionGroupFilter.java | 63 ++++++++------- 3 files changed, 132 insertions(+), 33 deletions(-) diff --git a/thirdeye-integration-tests/src/test/java/ai/startree/thirdeye/HappyPathTest.java b/thirdeye-integration-tests/src/test/java/ai/startree/thirdeye/HappyPathTest.java index 06e7be0c6e..7b829a0693 100644 --- a/thirdeye-integration-tests/src/test/java/ai/startree/thirdeye/HappyPathTest.java +++ b/thirdeye-integration-tests/src/test/java/ai/startree/thirdeye/HappyPathTest.java @@ -638,6 +638,30 @@ public void testUpdateAlertAuth() throws InterruptedException { assertThat(investigationApi.getAuth().getNamespace()).isEqualTo("new-alert-namespace"); } + @Test + public void testCreateSubscriptionGroup() { + final var sg = new SubscriptionGroupApi() + .setName("test-subscription-group") + .setCron("0 0/5 0 ? * * *"); + + final var sgWithHistoricalAnomalies = new SubscriptionGroupApi() + .setName("test-subscription-group-with-historical-anomalies") + .setCron("0 0/5 0 ? * * *") + .setNotifyHistoricalAnomalies(true); + + final var response = request("api/subscription-groups").post( + Entity.json(List.of(sg, sgWithHistoricalAnomalies))); + assertThat(response.getStatus()).isEqualTo(200); + + final var response2 = request("api/subscription-groups").get(); + assertThat(response2.getStatus()).isEqualTo(200); + final var gotSgs = response2.readEntity(new GenericType>() {}); + assertThat(gotSgs).hasSize(2); + + gotSgs.stream().map(SubscriptionGroupApi::getName) + .forEach(name -> assertThat(name).isIn(sg.getName(), sgWithHistoricalAnomalies.getName())); + } + private Builder request(final String urlFragment) { return client.target(endPoint(urlFragment)).request(); } diff --git a/thirdeye-integration-tests/src/test/java/ai/startree/thirdeye/notification/SubscriptionGroupFilterIntegrationTest.java b/thirdeye-integration-tests/src/test/java/ai/startree/thirdeye/notification/SubscriptionGroupFilterIntegrationTest.java index 4bb2cff86a..0961fcbf60 100644 --- a/thirdeye-integration-tests/src/test/java/ai/startree/thirdeye/notification/SubscriptionGroupFilterIntegrationTest.java +++ b/thirdeye-integration-tests/src/test/java/ai/startree/thirdeye/notification/SubscriptionGroupFilterIntegrationTest.java @@ -30,7 +30,7 @@ import java.sql.Timestamp; import java.util.List; import java.util.Set; -import org.testng.annotations.AfterClass; +import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @@ -45,9 +45,9 @@ public class SubscriptionGroupFilterIntegrationTest { private AlertManager alertManager; private static AnomalyDTO anomalyWithCreateTime(final long createTime) { - final AnomalyDTO anomaly = new AnomalyDTO().setChild(false); - anomaly.setCreateTime(new Timestamp(createTime)); - return anomaly; + return new AnomalyDTO() + .setChild(false) + .setCreateTime(new Timestamp(createTime)); } private static long minutesAgo(final long nMinutes) { @@ -95,22 +95,86 @@ void beforeClass() { instance = injector.getInstance(SubscriptionGroupFilter.class); } - @AfterClass(alwaysRun = true) - void afterClass() { + @AfterMethod(alwaysRun = true) + void afterMethod() { alertManager.findAll().forEach(alertManager::delete); subscriptionGroupManager.findAll().forEach(subscriptionGroupManager::delete); anomalyManager.findAll().forEach(anomalyManager::delete); } + @Test public void testFilter() { final AlertDTO alert = persist(new AlertDTO() .setName("alert1") - .setActive(true)); + .setActive(true) + .setCreateTime(new Timestamp(minutesAgo(100))) + ); + final int sgCreationOffset = 80; final SubscriptionGroupDTO sg = persist(new SubscriptionGroupDTO() .setName("name1") .setCronExpression(CRON) + .setCreateTime(new Timestamp(minutesAgo(sgCreationOffset))) + ); + + // base case + assertThat(instance.filter(sg, POINT_IN_TIME).isEmpty()).isTrue(); + + final long superOldCreateTime = POINT_IN_TIME - NOTIFICATION_ANOMALY_MAX_LOOKBACK_MS - 100_000L; + persist(anomalyWithCreateTime(superOldCreateTime) + .setDetectionConfigId(alert.getId()) + .setStartTime(minutesAgo(1000)) + .setEndTime(minutesAgo(800)) + ); + + persist(anomalyWithCreateTime(minutesAgo(9)) // before alert was created + .setDetectionConfigId(alert.getId()) + .setStartTime(minutesAgo(100)) + .setEndTime(minutesAgo(sgCreationOffset + 1)) // before sg was created + ); + final AnomalyDTO anomaly1 = persist(anomalyWithCreateTime(minutesAgo(2)) + .setDetectionConfigId(alert.getId()) + .setStartTime(minutesAgo(100)) + .setEndTime(minutesAgo(sgCreationOffset - 5)) // after sg was created + ); + + persist(sg.setAlertAssociations(List.of(aaRef(alert.getId())))); + + assertThat(collectIds(instance.filter(sg, POINT_IN_TIME))) + .isEqualTo(collectIds(Set.of(anomaly1))); + + watermarkManager.updateWatermarks(sg, List.of(anomaly1)); + + persist(anomalyWithCreateTime(minutesAgo(3)) + .setDetectionConfigId(alert.getId()) + .setStartTime(minutesAgo(100)) + .setEndTime(minutesAgo(sgCreationOffset - 10)) + ); + // time in the future. Found an old anomaly. Should not be notified + persist(anomalyWithCreateTime(minutesAgo(-1)) + .setDetectionConfigId(alert.getId()) + .setStartTime(minutesAgo(100)) + .setEndTime(minutesAgo(sgCreationOffset - 10)) + ); + final AnomalyDTO anomaly2 = persist(anomalyWithCreateTime(minutesAgo(1)) + .setDetectionConfigId(alert.getId()) + .setStartTime(minutesAgo(100)) + .setEndTime(minutesAgo(sgCreationOffset - 20)) ); + assertThat(collectIds(instance.filter(sg, POINT_IN_TIME))) + .isEqualTo(collectIds(Set.of(anomaly2))); + } + + @Test + public void testFilterWithHistoricalAnomalies() { + final AlertDTO alert = persist(new AlertDTO() + .setName("alert1") + .setActive(true)); + + final SubscriptionGroupDTO sg = persist(new SubscriptionGroupDTO() + .setName("name1") + .setCronExpression(CRON) + .setNotifyHistoricalAnomalies(true)); // base case assertThat(instance.filter(sg, POINT_IN_TIME).isEmpty()).isTrue(); diff --git a/thirdeye-notification/src/main/java/ai/startree/thirdeye/notification/SubscriptionGroupFilter.java b/thirdeye-notification/src/main/java/ai/startree/thirdeye/notification/SubscriptionGroupFilter.java index 2ed40b84a7..e0eed88e11 100644 --- a/thirdeye-notification/src/main/java/ai/startree/thirdeye/notification/SubscriptionGroupFilter.java +++ b/thirdeye-notification/src/main/java/ai/startree/thirdeye/notification/SubscriptionGroupFilter.java @@ -14,6 +14,7 @@ package ai.startree.thirdeye.notification; import static ai.startree.thirdeye.spi.util.AnomalyUtils.isIgnore; +import static ai.startree.thirdeye.spi.util.SpiUtils.bool; import static ai.startree.thirdeye.spi.util.SpiUtils.optional; import static java.util.stream.Collectors.toSet; @@ -36,7 +37,6 @@ import java.util.Collection; import java.util.Collections; import java.util.List; -import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.stream.Collectors; @@ -98,30 +98,6 @@ private static String toFormattedDate(final long ts) { .format(Instant.ofEpochMilli(ts)); } - private static AnomalyFilter buildAnomalyFilter(final AlertAssociationDto aa, - final Map vectorClocks, - final long createTimeEnd) { - final long alertId = aa.getAlert().getId(); - long startTime = optional(vectorClocks) - .map(v -> v.get(alertId)) - .orElse(0L); - - // Do not notify anomalies older than MAX_ANOMALY_NOTIFICATION_LOOKBACK - final long minStartTime = createTimeEnd - Constants.NOTIFICATION_ANOMALY_MAX_LOOKBACK_MS; - final long createTimeStart = Math.max(startTime, minStartTime); - - final AnomalyFilter f = new AnomalyFilter() - .setCreateTimeWindow(new Interval(createTimeStart + 1, createTimeEnd)) - .setIsChild(false) // Notify only parent anomalies - .setAlertId(alertId); - - optional(aa.getEnumerationItem()) - .map(AbstractDTO::getId) - .ifPresent(f::setEnumerationItemId); - - return f; - } - /** * Find anomalies for the given subscription group given an end time. * @@ -136,7 +112,7 @@ public Set filter(final SubscriptionGroupDTO sg, final long endTime) // Fetch all the anomalies to be notified to the recipients return alertAssociations.stream() .filter(aa -> isAlertActive(aa.getAlert().getId())) - .map(aa -> buildAnomalyFilter(aa, sg.getVectorClocks(), endTime)) + .map(aa -> buildAnomalyFilter(aa, sg, endTime)) .map(f -> filterAnomalies(f, sg.getId())) .flatMap(Collection::stream) .collect(toSet()); @@ -166,6 +142,41 @@ private boolean isAlertActive(final long alertId) { return alert != null && alert.isActive(); } + private AnomalyFilter buildAnomalyFilter(final AlertAssociationDto aa, + final SubscriptionGroupDTO sg, + final long createTimeEnd) { + final long alertId = aa.getAlert().getId(); + final AlertDTO alert = alertManager.findById(alertId); + long startTime = optional(sg.getVectorClocks()) + .map(v -> v.get(alertId)) + .orElse(0L); + + // Do not notify anomalies older than MAX_ANOMALY_NOTIFICATION_LOOKBACK + final long minStartTime = createTimeEnd - Constants.NOTIFICATION_ANOMALY_MAX_LOOKBACK_MS; + final long createTimeStart = Math.max(startTime, minStartTime); + + final AnomalyFilter f = new AnomalyFilter() + .setCreateTimeWindow(new Interval(createTimeStart + 1, createTimeEnd)) + .setIsChild(false) // Notify only parent anomalies + .setAlertId(alertId); + + /* + * Do not notify historical anomalies if the end time of the anomaly is before the + * max of the alert create time and subscription group create time. + * + * TODO spyne This should also take the alert association tine when available. + */ + if (!bool(sg.getNotifyHistoricalAnomalies())) { + f.setEndTimeIsGte(Math.max(alert.getCreateTime().getTime(), sg.getCreateTime().getTime())); + } + + optional(aa.getEnumerationItem()) + .map(AbstractDTO::getId) + .ifPresent(f::setEnumerationItemId); + + return f; + } + @VisibleForTesting Set filterAnomalies(final AnomalyFilter f, final Long subscriptionGroupId) { final List candidates = anomalyManager.filter(f);