From 6e2f4e45a95aab1a5629da5bf362d04e5b5731b1 Mon Sep 17 00:00:00 2001 From: Alfred Gedeon Date: Tue, 28 Nov 2023 18:02:19 -0800 Subject: [PATCH] Update service tests --- .builder/actions/build_samples.py | 17 + .github/workflows/ci.yml | 9 + samples/utils/CommandLineUtils.cpp | 2 + samples/utils/CommandLineUtils.h | 2 + servicetests/test_cases/mqtt3_jobs_cfg.json | 32 ++ servicetests/test_cases/mqtt5_jobs_cfg.json | 32 ++ .../test_cases/test_jobs_execution.py | 103 ++++ .../tests/JobsExecution/CMakeLists.txt | 28 + servicetests/tests/JobsExecution/main.cpp | 529 ++++++++++++++++++ servicetests/tests/JobsExecution/main.mqtt5 | 182 ++++++ 10 files changed, 936 insertions(+) create mode 100644 servicetests/test_cases/mqtt3_jobs_cfg.json create mode 100644 servicetests/test_cases/mqtt5_jobs_cfg.json create mode 100644 servicetests/test_cases/test_jobs_execution.py create mode 100644 servicetests/tests/JobsExecution/CMakeLists.txt create mode 100644 servicetests/tests/JobsExecution/main.cpp create mode 100644 servicetests/tests/JobsExecution/main.mqtt5 diff --git a/.builder/actions/build_samples.py b/.builder/actions/build_samples.py index fd6a46a24..e8f481ab1 100644 --- a/.builder/actions/build_samples.py +++ b/.builder/actions/build_samples.py @@ -51,6 +51,10 @@ def run(self, env): 'deviceadvisor/tests/shadow_update' ] + servicetests = [ + 'servicetests/tests/JobsExecution/', + ] + for sample_path in samples: build_path = os.path.join('build', sample_path) steps.append(['cmake', @@ -64,6 +68,19 @@ def run(self, env): '--build', build_path, '--config', 'RelWithDebInfo']) + for sample_path in servicetests: + build_path = os.path.join('build', sample_path) + steps.append(['cmake', + f'-B{build_path}', + f'-H{sample_path}', + f'-DCMAKE_PREFIX_PATH={env.install_dir}', + '-DCMAKE_BUILD_TYPE=RelWithDebInfo']) + # append extra cmake configs + steps[-1].extend(cmd_args.cmake_extra) + steps.append(['cmake', + '--build', build_path, + '--config', 'RelWithDebInfo']) + for sample_path in da_samples: build_path = os.path.join('build', sample_path) steps.append(['cmake', diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index eac8fe274..4b9788f61 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -22,6 +22,7 @@ env: CI_FOLDER: "D:/a/work" CI_UTILS_FOLDER: "./aws-iot-device-sdk-cpp-v2/utils" CI_SAMPLES_CFG_FOLDER: "./aws-iot-device-sdk-cpp-v2/.github/workflows" + CI_SERVICE_TESTS_CFG_FOLDER: "./aws-iot-device-sdk-cpp-v2/servicetests/test_cases" CI_IOT_CONTAINERS: arn:aws:iam::123124136734:role/CRT_IoT_Containers CI_PUBSUB_ROLE: arn:aws:iam::180635532705:role/CI_PubSub_Role CI_CYCLEPUBSUB_ROLE: arn:aws:iam::180635532705:role/CI_CyclePubSub_Role @@ -570,6 +571,14 @@ jobs: run: | python3 ${{ env.CI_UTILS_FOLDER }}/run_sample_ci.py --file ${{ env.CI_SAMPLES_CFG_FOLDER }}/ci_run_greengrass_discovery_cfg.json + - name: run mqtt3 Jobs sericetests + run: | + python3 ${{ env.CI_UTILS_FOLDER }}/run_sample_ci.py --file ${{ env.CI_SERVICE_TESTS_CFG_FOLDER}}/mqtt3_jobs_cfg.json + + - name: run mqtt5 Jobs sericetests + run: | + python3 ${{ env.CI_UTILS_FOLDER }}/run_sample_ci.py --file ${{ env.CI_SERVICE_TESTS_CFG_FOLDER}}/mqtt5_jobs_cfg.json + # check that docs can still build check-docs: runs-on: ubuntu-20.04 # latest diff --git a/samples/utils/CommandLineUtils.cpp b/samples/utils/CommandLineUtils.cpp index 40e08b79a..72a7c7387 100644 --- a/samples/utils/CommandLineUtils.cpp +++ b/samples/utils/CommandLineUtils.cpp @@ -14,6 +14,7 @@ namespace Utils { // The command names for the samples + static const char *m_cmd_mqtt_version = "mqtt_version"; static const char *m_cmd_endpoint = "endpoint"; static const char *m_cmd_ca_file = "ca_file"; static const char *m_cmd_cert_file = "cert"; @@ -418,6 +419,7 @@ namespace Utils cmdData->input_ca = cmdUtils->GetCommand(m_cmd_ca_file); } cmdData->input_isCI = cmdUtils->HasCommand(m_cmd_is_ci); + cmdData->input_mqtt_version = atoi(cmdUtils->GetCommandOrDefault(m_cmd_mqtt_version, "3").c_str()); } static void s_populateTopic(CommandLineUtils *cmdUtils, cmdData *cmdData) diff --git a/samples/utils/CommandLineUtils.h b/samples/utils/CommandLineUtils.h index 16c7b2db0..413550605 100644 --- a/samples/utils/CommandLineUtils.h +++ b/samples/utils/CommandLineUtils.h @@ -279,6 +279,8 @@ namespace Utils Aws::Crt::String input_pkcs12Password; // Greengrass Discovery bool input_PrintDiscoverRespOnly; + // MQTT protocol version + uint64_t input_mqtt_version; }; cmdData parseSampleInputDeviceDefender(int argc, char *argv[], Aws::Crt::ApiHandle *api_handle); diff --git a/servicetests/test_cases/mqtt3_jobs_cfg.json b/servicetests/test_cases/mqtt3_jobs_cfg.json new file mode 100644 index 000000000..0074b1748 --- /dev/null +++ b/servicetests/test_cases/mqtt3_jobs_cfg.json @@ -0,0 +1,32 @@ +{ + "language": "CPP", + "sample_file": "./aws-iot-device-sdk-cpp-v2/build/servicestests/tests/JobsExecution/describe-job-execution", + "sample_region": "us-east-1", + "sample_main_class": "", + "arguments": [ + { + "name": "--mqtt_version", + "data": "3" + }, + { + "name": "--endpoint", + "secret": "ci/endpoint" + }, + { + "name": "--cert", + "data": "certificate.pem.crt" + }, + { + "name": "--key", + "data": "private.pem.key" + }, + { + "name": "--thing_name", + "data": "ServiceTest_Jobs_$INPUT_UUID" + }, + { + "name": "--is_ci", + "data": "true" + } + ] +} diff --git a/servicetests/test_cases/mqtt5_jobs_cfg.json b/servicetests/test_cases/mqtt5_jobs_cfg.json new file mode 100644 index 000000000..68280c0fb --- /dev/null +++ b/servicetests/test_cases/mqtt5_jobs_cfg.json @@ -0,0 +1,32 @@ +{ + "language": "CPP", + "sample_file": "./aws-iot-device-sdk-cpp-v2/build/servicestests/tests/JobsExecution/describe-job-execution", + "sample_region": "us-east-1", + "sample_main_class": "", + "arguments": [ + { + "name": "--mqtt_version", + "data": "5" + }, + { + "name": "--endpoint", + "secret": "ci/endpoint" + }, + { + "name": "--cert", + "data": "certificate.pem.crt" + }, + { + "name": "--key", + "data": "private.pem.key" + }, + { + "name": "--thing_name", + "data": "ServiceTest_Jobs_$INPUT_UUID" + }, + { + "name": "--is_ci", + "data": "true" + } + ] +} diff --git a/servicetests/test_cases/test_jobs_execution.py b/servicetests/test_cases/test_jobs_execution.py new file mode 100644 index 000000000..2a8610c8d --- /dev/null +++ b/servicetests/test_cases/test_jobs_execution.py @@ -0,0 +1,103 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0. + +import argparse +import json +import os +import sys +import uuid + +import boto3 + +import run_in_ci +import ci_iot_thing + + +def main(): + argument_parser = argparse.ArgumentParser( + description="Run Jobs test in CI") + argument_parser.add_argument( + "--config-file", required=True, help="JSON file providing command-line arguments for a test") + argument_parser.add_argument( + "--input-uuid", required=False, help="UUID for thing name. UUID will be generated if this option is omit") + argument_parser.add_argument( + "--region", required=False, default="us-east-1", help="The name of the region to use") + parsed_commands = argument_parser.parse_args() + + try: + iot_client = boto3.client('iot', region_name=parsed_commands.region) + secrets_client = boto3.client("secretsmanager", region_name=parsed_commands.region) + except Exception as e: + print(f"ERROR: Could not make Boto3 iot-data client. Credentials likely could not be sourced. Exception: {e}", + file=sys.stderr) + return -1 + + input_uuid = parsed_commands.input_uuid if parsed_commands.input_uuid else str(uuid.uuid4()) + + thing_name = "ServiceTest_Jobs_" + input_uuid + policy_name = secrets_client.get_secret_value( + SecretId="ci/JobsServiceClientTest/policy_name")["SecretString"] + + # Temporary certificate/key file path. + certificate_path = os.path.join(os.getcwd(), "tests/jobs_execution/certificate.pem.crt") + key_path = os.path.join(os.getcwd(), "tests/jobs_execution/private.pem.key") + + try: + ci_iot_thing.create_iot_thing( + thing_name=thing_name, + thing_group="CI_ServiceClient_Thing_Group", + region=parsed_commands.region, + policy_name=policy_name, + certificate_path=certificate_path, + key_path=key_path) + except Exception as e: + print(f"ERROR: Failed to create IoT thing: {e}") + sys.exit(-1) + + # Perform Jobs test. If it's successful, the Job execution should be marked as SUCCEEDED for the thing. + try: + test_result = run_in_ci.setup_and_launch(parsed_commands.config_file, input_uuid) + except Exception as e: + print(f"ERROR: Failed to execute Jobs test: {e}") + test_result = -1 + + # Test reported success, verify that Job was indeed executed by the thing. + if test_result == 0: + print("Verifying that Job was executed") + try: + job_id = secrets_client.get_secret_value(SecretId="ci/JobsServiceClientTest/job_id")["SecretString"] + thing_job = iot_client.describe_job_execution(jobId=job_id, thingName=thing_name) + job_status = thing_job.get('execution', {}).get('status', {}) + if job_status != 'SUCCEEDED': + print(f"ERROR: Could not verify Job execution; Job info: {thing_job}") + test_result = -1 + except Exception as e: + print(f"ERROR: Could not verify Job execution: {e}") + test_result = -1 + + if test_result == 0: + print("Test succeeded") + + # Delete a thing created for this test run. + # NOTE We want to try to delete thing even if test was unsuccessful. + try: + ci_iot_thing.delete_iot_thing(thing_name, parsed_commands.region) + except Exception as e: + print(f"ERROR: Failed to delete thing: {e}") + # Fail the test if unable to delete thing, so this won't remain unnoticed. + test_result = -1 + + try: + if os.path.isfile(certificate_path): + os.remove(certificate_path) + if os.path.isfile(key_path): + os.remove(key_path) + except Exception as e: + print(f"WARNING: Failed to delete local files: {e}") + + if test_result != 0: + sys.exit(-1) + + +if __name__ == "__main__": + main() diff --git a/servicetests/tests/JobsExecution/CMakeLists.txt b/servicetests/tests/JobsExecution/CMakeLists.txt new file mode 100644 index 000000000..f1be07fae --- /dev/null +++ b/servicetests/tests/JobsExecution/CMakeLists.txt @@ -0,0 +1,28 @@ +cmake_minimum_required(VERSION 3.1) +# note: cxx-17 requires cmake 3.8, cxx-20 requires cmake 3.12 +project(describe-job-execution CXX) + +file(GLOB SRC_FILES + "*.cpp" + "../../../samples/utils/CommandLineUtils.cpp" + "../../../samples/utils/CommandLineUtils.h" +) + +add_executable(${PROJECT_NAME} ${SRC_FILES}) + +set_target_properties(${PROJECT_NAME} PROPERTIES + CXX_STANDARD 14) + +#set warnings +if (MSVC) + target_compile_options(${PROJECT_NAME} PRIVATE /W4 /WX) +else () + target_compile_options(${PROJECT_NAME} PRIVATE -Wall -Wno-long-long -pedantic -Werror) +endif () + +find_package(aws-crt-cpp REQUIRED) +find_package(IotJobs-cpp REQUIRED) + +install(TARGETS ${PROJECT_NAME} DESTINATION bin) + +target_link_libraries(${PROJECT_NAME} PRIVATE AWS::aws-crt-cpp AWS::IotJobs-cpp) diff --git a/servicetests/tests/JobsExecution/main.cpp b/servicetests/tests/JobsExecution/main.cpp new file mode 100644 index 000000000..edaa5df69 --- /dev/null +++ b/servicetests/tests/JobsExecution/main.cpp @@ -0,0 +1,529 @@ +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ +#include +#include +#include + +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +#include "../../../samples/utils/CommandLineUtils.h" + + +using namespace Aws::Crt; +using namespace Aws::Iotjobs; + +void getAvailableJobs(Aws::Crt::String thingName, IotJobsClient &jobsClient); + +std::vector availableJobs; + +int main(int argc, char *argv[]) +{ + /************************ Setup ****************************/ + + // Do the global initialization for the API + ApiHandle apiHandle; + + /** + * cmdData is the arguments/input from the command line placed into a single struct for + * use in this sample. This handles all of the command line parsing, validating, etc. + * See the Utils/CommandLineUtils for more information. + */ + Utils::cmdData cmdData = Utils::parseSampleInputJobs(argc, argv, &apiHandle); + + /** + * In a real world application you probably don't want to enforce synchronous behavior + * but this is a sample console application, so we'll just do that with a condition variable. + */ + std::promise connectionCompletedPromise; + std::promise connectionClosedPromise; + std::promise connectionPromise; + std::promise stoppedPromise; + + Aws::Iot::MqttClientConnectionConfigBuilder clientConfigBuilder; + std::shared_ptr connection; + + Aws::Iot::Mqtt5ClientBuilder *builder = Aws::Iot::Mqtt5ClientBuilder::NewMqtt5ClientBuilderWithMtlsFromPath( + cmdData.input_endpoint, cmdData.input_cert.c_str(), cmdData.input_key.c_str()); + + // Check if the builder setup correctly. + if (builder == nullptr) + { + printf( + "Failed to setup mqtt5 client builder with error code %d: %s", LastError(), ErrorDebugString(LastError())); + return -1; + } + // Create Mqtt5Client + std::shared_ptr client = builder->Build(); + + if (cmdData.input_mqtt_version == 5UL) + { + // Create the MQTT5 builder and populate it with data from cmdData. + + // Setup connection options + std::shared_ptr connectOptions = std::make_shared(); + connectOptions->WithClientId(cmdData.input_clientId); + builder->WithConnectOptions(connectOptions); + if (cmdData.input_port != 0) + { + builder->WithPort(static_cast(cmdData.input_port)); + } + + // Setup lifecycle callbacks + builder->WithClientConnectionSuccessCallback( + [&connectionPromise](const Mqtt5::OnConnectionSuccessEventData &eventData) { + fprintf( + stdout, + "Mqtt5 Client connection succeed, clientid: %s.\n", + eventData.negotiatedSettings->getClientId().c_str()); + connectionPromise.set_value(true); + }); + builder->WithClientConnectionFailureCallback([&connectionPromise]( + const Mqtt5::OnConnectionFailureEventData &eventData) { + fprintf(stdout, "Mqtt5 Client connection failed with error: %s.\n", aws_error_debug_str(eventData.errorCode)); + connectionPromise.set_value(false); + }); + builder->WithClientStoppedCallback([&stoppedPromise](const Mqtt5::OnStoppedEventData &) { + fprintf(stdout, "Mqtt5 Client stopped.\n"); + stoppedPromise.set_value(); + }); + + + fprintf(stdout, "Connecting...\n"); + if (!client->Start()) + { + fprintf(stderr, "MQTT5 Connection failed to start"); + exit(-1); + } + } + else if (cmdData.input_mqtt_version == 3UL) + { + // Create the MQTT builder and populate it with data from cmdData. + clientConfigBuilder = + Aws::Iot::MqttClientConnectionConfigBuilder(cmdData.input_cert.c_str(), cmdData.input_key.c_str()); + clientConfigBuilder.WithEndpoint(cmdData.input_endpoint); + if (cmdData.input_ca != "") + { + clientConfigBuilder.WithCertificateAuthority(cmdData.input_ca.c_str()); + } + + // Create the MQTT connection from the MQTT builder + auto clientConfig = clientConfigBuilder.Build(); + if (!clientConfig) + { + fprintf( + stderr, + "Client Configuration initialization failed with error %s\n", + Aws::Crt::ErrorDebugString(clientConfig.LastError())); + exit(-1); + } + + + Aws::Iot::MqttClient client = Aws::Iot::MqttClient(); + //auto connection = client.NewConnection(clientConfig); + //std::shared_ptr connection = + connection = client.NewConnection(clientConfig); + if (!*connection) + { + fprintf( + stderr, + "MQTT Connection Creation failed with error %s\n", + Aws::Crt::ErrorDebugString(connection->LastError())); + exit(-1); + } + + + // Invoked when a MQTT connect has completed or failed + auto onConnectionCompleted = [&](Mqtt::MqttConnection &, int errorCode, Mqtt::ReturnCode returnCode, bool) { + if (errorCode) + { + fprintf(stdout, "Connection failed with error %s\n", ErrorDebugString(errorCode)); + connectionCompletedPromise.set_value(false); + } + else + { + fprintf(stdout, "Connection completed with return code %d\n", returnCode); + connectionCompletedPromise.set_value(true); + } + }; + + // Invoked when a disconnect has been completed + auto onDisconnect = [&](Mqtt::MqttConnection & /*conn*/) { + { + fprintf(stdout, "Disconnect completed\n"); + connectionClosedPromise.set_value(); + } + }; + + connection->OnConnectionCompleted = std::move(onConnectionCompleted); + connection->OnDisconnect = std::move(onDisconnect); + + fprintf(stdout, "Connecting...\n"); + if (!connection->Connect(cmdData.input_clientId.c_str(), true, 0)) + { + fprintf(stderr, "MQTT Connection failed with error %s\n", ErrorDebugString(connection->LastError())); + exit(-1); + } + } + else + { + fprintf(stderr, "MQTT Version %lu not supported\n", cmdData.input_mqtt_version); + exit(-1); + } + + delete builder; + + + + + + /************************ Run the sample ****************************/ + + + if (connectionCompletedPromise.get_future().get()) + { + IotJobsClient jobsClient(connection); + + DescribeJobExecutionSubscriptionRequest describeJobExecutionSubscriptionRequest; + describeJobExecutionSubscriptionRequest.ThingName = cmdData.input_thingName; + describeJobExecutionSubscriptionRequest.JobId = cmdData.input_jobId; + + /** + * This isn't absolutely necessary but since we're doing a publish almost immediately afterwards, + * to be cautious make sure the subscribe has finished before doing the publish. + */ + std::promise subAckedPromise; + auto subAckHandler = [&](int) { + // if error code returns it will be recorded by the other callback + subAckedPromise.set_value(); + }; + auto subscriptionHandler = [&](DescribeJobExecutionResponse *response, int ioErr) { + if (ioErr) + { + fprintf(stderr, "Error %d occurred\n", ioErr); + return; + } + if (response) + { + fprintf(stdout, "Received Job:\n"); + fprintf(stdout, "Job Id: %s\n", response->Execution->JobId->c_str()); + fprintf(stdout, "ClientToken: %s\n", response->ClientToken->c_str()); + fprintf(stdout, "Execution Status: %s\n", JobStatusMarshaller::ToString(*response->Execution->Status)); + } + }; + + getAvailableJobs(cmdData.input_thingName, jobsClient); + for (auto jobid : availableJobs ) + { + jobsClient.SubscribeToDescribeJobExecutionAccepted( + describeJobExecutionSubscriptionRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, subscriptionHandler, subAckHandler); + subAckedPromise.get_future().wait(); + + subAckedPromise = std::promise(); + + auto failureHandler = [&](RejectedError *rejectedError, int ioErr) { + if (ioErr) + { + fprintf(stderr, "Error %d occurred\n", ioErr); + return; + } + if (rejectedError) + { + fprintf( + stderr, + "Service Error %d occurred. Message %s\n", + (int)rejectedError->Code.value(), + rejectedError->Message->c_str()); + return; + } + }; + + jobsClient.SubscribeToDescribeJobExecutionRejected( + describeJobExecutionSubscriptionRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, + failureHandler, subAckHandler); + subAckedPromise.get_future().wait(); + + DescribeJobExecutionRequest describeJobExecutionRequest; + describeJobExecutionRequest.ThingName = cmdData.input_thingName; + describeJobExecutionRequest.JobId = cmdData.input_jobId; + describeJobExecutionRequest.IncludeJobDocument = true; + Aws::Crt::UUID uuid; + describeJobExecutionRequest.ClientToken = uuid.ToString(); + std::promise publishDescribeJobExeCompletedPromise; + + auto publishHandler = [&](int ioErr) { + if (ioErr) + { + fprintf(stderr, "Error %d occurred\n", ioErr); + } + publishDescribeJobExeCompletedPromise.set_value(); + }; + + jobsClient.PublishDescribeJobExecution( + std::move(describeJobExecutionRequest), AWS_MQTT_QOS_AT_LEAST_ONCE, publishHandler); + publishDescribeJobExeCompletedPromise.get_future().wait(); + + Aws::Crt::String currentJobId; + int64_t currentExecutionNumber; + int32_t currentVersionNumber; + + std::promise pendingExecutionPromise; + + { + auto OnSubscribeToStartNextPendingJobExecutionAcceptedResponse = + [&](StartNextJobExecutionResponse *response, int ioErr) { + if (ioErr) + { + fprintf(stderr, "Error %d occurred\n", ioErr); + } + if (response) + { + fprintf(stdout, "Start Job %s\n", response->Execution.value().JobId.value().c_str()); + currentJobId = response->Execution->JobId.value(); + currentExecutionNumber = response->Execution->ExecutionNumber.value(); + currentVersionNumber = response->Execution->VersionNumber.value(); + } + pendingExecutionPromise.set_value(); + }; + + StartNextPendingJobExecutionSubscriptionRequest subscriptionRequest; + subscriptionRequest.ThingName = cmdData.input_thingName; + subAckedPromise = std::promise(); + jobsClient.SubscribeToStartNextPendingJobExecutionAccepted( + subscriptionRequest, + AWS_MQTT_QOS_AT_LEAST_ONCE, + OnSubscribeToStartNextPendingJobExecutionAcceptedResponse, + subAckHandler); + + subAckedPromise.get_future().wait(); + + subAckedPromise = std::promise(); + jobsClient.SubscribeToStartNextPendingJobExecutionRejected( + subscriptionRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, failureHandler, subAckHandler); + + subAckedPromise.get_future().wait(); + + StartNextPendingJobExecutionRequest publishRequest; + publishRequest.ThingName = cmdData.input_thingName; + publishRequest.StepTimeoutInMinutes = 15L; + + publishDescribeJobExeCompletedPromise = std::promise(); + jobsClient.PublishStartNextPendingJobExecution( + publishRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, publishHandler); + + pendingExecutionPromise.get_future().wait(); + } + + { + pendingExecutionPromise = std::promise(); + auto OnSubscribeToUpdateJobExecutionAcceptedResponse = [&](UpdateJobExecutionResponse *response, + int ioErr) { + (void)response; + if (ioErr) + { + fprintf(stderr, "Error %d occurred\n", ioErr); + } + fprintf(stdout, "Marked Job %s IN_PROGRESS", currentJobId.c_str()); + pendingExecutionPromise.set_value(); + }; + UpdateJobExecutionSubscriptionRequest subscriptionRequest; + subscriptionRequest.ThingName = cmdData.input_thingName; + subscriptionRequest.JobId = currentJobId; + + subAckedPromise = std::promise(); + jobsClient.SubscribeToUpdateJobExecutionAccepted( + subscriptionRequest, + AWS_MQTT_QOS_AT_LEAST_ONCE, + OnSubscribeToUpdateJobExecutionAcceptedResponse, + subAckHandler); + subAckedPromise.get_future().wait(); + + subAckedPromise = std::promise(); + jobsClient.SubscribeToUpdateJobExecutionRejected( + subscriptionRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, failureHandler, subAckHandler); + subAckedPromise.get_future().wait(); + + UpdateJobExecutionRequest publishRequest; + publishRequest.ThingName = cmdData.input_thingName; + publishRequest.JobId = currentJobId; + publishRequest.ExecutionNumber = currentExecutionNumber; + publishRequest.Status = JobStatus::IN_PROGRESS; + publishRequest.ExpectedVersion = currentVersionNumber++; + publishDescribeJobExeCompletedPromise = std::promise(); + jobsClient.PublishUpdateJobExecution(publishRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, publishHandler); + + pendingExecutionPromise.get_future().wait(); + } + + // Pretend doing some work + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + + { + pendingExecutionPromise = std::promise(); + UpdateJobExecutionSubscriptionRequest subscriptionRequest; + subscriptionRequest.ThingName = cmdData.input_thingName; + subscriptionRequest.JobId = currentJobId; + + auto subscribeHandler = [&](UpdateJobExecutionResponse *response, int ioErr) { + (void)response; + if (ioErr) + { + fprintf(stderr, "Error %d occurred\n", ioErr); + } + fprintf(stdout, "Marked job %s currentJobId SUCCEEDED", currentJobId.c_str()); + pendingExecutionPromise.set_value(); + }; + subAckedPromise = std::promise(); + jobsClient.SubscribeToUpdateJobExecutionAccepted( + subscriptionRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, subscribeHandler, subAckHandler); + subAckedPromise.get_future().wait(); + + subAckedPromise = std::promise(); + jobsClient.SubscribeToUpdateJobExecutionRejected( + subscriptionRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, failureHandler, subAckHandler); + subAckedPromise.get_future().wait(); + + UpdateJobExecutionRequest publishRequest; + publishRequest.ThingName = cmdData.input_thingName; + publishRequest.JobId = currentJobId; + publishRequest.ExecutionNumber = currentExecutionNumber; + publishRequest.Status = JobStatus::SUCCEEDED; + publishRequest.ExpectedVersion = currentVersionNumber++; + + jobsClient.PublishUpdateJobExecution(publishRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, publishHandler); + + pendingExecutionPromise.get_future().wait(); + } + } + } + // Wait just a little bit to let the console print + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + + if (cmdData.input_mqtt_version == 5UL) + { + // Disconnect + if (client->Stop() == true) + { + stoppedPromise.get_future().wait(); + } + } + else + { // mqtt3 + + // Disconnect + if (connection->Disconnect() == true) + { + connectionClosedPromise.get_future().wait(); + } + } + + return 0; +} + + +void getAvailableJobs(Aws::Crt::String thingName, IotJobsClient &jobsClient) +{ + std::promise getResponse; + std::promise publishDescribeJobExeCompletedPromise; + + GetPendingJobExecutionsSubscriptionRequest subscriptionRequest; + subscriptionRequest.ThingName = thingName; + + auto handler = [&](Aws::Iotjobs::GetPendingJobExecutionsResponse * response, int ioErr) { + if (ioErr) + { + fprintf(stderr, "Error %d occurred\n", ioErr); + } + for (JobExecutionSummary job : response->InProgressJobs.value()) { + availableJobs.push_back(job.JobId.value()); + fprintf(stderr, "In Progress jobs %s\n", job.JobId->c_str()); + } + for (JobExecutionSummary job : response->QueuedJobs.value()) { + availableJobs.push_back(job.JobId.value()); + fprintf(stderr, "Queued jobs %s\n", job.JobId->c_str()); + } + getResponse.set_value(); + + }; + + auto err_handler = [&](Aws::Iotjobs::RejectedError * rejectedError, int ioErr) { + if (ioErr) + { + fprintf(stderr, "Error %d occurred\n", ioErr); + } + if (rejectedError) + { + fprintf( + stderr, + "Service Error %d occurred. Message %s\n", + (int)rejectedError->Code.value(), + rejectedError->Message->c_str()); + } + exit(-1); + }; + + auto publishHandler = [&](int ioErr) { + if (ioErr) + { + fprintf(stderr, "Error %d occurred\n", ioErr); + } + publishDescribeJobExeCompletedPromise.set_value(); + }; + + jobsClient.SubscribeToGetPendingJobExecutionsAccepted(subscriptionRequest, + AWS_MQTT_QOS_AT_LEAST_ONCE, + handler, + publishHandler); + publishDescribeJobExeCompletedPromise.get_future().wait(); + + jobsClient.SubscribeToGetPendingJobExecutionsRejected(subscriptionRequest, + AWS_MQTT_QOS_AT_LEAST_ONCE, + err_handler, + publishHandler); + publishDescribeJobExeCompletedPromise.get_future().wait(); + + GetPendingJobExecutionsRequest publishRequest; + publishRequest.ThingName = thingName; + jobsClient.PublishGetPendingJobExecutions(publishRequest, + AWS_MQTT_QOS_AT_LEAST_ONCE, + publishHandler); + publishDescribeJobExeCompletedPromise.get_future().wait(); + + getResponse.get_future().wait(); +} + +/* +std::function; + +bool SubscribeToGetPendingJobExecutionsRejected( + const Aws::Iotjobs::GetPendingJobExecutionsSubscriptionRequest &request, + Aws::Crt::Mqtt::QOS qos, + const OnSubscribeToGetPendingJobExecutionsRejectedResponse &handler, + const OnSubscribeComplete &onSubAck); +*/ diff --git a/servicetests/tests/JobsExecution/main.mqtt5 b/servicetests/tests/JobsExecution/main.mqtt5 new file mode 100644 index 000000000..c89b27f17 --- /dev/null +++ b/servicetests/tests/JobsExecution/main.mqtt5 @@ -0,0 +1,182 @@ +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ +#include +#include +#include + +#include +#include +#include + +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +#include "../../utils/CommandLineUtils.h" + +using namespace Aws::Crt; +using namespace Aws::Iotjobs; + +int main(int argc, char *argv[]) +{ + /************************ Setup ****************************/ + + // Do the global initialization for the API + ApiHandle apiHandle; + + /** + * cmdData is the arguments/input from the command line placed into a single struct for + * use in this sample. This handles all of the command line parsing, validating, etc. + * See the Utils/CommandLineUtils for more information. + */ + Utils::cmdData cmdData = Utils::parseSampleInputJobs(argc, argv, &apiHandle); + + // Create the MQTT5 builder and populate it with data from cmdData. + Aws::Iot::Mqtt5ClientBuilder *builder = Aws::Iot::Mqtt5ClientBuilder::NewMqtt5ClientBuilderWithMtlsFromPath( + cmdData.input_endpoint, cmdData.input_cert.c_str(), cmdData.input_key.c_str()); + + // Check if the builder setup correctly. + if (builder == nullptr) + { + printf( + "Failed to setup mqtt5 client builder with error code %d: %s", LastError(), ErrorDebugString(LastError())); + return -1; + } + + // Setup connection options + std::shared_ptr connectOptions = std::make_shared(); + connectOptions->WithClientId(cmdData.input_clientId); + builder->WithConnectOptions(connectOptions); + if (cmdData.input_port != 0) + { + builder->WithPort(static_cast(cmdData.input_port)); + } + + std::promise connectionPromise; + std::promise stoppedPromise; + + // Setup lifecycle callbacks + builder->WithClientConnectionSuccessCallback( + [&connectionPromise](const Mqtt5::OnConnectionSuccessEventData &eventData) { + fprintf( + stdout, + "Mqtt5 Client connection succeed, clientid: %s.\n", + eventData.negotiatedSettings->getClientId().c_str()); + connectionPromise.set_value(true); + }); + builder->WithClientConnectionFailureCallback([&connectionPromise]( + const Mqtt5::OnConnectionFailureEventData &eventData) { + fprintf(stdout, "Mqtt5 Client connection failed with error: %s.\n", aws_error_debug_str(eventData.errorCode)); + connectionPromise.set_value(false); + }); + builder->WithClientStoppedCallback([&stoppedPromise](const Mqtt5::OnStoppedEventData &) { + fprintf(stdout, "Mqtt5 Client stopped.\n"); + stoppedPromise.set_value(); + }); + + // Create Mqtt5Client + std::shared_ptr client = builder->Build(); + delete builder; + /************************ Run the sample ****************************/ + + fprintf(stdout, "Connecting...\n"); + if (!client->Start()) + { + fprintf(stderr, "MQTT5 Connection failed to start"); + exit(-1); + } + + if (connectionPromise.get_future().get()) + { + IotJobsClient jobsClient(client); + + DescribeJobExecutionSubscriptionRequest describeJobExecutionSubscriptionRequest; + describeJobExecutionSubscriptionRequest.ThingName = cmdData.input_thingName; + describeJobExecutionSubscriptionRequest.JobId = cmdData.input_jobId; + + /** + * This isn't absolutely necessary but since we're doing a publish almost immediately afterwards, + * to be cautious make sure the subscribe has finished before doing the publish. + */ + std::promise subAckedPromise; + auto subAckHandler = [&](int) { + // if error code returns it will be recorded by the other callback + subAckedPromise.set_value(); + }; + auto subscriptionHandler = [&](DescribeJobExecutionResponse *response, int ioErr) { + if (ioErr) + { + fprintf(stderr, "Error %d occurred\n", ioErr); + return; + } + fprintf(stdout, "Received Job:\n"); + fprintf(stdout, "Job Id: %s\n", response->Execution->JobId->c_str()); + fprintf(stdout, "ClientToken: %s\n", response->ClientToken->c_str()); + fprintf(stdout, "Execution Status: %s\n", JobStatusMarshaller::ToString(*response->Execution->Status)); + }; + + jobsClient.SubscribeToDescribeJobExecutionAccepted( + describeJobExecutionSubscriptionRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, subscriptionHandler, subAckHandler); + subAckedPromise.get_future().wait(); + + subAckedPromise = std::promise(); + auto failureHandler = [&](RejectedError *rejectedError, int ioErr) { + if (ioErr) + { + fprintf(stderr, "Error %d occurred\n", ioErr); + return; + } + if (rejectedError) + { + fprintf(stderr, "Service Error %d occurred\n", (int)rejectedError->Code.value()); + return; + } + }; + + jobsClient.SubscribeToDescribeJobExecutionRejected( + describeJobExecutionSubscriptionRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, failureHandler, subAckHandler); + subAckedPromise.get_future().wait(); + + DescribeJobExecutionRequest describeJobExecutionRequest; + describeJobExecutionRequest.ThingName = cmdData.input_thingName; + describeJobExecutionRequest.JobId = cmdData.input_jobId; + describeJobExecutionRequest.IncludeJobDocument = true; + Aws::Crt::UUID uuid; + describeJobExecutionRequest.ClientToken = uuid.ToString(); + std::promise publishDescribeJobExeCompletedPromise; + + auto publishHandler = [&](int ioErr) { + if (ioErr) + { + fprintf(stderr, "Error %d occurred\n", ioErr); + } + publishDescribeJobExeCompletedPromise.set_value(); + }; + + jobsClient.PublishDescribeJobExecution( + std::move(describeJobExecutionRequest), AWS_MQTT_QOS_AT_LEAST_ONCE, publishHandler); + publishDescribeJobExeCompletedPromise.get_future().wait(); + } + + // Wait just a little bit to let the console print + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + + // Disconnect + if (client->Stop()) + { + stoppedPromise.get_future().wait(); + } + + return 0; +} \ No newline at end of file