Skip to content

Commit

Permalink
[notification] Do not notify historical anomalies for subscription gr…
Browse files Browse the repository at this point in the history
…oup.

Any anomaly ending before the max of (alert creation time, subscription group creation time) will be skipped for notification.
  • Loading branch information
suvodeep-pyne committed Jan 12, 2024
1 parent ec8747b commit d518921
Show file tree
Hide file tree
Showing 3 changed files with 132 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -638,6 +638,30 @@ public void testUpdateAlertAuth() throws InterruptedException {
assertThat(investigationApi.getAuth().getNamespace()).isEqualTo("new-alert-namespace");
}

@Test
public void testCreateSubscriptionGroup() {
final var sg = new SubscriptionGroupApi()
.setName("test-subscription-group")
.setCron("0 0/5 0 ? * * *");

final var sgWithHistoricalAnomalies = new SubscriptionGroupApi()
.setName("test-subscription-group-with-historical-anomalies")
.setCron("0 0/5 0 ? * * *")
.setNotifyHistoricalAnomalies(true);

final var response = request("api/subscription-groups").post(
Entity.json(List.of(sg, sgWithHistoricalAnomalies)));
assertThat(response.getStatus()).isEqualTo(200);

final var response2 = request("api/subscription-groups").get();
assertThat(response2.getStatus()).isEqualTo(200);
final var gotSgs = response2.readEntity(new GenericType<List<SubscriptionGroupApi>>() {});
assertThat(gotSgs).hasSize(2);

gotSgs.stream().map(SubscriptionGroupApi::getName)
.forEach(name -> assertThat(name).isIn(sg.getName(), sgWithHistoricalAnomalies.getName()));
}

