Skip to content

Commit

Permalink
[PAWS][ENG-55567] Replace GCP collector npm library google-cloud-logg…
Browse files Browse the repository at this point in the history
…ing to googleapis to convert protopayload to json (#372)

* [PAWS][ENG-55567] Replace GCP collector npm library google-cloud-logging to googleapis to convert protopayload to json
  • Loading branch information
imranalisyed506 authored Jul 5, 2024
1 parent f6c9a7b commit 279bac0
Show file tree
Hide file tree
Showing 9 changed files with 438 additions and 248 deletions.
106 changes: 49 additions & 57 deletions collectors/googlestackdriver/collector.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,35 +14,36 @@ const PawsCollector = require('@alertlogic/paws-collector').PawsCollector;
const calcNextCollectionInterval = require('@alertlogic/paws-collector').calcNextCollectionInterval;
const parse = require('@alertlogic/al-collector-js').Parse;
const AlLogger = require('@alertlogic/al-aws-collector-js').Logger;
const logging = require('@google-cloud/logging');
const packageJson = require('./package.json');
const protoFiles = require('google-proto-files');
const { auth } = require("google-auth-library");
const { google } = require("googleapis");

const API_THROTTLING_ERROR = 8;
const API_THROTTLING_STATUS_CODE = 429;
const MAX_POLL_INTERVAL = 900;
const MAX_PAGE_SIZE = 1000;
const AUDIT_PAYLOAD_TYPE_URL = 'type.googleapis.com/google.cloud.audit.AuditLog';
const SCOPES = [
'https://www.googleapis.com/auth/cloud-platform',
'https://www.googleapis.com/auth/cloud-platform.read-only',
'https://www.googleapis.com/auth/logging.admin',
'https://www.googleapis.com/auth/logging.read',
'https://www.googleapis.com/auth/logging.write',
];

const typeIdPaths = [
{path: ['jsonPayload', 'fields', 'event_type', 'stringValue']},
{path: ['protoPayload', 'type_url']},
{path: ['jsonPayload']},
{path: ['protoPayload', '@type']},
{path: ['payload']}
];

const tsPaths = [{ path: ["timestamp"] }];

class GooglestackdriverCollector extends PawsCollector {

constructor(context, creds){
super(context, creds, packageJson.version);
this._initAuditLogDecoder();
}

_initAuditLogDecoder() {
const protoPath = protoFiles.getProtoPath('cloud', 'audit', 'audit_log.proto');
const root = protoFiles.loadSync(protoPath);
const auditLogDecoder = root.lookupType('google.cloud.audit.AuditLog');
this._auditLogDecoder = auditLogDecoder;
}

pawsInitCollectionState(event, callback) {
const startTs = process.env.paws_collection_start_ts ?
Expand Down Expand Up @@ -79,9 +80,18 @@ class GooglestackdriverCollector extends PawsCollector {
if (!state.stream) {
state = collector.setStreamToCollectionState(state);
}
const keysEnvVar = collector.secret;
if (!keysEnvVar) {
throw new Error("The $CREDS environment variable was not found!");
}
// Start API client
const client = new logging.v2.LoggingServiceV2Client({
credentials: JSON.parse(collector.secret)
const keys = JSON.parse(keysEnvVar);
const client = auth.fromJSON(keys);
client.subject = collector.clientId;
client.scopes = SCOPES;
const logging = google.logging({
version: 'v2',
auth: client,
});


Expand All @@ -92,49 +102,46 @@ class GooglestackdriverCollector extends PawsCollector {
timestamp < "${state.until}"`;

let pagesRetireved = 0;
const options = {autoPaginate: false};


const paginationCallback = (result, acc = []) => {
AlLogger.info(`Getting page: ${pagesRetireved + 1} Logs retrieved: ${result[0].length}`);
let logs = result.data.entries || [];
AlLogger.info(`Getting page: ${pagesRetireved + 1} Logs retrieved: ${logs.length}`);
pagesRetireved++;
//decode the protoPayload if it's an AuditLog message
let logs = result[0].map(function (logEntry) {
return collector.decodeProtoPayload(logEntry);
});

const nextPage = result[1];
const nextPage = { ...params, pageToken: result.data.nextPageToken };
const newAcc = [...acc, ...logs];
AlLogger.info(`Total Logs ${newAcc.length}`);

if(nextPage && pagesRetireved < process.env.paws_max_pages_per_invocation){
if (nextPage.pageToken && pagesRetireved < process.env.paws_max_pages_per_invocation) {

return client.listLogEntries(nextPage, options)
.then(res => paginationCallback(res, newAcc));
} else{
return {logs: newAcc, nextPage};
return logging.entries.list(params)
.then(res => {
return paginationCallback(res, newAcc)
});
} else {
return { logs: newAcc, nextPage };
}
};

const pageSize = state.pageSize > 0 ? state.pageSize : MAX_PAGE_SIZE;
let params = state.nextPage ?
state.nextPage:
state.nextPage :
{
filter,
pageSize: pageSize,
resourceNames:[state.stream]
resourceNames: [state.stream]
};

client.listLogEntries(params, options)
logging.entries.list(params)
.then(paginationCallback)
.then(({logs, nextPage}) => {
.then(({ logs, nextPage }) => {

const newState = collector._getNextCollectionState(state, nextPage);
AlLogger.info(`GSTA000002 Next collection in ${newState.poll_interval_sec} seconds`);

return callback(null, logs, newState, newState.poll_interval_sec);
})
.catch(err => {
AlLogger.error(`GSTA000003 err in collection ${JSON.stringify(err.details)}`);

AlLogger.error(`GSTA000003 err in collection ${JSON.stringify(err)}`);
// Stackdriver Logging api has some rate limits that we might run into.
// If we run inot a rate limit error, instead of returning the error,
// we return the state back to the queue with an additional second added, up to 15 min
Expand All @@ -147,54 +154,40 @@ timestamp < "${state.until}"`;
const interval = state.poll_interval_sec < 60 ? 60 : state.poll_interval_sec;
const nextPollInterval = state.poll_interval_sec < MAX_POLL_INTERVAL ?
interval + 60 : MAX_POLL_INTERVAL;

if (state.nextPage && state.nextPage.pageSize) {
if (state.nextPage && state.nextPage.pageToken && state.nextPage.pageSize) {
state.nextPage.pageSize = Math.ceil(state.nextPage.pageSize / 2);
AlLogger.debug(`Throttling error with nextPage: ${err.message}. Retrying with smaller pageSize.`);
} else {
if (currentInterval <= 15 && err.details.includes('Received message larger than max')) {
if (currentInterval <= 15 && err.message && err.message.indexOf('Received message larger than max') >= 0) {
state.pageSize = state.pageSize ? Math.ceil(state.pageSize / 2) : Math.ceil(params.pageSize / 2);
AlLogger.debug(`Throttling error with no nextPage and large message: ${err.message}. Reducing pageSize.`);
} else {
state.until = moment(state.since).add(Math.ceil(currentInterval / 2), 'seconds').toISOString();
AlLogger.debug(`Throttling error with no nextPage: ${err.message}. Reducing time range.`);
}
}
const backOffState = Object.assign({}, state, {poll_interval_sec:nextPollInterval});
const backOffState = Object.assign({}, state, { poll_interval_sec: nextPollInterval });
collector.reportApiThrottling(function () {
return callback(null, [], backOffState, nextPollInterval);
});
} else {
// set errorCode if not available in error object to showcase client error on DDMetrics
if (err.code) {
// set errorCode if not available in error object to showcase client error on DDMetrics
if (err.code) {
err.errorCode = err.code;
}
return callback(err);
}
});
}

decodeProtoPayload(logEntry) {
let collector = this;
if (logEntry.protoPayload && (logEntry.protoPayload.type_url === AUDIT_PAYLOAD_TYPE_URL)) {
try {
const buffer = Buffer.from(logEntry.protoPayload.value);
let decodedData = collector._auditLogDecoder.decode(buffer);
logEntry.protoPayload.value = decodedData.toJSON();
} catch(error) {
AlLogger.error(`Error decoding data ${error}`);
}
}
return logEntry;
}

_getNextCollectionState(curState, nextPage) {
// Reset the page size for the next collection if it's less than the maximum
const pageSize = Math.max(MAX_PAGE_SIZE, nextPage?.pageSize || curState.pageSize || MAX_PAGE_SIZE);

const { stream, since, until } = curState;

if (nextPage) {
if (nextPage && nextPage.pageToken) {
// Case: Continue with the next page
return {
since,
Expand Down Expand Up @@ -222,15 +215,14 @@ timestamp < "${state.until}"`;
// TODO: probably need to actually decode hte protobuf payload on these logs
pawsFormatLog(msg) {
let collector = this;

const ts = msg.timestamp ? msg.timestamp : {seconds: Date.now() / 1000};
const ts = parse.getMsgTs(msg, tsPaths);

const typeId = parse.getMsgTypeId(msg, typeIdPaths);

let formattedMsg = {
// TODO: figure out if this TS is always a string or if they API is goofy...
hostname: collector.collector_id,
messageTs: parseInt(ts.seconds),
messageTs: ts.sec,
priority: 11,
progName: 'GooglestackdriverCollector',
message: JSON.stringify(msg),
Expand Down
36 changes: 18 additions & 18 deletions collectors/googlestackdriver/local/events/event_poll.json
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
{
"Records": [
{
"messageId": "f67a45fc-ab43-4df1-9f2f-585bb2043122",
"receiptHandle": "AQEB78LmKaaZ+uj8DVnc/O2zQcAe+qKSi3ZGTSBFssIRSmpwwzUEi2vhPTaIBnhOoh0aoRxVjWdoXO3ZloINfMxmcjycP3KC0WXwcWokoOc3iMCdqhYg0NcOhQW1X0ixc79C9/5/XF1xGd79vLhFL7KvRjjiT4sOaSxlAv6v2fJ5eDETnp7CRa5pocCF4EO2su0M4/TnlLreGfsY+C+/tH+r19AM+d3Jt5dbNMrKMWRRZ7/PTjczkIM7U38AHPuusuBz9uzA5yMQGOMI8FPXfqgcafEN17JqKuNSd/l54v1+s9rBQSzL/MH9wZ0XpZditcpe5pTc+dGuRHOXbFK2A0YUQCvRq1Ed8tfr68uELTQK5jWHH/zPnEMWsTyco9VuNCKr",
"body": "{\n \"priv_collector_state\": {\n \"since\": \"2019-01-01T10:00:00Z\",\n \"until\": \"2020-01-01T10:30:00Z\"\n }\n}",
"attributes": {
"ApproximateReceiveCount": "1",
"SentTimestamp": "1574159909907",
"SenderId": "some-id",
"ApproximateFirstReceiveTimestamp": "1574159909968"
},
"messageAttributes": {},
"md5OfBody": "1845296051d4cfe2e6175358a065cc38",
"eventSource": "aws:sqs",
"eventSourceARN": "arn:aws:sqs:us-east-1:352283894008:paws-state-queue",
"awsRegion": "us-east-1"
}
]
"Records": [
{
"messageId": "1b77ecca-5c1d-4c88-a6b5-f5be6d6e3947",
"receiptHandle": "AQEB78LmKaaZ+uj8DVnc/O2zQcAe+qKSi3ZGTSBFssIRSmpwwzUEi2vhPTaIBnhOoh0aoRxVjWdoXO3ZloINfMxmcjycP3KC0WXwcWokoOc3iMCdqhYg0NcOhQW1X0ixc79C9/5/XF1xGd79vLhFL7KvRjjiT4sOaSxlAv6v2fJ5eDETnp7CRa5pocCF4EO2su0M4/TnlLreGfsY+C+/tH+r19AM+d3Jt5dbNMrKMWRRZ7/PTjczkIM7U38AHPuusuBz9uzA5yMQGOMI8FPXfqgcafEN17JqKuNSd/l54v1+s9rBQSzL/MH9wZ0XpZditcpe5pTc+dGuRHOXbFK2A0YUQCvRq1Ed8tfr68uELTQK5jWHH/zPnEMWsTyco9VuNCKr",
"body": "{\n \"priv_collector_state\": {\n \"since\": \"2024-04-13T00:00:00Z\",\n \n \"stream\": \"projects/rcs-test-project-422212\",\n \"until\": \"2024-06-13T23:00:00Z\"\n }\n}",
"attributes": {
"ApproximateReceiveCount": "1",
"SentTimestamp": "1574159909907",
"SenderId": "some-id",
"ApproximateFirstReceiveTimestamp": "1574159909968"
},
"messageAttributes": {},
"md5OfBody": "1845296051d4cfe2e6175358a065cc38",
"eventSource": "aws:sqs",
"eventSourceARN": "arn:aws:sqs:us-east-1:352283894008:paws-state-queue",
"awsRegion": "us-east-1"
}
]
}
14 changes: 14 additions & 0 deletions collectors/googlestackdriver/local/run-sam.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,20 @@ SRC_SAM_TEMPLATE="${SCRIPT_DIR}/sam-template.yaml"
SRC_ENV_FILE="${SCRIPT_DIR}/${ENV_FILE_NAME}"
SRC_EVENT_FILE="${SCRIPT_DIR}/events/${EVENT_FILE_NAME}"
RUN_DIR=${SCRIPT_DIR}/../
PROFILE_NAME=""

exists(){
command -v "$1" >/dev/null 2>&1
}

if exists jq; then
uid=`uuidgen`
LOWERUUID=$(echo "$uid" | tr '[:upper:]' '[:lower:]')
echo "generating messageId in event.json: ${LOWERUUID}"
jq --arg newRandomvalue $LOWERUUID '(.Records[].messageId) |= $newRandomvalue' ${SRC_EVENT_FILE} > tmp && mv tmp ${SRC_EVENT_FILE}
else
echo "jq does not exist please install jq to run command"
fi

command -v sam > /dev/null
if [ $? -ne 0 ]; then
Expand All @@ -26,6 +39,7 @@ ln -sf ${SRC_ENV_FILE} ${RUN_DIR}/${ENV_FILE_NAME}
ln -sf ${SRC_EVENT_FILE} ${RUN_DIR}/${EVENT_FILE_NAME}
cd ${RUN_DIR} && \
sam local invoke \
--profile ${PROFILE_NAME} \
--env-vars ${ENV_FILE_NAME} \
-t ${SAM_TEMPLATE_NAME} \
-e ${EVENT_FILE_NAME} \
Expand Down
6 changes: 4 additions & 2 deletions collectors/googlestackdriver/local/sam-template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ Resources:
LocalLambda:
Type: AWS::Serverless::Function
Properties:
KmsKeyArn: arn:aws:kms:us-east-1:352283894008:key/cdda86d5-615b-4dcc-9319-77ab34510473
KmsKeyArn:
Environment:
Variables:
AWS_LAMBDA_FUNCTION_NAME:
Expand All @@ -25,6 +25,7 @@ Resources:
paws_api_client_id:
paws_max_pages_per_invocation:
paws_collector_param_string_2:
paws_collector_param_string_1:
paws_endpoint:
paws_extension:
collector_id:
Expand All @@ -33,8 +34,9 @@ Resources:
customer_id:
paws_type_name:
ssm_direct:
collector_status_api: api.product.dev.alertlogic.com
CodeUri:
Runtime: nodejs20.x
Handler: index.handler
Timeout: 60
Timeout: 600
MemorySize: 1024
6 changes: 3 additions & 3 deletions collectors/googlestackdriver/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "googlestackdriver-collector",
"version": "1.2.11",
"version": "1.2.12",
"description": "Alert Logic AWS based Googlestackdriver Log Collector",
"repository": {},
"private": true,
Expand All @@ -27,10 +27,10 @@
"dependencies": {
"@alertlogic/al-collector-js": "3.0.11",
"@alertlogic/paws-collector": "2.2.3",
"@google-cloud/logging": "^11.0.0",
"google-proto-files": "^4.2.0",
"async": "^3.2.5",
"debug": "^4.3.5",
"google-auth-library": "^9.11.0",
"googleapis": "^126.0.0",
"moment": "2.30.1"
},
"author": "Alert Logic Inc."
Expand Down
Loading

0 comments on commit 279bac0

Please sign in to comment.