Skip to content

Commit

Permalink
RS-418: Implement record removing API (#74)
Browse files Browse the repository at this point in the history
* add Bucket.RemoveRecord

* add Bucket::RemoveBatch

* add Bucket::RemoveQuery

* update CHANGELOG
  • Loading branch information
atimin authored Sep 10, 2024
1 parent e8f0ac0 commit 6a498ba
Show file tree
Hide file tree
Showing 7 changed files with 266 additions and 93 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ jobs:
- reductstore_version: "main"
exclude_api_version_tag: ""
- reductstore_version: "latest"
exclude_api_version_tag: "~[1_11]"
exclude_api_version_tag: "~[1_12]"
- license_file: ""
exclude_license_tag: "~[license]"

Expand Down
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,15 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added

- RS-418: `IBucket::RemoveRecord`, `IBucket::RemoveRecordBatch`, `IBucket::RemoveQuery` methods for removing records, [PR-74](https://github.com/reductstore/reduct-cpp/pull/74)

## [1.11.0] - 2022-08-19

### Added

- RS-31: `Bucket::Update` and `Bucket::UpdateBatch` methods for changing labels, [PR-72](https://github.com/reductstore/reduct-cpp/pull/72)
- RS-31: `IBucket::Update` and `IBucket::UpdateBatch` methods for changing labels, [PR-72](https://github.com/reductstore/reduct-cpp/pull/72)

### Infrastructure

Expand Down
174 changes: 111 additions & 63 deletions src/reduct/bucket.cc
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,10 @@ class Bucket : public IBucket {
return client_->Delete(fmt::format("{}/{}", path_, entry_name));
}

Error RemoveRecord(std::string_view entry_name, Time timestamp) const noexcept override {
return client_->Delete(fmt::format("{}/{}?ts={}", path_, entry_name, ToMicroseconds(timestamp)));
}

Error Write(std::string_view entry_name, std::optional<Time> ts,
WriteRecordCallback callback) const noexcept override {
return Write(entry_name, {.timestamp = ts}, std::move(callback));
Expand All @@ -142,14 +146,16 @@ class Bucket : public IBucket {
std::move(headers), std::move(record.callback_));
}

Result<WriteBatchErrors> WriteBatch(std::string_view entry_name,
WriteBatchCallback callback) const noexcept override {
return WriteOrUpdateBatch(entry_name, std::move(callback), true);
Result<BatchErrors> WriteBatch(std::string_view entry_name, BatchCallback callback) const noexcept override {
return ProcessBatch(entry_name, std::move(callback), BatchType::kWrite);
}

Result<BatchErrors> UpdateBatch(std::string_view entry_name, BatchCallback callback) const noexcept override {
return ProcessBatch(entry_name, std::move(callback), BatchType::kUpdate);
}

Result<WriteBatchErrors> UpdateBatch(std::string_view entry_name,
WriteBatchCallback callback) const noexcept override {
return WriteOrUpdateBatch(entry_name, std::move(callback), false);
Result<BatchErrors> RemoveBatch(std::string_view entry_name, BatchCallback callback) const noexcept override {
return ProcessBatch(entry_name, std::move(callback), BatchType::kRemove);
}

Error Update(std::string_view entry_name, const WriteOptions& options) const noexcept override {
Expand Down Expand Up @@ -184,44 +190,7 @@ class Bucket : public IBucket {

Error Query(std::string_view entry_name, std::optional<Time> start, std::optional<Time> stop, QueryOptions options,
ReadRecordCallback callback) const noexcept override {
auto url = fmt::format("{}/{}/q?", path_, entry_name);

if (start) {
url += fmt::format("start={}&", ToMicroseconds(*start));
}

if (stop) {
url += fmt::format("stop={}&", ToMicroseconds(*stop));
}

for (const auto& [key, value] : options.include) {
url += fmt::format("&include-{}={}", key, value);
}

for (const auto& [key, value] : options.exclude) {
url += fmt::format("&exclude-{}={}", key, value);
}

if (options.each_s) {
url += fmt::format("each_s={}&", *options.each_s);
}

if (options.each_n) {
url += fmt::format("each_n={}&", *options.each_n);
}

if (options.limit) {
url += fmt::format("limit={}&", *options.limit);
}

if (options.ttl) {
url += fmt::format("ttl={}&", options.ttl->count() / 1000);
}

if (options.continuous) {
url += "continuous=true&";
}

std::string url = BuildQueryUrl(start, stop, entry_name, options);
auto [body, err] = client_->Get(url);
if (err) {
return err;
Expand Down Expand Up @@ -261,7 +230,65 @@ class Bucket : public IBucket {
return Error::kOk;
}

Result<uint64_t> RemoveQuery(std::string_view entry_name, std::optional<Time> start, std::optional<Time> stop,
QueryOptions options) const noexcept override {
std::string url = BuildQueryUrl(start, stop, entry_name, options);
auto [resp, err] = client_->Delete(url);
if (err) {
return {0, std::move(err)};
}

try {
auto data = nlohmann::json::parse(std::get<0>(resp));
return {data.at("removed_records"), Error::kOk};
} catch (const std::exception& ex) {
return {0, Error{.code = -1, .message = ex.what()}};
}
}

private:
std::string BuildQueryUrl(const std::optional<Time>& start, const std::optional<Time>& stop,
std::string_view entry_name, const QueryOptions& options) const {
auto url = fmt::v11::format("{}/{}/q?", path_, entry_name);
if (start) {
url += fmt::format("start={}&", ToMicroseconds(*start));
}

if (stop) {
url += fmt::format("stop={}&", ToMicroseconds(*stop));
}

for (const auto& [key, value] : options.include) {
url += fmt::format("&include-{}={}", key, value);
}

for (const auto& [key, value] : options.exclude) {
url += fmt::format("&exclude-{}={}", key, value);
}

if (options.each_s) {
url += fmt::format("each_s={}&", *options.each_s);
}

if (options.each_n) {
url += fmt::format("each_n={}&", *options.each_n);
}

if (options.limit) {
url += fmt::format("limit={}&", *options.limit);
}

if (options.ttl) {
url += fmt::format("ttl={}&", options.ttl->count() / 1000);
}

if (options.continuous) {
url += "continuous=true&";
}

return url;
}

Result<bool> ReadRecord(std::string&& path, bool batched, bool head,
const ReadRecordCallback& callback) const noexcept {
std::deque<std::optional<std::string>> data;
Expand Down Expand Up @@ -503,8 +530,10 @@ class Bucket : public IBucket {
return headers;
}

Result<WriteBatchErrors> WriteOrUpdateBatch(std::string_view entry_name, WriteBatchCallback callback,
bool write) const noexcept {
enum class BatchType { kWrite, kUpdate, kRemove };

Result<WriteBatchErrors> ProcessBatch(std::string_view entry_name, BatchCallback callback,
BatchType type) const noexcept {
Batch batch;
callback(&batch);

Expand All @@ -521,33 +550,52 @@ class Bucket : public IBucket {

const auto key = fmt::format("x-reduct-time-{}", ToMicroseconds(time));

if (write) {
const auto value = fmt::format("{},{},{}", record.size, record.content_type, fmt::join(labels, ","));
headers.emplace(key, value);
} else {
const auto value = fmt::format("0,,{}", fmt::join(labels, ","));
headers.emplace(key, value);
switch (type) {
case BatchType::kWrite: {
const auto value = fmt::format("{},{},{}", record.size, record.content_type, fmt::join(labels, ","));
headers.emplace(key, value);

break;
}
case BatchType::kUpdate: {
const auto value = fmt::format("0,,{}", fmt::join(labels, ","));
headers.emplace(key, value);
break;
}
case BatchType::kRemove: {
headers.emplace(key, "0,");
break;
}
}
}

Result<IHttpClient::Headers> resp;
if (write) {
const auto content_length = batch.body().size();
resp = client_->Post(fmt::format("{}/{}/batch", path_, entry_name), "application/octet-stream", content_length,
std::move(headers), [batch = std::move(batch)](size_t offset, size_t size) {
return std::pair{batch.body().size() <= offset + size, batch.body().substr(offset, size)};
});
} else {
resp = client_->Patch(fmt::format("{}/{}/batch", path_, entry_name), "", std::move(headers));
Result<std::tuple<std::string, IHttpClient::Headers>> resp_result;
switch (type) {
case BatchType::kWrite: {
const auto content_length = batch.body().size();
resp_result =
client_->Post(fmt::format("{}/{}/batch", path_, entry_name), "application/octet-stream", content_length,
std::move(headers), [batch = std::move(batch)](size_t offset, size_t size) {
return std::pair{batch.body().size() <= offset + size, batch.body().substr(offset, size)};
});
break;
}
case BatchType::kUpdate:
resp_result = client_->Patch(fmt::format("{}/{}/batch", path_, entry_name), "", std::move(headers));
break;

case BatchType::kRemove:
resp_result = client_->Delete(fmt::format("{}/{}/batch", path_, entry_name), std::move(headers));
break;
}

auto [resp_headers, err] = resp;
auto [resp, err] = resp_result;
if (err) {
return {{}, err};
}

WriteBatchErrors errors;
for (const auto& [key, value] : resp_headers) {
for (const auto& [key, value] : std::get<1>(resp)) {
if (key.starts_with("x-reduct-error-")) {
auto pos = value.find(',');
if (pos == std::string::npos) {
Expand Down
66 changes: 52 additions & 14 deletions src/reduct/bucket.h
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,12 @@ class IBucket {
body_ += data;
}

/**
* Add an empty record to batch (use for removing)
* @param timestamp
*/
void AddRecord(Time timestamp) { records_[timestamp] = Record{timestamp, 0, "", {}}; }

void AddOnlyLabels(Time timestamp, LabelMap labels) {
records_[timestamp] = Record{timestamp, 0, "", std::move(labels)};
}
Expand All @@ -192,9 +198,17 @@ class IBucket {
std::string body_;
};

/**
* @deprecated Use BatchErrors instead
*/
using WriteBatchCallback = std::function<void(Batch*)>;
using WriteBatchErrors = std::map<Time, Error>;
using BatchCallback = std::function<void(Batch*)>;

/**
* @deprecated Use BatchErrors instead
*/
using WriteBatchErrors = std::map<Time, Error>;
using BatchErrors = std::map<Time, Error>;

/**
* Read a record in chunks
Expand Down Expand Up @@ -250,9 +264,8 @@ class IBucket {
* @param callback a callback to add records to batch
* @return HTTP error or map of errors for each record
*/
[[nodiscard]] virtual Result<WriteBatchErrors> WriteBatch(std::string_view entry_name,
WriteBatchCallback callback) const noexcept = 0;

[[nodiscard]] virtual Result<BatchErrors> WriteBatch(std::string_view entry_name,
BatchCallback callback) const noexcept = 0;

/**
* Write labels of an existing record
Expand All @@ -263,8 +276,7 @@ class IBucket {
* @param options options with timestamp, labels (content type is ignored)
* @return HTTP or communication error
*/
virtual Error Update(std::string_view entry_name,
const WriteOptions& options) const noexcept = 0;
virtual Error Update(std::string_view entry_name, const WriteOptions& options) const noexcept = 0;

/**
* Update labels of an existing record in a batch
Expand All @@ -273,19 +285,18 @@ class IBucket {
* @param callback a callback to add records to batch
* @return HTTP error or map of errors for each record
*/
[[nodiscard]] virtual Result<WriteBatchErrors> UpdateBatch(std::string_view entry_name,
WriteBatchCallback callback) const noexcept = 0;

[[nodiscard]] virtual Result<BatchErrors> UpdateBatch(std::string_view entry_name,
BatchCallback callback) const noexcept = 0;

/**
* Query options
*/
struct QueryOptions {
LabelMap include; ///< include labels
LabelMap exclude; ///< exclude labels
std::optional<double> each_s; ///< return one record each S seconds
std::optional<size_t> each_n; ///< return each N-th record
std::optional<size_t> limit; ///< limit number of records
LabelMap include; ///< include labels
LabelMap exclude; ///< exclude labels
std::optional<double> each_s; ///< return one record each S seconds
std::optional<size_t> each_n; ///< return each N-th record
std::optional<size_t> limit; ///< limit number of records

std::optional<std::chrono::milliseconds> ttl; ///< time to live

Expand Down Expand Up @@ -344,6 +355,33 @@ class IBucket {
*/
virtual Error RemoveEntry(std::string_view entry_name) const noexcept = 0;

/**
* @brief Remove a record from the entry
* @param entry_name
* @param timestamp
* @return HTTP or communication error
*/
virtual Error RemoveRecord(std::string_view entry_name, Time timestamp) const noexcept = 0;

/**
* @brief Remove a batch of records from the entry
* @param entry_name
* @param callback a callback to add records to batch
* @return HTTP error or map of errors for each record
*/
virtual Result<BatchErrors> RemoveBatch(std::string_view entry_name, BatchCallback callback) const noexcept = 0;

/**
* @brief Remove revision of an entry by query
* @param entry_name
* @param start start time point
* @param stop stop time point
* @param options query options. TTL, continuous, poll_interval, head_only are ignored
* @return HTTP error or number of removed records
*/
virtual Result<uint64_t> RemoveQuery(std::string_view entry_name, std::optional<Time> start, std::optional<Time> stop,
QueryOptions options) const noexcept = 0;

/**
* @brief Creates a new bucket
* @param server_url HTTP url
Expand Down
Loading

0 comments on commit 6a498ba

Please sign in to comment.