Skip to content

Commit

Permalink
add get_blocks_result_v1 to SHiP
Browse files Browse the repository at this point in the history
  • Loading branch information
linh2931 committed Mar 27, 2024
1 parent a2013f7 commit 80c3b22
Show file tree
Hide file tree
Showing 6 changed files with 290 additions and 125 deletions.
13 changes: 12 additions & 1 deletion libraries/state_history/abi.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,17 @@ extern const char* const state_history_plugin_abi = R"({
},
{
"name": "get_blocks_result_v0", "fields": [
{ "name": "head", "type": "block_position" },
{ "name": "last_irreversible", "type": "block_position" },
{ "name": "this_block", "type": "block_position?" },
{ "name": "prev_block", "type": "block_position?" },
{ "name": "block", "type": "bytes?" },
{ "name": "traces", "type": "bytes?" },
{ "name": "deltas", "type": "bytes?" }
]
},
{
"name": "get_blocks_result_v1", "fields": [
{ "name": "head", "type": "block_position" },
{ "name": "last_irreversible", "type": "block_position" },
{ "name": "this_block", "type": "block_position?" },
Expand Down Expand Up @@ -576,7 +587,7 @@ extern const char* const state_history_plugin_abi = R"({
],
"variants": [
{ "name": "request", "types": ["get_status_request_v0", "get_blocks_request_v0", "get_blocks_request_v1", "get_blocks_ack_request_v0"] },
{ "name": "result", "types": ["get_status_result_v0", "get_blocks_result_v0"] },
{ "name": "result", "types": ["get_status_result_v0", "get_blocks_result_v0", "get_blocks_result_v1"] },
{ "name": "action_receipt", "types": ["action_receipt_v0"] },
{ "name": "action_trace", "types": ["action_trace_v0", "action_trace_v1"] },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -709,6 +709,18 @@ datastream<ST>& operator<<(datastream<ST>& ds, const history_context_wrapper_sta

template <typename ST>
datastream<ST>& operator<<(datastream<ST>& ds, const eosio::state_history::get_blocks_result_v0& obj) {
fc::raw::pack(ds, obj.head);
fc::raw::pack(ds, obj.last_irreversible);
fc::raw::pack(ds, obj.this_block);
fc::raw::pack(ds, obj.prev_block);
history_pack_big_bytes(ds, obj.block);
history_pack_big_bytes(ds, obj.traces);
history_pack_big_bytes(ds, obj.deltas);
return ds;
}

template <typename ST>
datastream<ST>& operator<<(datastream<ST>& ds, const eosio::state_history::get_blocks_result_v1& obj) {
fc::raw::pack(ds, obj.head);
fc::raw::pack(ds, obj.last_irreversible);
fc::raw::pack(ds, obj.this_block);
Expand Down
9 changes: 7 additions & 2 deletions libraries/state_history/include/eosio/state_history/types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,12 +123,16 @@ struct get_blocks_result_base {
struct get_blocks_result_v0 : get_blocks_result_base {
std::optional<bytes> traces;
std::optional<bytes> deltas;
};

struct get_blocks_result_v1 : get_blocks_result_v0 {
std::optional<bytes> finality_data;
};

using state_request = std::variant<get_status_request_v0, get_blocks_request_v0, get_blocks_request_v1, get_blocks_ack_request_v0>;
using state_result = std::variant<get_status_result_v0, get_blocks_result_v0, get_blocks_result_v1>;
using get_blocks_request = std::variant<get_blocks_request_v0, get_blocks_request_v1>;
using state_result = std::variant<get_status_result_v0, get_blocks_result_v0>;
using get_blocks_result = std::variant<get_blocks_result_v0, get_blocks_result_v1>;

} // namespace state_history
} // namespace eosio
Expand All @@ -142,5 +146,6 @@ FC_REFLECT(eosio::state_history::get_blocks_request_v0, (start_block_num)(end_bl
FC_REFLECT_DERIVED(eosio::state_history::get_blocks_request_v1, (eosio::state_history::get_blocks_request_v0), (fetch_finality_data));
FC_REFLECT(eosio::state_history::get_blocks_ack_request_v0, (num_messages));
FC_REFLECT(eosio::state_history::get_blocks_result_base, (head)(last_irreversible)(this_block)(prev_block)(block));
FC_REFLECT_DERIVED(eosio::state_history::get_blocks_result_v0, (eosio::state_history::get_blocks_result_base), (traces)(deltas)(finality_data));
FC_REFLECT_DERIVED(eosio::state_history::get_blocks_result_v0, (eosio::state_history::get_blocks_result_base), (traces)(deltas));
FC_REFLECT_DERIVED(eosio::state_history::get_blocks_result_v1, (eosio::state_history::get_blocks_result_v0), (finality_data));
// clang-format on
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ class blocks_request_send_queue_entry : public send_queue_entry_base {
template <typename Session>
class blocks_result_send_queue_entry : public send_queue_entry_base, public std::enable_shared_from_this<blocks_result_send_queue_entry<Session>> {
std::shared_ptr<Session> session;
state_history::get_blocks_result_v0 r;
state_history::get_blocks_result result;
std::vector<char> data;
std::optional<locked_decompress_stream> stream;

Expand Down Expand Up @@ -264,46 +264,68 @@ class blocks_result_send_queue_entry : public send_queue_entry_base, public std:
});
}

// last to be sent
// last to be sent if result is get_blocks_result_v1
void send_finality_data() {
assert(std::holds_alternative<state_history::get_blocks_result_v1>(result));
stream.reset();
send_log(session->get_finality_data_log_entry(r, stream), true, [me=this->shared_from_this()]() {
send_log(session->get_finality_data_log_entry(std::get<state_history::get_blocks_result_v1>(result), stream), true, [me=this->shared_from_this()]() {
me->stream.reset();
me->session->session_mgr.pop_entry();
});
}

// second to be sent
// second to be sent if result is get_blocks_result_v1;
// last to be sent if result is get_blocks_result_v0
void send_deltas() {
stream.reset();
send_log(session->get_delta_log_entry(r, stream), false, [me=this->shared_from_this()]() {
me->send_finality_data();
});
std::visit(chain::overloaded{
[&](state_history::get_blocks_result_v0& r) {
send_log(session->get_delta_log_entry(r, stream), true, [me=this->shared_from_this()]() {
me->stream.reset();
me->session->session_mgr.pop_entry();}); },
[&](state_history::get_blocks_result_v1& r) {
send_log(session->get_delta_log_entry(r, stream), false, [me=this->shared_from_this()]() {
me->send_finality_data(); }); }},
result);
}

// first to be sent
void send_traces() {
stream.reset();
send_log(session->get_trace_log_entry(r, stream), false, [me=this->shared_from_this()]() {
send_log(session->get_trace_log_entry(result, stream), false, [me=this->shared_from_this()]() {
me->send_deltas();
});
}

public:
blocks_result_send_queue_entry(std::shared_ptr<Session> s, state_history::get_blocks_result_v0&& r)
blocks_result_send_queue_entry(std::shared_ptr<Session> s, state_history::get_blocks_result&& r)
: session(std::move(s)),
r(std::move(r)) {}
result(std::move(r)) {}

void send_entry() override {
assert(std::holds_alternative<state_history::get_blocks_result_v0>(result) ||
std::holds_alternative<state_history::get_blocks_result_v1>(result));

// pack the state_result{get_blocks_result} excluding the fields `traces` and `deltas`
fc::datastream<size_t> ss;
fc::raw::pack(ss, fc::unsigned_int(1)); // pack the variant index of state_result{r}
fc::raw::pack(ss, static_cast<const state_history::get_blocks_result_base&>(r));
if(std::holds_alternative<state_history::get_blocks_result_v0>(result)) {
fc::raw::pack(ss, fc::unsigned_int(1)); // pack the variant index of state_result{r}, 1 for get_blocks_result_v0
} else {
fc::raw::pack(ss, fc::unsigned_int(2)); // pack the variant index of state_result{r}, 2 for get_blocks_result_v1
}
std::visit([&](auto& r) {
fc::raw::pack(ss, static_cast<const state_history::get_blocks_result_base&>(r)); },
result);
data.resize(ss.tellp());
fc::datastream<char*> ds(data.data(), data.size());
fc::raw::pack(ds, fc::unsigned_int(1)); // pack the variant index of state_result{r}
fc::raw::pack(ds, static_cast<const state_history::get_blocks_result_base&>(r));

if(std::holds_alternative<state_history::get_blocks_result_v0>(result)) {
fc::raw::pack(ds, fc::unsigned_int(1)); // pack the variant index of state_result{r}, 1 for get_blocks_result_v0
} else {
fc::raw::pack(ds, fc::unsigned_int(2)); // pack the variant index of state_result{r}, 2 for get_blocks_result_v1
}
std::visit([&](auto& r) {
fc::raw::pack(ds, static_cast<const state_history::get_blocks_result_base&>(r)); },
result);
async_send(false, data, [me=this->shared_from_this()]() {
me->send_traces();
});
Expand Down Expand Up @@ -394,31 +416,31 @@ struct session : session_base, std::enable_shared_from_this<session<Plugin, Sock
}
}

uint64_t get_log_entry_impl(const eosio::state_history::get_blocks_result_v0& result,
uint64_t get_log_entry_impl(const eosio::state_history::get_blocks_result& result,
bool has_value,
std::optional<state_history_log>& optional_log,
std::optional<locked_decompress_stream>& buf) {
if (has_value) {
if( optional_log ) {
buf.emplace( optional_log->create_locked_decompress_stream() );
return optional_log->get_unpacked_entry( result.this_block->block_num, *buf );
return std::visit([&](auto& r) { return optional_log->get_unpacked_entry( r.this_block->block_num, *buf ); }, result);
}
}
return 0;
}

uint64_t get_trace_log_entry(const eosio::state_history::get_blocks_result_v0& result,
uint64_t get_trace_log_entry(const eosio::state_history::get_blocks_result& result,
std::optional<locked_decompress_stream>& buf) {
return get_log_entry_impl(result, result.traces.has_value(), plugin.get_trace_log(), buf);
return std::visit([&](auto& r) { return get_log_entry_impl(r, r.traces.has_value(), plugin.get_trace_log(), buf); }, result);
}

uint64_t get_delta_log_entry(const eosio::state_history::get_blocks_result_v0& result,
uint64_t get_delta_log_entry(const eosio::state_history::get_blocks_result& result,
std::optional<locked_decompress_stream>& buf) {
return get_log_entry_impl(result, result.deltas.has_value(), plugin.get_chain_state_log(), buf);
return std::visit([&](auto& r) { return get_log_entry_impl(r, r.deltas.has_value(), plugin.get_chain_state_log(), buf); }, result);
}

uint64_t get_finality_data_log_entry(const eosio::state_history::get_blocks_result_v0& result,
std::optional<locked_decompress_stream>& buf) {
uint64_t get_finality_data_log_entry(const eosio::state_history::get_blocks_result_v1& result,
std::optional<locked_decompress_stream>& buf) {
return get_log_entry_impl(result, result.finality_data.has_value(), plugin.get_finality_data_log(), buf);
}

Expand Down Expand Up @@ -515,7 +537,8 @@ struct session : session_base, std::enable_shared_from_this<session<Plugin, Sock
current_request = std::move(req);
}

void send_update(state_history::get_blocks_request_v0& request, bool fetch_finality_data, state_history::get_blocks_result_v0 result, const chain::signed_block_ptr& block, const chain::block_id_type& id) {
template<typename T> // get_blocks_result_v0 or get_blocks_result_v1
void send_update(state_history::get_blocks_request_v0& request, bool fetch_finality_data, T result, const chain::signed_block_ptr& block, const chain::block_id_type& id) {
need_to_send_update = true;

result.last_irreversible = plugin.get_last_irreversible();
Expand Down Expand Up @@ -565,8 +588,10 @@ struct session : session_base, std::enable_shared_from_this<session<Plugin, Sock
result.traces.emplace();
if (request.fetch_deltas && plugin.get_chain_state_log())
result.deltas.emplace();
if (fetch_finality_data && plugin.get_finality_data_log()) {
result.finality_data.emplace(); // create finality_data (it's an optional field)
if constexpr (std::is_same_v<T, state_history::get_blocks_result_v1>) {
if (fetch_finality_data && plugin.get_finality_data_log()) {
result.finality_data.emplace();
}
}
}
++to_send_block_num;
Expand Down Expand Up @@ -601,7 +626,7 @@ struct session : session_base, std::enable_shared_from_this<session<Plugin, Sock
return !max_messages_in_flight;
}

void send_update(state_history::get_blocks_result_v0 result, const chain::signed_block_ptr& block, const chain::block_id_type& id) {
void send_update(const state_history::block_position& head, const chain::signed_block_ptr& block, const chain::block_id_type& id) {
if (no_request_or_not_max_messages_in_flight()) {
session_mgr.pop_entry(false);
return;
Expand All @@ -613,9 +638,13 @@ struct session : session_base, std::enable_shared_from_this<session<Plugin, Sock

std::visit(eosio::chain::overloaded{
[&](eosio::state_history::get_blocks_request_v0& request) {
send_update(request, false, result, block, id); },
state_history::get_blocks_result_v0 result;
result.head = head;
send_update(request, false, std::move(result), block, id); },
[&](eosio::state_history::get_blocks_request_v1& request) {
send_update(request, request.fetch_finality_data, result, block, id); } },
state_history::get_blocks_result_v1 result;
result.head = head;
send_update(request, request.fetch_finality_data, std::move(result), block, id); } },
*current_request);
}

Expand All @@ -626,17 +655,13 @@ struct session : session_base, std::enable_shared_from_this<session<Plugin, Sock
}

auto block_num = block->block_num();
state_history::get_blocks_result_v0 result;
result.head = {block_num, id};
to_send_block_num = std::min(block_num, to_send_block_num);
send_update(std::move(result), block, id);
send_update(state_history::block_position{block_num, id}, block, id);
}

void send_update(bool changed) override {
if (changed || need_to_send_update) {
state_history::get_blocks_result_v0 result;
result.head = plugin.get_block_head();
send_update(std::move(result), nullptr, chain::block_id_type{});
send_update(plugin.get_block_head(), nullptr, chain::block_id_type{});
} else {
session_mgr.pop_entry(false);
}
Expand Down
Loading

0 comments on commit 80c3b22

Please sign in to comment.