Skip to content

Commit

Permalink
Initial backwards-incompatible refactor of GG IPC to support safer ob…
Browse files Browse the repository at this point in the history
…ject lifetimes (#377)
  • Loading branch information
bretambrose authored Apr 18, 2022
1 parent d99142e commit 3f57126
Show file tree
Hide file tree
Showing 11 changed files with 198 additions and 605 deletions.
10 changes: 7 additions & 3 deletions eventstream_rpc/include/aws/eventstreamrpc/EventStreamClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -492,12 +492,16 @@ namespace Aws
public:
ClientOperation(
ClientConnection &connection,
StreamResponseHandler *streamHandler,
std::shared_ptr<StreamResponseHandler> streamHandler,
const OperationModelContext &operationModelContext,
Crt::Allocator *allocator) noexcept;
~ClientOperation() noexcept;

ClientOperation(const ClientOperation &clientOperation) noexcept = delete;
ClientOperation(ClientOperation &&clientOperation) noexcept;
ClientOperation(ClientOperation &&clientOperation) noexcept = delete;
bool operator=(const ClientOperation &clientOperation) noexcept = delete;
bool operator=(ClientOperation &&clientOperation) noexcept = delete;

std::future<RpcError> Close(OnMessageFlushCallback onMessageFlushCallback = nullptr) noexcept;
std::future<TaggedResult> GetOperationResult() noexcept;

Expand Down Expand Up @@ -547,7 +551,7 @@ namespace Aws

uint32_t m_messageCount;
Crt::Allocator *m_allocator;
StreamResponseHandler *m_streamHandler;
std::shared_ptr<StreamResponseHandler> m_streamHandler;
ClientContinuation m_clientContinuation;
/* This mutex protects m_resultReceived & m_closeState. */
std::mutex m_continuationMutex;
Expand Down
10 changes: 1 addition & 9 deletions eventstream_rpc/source/EventStreamClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1110,7 +1110,7 @@ namespace Aws

ClientOperation::ClientOperation(
ClientConnection &connection,
StreamResponseHandler *streamHandler,
std::shared_ptr<StreamResponseHandler> streamHandler,
const OperationModelContext &operationModelContext,
Crt::Allocator *allocator) noexcept
: m_operationModelContext(operationModelContext), m_messageCount(0), m_allocator(allocator),
Expand All @@ -1119,14 +1119,6 @@ namespace Aws
{
}

ClientOperation::ClientOperation(ClientOperation &&rhs) noexcept
: m_operationModelContext(rhs.m_operationModelContext), m_messageCount(std::move(rhs.m_messageCount)),
m_allocator(std::move(rhs.m_allocator)), m_streamHandler(rhs.m_streamHandler),
m_clientContinuation(std::move(rhs.m_clientContinuation)),
m_initialResponsePromise(std::move(rhs.m_initialResponsePromise))
{
}

ClientOperation::~ClientOperation() noexcept
{
Close().wait();
Expand Down
76 changes: 18 additions & 58 deletions eventstream_rpc/tests/EchoTestRpcClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,90 +24,50 @@ namespace Awstest

EchoTestRpcClient::~EchoTestRpcClient() noexcept { Close(); }

GetAllProductsOperation EchoTestRpcClient::NewGetAllProducts() noexcept
std::shared_ptr<GetAllProductsOperation> EchoTestRpcClient::NewGetAllProducts() noexcept
{
return GetAllProductsOperation(
m_connection, m_echoTestRpcServiceModel.m_getAllProductsOperationContext, m_allocator);
return Aws::Crt::MakeShared<GetAllProductsOperation>(
m_allocator, m_connection, m_echoTestRpcServiceModel.m_getAllProductsOperationContext, m_allocator);
}

std::unique_ptr<GetAllProductsOperation> EchoTestRpcClient::NewPtrGetAllProducts() noexcept
std::shared_ptr<CauseServiceErrorOperation> EchoTestRpcClient::NewCauseServiceError() noexcept
{
return std::unique_ptr<GetAllProductsOperation>(Aws::Crt::New<GetAllProductsOperation>(
m_allocator, m_connection, m_echoTestRpcServiceModel.m_getAllProductsOperationContext, m_allocator));
return Aws::Crt::MakeShared<CauseServiceErrorOperation>(
m_allocator, m_connection, m_echoTestRpcServiceModel.m_causeServiceErrorOperationContext, m_allocator);
}

CauseServiceErrorOperation EchoTestRpcClient::NewCauseServiceError() noexcept
{
return CauseServiceErrorOperation(
m_connection, m_echoTestRpcServiceModel.m_causeServiceErrorOperationContext, m_allocator);
}

std::unique_ptr<CauseServiceErrorOperation> EchoTestRpcClient::NewPtrCauseServiceError() noexcept
{
return std::unique_ptr<CauseServiceErrorOperation>(Aws::Crt::New<CauseServiceErrorOperation>(
m_allocator, m_connection, m_echoTestRpcServiceModel.m_causeServiceErrorOperationContext, m_allocator));
}

CauseStreamServiceToErrorOperation EchoTestRpcClient::NewCauseStreamServiceToError(
CauseStreamServiceToErrorStreamHandler &streamHandler) noexcept
{
return CauseStreamServiceToErrorOperation(
m_connection,
&streamHandler,
m_echoTestRpcServiceModel.m_causeStreamServiceToErrorOperationContext,
m_allocator);
}

std::unique_ptr<CauseStreamServiceToErrorOperation> EchoTestRpcClient::NewPtrCauseStreamServiceToError(
std::shared_ptr<CauseStreamServiceToErrorOperation> EchoTestRpcClient::NewCauseStreamServiceToError(
std::shared_ptr<CauseStreamServiceToErrorStreamHandler> streamHandler) noexcept
{
return std::unique_ptr<CauseStreamServiceToErrorOperation>(Aws::Crt::New<CauseStreamServiceToErrorOperation>(
return Aws::Crt::MakeShared<CauseStreamServiceToErrorOperation>(
m_allocator,
m_connection,
std::move(streamHandler),
m_echoTestRpcServiceModel.m_causeStreamServiceToErrorOperationContext,
m_allocator));
}

EchoStreamMessagesOperation EchoTestRpcClient::NewEchoStreamMessages(
EchoStreamMessagesStreamHandler &streamHandler) noexcept
{
return EchoStreamMessagesOperation(
m_connection, &streamHandler, m_echoTestRpcServiceModel.m_echoStreamMessagesOperationContext, m_allocator);
m_allocator);
}

std::unique_ptr<EchoStreamMessagesOperation> EchoTestRpcClient::NewPtrEchoStreamMessages(
std::shared_ptr<EchoStreamMessagesOperation> EchoTestRpcClient::NewEchoStreamMessages(
std::shared_ptr<EchoStreamMessagesStreamHandler> streamHandler) noexcept
{
return std::unique_ptr<EchoStreamMessagesOperation>(Aws::Crt::New<EchoStreamMessagesOperation>(
return Aws::Crt::MakeShared<EchoStreamMessagesOperation>(
m_allocator,
m_connection,
std::move(streamHandler),
m_echoTestRpcServiceModel.m_echoStreamMessagesOperationContext,
m_allocator));
}

EchoMessageOperation EchoTestRpcClient::NewEchoMessage() noexcept
{
return EchoMessageOperation(m_connection, m_echoTestRpcServiceModel.m_echoMessageOperationContext, m_allocator);
}

std::unique_ptr<EchoMessageOperation> EchoTestRpcClient::NewPtrEchoMessage() noexcept
{
return std::unique_ptr<EchoMessageOperation>(Aws::Crt::New<EchoMessageOperation>(
m_allocator, m_connection, m_echoTestRpcServiceModel.m_echoMessageOperationContext, m_allocator));
m_allocator);
}

GetAllCustomersOperation EchoTestRpcClient::NewGetAllCustomers() noexcept
std::shared_ptr<EchoMessageOperation> EchoTestRpcClient::NewEchoMessage() noexcept
{
return GetAllCustomersOperation(
m_connection, m_echoTestRpcServiceModel.m_getAllCustomersOperationContext, m_allocator);
return Aws::Crt::MakeShared<EchoMessageOperation>(
m_allocator, m_connection, m_echoTestRpcServiceModel.m_echoMessageOperationContext, m_allocator);
}

std::unique_ptr<GetAllCustomersOperation> EchoTestRpcClient::NewPtrGetAllCustomers() noexcept
std::shared_ptr<GetAllCustomersOperation> EchoTestRpcClient::NewGetAllCustomers() noexcept
{
return std::unique_ptr<GetAllCustomersOperation>(Aws::Crt::New<GetAllCustomersOperation>(
m_allocator, m_connection, m_echoTestRpcServiceModel.m_getAllCustomersOperationContext, m_allocator));
return Aws::Crt::MakeShared<GetAllCustomersOperation>(
m_allocator, m_connection, m_echoTestRpcServiceModel.m_getAllCustomersOperationContext, m_allocator);
}

} // namespace Awstest
24 changes: 2 additions & 22 deletions eventstream_rpc/tests/EchoTestRpcModel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1141,22 +1141,12 @@ namespace Awstest
std::launch::deferred, [this]() { return CauseStreamServiceToErrorResult(GetOperationResult().get()); });
}

CauseStreamServiceToErrorOperation::CauseStreamServiceToErrorOperation(
ClientConnection &connection,
CauseStreamServiceToErrorStreamHandler *streamHandler,
const CauseStreamServiceToErrorOperationContext &operationContext,
Aws::Crt::Allocator *allocator) noexcept
: ClientOperation(connection, streamHandler, operationContext, allocator)
{
}

CauseStreamServiceToErrorOperation::CauseStreamServiceToErrorOperation(
ClientConnection &connection,
std::shared_ptr<CauseStreamServiceToErrorStreamHandler> streamHandler,
const CauseStreamServiceToErrorOperationContext &operationContext,
Aws::Crt::Allocator *allocator) noexcept
: ClientOperation(connection, streamHandler.get(), operationContext, allocator),
pinnedHandler(std::move(streamHandler))
: ClientOperation(connection, streamHandler, operationContext, allocator)
{
}

Expand Down Expand Up @@ -1238,22 +1228,12 @@ namespace Awstest
std::launch::deferred, [this]() { return EchoStreamMessagesResult(GetOperationResult().get()); });
}

