From 510d644183ce2eda5f6c61c701fd9384941ba67f Mon Sep 17 00:00:00 2001 From: Igor Abdrakhimov Date: Fri, 3 May 2024 11:54:55 -0700 Subject: [PATCH] Revert jobs service test --- .../tests/JobsExecution/CMakeLists.txt | 17 - .../tests/JobsExecution/JobsExecution.cpp | 383 ------------------ .../tests/JobsExecution/JobsExecution.h | 39 -- servicetests/tests/JobsExecution/main.cpp | 357 +++++++++++++++- 4 files changed, 339 insertions(+), 457 deletions(-) delete mode 100644 servicetests/tests/JobsExecution/JobsExecution.cpp delete mode 100644 servicetests/tests/JobsExecution/JobsExecution.h diff --git a/servicetests/tests/JobsExecution/CMakeLists.txt b/servicetests/tests/JobsExecution/CMakeLists.txt index 834189e41..70577d337 100644 --- a/servicetests/tests/JobsExecution/CMakeLists.txt +++ b/servicetests/tests/JobsExecution/CMakeLists.txt @@ -4,7 +4,6 @@ project(job-execution CXX) file(GLOB SRC_FILES "*.cpp" - "*.h" "../../../samples/utils/CommandLineUtils.cpp" "../../../samples/utils/CommandLineUtils.h" ) @@ -26,20 +25,4 @@ find_package(IotJobs-cpp REQUIRED) install(TARGETS ${PROJECT_NAME} DESTINATION bin) -if (UNIX AND NOT APPLE) - include(GNUInstallDirs) -elseif(NOT DEFINED CMAKE_INSTALL_LIBDIR) - set(CMAKE_INSTALL_LIBDIR "lib") - - if (${CMAKE_INSTALL_LIBDIR} STREQUAL "lib64") - set(FIND_LIBRARY_USE_LIB64_PATHS true) - endif() -endif() - -list(APPEND CMAKE_MODULE_PATH "${CMAKE_PREFIX_PATH}/${CMAKE_INSTALL_LIBDIR}/cmake") - -include(AwsSanitizers) -enable_language(C) -aws_add_sanitizers(${PROJECT_NAME}) - target_link_libraries(${PROJECT_NAME} PRIVATE AWS::aws-crt-cpp AWS::IotJobs-cpp) diff --git a/servicetests/tests/JobsExecution/JobsExecution.cpp b/servicetests/tests/JobsExecution/JobsExecution.cpp deleted file mode 100644 index f79330892..000000000 --- a/servicetests/tests/JobsExecution/JobsExecution.cpp +++ /dev/null @@ -1,383 +0,0 @@ -/** - * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. - * SPDX-License-Identifier: Apache-2.0. - */ - -#include "JobsExecution.h" - -#include - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -using namespace Aws::Crt; -using namespace Aws::Iotjobs; - -JobsExecution::JobsExecution(std::shared_ptr jobsClient, Aws::Crt::String thingName) - : m_jobsClient(std::move(jobsClient)), m_thingName(std::move(thingName)), m_currentExecutionNumber(), - m_currentVersionNumber() -{ -} - -Aws::Crt::Vector JobsExecution::getAvailableJobs() -{ - GetPendingJobExecutionsSubscriptionRequest subscriptionRequest; - subscriptionRequest.ThingName = m_thingName; - - auto handler = [this](Aws::Iotjobs::GetPendingJobExecutionsResponse *response, int ioErr) { - fprintf(stderr, "running the jobs handler\n"); - if (ioErr) - { - fprintf(stderr, "Error %d occurred\n", ioErr); - exit(1); - } - if (response) - { - if (response->InProgressJobs.has_value()) - { - for (const JobExecutionSummary &job : response->InProgressJobs.value()) - { - std::lock_guard lock(m_jobsMutex); - m_availableJobs.push_back(job.JobId.value()); - fprintf(stderr, "In Progress jobs %s\n", job.JobId->c_str()); - } - } - else - { - fprintf(stderr, "In Progress jobs: empty\n"); - } - if (response->QueuedJobs.has_value()) - { - for (const JobExecutionSummary &job : response->QueuedJobs.value()) - { - std::lock_guard lock(m_jobsMutex); - m_availableJobs.push_back(job.JobId.value()); - fprintf(stderr, "Queued jobs %s\n", job.JobId->c_str()); - } - } - else - { - fprintf(stderr, "Queued jobs: empty\n"); - } - } - m_getResponse.set_value(); - }; - - auto err_handler = [](Aws::Iotjobs::RejectedError *rejectedError, int ioErr) { - if (ioErr) - { - fprintf(stderr, "Error %d occurred\n", ioErr); - exit(1); - } - if (rejectedError) - { - fprintf( - stderr, - "Service Error %d occurred. Message %s\n", - (int)rejectedError->Code.value(), - rejectedError->Message->c_str()); - } - fprintf(stderr, "Error handler\n"); - exit(-1); - }; - - std::promise publishDescribeJobExeCompletedPromise; - - auto publishHandler = [&publishDescribeJobExeCompletedPromise](int ioErr) { - if (ioErr) - { - fprintf(stderr, "Error %d occurred\n", ioErr); - exit(1); - } - publishDescribeJobExeCompletedPromise.set_value(); - }; - - m_jobsClient->SubscribeToGetPendingJobExecutionsAccepted( - subscriptionRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, handler, publishHandler); - publishDescribeJobExeCompletedPromise.get_future().wait(); - - publishDescribeJobExeCompletedPromise = std::promise(); - m_jobsClient->SubscribeToGetPendingJobExecutionsRejected( - subscriptionRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, err_handler, publishHandler); - publishDescribeJobExeCompletedPromise.get_future().wait(); - - publishDescribeJobExeCompletedPromise = std::promise(); - GetPendingJobExecutionsRequest publishRequest; - publishRequest.ThingName = m_thingName; - m_jobsClient->PublishGetPendingJobExecutions(publishRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, publishHandler); - publishDescribeJobExeCompletedPromise.get_future().wait(); - - if (m_getResponse.get_future().wait_for(std::chrono::seconds(10)) == std::future_status::timeout) - { - fprintf(stderr, "get available jobs error timedout\n"); - exit(-1); - } - - std::lock_guard lock(m_jobsMutex); - return m_availableJobs; -} - -void JobsExecution::describeJob(const String &jobId) -{ - DescribeJobExecutionSubscriptionRequest describeJobExecutionSubscriptionRequest; - describeJobExecutionSubscriptionRequest.ThingName = m_thingName; - describeJobExecutionSubscriptionRequest.JobId = jobId; - - /** - * This isn't absolutely necessary but since we're doing a publish almost immediately afterwards, - * to be cautious make sure the subscribe has finished before doing the publish. - */ - std::promise subAckedPromise; - auto subAckHandler = [&subAckedPromise](int) { - // if error code returns it will be recorded by the other callback - subAckedPromise.set_value(); - }; - auto subscriptionHandler = [&](DescribeJobExecutionResponse *response, int ioErr) { - if (ioErr) - { - fprintf(stderr, "Error %d occurred\n", ioErr); - return; - } - if (response) - { - fprintf(stderr, "Received Job:\n"); - fprintf(stderr, "Job Id: %s\n", response->Execution->JobId->c_str()); - fprintf(stderr, "ClientToken: %s\n", response->ClientToken->c_str()); - fprintf(stderr, "Execution Status: %s\n", JobStatusMarshaller::ToString(*response->Execution->Status)); - } - }; - - m_jobsClient->SubscribeToDescribeJobExecutionAccepted( - describeJobExecutionSubscriptionRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, subscriptionHandler, subAckHandler); - subAckedPromise.get_future().wait(); - - subAckedPromise = std::promise(); - - auto failureHandler = [](RejectedError *rejectedError, int ioErr) { - if (ioErr) - { - fprintf(stderr, "Error %d occurred\n", ioErr); - return; - } - if (rejectedError) - { - fprintf( - stderr, - "Service Error %d occurred. Message %s\n", - (int)rejectedError->Code.value(), - rejectedError->Message->c_str()); - return; - } - }; - - m_jobsClient->SubscribeToDescribeJobExecutionRejected( - describeJobExecutionSubscriptionRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, failureHandler, subAckHandler); - subAckedPromise.get_future().wait(); - - DescribeJobExecutionRequest describeJobExecutionRequest; - describeJobExecutionRequest.ThingName = m_thingName; - describeJobExecutionRequest.JobId = jobId; - describeJobExecutionRequest.IncludeJobDocument = true; - Aws::Crt::UUID uuid; - describeJobExecutionRequest.ClientToken = uuid.ToString(); - std::promise publishDescribeJobExeCompletedPromise; - - auto publishHandler = [&publishDescribeJobExeCompletedPromise](int ioErr) { - if (ioErr) - { - fprintf(stderr, "Error %d occurred\n", ioErr); - exit(1); - } - publishDescribeJobExeCompletedPromise.set_value(); - }; - - m_jobsClient->PublishDescribeJobExecution(describeJobExecutionRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, publishHandler); - publishDescribeJobExeCompletedPromise.get_future().wait(); -} - -void JobsExecution::startNextPendingJob() -{ - std::promise subAckedPromise; - auto subAckHandler = [&subAckedPromise](int) { - // if error code returns it will be recorded by the other callback - subAckedPromise.set_value(); - }; - - auto failureHandler = [](RejectedError *rejectedError, int ioErr) { - if (ioErr) - { - fprintf(stderr, "Error %d occurred\n", ioErr); - return; - } - if (rejectedError) - { - fprintf( - stderr, - "Service Error %d occurred. Message %s\n", - (int)rejectedError->Code.value(), - rejectedError->Message->c_str()); - return; - } - }; - - Aws::Crt::String jobId; - - auto OnSubscribeToStartNextPendingJobExecutionAcceptedResponse = - [this](StartNextJobExecutionResponse *response, int ioErr) { - if (ioErr) - { - fprintf(stderr, "Error %d occurred\n", ioErr); - exit(1); - } - if (response && response->Execution.has_value()) - { - fprintf(stderr, "Start Job %s\n", response->Execution.value().JobId.value().c_str()); - m_currentJobId = response->Execution->JobId.value(); - m_currentExecutionNumber = response->Execution->ExecutionNumber.value(); - m_currentVersionNumber = response->Execution->VersionNumber.value(); - } - else - { - fprintf(stderr, "Could not get Job Id, exiting\n"); - exit(-1); - } - - m_pendingExecutionPromise.set_value(); - }; - - StartNextPendingJobExecutionSubscriptionRequest subscriptionRequest; - subscriptionRequest.ThingName = m_thingName; - subAckedPromise = std::promise(); - m_jobsClient->SubscribeToStartNextPendingJobExecutionAccepted( - subscriptionRequest, - AWS_MQTT_QOS_AT_LEAST_ONCE, - OnSubscribeToStartNextPendingJobExecutionAcceptedResponse, - subAckHandler); - - subAckedPromise.get_future().wait(); - - subAckedPromise = std::promise(); - m_jobsClient->SubscribeToStartNextPendingJobExecutionRejected( - subscriptionRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, failureHandler, subAckHandler); - - subAckedPromise.get_future().wait(); - - StartNextPendingJobExecutionRequest publishRequest; - publishRequest.ThingName = m_thingName; - publishRequest.StepTimeoutInMinutes = 15L; - - std::promise publishDescribeJobExeCompletedPromise; - - auto publishHandler = [&publishDescribeJobExeCompletedPromise](int ioErr) { - if (ioErr) - { - fprintf(stderr, "Error %d occurred\n", ioErr); - exit(1); - } - publishDescribeJobExeCompletedPromise.set_value(); - }; - m_jobsClient->PublishStartNextPendingJobExecution(publishRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, publishHandler); - - m_pendingExecutionPromise.get_future().wait(); -} - -void JobsExecution::updateCurrentJobStatus(Aws::Iotjobs::JobStatus jobStatus) -{ - Aws::Crt::String jobId; - int64_t currentExecutionNumber; - int32_t currentVersionNumber; - - { - jobId = m_currentJobId; - currentExecutionNumber = m_currentExecutionNumber; - currentVersionNumber = m_currentVersionNumber; - } - - std::promise subAckedPromise; - auto subAckHandler = [&subAckedPromise](int) { - // if error code returns it will be recorded by the other callback - subAckedPromise.set_value(); - }; - - auto failureHandler = [](RejectedError *rejectedError, int ioErr) { - if (ioErr) - { - fprintf(stderr, "Error %d occurred\n", ioErr); - return; - } - if (rejectedError) - { - fprintf( - stderr, - "Service Error %d occurred. Message %s\n", - (int)rejectedError->Code.value(), - rejectedError->Message->c_str()); - return; - } - }; - - m_pendingExecutionPromise = std::promise(); - auto OnSubscribeToUpdateJobExecutionAcceptedResponse = [this, - jobId, jobStatus](UpdateJobExecutionResponse *response, int ioErr) { - (void)response; - if (ioErr) - { - fprintf(stderr, "Error %d occurred\n", ioErr); - exit(1); - } - fprintf(stderr, "Marked Job %s %s\n", jobId.c_str(), JobStatusMarshaller::ToString(jobStatus)); - m_pendingExecutionPromise.set_value(); - }; - UpdateJobExecutionSubscriptionRequest subscriptionRequest; - subscriptionRequest.ThingName = m_thingName; - subscriptionRequest.JobId = jobId; - - subAckedPromise = std::promise(); - m_jobsClient->SubscribeToUpdateJobExecutionAccepted( - subscriptionRequest, - AWS_MQTT_QOS_AT_LEAST_ONCE, - OnSubscribeToUpdateJobExecutionAcceptedResponse, - subAckHandler); - subAckedPromise.get_future().wait(); - - subAckedPromise = std::promise(); - m_jobsClient->SubscribeToUpdateJobExecutionRejected( - subscriptionRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, failureHandler, subAckHandler); - subAckedPromise.get_future().wait(); - - std::promise publishPromise; - - auto publishHandler = [&publishPromise](int ioErr) { - if (ioErr) - { - fprintf(stderr, "Error %d occurred\n", ioErr); - exit(1); - } - else { - fprintf(stderr, "Publish handler done\n"); - } - publishPromise.set_value(); - }; - - UpdateJobExecutionRequest publishRequest; - publishRequest.ThingName = m_thingName; - publishRequest.JobId = jobId; - publishRequest.ExecutionNumber = currentExecutionNumber; - publishRequest.Status = jobStatus; - publishRequest.ExpectedVersion = currentVersionNumber++; - m_jobsClient->PublishUpdateJobExecution(publishRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, publishHandler); - - publishPromise.get_future().wait(); - m_pendingExecutionPromise.get_future().wait(); -} diff --git a/servicetests/tests/JobsExecution/JobsExecution.h b/servicetests/tests/JobsExecution/JobsExecution.h deleted file mode 100644 index e39940f67..000000000 --- a/servicetests/tests/JobsExecution/JobsExecution.h +++ /dev/null @@ -1,39 +0,0 @@ -#pragma once - -/** - * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. - * SPDX-License-Identifier: Apache-2.0. - */ - -#include -#include - -#include - -class JobsExecution -{ - public: - JobsExecution(std::shared_ptr jobsClient, Aws::Crt::String thingName); - - Aws::Crt::Vector getAvailableJobs(); - - void describeJob(const Aws::Crt::String &jobId); - - void startNextPendingJob(); - - void updateCurrentJobStatus(Aws::Iotjobs::JobStatus jobStatus); - - private: - std::shared_ptr m_jobsClient; - const Aws::Crt::String m_thingName; - - Aws::Crt::Vector m_availableJobs; - std::mutex m_jobsMutex; - std::promise m_getResponse; - - Aws::Crt::String m_currentJobId; - int64_t m_currentExecutionNumber; - int32_t m_currentVersionNumber; - - std::promise m_pendingExecutionPromise; -}; diff --git a/servicetests/tests/JobsExecution/main.cpp b/servicetests/tests/JobsExecution/main.cpp index 175e55553..667f8e092 100644 --- a/servicetests/tests/JobsExecution/main.cpp +++ b/servicetests/tests/JobsExecution/main.cpp @@ -4,6 +4,7 @@ */ #include #include +#include #include #include @@ -25,15 +26,23 @@ #include #include +#include #include +#include +#include #include +#include #include "../../../samples/utils/CommandLineUtils.h" -#include "JobsExecution.h" using namespace Aws::Crt; using namespace Aws::Iotjobs; +void getAvailableJobs( + Aws::Crt::String thingName, + IotJobsClient &jobsClient, + std::vector &availableJobs); + std::shared_ptr build_mqtt3_client( Utils::cmdData &cmdData, std::shared_ptr &connection, @@ -76,12 +85,12 @@ std::shared_ptr build_mqtt3_client( auto onConnectionCompleted = [&](Mqtt::MqttConnection &, int errorCode, Mqtt::ReturnCode returnCode, bool) { if (errorCode) { - fprintf(stderr, "Connection failed with error %s\n", ErrorDebugString(errorCode)); + fprintf(stdout, "Connection failed with error %s\n", ErrorDebugString(errorCode)); connectionCompletedPromise.set_value(false); } else { - fprintf(stderr, "Connection completed with return code %d\n", returnCode); + fprintf(stdout, "Connection completed with return code %d\n", returnCode); connectionCompletedPromise.set_value(true); } }; @@ -89,7 +98,7 @@ std::shared_ptr build_mqtt3_client( // Invoked when a disconnect has been completed auto onDisconnect = [&](Mqtt::MqttConnection & /*conn*/) { { - fprintf(stderr, "Disconnect completed\n"); + fprintf(stdout, "Disconnect completed\n"); connectionClosedPromise.set_value(); } }; @@ -135,18 +144,18 @@ std::shared_ptr build_mqtt5_client( builder->WithClientConnectionSuccessCallback( [&connectionCompletedPromise](const Mqtt5::OnConnectionSuccessEventData &eventData) { fprintf( - stderr, + stdout, "Mqtt5 Client connection succeed, clientid: %s.\n", eventData.negotiatedSettings->getClientId().c_str()); connectionCompletedPromise.set_value(true); }); builder->WithClientConnectionFailureCallback([&connectionCompletedPromise]( const Mqtt5::OnConnectionFailureEventData &eventData) { - fprintf(stderr, "Mqtt5 Client connection failed with error: %s.\n", aws_error_debug_str(eventData.errorCode)); + fprintf(stdout, "Mqtt5 Client connection failed with error: %s.\n", aws_error_debug_str(eventData.errorCode)); connectionCompletedPromise.set_value(false); }); builder->WithClientStoppedCallback([&connectionClosedPromise](const Mqtt5::OnStoppedEventData &) { - fprintf(stderr, "Mqtt5 Client stopped.\n"); + fprintf(stdout, "Mqtt5 Client stopped.\n"); connectionClosedPromise.set_value(); }); @@ -154,7 +163,7 @@ std::shared_ptr build_mqtt5_client( if (client5 == nullptr) { fprintf( - stderr, "Failed to Init Mqtt5Client with error code %d: %s.\n", LastError(), ErrorDebugString(LastError())); + stdout, "Failed to Init Mqtt5Client with error code %d: %s.\n", LastError(), ErrorDebugString(LastError())); exit(-1); } @@ -168,7 +177,8 @@ std::shared_ptr build_mqtt5_client( int main(int argc, char *argv[]) { - fprintf(stderr, "Starting the jobs execution program\n"); + + fprintf(stdout, "Starting the jobs execution programm\n"); /************************ Setup ****************************/ // Do the global initialization for the API @@ -207,17 +217,231 @@ int main(int argc, char *argv[]) /************************ Run the sample ****************************/ if (connectionCompletedPromise.get_future().get()) { - JobsExecution jobsExecution(jobsClient, cmdData.input_thingName); - auto availableJobs = jobsExecution.getAvailableJobs(); - for (const auto &jobId : availableJobs) + std::vector availableJobs; + getAvailableJobs(cmdData.input_thingName, *jobsClient, availableJobs); + for (auto jobid : availableJobs) { - jobsExecution.describeJob(jobId); - } + DescribeJobExecutionSubscriptionRequest describeJobExecutionSubscriptionRequest; + describeJobExecutionSubscriptionRequest.ThingName = cmdData.input_thingName; + describeJobExecutionSubscriptionRequest.JobId = jobid; - for (size_t idx = 0; idx < availableJobs.size(); ++idx) - { - jobsExecution.startNextPendingJob(); - jobsExecution.updateCurrentJobStatus(Aws::Iotjobs::JobStatus::SUCCEEDED); + /** + * This isn't absolutely necessary but since we're doing a publish almost immediately afterwards, + * to be cautious make sure the subscribe has finished before doing the publish. + */ + std::promise subAckedPromise; + auto subAckHandler = [&](int) { + // if error code returns it will be recorded by the other callback + subAckedPromise.set_value(); + }; + auto subscriptionHandler = [&](DescribeJobExecutionResponse *response, int ioErr) { + if (ioErr) + { + fprintf(stderr, "Error %d occurred\n", ioErr); + return; + } + if (response) + { + fprintf(stdout, "Received Job:\n"); + fprintf(stdout, "Job Id: %s\n", response->Execution->JobId->c_str()); + fprintf(stdout, "ClientToken: %s\n", response->ClientToken->c_str()); + fprintf( + stdout, "Execution Status: %s\n", JobStatusMarshaller::ToString(*response->Execution->Status)); + } + }; + + jobsClient->SubscribeToDescribeJobExecutionAccepted( + describeJobExecutionSubscriptionRequest, + AWS_MQTT_QOS_AT_LEAST_ONCE, + subscriptionHandler, + subAckHandler); + subAckedPromise.get_future().wait(); + + subAckedPromise = std::promise(); + + auto failureHandler = [&](RejectedError *rejectedError, int ioErr) { + if (ioErr) + { + fprintf(stderr, "Error %d occurred\n", ioErr); + return; + } + if (rejectedError) + { + fprintf( + stderr, + "Service Error %d occurred. Message %s\n", + (int)rejectedError->Code.value(), + rejectedError->Message->c_str()); + return; + } + }; + + jobsClient->SubscribeToDescribeJobExecutionRejected( + describeJobExecutionSubscriptionRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, failureHandler, subAckHandler); + subAckedPromise.get_future().wait(); + + DescribeJobExecutionRequest describeJobExecutionRequest; + describeJobExecutionRequest.ThingName = cmdData.input_thingName; + describeJobExecutionRequest.JobId = jobid; + describeJobExecutionRequest.IncludeJobDocument = true; + Aws::Crt::UUID uuid; + describeJobExecutionRequest.ClientToken = uuid.ToString(); + std::promise publishDescribeJobExeCompletedPromise; + + auto publishHandler = [&](int ioErr) { + if (ioErr) + { + fprintf(stderr, "Error %d occurred\n", ioErr); + exit(1); + } + publishDescribeJobExeCompletedPromise.set_value(); + }; + + jobsClient->PublishDescribeJobExecution( + std::move(describeJobExecutionRequest), AWS_MQTT_QOS_AT_LEAST_ONCE, publishHandler); + publishDescribeJobExeCompletedPromise.get_future().wait(); + + Aws::Crt::String currentJobId; + int64_t currentExecutionNumber; + int32_t currentVersionNumber; + + std::promise pendingExecutionPromise; + + { + auto OnSubscribeToStartNextPendingJobExecutionAcceptedResponse = + [&](StartNextJobExecutionResponse *response, int ioErr) { + if (ioErr) + { + fprintf(stderr, "Error %d occurred\n", ioErr); + exit(1); + } + if (response) + { + fprintf(stdout, "Start Job %s\n", response->Execution.value().JobId.value().c_str()); + currentJobId = response->Execution->JobId.value(); + currentExecutionNumber = response->Execution->ExecutionNumber.value(); + currentVersionNumber = response->Execution->VersionNumber.value(); + } + else + { + fprintf(stdout, "Could not get Job Id exiting\n"); + exit(-1); + } + + pendingExecutionPromise.set_value(); + }; + + StartNextPendingJobExecutionSubscriptionRequest subscriptionRequest; + subscriptionRequest.ThingName = cmdData.input_thingName; + subAckedPromise = std::promise(); + jobsClient->SubscribeToStartNextPendingJobExecutionAccepted( + subscriptionRequest, + AWS_MQTT_QOS_AT_LEAST_ONCE, + OnSubscribeToStartNextPendingJobExecutionAcceptedResponse, + subAckHandler); + + subAckedPromise.get_future().wait(); + + subAckedPromise = std::promise(); + jobsClient->SubscribeToStartNextPendingJobExecutionRejected( + subscriptionRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, failureHandler, subAckHandler); + + subAckedPromise.get_future().wait(); + + StartNextPendingJobExecutionRequest publishRequest; + publishRequest.ThingName = cmdData.input_thingName; + publishRequest.StepTimeoutInMinutes = 15L; + + publishDescribeJobExeCompletedPromise = std::promise(); + jobsClient->PublishStartNextPendingJobExecution( + publishRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, publishHandler); + + pendingExecutionPromise.get_future().wait(); + } + + { + pendingExecutionPromise = std::promise(); + auto OnSubscribeToUpdateJobExecutionAcceptedResponse = [&](UpdateJobExecutionResponse *response, + int ioErr) { + (void)response; + if (ioErr) + { + fprintf(stderr, "Error %d occurred\n", ioErr); + exit(1); + } + fprintf(stdout, "Marked Job %s IN_PROGRESS", currentJobId.c_str()); + pendingExecutionPromise.set_value(); + }; + UpdateJobExecutionSubscriptionRequest subscriptionRequest; + subscriptionRequest.ThingName = cmdData.input_thingName; + subscriptionRequest.JobId = currentJobId; + + subAckedPromise = std::promise(); + jobsClient->SubscribeToUpdateJobExecutionAccepted( + subscriptionRequest, + AWS_MQTT_QOS_AT_LEAST_ONCE, + OnSubscribeToUpdateJobExecutionAcceptedResponse, + subAckHandler); + subAckedPromise.get_future().wait(); + + subAckedPromise = std::promise(); + jobsClient->SubscribeToUpdateJobExecutionRejected( + subscriptionRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, failureHandler, subAckHandler); + subAckedPromise.get_future().wait(); + + publishDescribeJobExeCompletedPromise = std::promise(); + UpdateJobExecutionRequest publishRequest; + publishRequest.ThingName = cmdData.input_thingName; + publishRequest.JobId = currentJobId; + publishRequest.ExecutionNumber = currentExecutionNumber; + publishRequest.Status = JobStatus::IN_PROGRESS; + publishRequest.ExpectedVersion = currentVersionNumber++; + jobsClient->PublishUpdateJobExecution(publishRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, publishHandler); + + pendingExecutionPromise.get_future().wait(); + } + + // Pretend doing some work + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + + { + pendingExecutionPromise = std::promise(); + UpdateJobExecutionSubscriptionRequest subscriptionRequest; + subscriptionRequest.ThingName = cmdData.input_thingName; + subscriptionRequest.JobId = currentJobId; + + auto subscribeHandler = [&](UpdateJobExecutionResponse *response, int ioErr) { + (void)response; + if (ioErr) + { + fprintf(stderr, "Error %d occurred\n", ioErr); + exit(1); + } + fprintf(stdout, "Marked job %s currentJobId SUCCEEDED", currentJobId.c_str()); + pendingExecutionPromise.set_value(); + }; + subAckedPromise = std::promise(); + jobsClient->SubscribeToUpdateJobExecutionAccepted( + subscriptionRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, subscribeHandler, subAckHandler); + subAckedPromise.get_future().wait(); + + subAckedPromise = std::promise(); + jobsClient->SubscribeToUpdateJobExecutionRejected( + subscriptionRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, failureHandler, subAckHandler); + subAckedPromise.get_future().wait(); + + UpdateJobExecutionRequest publishRequest; + publishRequest.ThingName = cmdData.input_thingName; + publishRequest.JobId = currentJobId; + publishRequest.ExecutionNumber = currentExecutionNumber; + publishRequest.Status = JobStatus::SUCCEEDED; + publishRequest.ExpectedVersion = currentVersionNumber++; + + publishDescribeJobExeCompletedPromise = std::promise(); + jobsClient->PublishUpdateJobExecution(publishRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, publishHandler); + + pendingExecutionPromise.get_future().wait(); + } } } // Wait just a little bit to let the console print @@ -242,3 +466,100 @@ int main(int argc, char *argv[]) } return 0; } + +void getAvailableJobs( + Aws::Crt::String thingName, + IotJobsClient &jobsClient, + std::vector &availableJobs) +{ + std::promise getResponse; + std::promise publishDescribeJobExeCompletedPromise; + + GetPendingJobExecutionsSubscriptionRequest subscriptionRequest; + subscriptionRequest.ThingName = thingName; + + auto handler = [&](Aws::Iotjobs::GetPendingJobExecutionsResponse *response, int ioErr) { + fprintf(stderr, "running the jobs handler\n"); + if (ioErr) + { + fprintf(stderr, "Error %d occurred\n", ioErr); + exit(1); + } + if (response) + { + if (response->InProgressJobs.has_value()) + { + for (JobExecutionSummary job : response->InProgressJobs.value()) + { + availableJobs.push_back(job.JobId.value()); + fprintf(stderr, "In Progress jobs %s\n", job.JobId->c_str()); + } + } + else + { + fprintf(stderr, "In Progress jobs: empty\n"); + } + if (response->QueuedJobs.has_value()) + { + for (JobExecutionSummary job : response->QueuedJobs.value()) + { + availableJobs.push_back(job.JobId.value()); + fprintf(stderr, "Queued jobs %s\n", job.JobId->c_str()); + } + } + else + { + fprintf(stderr, "Queued jobs: empty\n"); + } + } + getResponse.set_value(); + }; + + auto err_handler = [&](Aws::Iotjobs::RejectedError *rejectedError, int ioErr) { + if (ioErr) + { + fprintf(stderr, "Error %d occurred\n", ioErr); + exit(1); + } + if (rejectedError) + { + fprintf( + stderr, + "Service Error %d occurred. Message %s\n", + (int)rejectedError->Code.value(), + rejectedError->Message->c_str()); + } + fprintf(stderr, "Error handler\n"); + exit(-1); + }; + + auto publishHandler = [&](int ioErr) { + if (ioErr) + { + fprintf(stderr, "Error %d occurred\n", ioErr); + exit(1); + } + publishDescribeJobExeCompletedPromise.set_value(); + }; + + jobsClient.SubscribeToGetPendingJobExecutionsAccepted( + subscriptionRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, handler, publishHandler); + publishDescribeJobExeCompletedPromise.get_future().wait(); + + publishDescribeJobExeCompletedPromise = std::promise(); + jobsClient.SubscribeToGetPendingJobExecutionsRejected( + subscriptionRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, err_handler, publishHandler); + publishDescribeJobExeCompletedPromise.get_future().wait(); + + publishDescribeJobExeCompletedPromise = std::promise(); + GetPendingJobExecutionsRequest publishRequest; + publishRequest.ThingName = thingName; + jobsClient.PublishGetPendingJobExecutions(publishRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, publishHandler); + publishDescribeJobExeCompletedPromise.get_future().wait(); + + if (getResponse.get_future().wait_for(std::chrono::seconds(10)) == std::future_status::timeout) + { + fprintf(stderr, "get available jobs error timedout\n"); + exit(-1); + } +}