Skip to content
This repository has been archived by the owner on Apr 17, 2019. It is now read-only.

Commit

Permalink
MovedBatchPtr
Browse files Browse the repository at this point in the history
Signed-off-by: Mikhail Boldyrev <miboldyrev@gmail.com>
  • Loading branch information
MBoldyrev committed Apr 10, 2019
1 parent 3db6a12 commit 9b98ea7
Show file tree
Hide file tree
Showing 19 changed files with 218 additions and 142 deletions.
3 changes: 2 additions & 1 deletion irohad/multi_sig_transactions/impl/mst_processor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ namespace iroha {
return this->onStateUpdateImpl();
}

rxcpp::observable<DataType> MstProcessor::onPreparedBatches() const {
rxcpp::observable<std::shared_ptr<MovedBatchPtr>>
MstProcessor::onPreparedBatches() const {
return this->onPreparedBatchesImpl();
}

Expand Down
19 changes: 9 additions & 10 deletions irohad/multi_sig_transactions/impl/mst_processor_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ namespace iroha {
auto FairMstProcessor::propagateBatchImpl(const iroha::DataType &batch)
-> decltype(propagateBatch(batch)) {
auto state_update = storage_->updateOwnState(batch);
completedBatchesNotify(*state_update.completed_state_);
completedBatchesNotify(state_update.completed_state_);
updatedBatchesNotify(*state_update.updated_state_);
expiredBatchesNotify(
storage_->extractExpiredTransactions(time_provider_->getCurrentTime()));
Expand All @@ -57,14 +57,13 @@ namespace iroha {
}

// TODO [IR-1687] Akvinikym 10.09.18: three methods below should be one
void FairMstProcessor::completedBatchesNotify(ConstRefState state) const {
if (not state.isEmpty()) {
auto completed_batches = state.getBatches();
std::for_each(completed_batches.begin(),
completed_batches.end(),
[this](const auto &batch) {
batches_subject_.get_subscriber().on_next(batch);
});
void FairMstProcessor::completedBatchesNotify(
std::vector<std::shared_ptr<MovedBatchPtr>> completed) const {
if (not completed.empty()) {
std::for_each(
completed.begin(), completed.end(), [this](const auto &batch) {
batches_subject_.get_subscriber().on_next(batch);
});
}
}

Expand Down Expand Up @@ -106,7 +105,7 @@ namespace iroha {
state_update.updated_state_->transactionsQuantity());

// completed batches
completedBatchesNotify(*state_update.completed_state_);
completedBatchesNotify(state_update.completed_state_);

// expired batches
expiredBatchesNotify(storage_->getDiffState(from, current_time));
Expand Down
11 changes: 6 additions & 5 deletions irohad/multi_sig_transactions/impl/propagation_to_pcs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ MstToPcsPropagation::MstToPcsPropagation(
std::make_unique<InternalStorage>())),
propagation_available_subscription_(
propagation_available.subscribe([this, pcs](size_t available_txs) {
pending_batches_.extract(
pending_batches_.extractMultiple(
[pcs, &available_txs](InternalStorage &storage) {
std::vector<BatchPtr> extracted;
extracted.reserve(storage.pending_batches.size());
Expand All @@ -47,12 +47,13 @@ MstToPcsPropagation::~MstToPcsPropagation() {
propagation_available_subscription_.unsubscribe();
}

void MstToPcsPropagation::notifyCompletedBatch(BatchPtr batch) {
if (not pcs_->propagate_batch(batch)) {
if (not pending_batches_.insert(batch)) {
void MstToPcsPropagation::notifyCompletedBatch(
std::shared_ptr<MovedBatchPtr> moved_batch) {
if (not pcs_->propagate_batch(moved_batch->get())) {
if (not pending_batches_.insert(std::move(moved_batch))) {
log_->critical(
"Dropped a completed MST batch because no place left in storage: {}",
batch);
moved_batch->get());
assert(false);
}
}
Expand Down
2 changes: 1 addition & 1 deletion irohad/multi_sig_transactions/mst_processor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ namespace iroha {
* Observable emit batches which are prepared for further processing in
* system
*/
rxcpp::observable<DataType> onPreparedBatches() const;
rxcpp::observable<std::shared_ptr<MovedBatchPtr>> onPreparedBatches() const;

/**
* Observable emit expired by time transactions
Expand Down
5 changes: 3 additions & 2 deletions irohad/multi_sig_transactions/mst_processor_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ namespace iroha {
* signatures and ready to move forward
* @param state with those batches
*/
void completedBatchesNotify(ConstRefState state) const;
void completedBatchesNotify(
std::vector<std::shared_ptr<MovedBatchPtr>> completed) const;

/**
* Notify subscribers when some of the batches received new signatures, but
Expand All @@ -102,7 +103,7 @@ namespace iroha {
rxcpp::subjects::subject<std::shared_ptr<MstState>> state_subject_;

/// use for share completed batches
rxcpp::subjects::subject<DataType> batches_subject_;
rxcpp::subjects::subject<std::shared_ptr<MovedBatchPtr>> batches_subject_;

/// use for share expired batches
rxcpp::subjects::subject<DataType> expired_subject_;
Expand Down
14 changes: 0 additions & 14 deletions irohad/multi_sig_transactions/mst_types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,20 +38,6 @@ namespace iroha {
using ConstRefState = ConstRefT<MstState>;

using DataType = BatchPtr;

/**
* Contains result of updating local state:
* - state with completed batches
* - state with updated (still not enough signatures) batches
*/
struct StateUpdateResult {
StateUpdateResult(std::shared_ptr<MstState> completed_state,
std::shared_ptr<MstState> updated_state)
: completed_state_{std::move(completed_state)},
updated_state_{std::move(updated_state)} {}
std::shared_ptr<MstState> completed_state_;
std::shared_ptr<MstState> updated_state_;
};
} // namespace iroha

#endif // IROHA_MST_TYPES_HPP
2 changes: 1 addition & 1 deletion irohad/multi_sig_transactions/propagation_to_pcs.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ namespace iroha {

virtual ~MstToPcsPropagation();

void notifyCompletedBatch(BatchPtr batch);
void notifyCompletedBatch(std::shared_ptr<MovedBatchPtr> batch);

size_t pendingBatchesQuantity() const;

Expand Down
87 changes: 43 additions & 44 deletions irohad/multi_sig_transactions/state/impl/mst_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,21 +81,13 @@ namespace iroha {
}

StateUpdateResult MstState::operator+=(const DataType &rhs) {
auto state_update = StateUpdateResult{
std::make_shared<MstState>(empty(
completer_, std::make_shared<iroha::StorageLimitDummy>(), log_)),
std::make_shared<MstState>(empty(
completer_, std::make_shared<iroha::StorageLimitDummy>(), log_))};
auto state_update = StateUpdateResult{completer_, log_};
insertOne(state_update, rhs);
return state_update;
}

StateUpdateResult MstState::operator+=(const MstState &rhs) {
auto state_update = StateUpdateResult{
std::make_shared<MstState>(empty(
completer_, std::make_shared<iroha::StorageLimitDummy>(), log_)),
std::make_shared<MstState>(empty(
completer_, std::make_shared<iroha::StorageLimitDummy>(), log_))};
auto state_update = StateUpdateResult{completer_, log_};
return rhs.batches_.access([this, &state_update](const auto &storage) {
for (auto &&rhs_tx : storage.batches.right | boost::adaptors::map_keys) {
this->insertOne(state_update, rhs_tx);
Expand Down Expand Up @@ -211,42 +203,48 @@ namespace iroha {
void MstState::insertOne(StateUpdateResult &state_update,
const DataType &rhs_batch) {
log_->info("batch: {}", *rhs_batch);
batches_.extract([this, &state_update, &rhs_batch](
auto &storage) -> std::vector<BatchPtr> {
auto corresponding = storage.batches.right.find(rhs_batch);
if (corresponding == storage.batches.right.end()) {
// when state does not contain transaction
if (this->rawInsert(rhs_batch)) {
// there is enough room for the new batch
BOOST_VERIFY_MSG(state_update.updated_state_->rawInsert(rhs_batch),
"Could not insert new MST batch to state update.");
} else {
// there is not enough room for the new batch
log_->info("Dropped a batch because it did not fit into storage: {}",
*rhs_batch);
}
return {};
}
if (auto opt_completed_batch =
batches_.move([this, &state_update, &rhs_batch](
auto &storage) -> boost::optional<BatchPtr> {
auto corresponding = storage.batches.right.find(rhs_batch);
if (corresponding == storage.batches.right.end()) {
// when state does not contain transaction
if (this->rawInsert(rhs_batch)) {
// there is enough room for the new batch
BOOST_VERIFY_MSG(
state_update.updated_state_->rawInsert(rhs_batch),
"Could not insert new MST batch to state update.");
} else {
// there is not enough room for the new batch
log_->info(
"Dropped a batch because it did not fit into storage: {}",
*rhs_batch);
}
return boost::none;
}

DataType found = corresponding->first;
// Append new signatures to the existing state
auto inserted_new_signatures = mergeSignaturesInBatch(found, rhs_batch);
DataType found = corresponding->first;
// Append new signatures to the existing state
auto inserted_new_signatures =
mergeSignaturesInBatch(found, rhs_batch);

if (completer_->isCompleted(found)) {
// state already has completed transaction,
// remove from state and return it
storage.batches.right.erase(found);
state_update.completed_state_->rawInsert(found);
return {found};
}
if (completer_->isCompleted(found)) {
// state already has completed transaction,
// remove from state and return it
storage.batches.right.erase(found);
return found;
}

// if batch still isn't completed, return it, if new signatures were
// inserted
if (inserted_new_signatures) {
state_update.updated_state_->rawInsert(found);
}
return {};
});
// if batch still isn't completed, return it, if new signatures
// were inserted
if (inserted_new_signatures) {
state_update.updated_state_->rawInsert(found);
}
return boost::none;
})) {
state_update.completed_state_.emplace_back(
std::move(*opt_completed_batch));
}
}

bool MstState::rawInsert(const DataType &rhs_batch) {
Expand All @@ -261,7 +259,8 @@ namespace iroha {

void MstState::extractExpiredImpl(const TimeType &current_time,
boost::optional<MstState &> opt_extracted) {
auto extracted = batches_.extract([this, &current_time](auto &storage) {
auto extracted = batches_.extractMultiple([this,
&current_time](auto &storage) {
std::vector<BatchPtr> extracted;
for (auto it = storage.batches.left.begin();
it != storage.batches.left.end()
Expand Down
18 changes: 18 additions & 0 deletions irohad/multi_sig_transactions/state/mst_state.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ namespace iroha {

using CompleterType = std::shared_ptr<const Completer>;

struct StateUpdateResult;

class MstState {
public:
// -----------------------------| public api |------------------------------
Expand Down Expand Up @@ -222,6 +224,22 @@ namespace iroha {
logger::LoggerPtr log_;
};

/**
* Contains result of updating local state:
* - state with completed batches
* - state with updated (still not enough signatures) batches
*/
struct StateUpdateResult {
StateUpdateResult(std::shared_ptr<const Completer> completer,
logger::LoggerPtr log)
: updated_state_(std::make_shared<MstState>(
MstState::empty(std::move(completer),
std::make_shared<iroha::StorageLimitDummy>(),
std::move(log)))) {}
std::vector<std::shared_ptr<MovedBatchPtr>> completed_state_;
std::shared_ptr<MstState> updated_state_;
};

} // namespace iroha

#endif // IROHA_MST_STATE_HPP
Loading

0 comments on commit 9b98ea7

Please sign in to comment.