EchoStreamMessagesOperation::EchoStreamMessagesOperation(
ClientConnection &connection,
EchoStreamMessagesStreamHandler *streamHandler,
const EchoStreamMessagesOperationContext &operationContext,
Aws::Crt::Allocator *allocator) noexcept
: ClientOperation(connection, streamHandler, operationContext, allocator)
{
}

EchoStreamMessagesOperation::EchoStreamMessagesOperation(
ClientConnection &connection,
std::shared_ptr<EchoStreamMessagesStreamHandler> streamHandler,
const EchoStreamMessagesOperationContext &operationContext,
Aws::Crt::Allocator *allocator) noexcept
: ClientOperation(connection, streamHandler.get(), operationContext, allocator),
pinnedHandler(std::move(streamHandler))
: ClientOperation(connection, streamHandler, operationContext, allocator)
{
}

Expand Down
72 changes: 36 additions & 36 deletions eventstream_rpc/tests/EventStreamClientTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,9 +181,9 @@ static int s_TestOperationWhileDisconnected(struct aws_allocator *allocator, voi
Aws::Crt::String expectedMessage("l33t");
messageData.SetStringMessage(expectedMessage);
echoMessageRequest.SetMessage(messageData);
auto requestFuture = echoMessage.Activate(echoMessageRequest, s_onMessageFlush);
auto requestFuture = echoMessage->Activate(echoMessageRequest, s_onMessageFlush);
ASSERT_TRUE(requestFuture.get().baseStatus == EVENT_STREAM_RPC_CONNECTION_CLOSED);
auto result = echoMessage.GetOperationResult().get();
auto result = echoMessage->GetOperationResult().get();
ASSERT_FALSE(result);
auto error = result.GetRpcError();
ASSERT_TRUE(error.baseStatus == EVENT_STREAM_RPC_CONNECTION_CLOSED);
Expand Down Expand Up @@ -218,9 +218,9 @@ static int s_TestEchoOperation(struct aws_allocator *allocator, void *ctx)
auto echoMessage = client.NewEchoMessage();
messageData.SetStringMessage(expectedMessage);
echoMessageRequest.SetMessage(messageData);
auto requestFuture = echoMessage.Activate(echoMessageRequest, s_onMessageFlush);
auto requestFuture = echoMessage->Activate(echoMessageRequest, s_onMessageFlush);
requestFuture.wait();
auto result = echoMessage.GetResult().get();
auto result = echoMessage->GetResult().get();
ASSERT_TRUE(result);
auto response = result.GetOperationResponse();
ASSERT_NOT_NULL(response);
Expand All @@ -240,9 +240,9 @@ static int s_TestEchoOperation(struct aws_allocator *allocator, void *ctx)
Aws::Crt::String expectedMessage("l33t");
messageData.SetStringMessage(expectedMessage);
echoMessageRequest.SetMessage(messageData);
auto requestFuture = echoMessage.Activate(echoMessageRequest, s_onMessageFlush);
auto requestFuture = echoMessage->Activate(echoMessageRequest, s_onMessageFlush);
ASSERT_TRUE(requestFuture.get().baseStatus == EVENT_STREAM_RPC_CONNECTION_CLOSED);
auto result = echoMessage.GetOperationResult().get();
auto result = echoMessage->GetOperationResult().get();
ASSERT_FALSE(result);
auto error = result.GetRpcError();
ASSERT_TRUE(error.baseStatus == EVENT_STREAM_RPC_CONNECTION_CLOSED);
Expand All @@ -257,14 +257,14 @@ static int s_TestEchoOperation(struct aws_allocator *allocator, void *ctx)
auto echoMessage = client.NewEchoMessage();
messageData.SetStringMessage(expectedMessage);
echoMessageRequest.SetMessage(messageData);
auto requestFuture = echoMessage.Activate(echoMessageRequest, s_onMessageFlush);
requestFuture = echoMessage.Activate(echoMessageRequest, s_onMessageFlush);
auto requestFuture = echoMessage->Activate(echoMessageRequest, s_onMessageFlush);
requestFuture = echoMessage->Activate(echoMessageRequest, s_onMessageFlush);
MessageData differentMessage;
differentMessage.SetBooleanMessage(true);
echoMessageRequest.SetMessage(differentMessage);
requestFuture = echoMessage.Activate(echoMessageRequest, s_onMessageFlush);
requestFuture = echoMessage->Activate(echoMessageRequest, s_onMessageFlush);
requestFuture.wait();
auto result = echoMessage.GetResult().get();
auto result = echoMessage->GetResult().get();
ASSERT_TRUE(result);
auto response = result.GetOperationResponse();
ASSERT_NOT_NULL(response);
Expand All @@ -280,17 +280,17 @@ static int s_TestEchoOperation(struct aws_allocator *allocator, void *ctx)
auto echoMessage = client.NewEchoMessage();
messageData.SetStringMessage(expectedMessage);
echoMessageRequest.SetMessage(messageData);
auto requestFuture = echoMessage.Activate(echoMessageRequest, s_onMessageFlush);
requestFuture = echoMessage.Activate(echoMessageRequest, s_onMessageFlush);
auto requestFuture = echoMessage->Activate(echoMessageRequest, s_onMessageFlush);
requestFuture = echoMessage->Activate(echoMessageRequest, s_onMessageFlush);
MessageData differentMessage;
differentMessage.SetBooleanMessage(true);
echoMessageRequest.SetMessage(differentMessage);
requestFuture = echoMessage.Activate(echoMessageRequest, s_onMessageFlush);
requestFuture = echoMessage->Activate(echoMessageRequest, s_onMessageFlush);
requestFuture.wait();
echoMessage.Close().wait();
echoMessage.Close().wait();
echoMessage.Close().wait();
echoMessage.Close().wait();
echoMessage->Close().wait();
echoMessage->Close().wait();
echoMessage->Close().wait();
echoMessage->Close().wait();
}

