From 97adeeb2f7212a8a0964cb016af031135305f800 Mon Sep 17 00:00:00 2001 From: Alfred Gedeon Date: Fri, 1 Dec 2023 12:23:14 -0800 Subject: [PATCH] Review comments --- .github/workflows/ci.yml | 11 + .github/workflows/ci_run_jobs_cfg.json | 2 +- .github/workflows/ci_run_jobs_mqtt5_cfg.json | 7 +- samples/jobs/job_execution/CMakeLists.txt | 28 ++ samples/jobs/job_execution/README.md | 82 ++++ samples/jobs/job_execution/main.cpp | 357 ++++++++++++++++++ .../jobs/mqtt5_job_execution/CMakeLists.txt | 28 ++ samples/jobs/mqtt5_job_execution/README.md | 123 ++++++ samples/jobs/mqtt5_job_execution/main.cpp | 182 +++++++++ 9 files changed, 818 insertions(+), 2 deletions(-) create mode 100644 samples/jobs/job_execution/CMakeLists.txt create mode 100644 samples/jobs/job_execution/README.md create mode 100644 samples/jobs/job_execution/main.cpp create mode 100644 samples/jobs/mqtt5_job_execution/CMakeLists.txt create mode 100644 samples/jobs/mqtt5_job_execution/README.md create mode 100644 samples/jobs/mqtt5_job_execution/main.cpp diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index a0819fa65..dd8d28a2a 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -508,6 +508,17 @@ jobs: - name: run MQTT5 Shared Subscription sample run: | python3 ${{ env.CI_UTILS_FOLDER }}/run_sample_ci.py --file ${{ env.CI_SAMPLES_CFG_FOLDER }}/ci_run_mqtt5_shared_subscription_cfg.json + - name: configure AWS credentials (Jobs) + uses: aws-actions/configure-aws-credentials@v1 + with: + role-to-assume: ${{ env.CI_JOBS_ROLE }} + aws-region: ${{ env.AWS_DEFAULT_REGION }} + - name: run Jobs sample + run: | + python3 ${{ env.CI_UTILS_FOLDER }}/run_sample_ci.py --file ${{ env.CI_SAMPLES_CFG_FOLDER }}/ci_run_jobs_cfg.json + - name: run Mqtt5 Jobs sample + run: | + python3 ${{ env.CI_UTILS_FOLDER }}/run_sample_ci.py --file ${{ env.CI_SAMPLES_CFG_FOLDER }}/ci_run_jobs_mqtt5_cfg.json - name: configure AWS credentials (Cognito) uses: aws-actions/configure-aws-credentials@v1 with: diff --git a/.github/workflows/ci_run_jobs_cfg.json b/.github/workflows/ci_run_jobs_cfg.json index 1a28f96e6..82a6ebac7 100644 --- a/.github/workflows/ci_run_jobs_cfg.json +++ b/.github/workflows/ci_run_jobs_cfg.json @@ -1,6 +1,6 @@ { "language": "CPP", - "sample_file": "./aws-iot-device-sdk-cpp-v2/build/samples/jobs/describe_job_execution/describe-job-execution", + "sample_file": "./aws-iot-device-sdk-cpp-v2/build/samples/jobs/job_execution/job-execution", "sample_region": "us-east-1", "sample_main_class": "", "arguments": [ diff --git a/.github/workflows/ci_run_jobs_mqtt5_cfg.json b/.github/workflows/ci_run_jobs_mqtt5_cfg.json index 23d48c2b1..91eeb6428 100644 --- a/.github/workflows/ci_run_jobs_mqtt5_cfg.json +++ b/.github/workflows/ci_run_jobs_mqtt5_cfg.json @@ -1,6 +1,6 @@ { "language": "CPP", - "sample_file": "./aws-iot-device-sdk-cpp-v2/build/samples/jobs/mqtt5_describe_job_execution/mqtt5-describe-job-execution", + "sample_file": "./aws-iot-device-sdk-cpp-v2/build/samples/jobs/mqtt5_job_execution/mqtt5-job-execution", "sample_region": "us-east-1", "sample_main_class": "", "arguments": [ @@ -25,6 +25,11 @@ { "name": "--job_id", "data": "CI_Jobs_Thing_Job_1" + }, + { + "name": "--is_ci", + "data": "true" } + ] } diff --git a/samples/jobs/job_execution/CMakeLists.txt b/samples/jobs/job_execution/CMakeLists.txt new file mode 100644 index 000000000..a0ce228b7 --- /dev/null +++ b/samples/jobs/job_execution/CMakeLists.txt @@ -0,0 +1,28 @@ +cmake_minimum_required(VERSION 3.1) +# note: cxx-17 requires cmake 3.8, cxx-20 requires cmake 3.12 +project(job-execution CXX) + +file(GLOB SRC_FILES + "*.cpp" + "../../utils/CommandLineUtils.cpp" + "../../utils/CommandLineUtils.h" +) + +add_executable(${PROJECT_NAME} ${SRC_FILES}) + +set_target_properties(${PROJECT_NAME} PROPERTIES + CXX_STANDARD 14) + +#set warnings +if (MSVC) + target_compile_options(${PROJECT_NAME} PRIVATE /W4 /WX) +else () + target_compile_options(${PROJECT_NAME} PRIVATE -Wall -Wno-long-long -pedantic -Werror) +endif () + +find_package(aws-crt-cpp REQUIRED) +find_package(IotJobs-cpp REQUIRED) + +install(TARGETS ${PROJECT_NAME} DESTINATION bin) + +target_link_libraries(${PROJECT_NAME} PRIVATE AWS::aws-crt-cpp AWS::IotJobs-cpp) diff --git a/samples/jobs/job_execution/README.md b/samples/jobs/job_execution/README.md new file mode 100644 index 000000000..9b32156c8 --- /dev/null +++ b/samples/jobs/job_execution/README.md @@ -0,0 +1,82 @@ +# Jobs + +[**Return to main sample list**](../../README.md) + +This sample uses the AWS IoT [Jobs](https://docs.aws.amazon.com/iot/latest/developerguide/iot-jobs.html) Service to describe jobs to execute. [Jobs](https://docs.aws.amazon.com/iot/latest/developerguide/iot-jobs.html) is a service that allows you to define and respond to remote operation requests defined through the AWS IoT Core website or via any other device (or CLI command) that can access the [Jobs](https://docs.aws.amazon.com/iot/latest/developerguide/iot-jobs.html) service. + +Note: This sample requires you to create jobs for your device to execute. See +[instructions here](https://docs.aws.amazon.com/iot/latest/developerguide/create-manage-jobs.html) for how to make jobs. + +On startup, the sample describes the jobs that are pending execution. + +Your IoT Core Thing's [Policy](https://docs.aws.amazon.com/iot/latest/developerguide/iot-policies.html) must provide privileges for this sample to connect, subscribe, publish, and receive. Below is a sample policy that can be used on your IoT Core Thing that will allow this sample to run as intended. + +
+Sample Policy +
+{
+  "Version": "2012-10-17",
+  "Statement": [
+    {
+      "Effect": "Allow",
+      "Action": "iot:Publish",
+      "Resource": [
+        "arn:aws:iot:region:account:topic/$aws/things/thingname/jobs/start-next",
+        "arn:aws:iot:region:account:topic/$aws/things/thingname/jobs/*/update",
+        "arn:aws:iot:region:account:topic/$aws/things/thingname/jobs/*/get",
+        "arn:aws:iot:region:account:topic/$aws/things/thingname/jobs/get"
+      ]
+    },
+    {
+      "Effect": "Allow",
+      "Action": "iot:Receive",
+      "Resource": [
+        "arn:aws:iot:region:account:topic/$aws/things/thingname/jobs/notify-next",
+        "arn:aws:iot:region:account:topic/$aws/things/thingname/jobs/start-next/*",
+        "arn:aws:iot:region:account:topic/$aws/things/thingname/jobs/*/update/*",
+        "arn:aws:iot:region:account:topic/$aws/things/thingname/jobs/get/*",
+        "arn:aws:iot:region:account:topic/$aws/things/thingname/jobs/*/get/*"
+      ]
+    },
+    {
+      "Effect": "Allow",
+      "Action": "iot:Subscribe",
+      "Resource": [
+        "arn:aws:iot:region:account:topicfilter/$aws/things/thingname/jobs/notify-next",
+        "arn:aws:iot:region:account:topicfilter/$aws/things/thingname/jobs/start-next/*",
+        "arn:aws:iot:region:account:topicfilter/$aws/things/thingname/jobs/*/update/*",
+        "arn:aws:iot:region:account:topicfilter/$aws/things/thingname/jobs/get/*",
+        "arn:aws:iot:region:account:topicfilter/$aws/things/thingname/jobs/*/get/*"
+      ]
+    },
+    {
+      "Effect": "Allow",
+      "Action": "iot:Connect",
+      "Resource": "arn:aws:iot:region:account:client/test-*"
+    }
+  ]
+}
+
+ +Replace with the following with the data from your AWS account: +* ``: The AWS IoT Core region where you created your AWS IoT Core thing you wish to use with this sample. For example `us-east-1`. +* ``: Your AWS IoT Core account ID. This is the set of numbers in the top right next to your AWS account name when using the AWS IoT Core website. +* ``: The name of your AWS IoT Core thing you want the device connection to be associated with + +Note that in a real application, you may want to avoid the use of wildcards in your ClientID or use them selectively. Please follow best practices when working with AWS on production applications using the SDK. Also, for the purposes of this sample, please make sure your policy allows a client ID of `test-*` to connect or use `--client_id ` to send the client ID your policy supports. + +
+ +## How to run + +Use the following command to run the Jobs sample: + +``` sh +./describe-job-execution --endpoint --cert --key --thing_name --job_id +``` + +You can also pass a Certificate Authority file (CA) if your certificate and key combination requires it: + +``` sh +./describe-job-execution --endpoint --cert --key --thing_name --job_id --ca_file +``` diff --git a/samples/jobs/job_execution/main.cpp b/samples/jobs/job_execution/main.cpp new file mode 100644 index 000000000..edb4b62f5 --- /dev/null +++ b/samples/jobs/job_execution/main.cpp @@ -0,0 +1,357 @@ +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ +#include +#include +#include + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +#include "../../utils/CommandLineUtils.h" + +using namespace Aws::Crt; +using namespace Aws::Iotjobs; + +void updateJobExecution(enum JobStatus status, String thingName, + String currentJobId, IotJobsClient &jobsClient, + int32_t ¤tVersionNumber, int64_t ¤tExecutionNumber); + +int main(int argc, char *argv[]) +{ + /************************ Setup ****************************/ + + // Do the global initialization for the API + ApiHandle apiHandle; + + /** + * cmdData is the arguments/input from the command line placed into a single struct for + * use in this sample. This handles all of the command line parsing, validating, etc. + * See the Utils/CommandLineUtils for more information. + */ + Utils::cmdData cmdData = Utils::parseSampleInputJobs(argc, argv, &apiHandle); + + // Create the MQTT builder and populate it with data from cmdData. + auto clientConfigBuilder = + Aws::Iot::MqttClientConnectionConfigBuilder(cmdData.input_cert.c_str(), cmdData.input_key.c_str()); + clientConfigBuilder.WithEndpoint(cmdData.input_endpoint); + if (cmdData.input_ca != "") + { + clientConfigBuilder.WithCertificateAuthority(cmdData.input_ca.c_str()); + } + + // Create the MQTT connection from the MQTT builder + auto clientConfig = clientConfigBuilder.Build(); + if (!clientConfig) + { + fprintf( + stderr, + "Client Configuration initialization failed with error %s\n", + Aws::Crt::ErrorDebugString(clientConfig.LastError())); + exit(-1); + } + Aws::Iot::MqttClient client = Aws::Iot::MqttClient(); + auto connection = client.NewConnection(clientConfig); + if (!*connection) + { + fprintf( + stderr, + "MQTT Connection Creation failed with error %s\n", + Aws::Crt::ErrorDebugString(connection->LastError())); + exit(-1); + } + + /** + * In a real world application you probably don't want to enforce synchronous behavior + * but this is a sample console application, so we'll just do that with a condition variable. + */ + std::promise connectionCompletedPromise; + std::promise connectionClosedPromise; + + // Invoked when a MQTT connect has completed or failed + auto onConnectionCompleted = [&](Mqtt::MqttConnection &, int errorCode, Mqtt::ReturnCode returnCode, bool) { + if (errorCode) + { + fprintf(stdout, "Connection failed with error %s\n", ErrorDebugString(errorCode)); + connectionCompletedPromise.set_value(false); + } + else + { + fprintf(stdout, "Connection completed with return code %d\n", returnCode); + connectionCompletedPromise.set_value(true); + } + }; + + // Invoked when a disconnect has been completed + auto onDisconnect = [&](Mqtt::MqttConnection & /*conn*/) { + { + fprintf(stdout, "Disconnect completed\n"); + connectionClosedPromise.set_value(); + } + }; + + connection->OnConnectionCompleted = std::move(onConnectionCompleted); + connection->OnDisconnect = std::move(onDisconnect); + + /************************ Run the sample ****************************/ + + fprintf(stdout, "Connecting with MQTT3...\n"); + if (!connection->Connect(cmdData.input_clientId.c_str(), true, 0)) + { + fprintf(stderr, "MQTT Connection failed with error %s\n", ErrorDebugString(connection->LastError())); + exit(-1); + } + + if (connectionCompletedPromise.get_future().get()) + { + IotJobsClient jobsClient(connection); + + DescribeJobExecutionSubscriptionRequest describeJobExecutionSubscriptionRequest; + describeJobExecutionSubscriptionRequest.ThingName = cmdData.input_thingName; + describeJobExecutionSubscriptionRequest.JobId = cmdData.input_jobId; + + /** + * This isn't absolutely necessary but since we're doing a publish almost immediately afterwards, + * to be cautious make sure the subscribe has finished before doing the publish. + */ + std::promise subAckedPromise; + auto subAckHandler = [&](int) { + // if error code returns it will be recorded by the other callback + subAckedPromise.set_value(); + }; + auto subscriptionHandler = [&](DescribeJobExecutionResponse *response, int ioErr) { + if (ioErr) + { + fprintf(stderr, "Error %d occurred\n", ioErr); + exit(-1); + } + if (response) + { + fprintf(stdout, "Received Job:\n"); + fprintf(stdout, "Job Id: %s\n", response->Execution->JobId->c_str()); + fprintf(stdout, "ClientToken: %s\n", response->ClientToken->c_str()); + fprintf(stdout, "Execution Status: %s\n", JobStatusMarshaller::ToString(*response->Execution->Status)); + } + }; + + jobsClient.SubscribeToDescribeJobExecutionAccepted( + describeJobExecutionSubscriptionRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, subscriptionHandler, subAckHandler); + subAckedPromise.get_future().wait(); + + subAckedPromise = std::promise(); + + auto failureHandler = [&](RejectedError *rejectedError, int ioErr) { + if (ioErr) + { + fprintf(stderr, "Error %d occurred\n", ioErr); + exit(-1); + } + if (rejectedError) + { + fprintf( + stderr, + "Service Error %d occurred. Message %s\n", + (int)rejectedError->Code.value(), + rejectedError->Message->c_str()); + return; + } + }; + + jobsClient.SubscribeToDescribeJobExecutionRejected( + describeJobExecutionSubscriptionRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, failureHandler, subAckHandler); + subAckedPromise.get_future().wait(); + + DescribeJobExecutionRequest describeJobExecutionRequest; + describeJobExecutionRequest.ThingName = cmdData.input_thingName; + describeJobExecutionRequest.JobId = cmdData.input_jobId; + describeJobExecutionRequest.IncludeJobDocument = true; + Aws::Crt::UUID uuid; + describeJobExecutionRequest.ClientToken = uuid.ToString(); + std::promise publishDescribeJobExeCompletedPromise; + + auto publishHandler = [&](int ioErr) { + if (ioErr) + { + fprintf(stderr, "Error %d occurred\n", ioErr); + exit(-1); + } + publishDescribeJobExeCompletedPromise.set_value(); + }; + + jobsClient.PublishDescribeJobExecution( + std::move(describeJobExecutionRequest), AWS_MQTT_QOS_AT_LEAST_ONCE, publishHandler); + publishDescribeJobExeCompletedPromise.get_future().wait(); + + if (cmdData.input_isCI == false) + { + Aws::Crt::String currentJobId; + int64_t currentExecutionNumber; + int32_t currentVersionNumber; + + std::promise pendingExecutionPromise; + + { + auto OnSubscribeToStartNextPendingJobExecutionAcceptedResponse = + [&](StartNextJobExecutionResponse *response, int ioErr) { + if (ioErr) + { + fprintf(stderr, "Error %d occurred\n", ioErr); + exit(-1); + } + if (response) + { + fprintf(stdout, "Start Job %s\n", response->Execution.value().JobId.value().c_str()); + currentJobId = response->Execution->JobId.value(); + currentExecutionNumber = response->Execution->ExecutionNumber.value(); + currentVersionNumber = response->Execution->VersionNumber.value(); + } else { + fprintf(stdout, "Could not get Job Id exiting\n"); + exit(-1); + } + pendingExecutionPromise.set_value(); + }; + + StartNextPendingJobExecutionSubscriptionRequest subscriptionRequest; + subscriptionRequest.ThingName = cmdData.input_thingName; + subAckedPromise = std::promise(); + jobsClient.SubscribeToStartNextPendingJobExecutionAccepted( + subscriptionRequest, + AWS_MQTT_QOS_AT_LEAST_ONCE, + OnSubscribeToStartNextPendingJobExecutionAcceptedResponse, + subAckHandler); + + subAckedPromise.get_future().wait(); + + subAckedPromise = std::promise(); + jobsClient.SubscribeToStartNextPendingJobExecutionRejected( + subscriptionRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, failureHandler, subAckHandler); + + subAckedPromise.get_future().wait(); + + StartNextPendingJobExecutionRequest publishRequest; + publishRequest.ThingName = cmdData.input_thingName; + publishRequest.StepTimeoutInMinutes = 15L; + + publishDescribeJobExeCompletedPromise = std::promise(); + jobsClient.PublishStartNextPendingJobExecution( + publishRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, publishHandler); + + pendingExecutionPromise.get_future().wait(); + } + + updateJobExecution(JobStatus::IN_PROGRESS, cmdData.input_thingName, + currentJobId, jobsClient, currentVersionNumber, currentExecutionNumber); + // Pretend doing some work + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + + updateJobExecution(JobStatus::SUCCEEDED, cmdData.input_thingName, + currentJobId, jobsClient, currentVersionNumber, currentExecutionNumber); + } + } + + // Wait just a little bit to let the console print + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + + // Disconnect + if (connection->Disconnect()) + { + connectionClosedPromise.get_future().wait(); + } + + return 0; +} + +void updateJobExecution(enum JobStatus status, String thingName, + String currentJobId, IotJobsClient &jobsClient, + int32_t ¤tVersionNumber, int64_t ¤tExecutionNumber) +{ + std::promise publishDescribeJobExeCompletedPromise; + std::promise pendingExecutionPromise = std::promise(); + std::promise subAckedPromise; + + UpdateJobExecutionSubscriptionRequest subscriptionRequest; + subscriptionRequest.ThingName = thingName; + subscriptionRequest.JobId = currentJobId; + + auto subAckHandler = [&](int) { + // if error code returns it will be recorded by the other callback + subAckedPromise.set_value(); + }; + auto failureHandler = [&](RejectedError *rejectedError, int ioErr) { + if (ioErr) + { + fprintf(stderr, "Error %d occurred\n", ioErr); + exit(-1); + } + if (rejectedError) + { + fprintf( + stderr, + "Service Error %d occurred. Message %s\n", + (int)rejectedError->Code.value(), + rejectedError->Message->c_str()); + return; + } + }; + auto subscribeHandler = [&](UpdateJobExecutionResponse *response, int ioErr) { + (void)response; + if (ioErr) + { + fprintf(stderr, "Error %d occurred\n", ioErr); + exit(-1); + } + fprintf(stdout, "Marked job %s currentJobId SUCCEEDED", currentJobId.c_str()); + pendingExecutionPromise.set_value(); + }; + + auto publishHandler = [&](int ioErr) { + if (ioErr) + { + fprintf(stderr, "Error %d occurred\n", ioErr); + exit(-1); + } + publishDescribeJobExeCompletedPromise.set_value(); + }; + subAckedPromise = std::promise(); + jobsClient.SubscribeToUpdateJobExecutionAccepted( + subscriptionRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, subscribeHandler, subAckHandler); + subAckedPromise.get_future().wait(); + + subAckedPromise = std::promise(); + jobsClient.SubscribeToUpdateJobExecutionRejected( + subscriptionRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, failureHandler, subAckHandler); + subAckedPromise.get_future().wait(); + + UpdateJobExecutionRequest publishRequest; + publishRequest.ThingName = thingName; + publishRequest.JobId = currentJobId; + publishRequest.ExecutionNumber = currentExecutionNumber; + publishRequest.Status = status; + publishRequest.ExpectedVersion = currentVersionNumber++; + + jobsClient.PublishUpdateJobExecution(publishRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, publishHandler); + + pendingExecutionPromise.get_future().wait(); +} diff --git a/samples/jobs/mqtt5_job_execution/CMakeLists.txt b/samples/jobs/mqtt5_job_execution/CMakeLists.txt new file mode 100644 index 000000000..0affcba5c --- /dev/null +++ b/samples/jobs/mqtt5_job_execution/CMakeLists.txt @@ -0,0 +1,28 @@ +cmake_minimum_required(VERSION 3.1) +# note: cxx-17 requires cmake 3.8, cxx-20 requires cmake 3.12 +project(mqtt5-job-execution CXX) + +file(GLOB SRC_FILES + "*.cpp" + "../../utils/CommandLineUtils.cpp" + "../../utils/CommandLineUtils.h" +) + +add_executable(${PROJECT_NAME} ${SRC_FILES}) + +set_target_properties(${PROJECT_NAME} PROPERTIES + CXX_STANDARD 14) + +#set warnings +if (MSVC) + target_compile_options(${PROJECT_NAME} PRIVATE /W4 /WX) +else () + target_compile_options(${PROJECT_NAME} PRIVATE -Wall -Wno-long-long -pedantic -Werror) +endif () + +find_package(aws-crt-cpp REQUIRED) +find_package(IotJobs-cpp REQUIRED) + +install(TARGETS ${PROJECT_NAME} DESTINATION bin) + +target_link_libraries(${PROJECT_NAME} PRIVATE AWS::aws-crt-cpp AWS::IotJobs-cpp) diff --git a/samples/jobs/mqtt5_job_execution/README.md b/samples/jobs/mqtt5_job_execution/README.md new file mode 100644 index 000000000..e97c806d6 --- /dev/null +++ b/samples/jobs/mqtt5_job_execution/README.md @@ -0,0 +1,123 @@ +# Jobs + +[**Return to main sample list**](../../README.md) + +This sample uses the AWS IoT [Jobs](https://docs.aws.amazon.com/iot/latest/developerguide/iot-jobs.html) Service to describe jobs to execute. [Jobs](https://docs.aws.amazon.com/iot/latest/developerguide/iot-jobs.html) is a service that allows you to define and respond to remote operation requests defined through the AWS IoT Core website or via any other device (or CLI command) that can access the [Jobs](https://docs.aws.amazon.com/iot/latest/developerguide/iot-jobs.html) service. + +Note: This sample requires you to create jobs for your device to execute. See +[instructions here](https://docs.aws.amazon.com/iot/latest/developerguide/create-manage-jobs.html) for how to make jobs. + +On startup, the sample describes the jobs that are pending execution. + +Your IoT Core Thing's [Policy](https://docs.aws.amazon.com/iot/latest/developerguide/iot-policies.html) must provide privileges for this sample to connect, subscribe, publish, and receive. Below is a sample policy that can be used on your IoT Core Thing that will allow this sample to run as intended. + +
+Sample Policy +
+{
+  "Version": "2012-10-17",
+  "Statement": [
+    {
+      "Effect": "Allow",
+      "Action": "iot:Publish",
+      "Resource": [
+        "arn:aws:iot:region:account:topic/$aws/things/thingname/jobs/start-next",
+        "arn:aws:iot:region:account:topic/$aws/things/thingname/jobs/*/update",
+        "arn:aws:iot:region:account:topic/$aws/things/thingname/jobs/*/get",
+        "arn:aws:iot:region:account:topic/$aws/things/thingname/jobs/get"
+      ]
+    },
+    {
+      "Effect": "Allow",
+      "Action": "iot:Receive",
+      "Resource": [
+        "arn:aws:iot:region:account:topic/$aws/things/thingname/jobs/notify-next",
+        "arn:aws:iot:region:account:topic/$aws/things/thingname/jobs/start-next/*",
+        "arn:aws:iot:region:account:topic/$aws/things/thingname/jobs/*/update/*",
+        "arn:aws:iot:region:account:topic/$aws/things/thingname/jobs/get/*",
+        "arn:aws:iot:region:account:topic/$aws/things/thingname/jobs/*/get/*"
+      ]
+    },
+    {
+      "Effect": "Allow",
+      "Action": "iot:Subscribe",
+      "Resource": [
+        "arn:aws:iot:region:account:topicfilter/$aws/things/thingname/jobs/notify-next",
+        "arn:aws:iot:region:account:topicfilter/$aws/things/thingname/jobs/start-next/*",
+        "arn:aws:iot:region:account:topicfilter/$aws/things/thingname/jobs/*/update/*",
+        "arn:aws:iot:region:account:topicfilter/$aws/things/thingname/jobs/get/*",
+        "arn:aws:iot:region:account:topicfilter/$aws/things/thingname/jobs/*/get/*"
+      ]
+    },
+    {
+      "Effect": "Allow",
+      "Action": "iot:Connect",
+      "Resource": "arn:aws:iot:region:account:client/test-*"
+    }
+  ]
+}
+
+ +Replace with the following with the data from your AWS account: +* ``: The AWS IoT Core region where you created your AWS IoT Core thing you wish to use with this sample. For example `us-east-1`. +* ``: Your AWS IoT Core account ID. This is the set of numbers in the top right next to your AWS account name when using the AWS IoT Core website. +* ``: The name of your AWS IoT Core thing you want the device connection to be associated with + +Note that in a real application, you may want to avoid the use of wildcards in your ClientID or use them selectively. Please follow best practices when working with AWS on production applications using the SDK. Also, for the purposes of this sample, please make sure your policy allows a client ID of `test-*` to connect or use `--client_id ` to send the client ID your policy supports. + +
+ +## How to run + +Use the following command to run the Jobs sample: + +``` sh +./mqtt5-describe-job-execution --endpoint --cert --key --thing_name --job_id +``` + +You can also pass a Certificate Authority file (CA) if your certificate and key combination requires it: + +``` sh +./mqtt5-describe-job-execution --endpoint --cert --key --thing_name --job_id --ca_file +``` + +## Service Client Notes +### Difference between MQTT5 and MQTT311 IotJobsClient +The IotJobsClient with Mqtt5 client is identical to Mqtt3 one. We wrapped the Mqtt5Client into MqttClientConnection so that we could keep the same interface for IotJobsClient. +The only difference is that you would need setup up a Mqtt5 Client for the IotJobsClient. For how to setup a Mqtt5 Client, please refer to [MQTT5 UserGuide](../../../documents/MQTT5_Userguide.md) and [MQTT5 PubSub Sample](../../mqtt5/mqtt5_pubsub/) + + + + + + + + + + +
Create a IotJobsClient with Mqtt5Create a IotJobsClient with Mqtt311
+ +```Cpp + // Build Mqtt5Client + std::shared_ptr client = builder->Build(); + + // Create jobs client with mqtt5 client + IotJobsClient jobsClient(client); +``` + + + +```Cpp + // Create mqtt311 connection + Aws::Iot::MqttClient client = Aws::Iot::MqttClient(); + auto connection = client.NewConnection(clientConfig); + + // Create jobs client with mqtt311 connection + IotJobsClient jobsClient(connection); + +``` + +
+ +### Mqtt::QOS v.s. Mqtt5::QOS +As the service client interface is unchanged for Mqtt3 Connection and Mqtt5 Client,the IotJobsClient will use Mqtt::QOS instead of Mqtt5::QOS even with a Mqtt5 Client. diff --git a/samples/jobs/mqtt5_job_execution/main.cpp b/samples/jobs/mqtt5_job_execution/main.cpp new file mode 100644 index 000000000..c89b27f17 --- /dev/null +++ b/samples/jobs/mqtt5_job_execution/main.cpp @@ -0,0 +1,182 @@ +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ +#include +#include +#include + +#include +#include +#include + +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +#include "../../utils/CommandLineUtils.h" + +using namespace Aws::Crt; +using namespace Aws::Iotjobs; + +int main(int argc, char *argv[]) +{ + /************************ Setup ****************************/ + + // Do the global initialization for the API + ApiHandle apiHandle; + + /** + * cmdData is the arguments/input from the command line placed into a single struct for + * use in this sample. This handles all of the command line parsing, validating, etc. + * See the Utils/CommandLineUtils for more information. + */ + Utils::cmdData cmdData = Utils::parseSampleInputJobs(argc, argv, &apiHandle); + + // Create the MQTT5 builder and populate it with data from cmdData. + Aws::Iot::Mqtt5ClientBuilder *builder = Aws::Iot::Mqtt5ClientBuilder::NewMqtt5ClientBuilderWithMtlsFromPath( + cmdData.input_endpoint, cmdData.input_cert.c_str(), cmdData.input_key.c_str()); + + // Check if the builder setup correctly. + if (builder == nullptr) + { + printf( + "Failed to setup mqtt5 client builder with error code %d: %s", LastError(), ErrorDebugString(LastError())); + return -1; + } + + // Setup connection options + std::shared_ptr connectOptions = std::make_shared(); + connectOptions->WithClientId(cmdData.input_clientId); + builder->WithConnectOptions(connectOptions); + if (cmdData.input_port != 0) + { + builder->WithPort(static_cast(cmdData.input_port)); + } + + std::promise connectionPromise; + std::promise stoppedPromise; + + // Setup lifecycle callbacks + builder->WithClientConnectionSuccessCallback( + [&connectionPromise](const Mqtt5::OnConnectionSuccessEventData &eventData) { + fprintf( + stdout, + "Mqtt5 Client connection succeed, clientid: %s.\n", + eventData.negotiatedSettings->getClientId().c_str()); + connectionPromise.set_value(true); + }); + builder->WithClientConnectionFailureCallback([&connectionPromise]( + const Mqtt5::OnConnectionFailureEventData &eventData) { + fprintf(stdout, "Mqtt5 Client connection failed with error: %s.\n", aws_error_debug_str(eventData.errorCode)); + connectionPromise.set_value(false); + }); + builder->WithClientStoppedCallback([&stoppedPromise](const Mqtt5::OnStoppedEventData &) { + fprintf(stdout, "Mqtt5 Client stopped.\n"); + stoppedPromise.set_value(); + }); + + // Create Mqtt5Client + std::shared_ptr client = builder->Build(); + delete builder; + /************************ Run the sample ****************************/ + + fprintf(stdout, "Connecting...\n"); + if (!client->Start()) + { + fprintf(stderr, "MQTT5 Connection failed to start"); + exit(-1); + } + + if (connectionPromise.get_future().get()) + { + IotJobsClient jobsClient(client); + + DescribeJobExecutionSubscriptionRequest describeJobExecutionSubscriptionRequest; + describeJobExecutionSubscriptionRequest.ThingName = cmdData.input_thingName; + describeJobExecutionSubscriptionRequest.JobId = cmdData.input_jobId; + + /** + * This isn't absolutely necessary but since we're doing a publish almost immediately afterwards, + * to be cautious make sure the subscribe has finished before doing the publish. + */ + std::promise subAckedPromise; + auto subAckHandler = [&](int) { + // if error code returns it will be recorded by the other callback + subAckedPromise.set_value(); + }; + auto subscriptionHandler = [&](DescribeJobExecutionResponse *response, int ioErr) { + if (ioErr) + { + fprintf(stderr, "Error %d occurred\n", ioErr); + return; + } + fprintf(stdout, "Received Job:\n"); + fprintf(stdout, "Job Id: %s\n", response->Execution->JobId->c_str()); + fprintf(stdout, "ClientToken: %s\n", response->ClientToken->c_str()); + fprintf(stdout, "Execution Status: %s\n", JobStatusMarshaller::ToString(*response->Execution->Status)); + }; + + jobsClient.SubscribeToDescribeJobExecutionAccepted( + describeJobExecutionSubscriptionRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, subscriptionHandler, subAckHandler); + subAckedPromise.get_future().wait(); + + subAckedPromise = std::promise(); + auto failureHandler = [&](RejectedError *rejectedError, int ioErr) { + if (ioErr) + { + fprintf(stderr, "Error %d occurred\n", ioErr); + return; + } + if (rejectedError) + { + fprintf(stderr, "Service Error %d occurred\n", (int)rejectedError->Code.value()); + return; + } + }; + + jobsClient.SubscribeToDescribeJobExecutionRejected( + describeJobExecutionSubscriptionRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, failureHandler, subAckHandler); + subAckedPromise.get_future().wait(); + + DescribeJobExecutionRequest describeJobExecutionRequest; + describeJobExecutionRequest.ThingName = cmdData.input_thingName; + describeJobExecutionRequest.JobId = cmdData.input_jobId; + describeJobExecutionRequest.IncludeJobDocument = true; + Aws::Crt::UUID uuid; + describeJobExecutionRequest.ClientToken = uuid.ToString(); + std::promise publishDescribeJobExeCompletedPromise; + + auto publishHandler = [&](int ioErr) { + if (ioErr) + { + fprintf(stderr, "Error %d occurred\n", ioErr); + } + publishDescribeJobExeCompletedPromise.set_value(); + }; + + jobsClient.PublishDescribeJobExecution( + std::move(describeJobExecutionRequest), AWS_MQTT_QOS_AT_LEAST_ONCE, publishHandler); + publishDescribeJobExeCompletedPromise.get_future().wait(); + } + + // Wait just a little bit to let the console print + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + + // Disconnect + if (client->Stop()) + { + stoppedPromise.get_future().wait(); + } + + return 0; +} \ No newline at end of file