-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathanomalies.R
114 lines (98 loc) · 3.22 KB
/
anomalies.R
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
library(rmongodb)
library(AnomalyDetection)
library(zoo)
save_anomalies <- function(mongo_host, mongo_db, app_ids) {
mongo <- mongo.create(host=mongo_host, db=mongo_db)
docs = list()
for (app_id in app_ids) {
anomalies <- find_anomalies(mongo, app_id)
docs = c(docs, Map(
function(anns, name) {
mongo.bson.from.list(list(
name=name,
anomalies=if(length(anns) > 1) anns else list(anns),
app_id=app_id
))
},
anomalies,
names(anomalies)
))
}
mongo.remove(
mongo,
"appstats.anomalies",
mongo.bson.from.list(list(app_id=list('$in'=app_ids)))
)
if (length(docs) > 0) {
mongo.insert.batch(mongo, "appstats.anomalies", docs)
}
}
find_anomalies <- function(mongo, app_id) {
top_names <- load_top_names(mongo, app_id, 50)
anomalies <- as.list(top_names)
names(anomalies) <- top_names
anomalies <- lapply(anomalies, load_counts_data, mongo=mongo, app_id=app_id)
anomalies <- lapply(anomalies, na.approx, na.rm=FALSE)
anomalies <- lapply(anomalies, data.frame)
# run algorithm, print all errors to stderr
anomalies <- lapply(
anomalies,
function(...) {
tryCatch(AnomalyDetectionTs(...), error=
function(e) {
e_str <- toString(e)
write(toString(e), stderr())
list(anoms=c())
})
},
max_anoms=0.02, direction='pos', alpha=0.01, only_last='day', plot=FALSE
)
anomalies <- Filter(function(x) length(x$anoms) > 0, anomalies)
anomalies <- lapply(anomalies, function(x) x$anoms$timestamp)
return(anomalies)
}
load_top_names <- function(mongo, app_id, limit) {
res <- mongo.find(
mongo, "appstats.appstats_docs",
query=list(app_id=app_id),
sort=list(NUMBER_day=-1),
fields=list(name=1),
limit=limit
)
out = NULL
while (mongo.cursor.next(res)){
row <- mongo.bson.to.list(mongo.cursor.value(res))
out <- c(out, row$name)
}
return(out)
}
load_counts_data <- function(name, mongo, app_id) {
start_date <- Sys.time() - 3628800 # three 2-week periods: 3 * 14 * 24 * 60 * 60
attr(start_date, "tzone") <- "UTC"
query <- list(app_id=app_id, name=name, date=list("$gt"=start_date))
res <- mongo.find(
mongo, "appstats.appstats_apps_periodic-1",
query=query, fields=list("date"=1, "real_time"=1, "NUMBER"=1)
)
out <- list(timestamp=NULL, count=NULL)
while (mongo.cursor.next(res)){
row <- mongo.bson.to.list(mongo.cursor.value(res))
out$timestamp <- c(out$timestamp, row$date)
out$count <- c(
out$count,
if(is.double(row$real_time) &&
is.double(row$NUMBER) &&
row$real_time > 0 &&
row$NUMBER > 0)
row$real_time / row$NUMBER
else
NA
)
}
return(data.frame(out))
}
args <- commandArgs(trailingOnly=TRUE)
mongo_host <- args[1]
mongo_db <- args[2]
app_ids <- as.list(args[-1:-2])
save_anomalies(mongo_host, mongo_db, app_ids)