Skip to content

Commit

Permalink
Check write buffer on stream closed check (#1578)
Browse files Browse the repository at this point in the history
Helps to avoid "empty JSON" errors when stream is being reset

Relates-To: DATASDK-57

Signed-off-by: Andrey Kashcheev <ext-andrey.kashcheev@here.com>
  • Loading branch information
andrey-kashcheev authored Jan 9, 2025
1 parent c2e791c commit e3708f2
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,10 @@ size_t RapidJsonByteStream::PutEnd(char*) { return 0; }
bool RapidJsonByteStream::ReadEmpty() const {
return count_ == read_buffer_.size();
}
bool RapidJsonByteStream::WriteEmpty() const { return write_buffer_.empty(); }
bool RapidJsonByteStream::WriteEmpty() const {
std::unique_lock<std::mutex> lock(mutex_);
return write_buffer_.empty();
}

void RapidJsonByteStream::AppendContent(const char* content, size_t length) {
std::unique_lock<std::mutex> lock(mutex_);
Expand All @@ -64,7 +67,7 @@ void RapidJsonByteStream::AppendContent(const char* content, size_t length) {

void RapidJsonByteStream::SwapBuffers() {
std::unique_lock<std::mutex> lock(mutex_);
cv_.wait(lock, [&]() { return !WriteEmpty(); });
cv_.wait(lock, [&]() { return !write_buffer_.empty(); });
std::swap(read_buffer_, write_buffer_);
write_buffer_.clear();
count_ = 0;
Expand Down Expand Up @@ -114,7 +117,7 @@ boost::optional<client::ApiError> AsyncJsonStream::GetError() const {

bool AsyncJsonStream::IsClosed() const {
std::unique_lock<std::mutex> lock(mutex_);
return closed_;
return closed_ && (error_ || current_stream_->WriteEmpty());
}

} // namespace repository
Expand Down
10 changes: 8 additions & 2 deletions olp-cpp-sdk-dataservice-read/tests/PartitionsRepositoryTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2076,10 +2076,13 @@ TEST_F(PartitionsRepositoryTest, StreamPartitions) {

repository.StreamPartitions(async_stream, kVersion, additional_fields,
billing_tag, context);
EXPECT_TRUE(async_stream->IsClosed());
// Not closed since the stream is not empty
EXPECT_FALSE(async_stream->IsClosed());
EXPECT_FALSE(async_stream->GetError());
EXPECT_STREQ(ref_stream_data.c_str(),
get_stream_content(*async_stream).c_str());
// Now it's closed as we read all its content
EXPECT_TRUE(async_stream->IsClosed());

{
SCOPED_TRACE("Data with offset is in the stream");
Expand All @@ -2091,10 +2094,13 @@ TEST_F(PartitionsRepositoryTest, StreamPartitions) {
repository.StreamPartitions(second_stream, kVersion, additional_fields,
billing_tag, context);

EXPECT_TRUE(second_stream->IsClosed());
// Not closed since the stream is not empty
EXPECT_FALSE(second_stream->IsClosed());
EXPECT_FALSE(second_stream->GetError());
EXPECT_STREQ((initial_value + ref_stream_data).c_str(),
get_stream_content(*second_stream).c_str());
// Now it's closed as we read all its content
EXPECT_TRUE(second_stream->IsClosed());
}
}
}
Expand Down

0 comments on commit e3708f2

Please sign in to comment.