Skip to content

Commit

Permalink
Merge pull request #38 from kkuzmin/single_batch
Browse files Browse the repository at this point in the history
Send notification data in one Ingest request.
  • Loading branch information
kkuzmin authored Mar 27, 2018
2 parents 4b5ac50 + b28325e commit 435c67b
Show file tree
Hide file tree
Showing 4 changed files with 342 additions and 51 deletions.
104 changes: 55 additions & 49 deletions O365WebHook/o365content.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,44 +25,51 @@ const g_ingestc = new m_ingest.Ingest(
}
);

// One O365 content message is about 1KB.
var MAX_BATCH_MESSAGES = 1500;

module.exports.processNotifications = function(context, notifications, callback) {
// Call the function per each notification in parallel.
async.each(notifications,
function(notification, callback) {
processContent(context, notification, callback);
},
function(err) {
if (err) {
return callback(`${err}`);
}
else {
return callback(null);
}
async.map(notifications, function(notification, asyncCallback) {
return m_o365mgmnt.getContent(notification.contentUri, asyncCallback);
}, function(fetchErr, mapResult) {
if (fetchErr) {
return callback(fetchErr);
} else {
const flattenResult = [].concat.apply([], mapResult);
context.log.verbose('Messages fetched:', flattenResult.length);
return processContent(context, flattenResult, callback);
}
);
});
};

function processContent(context, notification, callback) {
m_o365mgmnt.getContent(notification.contentUri,
function(err, content) {
if (err) {
return callback(`Unable to fetch content: ${err}`);
}
else {
parseContent(context, content,
function(err, parsedContent) {
if (err) {
return callback(err);
}
else {
return sendToIngest(context,
parsedContent, callback);
}
}
);
}
}
);
function processContent(context, content, callback) {
const slices = getSliceIndexes(content.length);
return async.map(slices, function(slice, asyncCallback){
const contentSlice = content.slice(slice.start, slice.end);
parseContent(context, contentSlice,
function(err, parsedContent) {
if (err) {
return asyncCallback(err);
}
else {
return sendToIngest(context, parsedContent, asyncCallback);
}
});
}, callback);
}

function getSliceIndexes(contentLength) {
var sliceArray = [];
const batchesCount = Math.ceil(contentLength / MAX_BATCH_MESSAGES);
for (var i=0; i<batchesCount; ++i) {
const slice = {
start : i * MAX_BATCH_MESSAGES,
end : (i+1) * MAX_BATCH_MESSAGES
};
sliceArray.push(slice);
}

return sliceArray;
}

// Parse each message into:
Expand Down Expand Up @@ -105,7 +112,6 @@ function parseContent(context, parsedContent, callback) {
if (err) {
return callback(`Content parsing failure. ${err}`);
} else {
context.log.verbose('parsedData: ', result.length);
return callback(null, result);
}
}
Expand All @@ -114,34 +120,34 @@ function parseContent(context, parsedContent, callback) {

function sendToIngest(context, content, callback) {
async.waterfall([
function(callback) {
function(asyncCallback) {
m_ingestProto.load(context, function(err, root) {
callback(err, root);
asyncCallback(err, root);
});
},
function(root, callback) {
function(root, asyncCallback) {
m_ingestProto.setMessage(context, root, content, function(err, msg) {
callback(err, root, msg);
asyncCallback(err, root, msg);
});
},
function(root, msg, callback) {
function(root, msg, asyncCallback) {
m_ingestProto.setHostMetadata(context, root, content, function(err, meta) {
callback(err, root, meta, msg);
asyncCallback(err, root, meta, msg);
});
},
function(root, meta, msg, callback) {
function(root, meta, msg, asyncCallback) {
m_ingestProto.setBatch(context, root, meta, msg, function(err, batch) {
callback(err, root, batch);
asyncCallback(err, root, batch);
});
},
function(root, batchBuf, callback) {
function(root, batchBuf, asyncCallback) {
m_ingestProto.setBatchList(context, root, batchBuf,
function(err, batchList) {
callback(err, root, batchList);
asyncCallback(err, root, batchList);
});
},
function(root, batchList, callback) {
m_ingestProto.encode(context, root, batchList, callback);
function(root, batchList, asyncCallback) {
m_ingestProto.encode(context, root, batchList, asyncCallback);
}],
function(err, result) {
if (err) {
Expand All @@ -157,13 +163,13 @@ function sendToIngest(context, content, callback) {
`(${compressed.byteLength}) exceeds maximum allowed value.`);
return g_ingestc.sendO365Data(compressed)
.then(resp => {
context.log.verbose('Bytes sent to Ingest: ', compressed.byteLength);
return callback(null, resp);
})
.catch(function(exception){
return callback(`Unable to send to Ingest ${exception}`);
return callback(`Unable to send to Ingest. ${exception}`);
});
}
});
});
}

4 changes: 3 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@
"local-ad": "node ./local_dev/ad_token_local_dev.js",
"local-version": "node ./local_dev/version_local_dev.js",
"lint": "jshint --exclude \"./node_modules/*\" **/*.js",
"test": "npm run lint && mocha"
"test": "JUNIT_REPORT_PATH=./test/report.xml nyc --reporter=cobertura mocha --colors --reporter mocha-jenkins-reporter"
},
"devDependencies": {
"jshint": "^2.9.5",
"mocha": "^3.5.3",
"mocha-jenkins-reporter": "^0.3.10",
"nyc": "^11.3.0",
"pre-commit": "^1.2.2",
"rewire": "^2.5.2",
"sinon": "^3.3.0"
Expand Down
157 changes: 156 additions & 1 deletion test/mock.js
Original file line number Diff line number Diff line change
Expand Up @@ -253,11 +253,166 @@ var updaterAuditLogs = {
]
};

const webhookNotifications = [
{
"contentType": "Audit.AzureActiveDirectory",
"contentId": "20180321173155808044366$20180321173155808044366$audit_azureactivedirectory$Audit_AzureActiveDirectory$IsFromNotification",
"contentUri": "https://manage.office.com/api/v1.0/bf8d32d3-1c13-4487-af02-80dba2236485/activity/feed/audit/20180321173155808044366$20180321173155808044366$audit_azureactivedirectory$Audit_AzureActiveDirectory$IsFromNotification",
"notificationStatus": "Succeeded",
"contentCreated": "2018-03-21T17:36:48.032Z",
"notificationSent": "2018-03-21T17:36:48.032Z",
"contentExpiration": "2018-03-28T17:31:55.808Z"
},
{
"contentType": "Audit.AzureActiveDirectory",
"contentId": "20180321173506988040854$20180321173506988040854$audit_azureactivedirectory$Audit_AzureActiveDirectory$IsFromNotification",
"contentUri": "https://manage.office.com/api/v1.0/bf8d32d3-1c13-4487-af02-80dba2236485/activity/feed/audit/20180321173506988040854$20180321173506988040854$audit_azureactivedirectory$Audit_AzureActiveDirectory$IsFromNotification",
"notificationStatus": "Succeeded",
"contentCreated": "2018-03-21T17:36:48.032Z",
"notificationSent": "2018-03-21T17:36:48.032Z",
"contentExpiration": "2018-03-28T17:35:06.988Z"
}
];

const o365Content = [
{
"ApplicationId": "c44b4083-3bb0-49c1-b47d-974e53cbdf3c",
"TargetContextId": "bf8d32d3-1c13-4487-af02-80dba2236485",
"Target": [
{
"Type": 0,
"ID": "797f4846-ba00-4fd7-ba43-dac1f8f63013"
}
],
"IntraSystemId": "c177a031-d063-4789-873d-87af94762900",
"InterSystemsId": "a9eccaf4-84f7-47c4-99f4-f3989bd1899a",
"ActorIpAddress": "87.113.76.58",
"ActorContextId": "bf8d32d3-1c13-4487-af02-80dba2236485",
"UserType": 0,
"UserKey": "10030000A19F1B13@alazurealertlogic.onmicrosoft.com",
"ResultStatus": "Succeeded",
"RecordType": 15,
"OrganizationId": "bf8d32d3-1c13-4487-af02-80dba2236485",
"Operation": "UserLoggedIn",
"Id": "425415ab-86e9-4ae1-b91f-61d748d2a812",
"CreationTime": "2018-03-21T17:00:32",
"Version": 1,
"Workload": "AzureActiveDirectory",
"ClientIP": "87.113.76.58",
"ObjectId": "797f4846-ba00-4fd7-ba43-dac1f8f63013",
"UserId": "kkuzmin@alazurealertlogic.onmicrosoft.com",
"AzureActiveDirectoryEventType": 1,
"ExtendedProperties": [
{
"Value": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/65.0.3325.162 Safari/537.36",
"Name": "UserAgent"
},
{
"Value": "1",
"Name": "UserAuthenticationMethod"
},
{
"Value": "OAuth2:Authorize",
"Name": "RequestType"
},
{
"Value": "Success",
"Name": "ResultStatusDetail"
},
{
"Value": "True",
"Name": "KeepMeSignedIn"
}
],
"Actor": [
{
"Type": 0,
"ID": "bea5cb4c-0348-49e4-b225-8acf2623d1ea"
},
{
"Type": 5,
"ID": "kkuzmin@alazurealertlogic.onmicrosoft.com"
},
{
"Type": 3,
"ID": "10030000A19F1B13"
}
]
},
{
"ApplicationId": "c44b4083-3bb0-49c1-b47d-974e53cbdf3c",
"TargetContextId": "bf8d32d3-1c13-4487-af02-80dba2236485",
"Target": [
{
"Type": 0,
"ID": "797f4846-ba00-4fd7-ba43-dac1f8f63013"
}
],
"IntraSystemId": "c177a031-d063-4789-873d-87af94762900",
"InterSystemsId": "a9eccaf4-84f7-47c4-99f4-f3989bd1899a",
"ActorIpAddress": "87.113.76.58",
"ActorContextId": "bf8d32d3-1c13-4487-af02-80dba2236485",
"UserType": 0,
"UserKey": "10030000A19F1B13@alazurealertlogic.onmicrosoft.com",
"ResultStatus": "Succeeded",
"RecordType": 15,
"OrganizationId": "bf8d32d3-1c13-4487-af02-80dba2236485",
"Operation": "UserLoggedIn",
"Id": "425415ab-86e9-4ae1-b91f-61d748d2a812",
"CreationTime": "2018-03-21T17:00:32",
"Version": 1,
"Workload": "AzureActiveDirectory",
"ClientIP": "87.113.76.58",
"ObjectId": "797f4846-ba00-4fd7-ba43-dac1f8f63013",
"UserId": "kkuzmin@alazurealertlogic.onmicrosoft.com",
"AzureActiveDirectoryEventType": 1,
"ExtendedProperties": [
{
"Value": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/65.0.3325.162 Safari/537.36",
"Name": "UserAgent"
},
{
"Value": "1",
"Name": "UserAuthenticationMethod"
},
{
"Value": "OAuth2:Authorize",
"Name": "RequestType"
},
{
"Value": "Success",
"Name": "ResultStatusDetail"
},
{
"Value": "True",
"Name": "KeepMeSignedIn"
}
],
"Actor": [
{
"Type": 0,
"ID": "bea5cb4c-0348-49e4-b225-8acf2623d1ea"
},
{
"Type": 5,
"ID": "kkuzmin@alazurealertlogic.onmicrosoft.com"
},
{
"Type": 3,
"ID": "10030000A19F1B13"
}
]
}
];


module.exports = {
allEnabledStreams : allEnabledStreams,
context : context,
timer : timer,
masterAuditLogs : masterAuditLogs,
updaterAuditLogs : updaterAuditLogs,
o365webhookAuditLogs : o365webhookAuditLogs
o365webhookAuditLogs : o365webhookAuditLogs,
webhookNotifications : webhookNotifications,
o365Content : o365Content
};
Loading

0 comments on commit 435c67b

Please sign in to comment.