Skip to content

Commit

Permalink
Add Compressor interface
Browse files Browse the repository at this point in the history
  • Loading branch information
lucagiac81 committed Jun 26, 2023
1 parent 3ee5632 commit 241d920
Show file tree
Hide file tree
Showing 88 changed files with 3,274 additions and 1,953 deletions.
2 changes: 2 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -906,6 +906,7 @@ set(SOURCES
util/comparator.cc
util/compression.cc
util/compression_context_cache.cc
util/compressor.cc
util/concurrent_task_limiter_impl.cc
util/crc32c.cc
util/data_structure.cc
Expand Down Expand Up @@ -1513,6 +1514,7 @@ if(WITH_TESTS)
util/autovector_test.cc
util/bloom_test.cc
util/coding_test.cc
util/compression_test.cc
util/crc32c_test.cc
util/defer_test.cc
util/dynamic_bloom_test.cc
Expand Down
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -1282,6 +1282,9 @@ cache_test: $(OBJ_DIR)/cache/cache_test.o $(TEST_LIBRARY) $(LIBRARY)
coding_test: $(OBJ_DIR)/util/coding_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)

compression_test: $(OBJ_DIR)/util/compression_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)

hash_test: $(OBJ_DIR)/util/hash_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)

Expand Down
8 changes: 8 additions & 0 deletions TARGETS
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@ cpp_library_wrapper(name="rocksdb_lib", srcs=[
"util/comparator.cc",
"util/compression.cc",
"util/compression_context_cache.cc",
"util/compressor.cc",
"util/concurrent_task_limiter_impl.cc",
"util/crc32c.cc",
"util/crc32c_arm64.cc",
Expand Down Expand Up @@ -597,6 +598,7 @@ cpp_library_wrapper(name="rocksdb_whole_archive_lib", srcs=[
"util/comparator.cc",
"util/compression.cc",
"util/compression_context_cache.cc",
"util/compressor.cc",
"util/concurrent_task_limiter_impl.cc",
"util/crc32c.cc",
"util/crc32c_arm64.cc",
Expand Down Expand Up @@ -5009,6 +5011,12 @@ cpp_unittest_wrapper(name="compressed_secondary_cache_test",
extra_compiler_flags=[])


cpp_unittest_wrapper(name="compression_test",
srcs=["util/compression_test.cc"],
deps=[":rocksdb_test_lib"],
extra_compiler_flags=[])


cpp_unittest_wrapper(name="configurable_test",
srcs=["options/configurable_test.cc"],
deps=[":rocksdb_test_lib"],
Expand Down
28 changes: 14 additions & 14 deletions cache/compressed_secondary_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -77,15 +77,16 @@ std::unique_ptr<SecondaryCacheResultHandle> CompressedSecondaryCache::Lookup(
s = helper->create_cb(Slice(ptr->get(), handle_value_charge),
create_context, allocator, &value, &charge);
} else {
UncompressionContext uncompression_context(cache_options_.compression_type);
UncompressionInfo uncompression_info(uncompression_context,
UncompressionDict::GetEmptyDict(),
cache_options_.compression_type);
auto compressor =
BuiltinCompressor::GetCompressor(cache_options_.compression_type);
UncompressionInfo uncompression_info(UncompressionDict::GetEmptyDict(),
cache_options_.compress_format_version,
allocator);

size_t uncompressed_size{0};
CacheAllocationPtr uncompressed = UncompressData(
uncompression_info, (char*)ptr->get(), handle_value_charge,
&uncompressed_size, cache_options_.compress_format_version, allocator);
CacheAllocationPtr uncompressed = uncompression_info.UncompressData(
compressor.get(), (char*)ptr->get(), handle_value_charge,
&uncompressed_size);

if (!uncompressed) {
cache_->Release(lru_handle, /*erase_if_last_ref=*/true);
Expand Down Expand Up @@ -148,16 +149,15 @@ Status CompressedSecondaryCache::Insert(const Slice& key,
if (cache_options_.compression_type != kNoCompression &&
!cache_options_.do_not_compress_roles.Contains(helper->role)) {
PERF_COUNTER_ADD(compressed_sec_cache_uncompressed_bytes, size);
CompressionOptions compression_opts;
CompressionContext compression_context(cache_options_.compression_type);
auto compressor =
BuiltinCompressor::GetCompressor(cache_options_.compression_type);
uint64_t sample_for_compression{0};
CompressionInfo compression_info(
compression_opts, compression_context, CompressionDict::GetEmptyDict(),
cache_options_.compression_type, sample_for_compression);
CompressionInfo compression_info(CompressionDict::GetEmptyDict(),
cache_options_.compress_format_version,
sample_for_compression);

bool success =
CompressData(val, compression_info,
cache_options_.compress_format_version, &compressed_val);
compression_info.CompressData(compressor.get(), val, &compressed_val);

if (!success) {
return Status::Corruption("Error compressing value.");
Expand Down
1 change: 1 addition & 0 deletions db/arena_wrapped_db_iter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "rocksdb/options.h"
#include "table/internal_iterator.h"
#include "table/iterator_wrapper.h"
#include "util/string_util.h"
#include "util/user_comparator_wrapper.h"

namespace ROCKSDB_NAMESPACE {
Expand Down
21 changes: 7 additions & 14 deletions db/blob/blob_file_builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ BlobFileBuilder::BlobFileBuilder(
immutable_options_(immutable_options),
min_blob_size_(mutable_cf_options->min_blob_size),
blob_file_size_(mutable_cf_options->blob_file_size),
blob_compression_type_(mutable_cf_options->blob_compression_type),
blob_compressor_(mutable_cf_options->blob_compressor),
prepopulate_blob_cache_(mutable_cf_options->prepopulate_blob_cache),
file_options_(file_options),
db_id_(std::move(db_id)),
Expand Down Expand Up @@ -150,7 +150,7 @@ Status BlobFileBuilder::Add(const Slice& key, const Slice& value,
}

BlobIndex::EncodeBlob(blob_index, blob_file_number, blob_offset, blob.size(),
blob_compression_type_);
blob_compressor_->GetCompressionType());

return Status::OK();
}
Expand Down Expand Up @@ -227,7 +227,8 @@ Status BlobFileBuilder::OpenBlobFileIfNeeded() {
constexpr bool has_ttl = false;
constexpr ExpirationRange expiration_range;

BlobLogHeader header(column_family_id_, blob_compression_type_, has_ttl,
BlobLogHeader header(column_family_id_,
blob_compressor_->GetCompressionType(), has_ttl,
expiration_range);

{
Expand Down Expand Up @@ -255,26 +256,18 @@ Status BlobFileBuilder::CompressBlobIfNeeded(
assert(compressed_blob->empty());
assert(immutable_options_);

if (blob_compression_type_ == kNoCompression) {
if (blob_compressor_->GetCompressionType() == kNoCompression) {
return Status::OK();
}

CompressionOptions opts;
CompressionContext context(blob_compression_type_);
constexpr uint64_t sample_for_compression = 0;

CompressionInfo info(opts, context, CompressionDict::GetEmptyDict(),
blob_compression_type_, sample_for_compression);

constexpr uint32_t compression_format_version = 2;
CompressionInfo info;

bool success = false;

{
StopWatch stop_watch(immutable_options_->clock, immutable_options_->stats,
BLOB_DB_COMPRESSION_MICROS);
success =
CompressData(*blob, info, compression_format_version, compressed_blob);
success = info.CompressData(blob_compressor_.get(), *blob, compressed_blob);
}

if (!success) {
Expand Down
3 changes: 2 additions & 1 deletion db/blob/blob_file_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "rocksdb/env.h"
#include "rocksdb/rocksdb_namespace.h"
#include "rocksdb/types.h"
#include "util/compressor.h"

namespace ROCKSDB_NAMESPACE {

Expand Down Expand Up @@ -89,7 +90,7 @@ class BlobFileBuilder {
const ImmutableOptions* immutable_options_;
uint64_t min_blob_size_;
uint64_t blob_file_size_;
CompressionType blob_compression_type_;
std::shared_ptr<Compressor> blob_compressor_;
PrepopulateBlobCache prepopulate_blob_cache_;
const FileOptions* file_options_;
const std::string db_id_;
Expand Down
11 changes: 4 additions & 7 deletions db/blob/blob_file_builder_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -406,15 +406,12 @@ TEST_F(BlobFileBuilderTest, Compression) {
ASSERT_EQ(blob_file_addition.GetTotalBlobCount(), 1);

CompressionOptions opts;
CompressionContext context(kSnappyCompression);
constexpr uint64_t sample_for_compression = 0;

CompressionInfo info(opts, context, CompressionDict::GetEmptyDict(),
kSnappyCompression, sample_for_compression);
auto compressor = BuiltinCompressor::GetCompressor(kSnappyCompression);
ASSERT_NE(compressor, nullptr);

std::string compressed_value;
ASSERT_TRUE(Snappy_Compress(info, uncompressed_value.data(),
uncompressed_value.size(), &compressed_value));
ASSERT_OK(compressor->Compress(CompressionInfo(), uncompressed_value,
&compressed_value));

ASSERT_EQ(blob_file_addition.GetTotalBlobBytes(),
BlobLogRecord::kHeaderSize + key_size + compressed_value.size());
Expand Down
42 changes: 20 additions & 22 deletions db/blob/blob_file_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,11 @@ Status BlobFileReader::Create(

Statistics* const statistics = immutable_options.stats;

CompressionType compression_type = kNoCompression;
std::shared_ptr<Compressor> compressor;

{
const Status s = ReadHeader(file_reader.get(), column_family_id, statistics,
&compression_type);
&compressor);
if (!s.ok()) {
return s;
}
Expand All @@ -67,7 +67,7 @@ Status BlobFileReader::Create(
}

blob_file_reader->reset(
new BlobFileReader(std::move(file_reader), file_size, compression_type,
new BlobFileReader(std::move(file_reader), file_size, compressor,
immutable_options.clock, statistics));

return Status::OK();
Expand Down Expand Up @@ -136,9 +136,9 @@ Status BlobFileReader::OpenFile(
Status BlobFileReader::ReadHeader(const RandomAccessFileReader* file_reader,
uint32_t column_family_id,
Statistics* statistics,
CompressionType* compression_type) {
std::shared_ptr<Compressor>* compressor) {
assert(file_reader);
assert(compression_type);
assert(compressor);

Slice header_slice;
Buffer buf;
Expand Down Expand Up @@ -181,7 +181,7 @@ Status BlobFileReader::ReadHeader(const RandomAccessFileReader* file_reader,
return Status::Corruption("Column family ID mismatch");
}

*compression_type = header.compression;
*compressor = BuiltinCompressor::GetCompressor(header.compression);

return Status::OK();
}
Expand Down Expand Up @@ -272,11 +272,11 @@ Status BlobFileReader::ReadFromFile(const RandomAccessFileReader* file_reader,

BlobFileReader::BlobFileReader(
std::unique_ptr<RandomAccessFileReader>&& file_reader, uint64_t file_size,
CompressionType compression_type, SystemClock* clock,
const std::shared_ptr<Compressor>& compressor, SystemClock* clock,
Statistics* statistics)
: file_reader_(std::move(file_reader)),
file_size_(file_size),
compression_type_(compression_type),
compressor_(compressor),
clock_(clock),
statistics_(statistics) {
assert(file_reader_);
Expand All @@ -286,7 +286,7 @@ BlobFileReader::~BlobFileReader() = default;

Status BlobFileReader::GetBlob(
const ReadOptions& read_options, const Slice& user_key, uint64_t offset,
uint64_t value_size, CompressionType compression_type,
uint64_t value_size, const std::shared_ptr<Compressor>& compressor,
FilePrefetchBuffer* prefetch_buffer, MemoryAllocator* allocator,
std::unique_ptr<BlobContents>* result, uint64_t* bytes_read) const {
assert(result);
Expand All @@ -297,7 +297,7 @@ Status BlobFileReader::GetBlob(
return Status::Corruption("Invalid blob offset");
}

if (compression_type != compression_type_) {
if (compressor->GetCompressionType() != compressor_->GetCompressionType()) {
return Status::Corruption("Compression type mismatch when reading blob");
}

Expand Down Expand Up @@ -361,7 +361,7 @@ Status BlobFileReader::GetBlob(

{
const Status s = UncompressBlobIfNeeded(
value_slice, compression_type, allocator, clock_, statistics_, result);
value_slice, compressor.get(), allocator, clock_, statistics_, result);
if (!s.ok()) {
return s;
}
Expand Down Expand Up @@ -407,7 +407,8 @@ void BlobFileReader::MultiGetBlob(
*req->status = Status::Corruption("Invalid blob offset");
continue;
}
if (req->compression != compression_type_) {
if (req->compressor->GetCompressionType() !=
compressor_->GetCompressionType()) {
*req->status =
Status::Corruption("Compression type mismatch when reading a blob");
continue;
Expand Down Expand Up @@ -506,7 +507,7 @@ void BlobFileReader::MultiGetBlob(
// Uncompress blob if needed
Slice value_slice(record_slice.data() + adjustments[i], req->len);
*req->status =
UncompressBlobIfNeeded(value_slice, compression_type_, allocator,
UncompressBlobIfNeeded(value_slice, compressor_.get(), allocator,
clock_, statistics_, &blob_reqs[i].second);
if (req->status->ok()) {
total_bytes += record_slice.size();
Expand Down Expand Up @@ -563,31 +564,28 @@ Status BlobFileReader::VerifyBlob(const Slice& record_slice,
}

Status BlobFileReader::UncompressBlobIfNeeded(
const Slice& value_slice, CompressionType compression_type,
const Slice& value_slice, Compressor* compressor,
MemoryAllocator* allocator, SystemClock* clock, Statistics* statistics,
std::unique_ptr<BlobContents>* result) {
assert(compressor);
assert(result);

if (compression_type == kNoCompression) {
if (compressor->GetCompressionType() == kNoCompression) {
BlobContentsCreator::Create(result, nullptr, value_slice, allocator);
return Status::OK();
}

UncompressionContext context(compression_type);
UncompressionInfo info(context, UncompressionDict::GetEmptyDict(),
compression_type);
UncompressionInfo info;

size_t uncompressed_size = 0;
constexpr uint32_t compression_format_version = 2;

CacheAllocationPtr output;

{
PERF_TIMER_GUARD(blob_decompress_time);
StopWatch stop_watch(clock, statistics, BLOB_DB_DECOMPRESSION_MICROS);
output = UncompressData(info, value_slice.data(), value_slice.size(),
&uncompressed_size, compression_format_version,
allocator);
output = info.UncompressData(compressor, value_slice.data(),
value_slice.size(), &uncompressed_size);
}

TEST_SYNC_POINT_CALLBACK(
Expand Down
Loading

0 comments on commit 241d920

Please sign in to comment.