/* Close without waiting on activation or close futures. */
Expand All @@ -302,16 +302,16 @@ static int s_TestEchoOperation(struct aws_allocator *allocator, void *ctx)
auto echoMessage = client.NewEchoMessage();
messageData.SetStringMessage(expectedMessage);
echoMessageRequest.SetMessage(messageData);
auto requestFuture = echoMessage.Activate(echoMessageRequest, s_onMessageFlush);
requestFuture = echoMessage.Activate(echoMessageRequest, s_onMessageFlush);
auto requestFuture = echoMessage->Activate(echoMessageRequest, s_onMessageFlush);
requestFuture = echoMessage->Activate(echoMessageRequest, s_onMessageFlush);
MessageData differentMessage;
differentMessage.SetBooleanMessage(true);
echoMessageRequest.SetMessage(differentMessage);
echoMessage.Activate(echoMessageRequest, s_onMessageFlush);
echoMessage.Close();
echoMessage.Close();
echoMessage.Close();
echoMessage.Close();
echoMessage->Activate(echoMessageRequest, s_onMessageFlush);
echoMessage->Close();
echoMessage->Close();
echoMessage->Close();
echoMessage->Close();
}

/* Close without waiting for TERMINATE_STREAM to flush then immediately trying to activate. */
Expand All @@ -323,15 +323,15 @@ static int s_TestEchoOperation(struct aws_allocator *allocator, void *ctx)
auto echoMessage = client.NewEchoMessage();
messageData.SetStringMessage(expectedMessage);
echoMessageRequest.SetMessage(messageData);
auto requestFuture = echoMessage.Activate(echoMessageRequest, s_onMessageFlush);
requestFuture = echoMessage.Activate(echoMessageRequest, s_onMessageFlush);
auto requestFuture = echoMessage->Activate(echoMessageRequest, s_onMessageFlush);
requestFuture = echoMessage->Activate(echoMessageRequest, s_onMessageFlush);
MessageData differentMessage;
differentMessage.SetBooleanMessage(true);
echoMessageRequest.SetMessage(differentMessage);
requestFuture = echoMessage.Activate(echoMessageRequest, s_onMessageFlush);
requestFuture = echoMessage->Activate(echoMessageRequest, s_onMessageFlush);
requestFuture.wait();
auto closeFuture = echoMessage.Close();
requestFuture = echoMessage.Activate(echoMessageRequest, s_onMessageFlush);
auto closeFuture = echoMessage->Close();
requestFuture = echoMessage->Activate(echoMessageRequest, s_onMessageFlush);
closeFuture.wait();
requestFuture.wait();
}
Expand All @@ -349,9 +349,9 @@ static int s_TestEchoOperation(struct aws_allocator *allocator, void *ctx)
ASSERT_TRUE(failedStatus.get().baseStatus == EVENT_STREAM_RPC_CONNECTION_ALREADY_ESTABLISHED);
auto echoMessage = client.NewEchoMessage();
echoMessageRequest.SetMessage(messageData);
auto requestFuture = echoMessage.Activate(echoMessageRequest, s_onMessageFlush);
auto requestFuture = echoMessage->Activate(echoMessageRequest, s_onMessageFlush);
requestFuture.wait();
auto result = echoMessage.GetResult().get();
auto result = echoMessage->GetResult().get();
ASSERT_TRUE(result);
auto response = result.GetOperationResponse();
ASSERT_NOT_NULL(response);
Expand All @@ -368,9 +368,9 @@ static int s_TestEchoOperation(struct aws_allocator *allocator, void *ctx)
ASSERT_TRUE(connectedStatus.get().baseStatus == EVENT_STREAM_RPC_CONNECTION_ALREADY_ESTABLISHED);
auto echoMessage = client.NewEchoMessage();
echoMessageRequest.SetMessage(messageData);
auto requestFuture = echoMessage.Activate(echoMessageRequest, s_onMessageFlush);
auto requestFuture = echoMessage->Activate(echoMessageRequest, s_onMessageFlush);
requestFuture.wait();
auto result = echoMessage.GetResult().get();
auto result = echoMessage->GetResult().get();
ASSERT_TRUE(result);
auto response = result.GetOperationResponse();
ASSERT_NOT_NULL(response);
Expand Down Expand Up @@ -510,16 +510,16 @@ static int s_TestStressClient(struct aws_allocator *allocator, void *ctx)
auto echoMessage = client.NewEchoMessage();
messageData.SetStringMessage(expectedMessage);
echoMessageRequest.SetMessage(messageData);
auto requestStatus = echoMessage.Activate(echoMessageRequest, s_onMessageFlush).get();
auto resultFuture = echoMessage.GetResult();
auto requestStatus = echoMessage->Activate(echoMessageRequest, s_onMessageFlush).get();
auto resultFuture = echoMessage->GetResult();
/* The response may never arrive depending on how many ongoing requests are made
* so in case of timeout, assume success. */
std::future_status status = resultFuture.wait_for(std::chrono::seconds(5));
if (status != std::future_status::ready)
{
return AWS_OP_SUCCESS;
}
auto result = echoMessage.GetResult().get();
auto result = echoMessage->GetResult().get();
ASSERT_TRUE(result);
auto response = result.GetOperationResponse();
ASSERT_NOT_NULL(response);
Expand Down
Loading

0 comments on commit 3f57126

Please sign in to comment.