private Builder request(final String urlFragment) {
return client.target(endPoint(urlFragment)).request();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import java.sql.Timestamp;
import java.util.List;
import java.util.Set;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

Expand All @@ -45,9 +45,9 @@ public class SubscriptionGroupFilterIntegrationTest {
private AlertManager alertManager;

private static AnomalyDTO anomalyWithCreateTime(final long createTime) {
final AnomalyDTO anomaly = new AnomalyDTO().setChild(false);
anomaly.setCreateTime(new Timestamp(createTime));
return anomaly;
return new AnomalyDTO()
.setChild(false)
.setCreateTime(new Timestamp(createTime));
}

private static long minutesAgo(final long nMinutes) {
Expand Down Expand Up @@ -95,22 +95,86 @@ void beforeClass() {
instance = injector.getInstance(SubscriptionGroupFilter.class);
}

@AfterClass(alwaysRun = true)
void afterClass() {
@AfterMethod(alwaysRun = true)
void afterMethod() {
alertManager.findAll().forEach(alertManager::delete);
subscriptionGroupManager.findAll().forEach(subscriptionGroupManager::delete);
anomalyManager.findAll().forEach(anomalyManager::delete);
}

@Test
public void testFilter() {
final AlertDTO alert = persist(new AlertDTO()
.setName("alert1")
.setActive(true));
.setActive(true)
.setCreateTime(new Timestamp(minutesAgo(100)))
);

final int sgCreationOffset = 80;
final SubscriptionGroupDTO sg = persist(new SubscriptionGroupDTO()
.setName("name1")
.setCronExpression(CRON)
.setCreateTime(new Timestamp(minutesAgo(sgCreationOffset)))
);

// base case
assertThat(instance.filter(sg, POINT_IN_TIME).isEmpty()).isTrue();

final long superOldCreateTime = POINT_IN_TIME - NOTIFICATION_ANOMALY_MAX_LOOKBACK_MS - 100_000L;
persist(anomalyWithCreateTime(superOldCreateTime)
.setDetectionConfigId(alert.getId())
.setStartTime(minutesAgo(1000))
.setEndTime(minutesAgo(800))
);

persist(anomalyWithCreateTime(minutesAgo(9)) // before alert was created
.setDetectionConfigId(alert.getId())
.setStartTime(minutesAgo(100))
.setEndTime(minutesAgo(sgCreationOffset + 1)) // before sg was created
);
final AnomalyDTO anomaly1 = persist(anomalyWithCreateTime(minutesAgo(2))
.setDetectionConfigId(alert.getId())
.setStartTime(minutesAgo(100))
.setEndTime(minutesAgo(sgCreationOffset - 5)) // after sg was created
);

persist(sg.setAlertAssociations(List.of(aaRef(alert.getId()))));

assertThat(collectIds(instance.filter(sg, POINT_IN_TIME)))
.isEqualTo(collectIds(Set.of(anomaly1)));

watermarkManager.updateWatermarks(sg, List.of(anomaly1));

persist(anomalyWithCreateTime(minutesAgo(3))
.setDetectionConfigId(alert.getId())
.setStartTime(minutesAgo(100))
.setEndTime(minutesAgo(sgCreationOffset - 10))
);
// time in the future. Found an old anomaly. Should not be notified
persist(anomalyWithCreateTime(minutesAgo(-1))
.setDetectionConfigId(alert.getId())
.setStartTime(minutesAgo(100))
.setEndTime(minutesAgo(sgCreationOffset - 10))
);
final AnomalyDTO anomaly2 = persist(anomalyWithCreateTime(minutesAgo(1))
.setDetectionConfigId(alert.getId())
.setStartTime(minutesAgo(100))
.setEndTime(minutesAgo(sgCreationOffset - 20))
);
assertThat(collectIds(instance.filter(sg, POINT_IN_TIME)))
.isEqualTo(collectIds(Set.of(anomaly2)));
}

@Test
public void testFilterWithHistoricalAnomalies() {
final AlertDTO alert = persist(new AlertDTO()
.setName("alert1")
.setActive(true));

final SubscriptionGroupDTO sg = persist(new SubscriptionGroupDTO()
.setName("name1")
.setCronExpression(CRON)
.setNotifyHistoricalAnomalies(true));

// base case
assertThat(instance.filter(sg, POINT_IN_TIME).isEmpty()).isTrue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package ai.startree.thirdeye.notification;

import static ai.startree.thirdeye.spi.util.AnomalyUtils.isIgnore;
import static ai.startree.thirdeye.spi.util.SpiUtils.bool;
import static ai.startree.thirdeye.spi.util.SpiUtils.optional;
import static java.util.stream.Collectors.toSet;

Expand All @@ -36,7 +37,6 @@
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -98,30 +98,6 @@ private static String toFormattedDate(final long ts) {
.format(Instant.ofEpochMilli(ts));
}

private static AnomalyFilter buildAnomalyFilter(final AlertAssociationDto aa,
final Map<Long, Long> vectorClocks,
final long createTimeEnd) {
final long alertId = aa.getAlert().getId();
long startTime = optional(vectorClocks)
.map(v -> v.get(alertId))
.orElse(0L);

// Do not notify anomalies older than MAX_ANOMALY_NOTIFICATION_LOOKBACK
final long minStartTime = createTimeEnd - Constants.NOTIFICATION_ANOMALY_MAX_LOOKBACK_MS;
final long createTimeStart = Math.max(startTime, minStartTime);

final AnomalyFilter f = new AnomalyFilter()
.setCreateTimeWindow(new Interval(createTimeStart + 1, createTimeEnd))
.setIsChild(false) // Notify only parent anomalies
.setAlertId(alertId);

optional(aa.getEnumerationItem())
.map(AbstractDTO::getId)
.ifPresent(f::setEnumerationItemId);

return f;
}

/**
* Find anomalies for the given subscription group given an end time.
*
Expand All @@ -136,7 +112,7 @@ public Set<AnomalyDTO> filter(final SubscriptionGroupDTO sg, final long endTime)
// Fetch all the anomalies to be notified to the recipients
return alertAssociations.stream()
.filter(aa -> isAlertActive(aa.getAlert().getId()))
.map(aa -> buildAnomalyFilter(aa, sg.getVectorClocks(), endTime))
.map(aa -> buildAnomalyFilter(aa, sg, endTime))
.map(f -> filterAnomalies(f, sg.getId()))
.flatMap(Collection::stream)
.collect(toSet());
Expand Down Expand Up @@ -166,6 +142,41 @@ private boolean isAlertActive(final long alertId) {
return alert != null && alert.isActive();
}

private AnomalyFilter buildAnomalyFilter(final AlertAssociationDto aa,
final SubscriptionGroupDTO sg,
final long createTimeEnd) {
final long alertId = aa.getAlert().getId();
final AlertDTO alert = alertManager.findById(alertId);
long startTime = optional(sg.getVectorClocks())
.map(v -> v.get(alertId))
.orElse(0L);

// Do not notify anomalies older than MAX_ANOMALY_NOTIFICATION_LOOKBACK
final long minStartTime = createTimeEnd - Constants.NOTIFICATION_ANOMALY_MAX_LOOKBACK_MS;
final long createTimeStart = Math.max(startTime, minStartTime);

final AnomalyFilter f = new AnomalyFilter()
.setCreateTimeWindow(new Interval(createTimeStart + 1, createTimeEnd))
.setIsChild(false) // Notify only parent anomalies
.setAlertId(alertId);

/*
* Do not notify historical anomalies if the end time of the anomaly is before the
* max of the alert create time and subscription group create time.
*
* TODO spyne This should also take the alert association tine when available.
*/
if (!bool(sg.getNotifyHistoricalAnomalies())) {
f.setEndTimeIsGte(Math.max(alert.getCreateTime().getTime(), sg.getCreateTime().getTime()));
}

optional(aa.getEnumerationItem())
.map(AbstractDTO::getId)
.ifPresent(f::setEnumerationItemId);

return f;
}

@VisibleForTesting
Set<AnomalyDTO> filterAnomalies(final AnomalyFilter f, final Long subscriptionGroupId) {
final List<AnomalyDTO> candidates = anomalyManager.filter(f);
Expand Down

0 comments on commit d518921

Please sign in to comment.