Skip to content

Commit

Permalink
Updated pipeline to support alarms.
Browse files Browse the repository at this point in the history
  • Loading branch information
ericstephan committed Nov 6, 2019
1 parent b75df79 commit 4ccc151
Show file tree
Hide file tree
Showing 2 changed files with 131 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1043,7 +1043,6 @@ public ProvenMessageResponse addBulkTimeSeries(String pm, @QueryParam("measureme
@QueryParam("instanceId") String instanceId) {

ProvenMessageResponse pmr = null;
String measurement_type = "input";
try {
cs.begin();

Expand All @@ -1052,16 +1051,8 @@ public ProvenMessageResponse addBulkTimeSeries(String pm, @QueryParam("measureme
throw new Exception("Measurement Name is missing.");
}

// Explicit
//
// Hack
//
if (pm.contains("measurement") || pm.contains("MEASUREMENT")) {

measurement_type = "output";
}

pmr = cs.influxWriteBulkMeasurement(pm, measurement_type, measurementName, instanceId);
pmr = cs.influxWriteMeasurements(pm, measurementName, instanceId);

// Invalid message content
if (null == pmr) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -917,7 +917,7 @@ private String timeFilterStatement(String key, String val) {
// Seconds - 10 digits

// 16 digit nanosecond padding
// statement = padRightZeros(val, 7);
// statement = padRightZeros(val, 7);
statement = padRightZeros(val, 19);

} catch (Exception e) {
Expand All @@ -932,7 +932,7 @@ private String timeFilterStatement(String key, String val) {
return statement;

}

private Map<String, Integer> countFilterFields(List<ProvenQueryFilter> filters) {
Map<String, Integer> filterFieldCounter = new HashMap();
if (filters != null) {
Expand All @@ -957,7 +957,7 @@ private String formatFilterCriteria(ProvenQueryFilter filter) {
String space = " ";
String eq = "=";
String quote = "'";

if (filter.getDatatype() == null) {

statement = filter.getField() + space + eq + quote + filter.getValue() + quote;
Expand All @@ -978,32 +978,32 @@ private String formatFilterCriteria(ProvenQueryFilter filter) {
}

private String assembleUnionFilterStatement(List<ProvenQueryFilter> filters, String fieldName, Integer fieldCounter) {
String unionStatement = "";
int index = 0;
while (index < filters.size()) {

if (filters.get(index).getField().equalsIgnoreCase(fieldName)) {
if (unionStatement.length() == 0) {
unionStatement = " ( " + formatFilterCriteria(filters.get(index));
} else {
unionStatement = unionStatement + " or " + formatFilterCriteria(filters.get(index));
}
}
String unionStatement = "";
int index = 0;
while (index < filters.size()) {

index = index + 1;

if (filters.get(index).getField().equalsIgnoreCase(fieldName)) {
if (unionStatement.length() == 0) {
unionStatement = " ( " + formatFilterCriteria(filters.get(index));
} else {
unionStatement = unionStatement + " or " + formatFilterCriteria(filters.get(index));
}
}
if (unionStatement.length() != 0) {
unionStatement = unionStatement + " ) ";
}
return unionStatement;


index = index + 1;

}
if (unionStatement.length() != 0) {
unionStatement = unionStatement + " ) ";
}
return unionStatement;

}


private List<String> assembleFilterStatements(List<ProvenQueryFilter> filters,
Map<String, Integer> filterFieldCounter) {

List<String> assembledFilterStatements = new ArrayList<>();

Iterator iter = filterFieldCounter.entrySet().iterator();
Expand All @@ -1019,10 +1019,10 @@ private List<String> assembleFilterStatements(List<ProvenQueryFilter> filters,
if (filters.get(index).getField().equalsIgnoreCase((String)entry.getKey())) {
if (filters.get(index).getField().toLowerCase().contains("starttime")
|| filters.get(index).getField().toLowerCase().contains("endtime")) {
// assembledFilterStatements.add(timeFilterStatement(filters.get(index).getField(), filters.get(index).getValue()));
// assembledFilterStatements.add(timeFilterStatement(filters.get(index).getField(), filters.get(index).getValue()));
assembledFilterStatements.add(timeFilterStatement(filters.get(index).getField(), filters.get(index).getValue()));
} else {

assembledFilterStatements.add(formatFilterCriteria(filters.get(index)));
}

Expand All @@ -1033,30 +1033,30 @@ private List<String> assembleFilterStatements(List<ProvenQueryFilter> filters,
}
} else {
assembledFilterStatements.add(assembleUnionFilterStatement(filters, (String) entry.getKey(), (Integer) counterIndex));
//
// int index = 0;
// String unionStatement = "";
// while (index < filters.size()) {
//
// if (filters.get(index).getField().equalsIgnoreCase((String)entry.getKey())) {
// if (unionStatement.length() == 0) {
// unionStatement = " ( " + assembledFilterStatements.add(formatFilterCriteria(filters.get(index)));
// } else {
// unionStatement = unionStatement + " or " + formatFilterCriteria(filters.get(index));
// }
// }
// if (unionStatement.length() != 0) {
// assembledFilterStatements.add(unionStatement + " ) ");
// }
// index = index + 1;
//
}
//
// int index = 0;
// String unionStatement = "";
// while (index < filters.size()) {
//
// if (filters.get(index).getField().equalsIgnoreCase((String)entry.getKey())) {
// if (unionStatement.length() == 0) {
// unionStatement = " ( " + assembledFilterStatements.add(formatFilterCriteria(filters.get(index)));
// } else {
// unionStatement = unionStatement + " or " + formatFilterCriteria(filters.get(index));
// }
// }
// if (unionStatement.length() != 0) {
// assembledFilterStatements.add(unionStatement + " ) ");
// }
// index = index + 1;
//
}
}

return assembledFilterStatements;
}


public ProvenMessageResponse influxQuery(ProvenMessage query) throws InvalidProvenMessageException {

ProvenMessageResponse pmr = influxQuery(query, false);
Expand Down Expand Up @@ -1100,7 +1100,7 @@ public ProvenMessageResponse influxQuery(ProvenMessage query, boolean returnCsvF
if (index == 0) {
queryStatement = queryStatement + space + assembledFilterStatements.get(index) + space;
flag = false;

} else {

// If not a time filter, treat it as a field.
Expand All @@ -1126,7 +1126,7 @@ public ProvenMessageResponse influxQuery(ProvenMessage query, boolean returnCsvF
//queryStatement = queryStatement + " limit 10";
//*******DEBUG statement
//


Query influxQuery = new Query(queryStatement, dbName);
//
Expand Down Expand Up @@ -1273,26 +1273,32 @@ public ProvenMessageResponse influxWriteMeasurements(Collection<ProvenMeasuremen

}

public ProvenMessageResponse influxWriteBulkOutputMeasurement() {
ProvenMessageResponse pmr = null;

return pmr;
}

@SuppressWarnings("unchecked")
public char detectObjectType(JSONObject message) {
char type = 0;
JSONObject object = (JSONObject) message.get("message");
if (object != null) {
type = 'O';
} else {
object = (JSONObject) message.get("input");
public String detectObjectType(Object messageObject) {
String objectType = "";
if (messageObject instanceof JSONObject) {

JSONObject mObject = (JSONObject) messageObject;
JSONObject object = (JSONObject) (mObject.get("message"));
if (object != null) {
type = 'I';
objectType = "O";

} else {
object = (JSONObject) mObject.get("input");
if (object != null) {
objectType = "I";
}
}
}
//
// If it isn't a JSONObject, assume that it is a JSONArray representing Alarms
//
} else {
objectType = "A";
}

return type;

return objectType;

}

Expand Down Expand Up @@ -1330,7 +1336,7 @@ private Point.Builder processNestedObject (JSONObject iObject, String key, Point
}


private ProvenMessageResponse influxWriteBulkSimulationInput(JSONObject commandObject, InfluxDB influxDB,
private ProvenMessageResponse influxWriteSimulationInput(JSONObject commandObject, InfluxDB influxDB,
String measurementName, String instanceId) {
ProvenMessageResponse ret = null;
Long timestamp = (long) -1;
Expand Down Expand Up @@ -1471,7 +1477,7 @@ private ProvenMessageResponse influxWriteBulkSimulationInput(JSONObject commandO
return ret;
}

private ProvenMessageResponse influxWriteBulkSimulationOutput(JSONObject messageObject, InfluxDB influxDB,
private ProvenMessageResponse influxWriteSimulationOutput(JSONObject messageObject, InfluxDB influxDB,
String measurementName, String instanceId) {

ProvenMessageResponse ret = null;
Expand Down Expand Up @@ -1552,10 +1558,55 @@ private ProvenMessageResponse influxWriteBulkSimulationOutput(JSONObject message

}



private ProvenMessageResponse influxWriteAlarms(JSONArray messageObject, InfluxDB influxDB,
String measurementName, String instanceId) {

ProvenMessageResponse ret = null;
Long timestamp = System.currentTimeMillis() / 1000l;

Iterator<JSONObject> iterator = messageObject.iterator();
while (iterator.hasNext()) {
//System.out.println(iterator.next());
JSONObject record = (JSONObject) iterator.next();
Set<String> record_keys = record.keySet();
Iterator<String> record_it = record_keys.iterator();

Point.Builder builder = Point.measurement(measurementName).time(timestamp, TimeUnit.SECONDS);
timestamp = timestamp + 1;
while (record_it.hasNext()) {
String record_key = record_it.next();

if (record.get(record_key) instanceof String) {
builder.addField( record_key, String.valueOf((String) record.get(record_key)));
} else if (record.get(record_key) instanceof Integer) {
builder.addField( record_key, Integer.valueOf((Integer) record.get(record_key)));
} else if (record.get(record_key) instanceof Long) {
builder.addField(record_key, Long.valueOf((Long) record.get(record_key)));
} else if (record.get(record_key) instanceof Float) {
builder.addField(record_key, Float.valueOf((Float) record.get(record_key)));
} else if (record.get(record_key) instanceof Double) {
builder.addField(record_key, Double.valueOf((Double) record.get(record_key)));
}
}
influxDB.write(idbDB, idbRP, builder.build());
}

ret = new ProvenMessageResponse();
ret.setReason("success");
ret.setStatus(Status.CREATED);
ret.setCode(Status.CREATED.getStatusCode());
ret.setResponse("{ \"INFO\": \"Time-series measurements successfully created.\" }");

return ret;

}

//
// New write measurement routine
//
public ProvenMessageResponse influxWriteBulkMeasurement(String measurements, String measurement_type,
public ProvenMessageResponse influxWriteMeasurements(String measurements,
String measurementName, String instanceId) {

// DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy/MM/dd HH:mm:ss");
Expand All @@ -1576,20 +1627,26 @@ public ProvenMessageResponse influxWriteBulkMeasurement(String measurements, Str
influxDB.enableBatch(20000, 20, TimeUnit.SECONDS);
JSONParser parser = new JSONParser();

JSONObject messageObject = null;
Object messageObject = null;

String objectType = "" ;

try {
messageObject = (JSONObject) parser.parse(measurements);
messageObject = parser.parse(measurements);

} catch (org.json.simple.parser.ParseException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

char type = detectObjectType(messageObject);
objectType = detectObjectType(messageObject);

if (type == 'O') {
ret = influxWriteBulkSimulationOutput(messageObject, influxDB, measurementName, instanceId);
} else if (type == 'I') {
ret = influxWriteBulkSimulationInput(messageObject, influxDB, measurementName, instanceId);
if (objectType.equalsIgnoreCase("O")) {
ret = influxWriteSimulationOutput((JSONObject)messageObject, influxDB, measurementName, instanceId);
} else if (objectType.equalsIgnoreCase("I")) {
ret = influxWriteSimulationInput((JSONObject)messageObject, influxDB, measurementName, instanceId);
} else if (objectType.equalsIgnoreCase("A")) {
ret = influxWriteAlarms((JSONArray)messageObject, influxDB, measurementName, instanceId);
} else {
ret = new ProvenMessageResponse();
ret.setStatus(Status.BAD_REQUEST);
Expand Down

0 comments on commit 4ccc151

Please sign in to comment.