From 9b98ea7b46fa0cf76ce04387da7eb253487cd01e Mon Sep 17 00:00:00 2001 From: Mikhail Boldyrev Date: Wed, 10 Apr 2019 14:09:29 +0300 Subject: [PATCH] MovedBatchPtr Signed-off-by: Mikhail Boldyrev --- .../impl/mst_processor.cpp | 3 +- .../impl/mst_processor_impl.cpp | 19 ++-- .../impl/propagation_to_pcs.cpp | 11 +-- .../multi_sig_transactions/mst_processor.hpp | 2 +- .../mst_processor_impl.hpp | 5 +- irohad/multi_sig_transactions/mst_types.hpp | 14 --- .../propagation_to_pcs.hpp | 2 +- .../state/impl/mst_state.cpp | 87 +++++++++---------- .../state/mst_state.hpp | 18 ++++ .../storage/storage_limit.hpp | 84 ++++++++++++++++-- .../impl/pending_txs_storage_impl.cpp | 6 +- .../impl/pending_txs_storage_impl.hpp | 8 +- .../impl/transaction_processor_impl.cpp | 2 +- .../integration_test_framework.cpp | 3 +- .../multi_sig_transactions/mst_mocks.hpp | 15 +++- .../mst_to_psc_propagation_test.cpp | 5 +- .../multi_sig_transactions/state_test.cpp | 13 +-- .../pending_txs_storage_test.cpp | 43 ++++----- .../processor/transaction_processor_test.cpp | 20 +---- 19 files changed, 218 insertions(+), 142 deletions(-) diff --git a/irohad/multi_sig_transactions/impl/mst_processor.cpp b/irohad/multi_sig_transactions/impl/mst_processor.cpp index 1e3e9ae647..ca7d614ba8 100644 --- a/irohad/multi_sig_transactions/impl/mst_processor.cpp +++ b/irohad/multi_sig_transactions/impl/mst_processor.cpp @@ -19,7 +19,8 @@ namespace iroha { return this->onStateUpdateImpl(); } - rxcpp::observable MstProcessor::onPreparedBatches() const { + rxcpp::observable> + MstProcessor::onPreparedBatches() const { return this->onPreparedBatchesImpl(); } diff --git a/irohad/multi_sig_transactions/impl/mst_processor_impl.cpp b/irohad/multi_sig_transactions/impl/mst_processor_impl.cpp index 468a32ce36..c63b737e59 100644 --- a/irohad/multi_sig_transactions/impl/mst_processor_impl.cpp +++ b/irohad/multi_sig_transactions/impl/mst_processor_impl.cpp @@ -35,7 +35,7 @@ namespace iroha { auto FairMstProcessor::propagateBatchImpl(const iroha::DataType &batch) -> decltype(propagateBatch(batch)) { auto state_update = storage_->updateOwnState(batch); - completedBatchesNotify(*state_update.completed_state_); + completedBatchesNotify(state_update.completed_state_); updatedBatchesNotify(*state_update.updated_state_); expiredBatchesNotify( storage_->extractExpiredTransactions(time_provider_->getCurrentTime())); @@ -57,14 +57,13 @@ namespace iroha { } // TODO [IR-1687] Akvinikym 10.09.18: three methods below should be one - void FairMstProcessor::completedBatchesNotify(ConstRefState state) const { - if (not state.isEmpty()) { - auto completed_batches = state.getBatches(); - std::for_each(completed_batches.begin(), - completed_batches.end(), - [this](const auto &batch) { - batches_subject_.get_subscriber().on_next(batch); - }); + void FairMstProcessor::completedBatchesNotify( + std::vector> completed) const { + if (not completed.empty()) { + std::for_each( + completed.begin(), completed.end(), [this](const auto &batch) { + batches_subject_.get_subscriber().on_next(batch); + }); } } @@ -106,7 +105,7 @@ namespace iroha { state_update.updated_state_->transactionsQuantity()); // completed batches - completedBatchesNotify(*state_update.completed_state_); + completedBatchesNotify(state_update.completed_state_); // expired batches expiredBatchesNotify(storage_->getDiffState(from, current_time)); diff --git a/irohad/multi_sig_transactions/impl/propagation_to_pcs.cpp b/irohad/multi_sig_transactions/impl/propagation_to_pcs.cpp index 53dbfb9a07..06d9cec77a 100644 --- a/irohad/multi_sig_transactions/impl/propagation_to_pcs.cpp +++ b/irohad/multi_sig_transactions/impl/propagation_to_pcs.cpp @@ -23,7 +23,7 @@ MstToPcsPropagation::MstToPcsPropagation( std::make_unique())), propagation_available_subscription_( propagation_available.subscribe([this, pcs](size_t available_txs) { - pending_batches_.extract( + pending_batches_.extractMultiple( [pcs, &available_txs](InternalStorage &storage) { std::vector extracted; extracted.reserve(storage.pending_batches.size()); @@ -47,12 +47,13 @@ MstToPcsPropagation::~MstToPcsPropagation() { propagation_available_subscription_.unsubscribe(); } -void MstToPcsPropagation::notifyCompletedBatch(BatchPtr batch) { - if (not pcs_->propagate_batch(batch)) { - if (not pending_batches_.insert(batch)) { +void MstToPcsPropagation::notifyCompletedBatch( + std::shared_ptr moved_batch) { + if (not pcs_->propagate_batch(moved_batch->get())) { + if (not pending_batches_.insert(std::move(moved_batch))) { log_->critical( "Dropped a completed MST batch because no place left in storage: {}", - batch); + moved_batch->get()); assert(false); } } diff --git a/irohad/multi_sig_transactions/mst_processor.hpp b/irohad/multi_sig_transactions/mst_processor.hpp index c77d5dafe8..37f203e91b 100644 --- a/irohad/multi_sig_transactions/mst_processor.hpp +++ b/irohad/multi_sig_transactions/mst_processor.hpp @@ -47,7 +47,7 @@ namespace iroha { * Observable emit batches which are prepared for further processing in * system */ - rxcpp::observable onPreparedBatches() const; + rxcpp::observable> onPreparedBatches() const; /** * Observable emit expired by time transactions diff --git a/irohad/multi_sig_transactions/mst_processor_impl.hpp b/irohad/multi_sig_transactions/mst_processor_impl.hpp index 4f10a05027..edb4221f02 100644 --- a/irohad/multi_sig_transactions/mst_processor_impl.hpp +++ b/irohad/multi_sig_transactions/mst_processor_impl.hpp @@ -75,7 +75,8 @@ namespace iroha { * signatures and ready to move forward * @param state with those batches */ - void completedBatchesNotify(ConstRefState state) const; + void completedBatchesNotify( + std::vector> completed) const; /** * Notify subscribers when some of the batches received new signatures, but @@ -102,7 +103,7 @@ namespace iroha { rxcpp::subjects::subject> state_subject_; /// use for share completed batches - rxcpp::subjects::subject batches_subject_; + rxcpp::subjects::subject> batches_subject_; /// use for share expired batches rxcpp::subjects::subject expired_subject_; diff --git a/irohad/multi_sig_transactions/mst_types.hpp b/irohad/multi_sig_transactions/mst_types.hpp index 4179323448..4d6d19c86e 100644 --- a/irohad/multi_sig_transactions/mst_types.hpp +++ b/irohad/multi_sig_transactions/mst_types.hpp @@ -38,20 +38,6 @@ namespace iroha { using ConstRefState = ConstRefT; using DataType = BatchPtr; - - /** - * Contains result of updating local state: - * - state with completed batches - * - state with updated (still not enough signatures) batches - */ - struct StateUpdateResult { - StateUpdateResult(std::shared_ptr completed_state, - std::shared_ptr updated_state) - : completed_state_{std::move(completed_state)}, - updated_state_{std::move(updated_state)} {} - std::shared_ptr completed_state_; - std::shared_ptr updated_state_; - }; } // namespace iroha #endif // IROHA_MST_TYPES_HPP diff --git a/irohad/multi_sig_transactions/propagation_to_pcs.hpp b/irohad/multi_sig_transactions/propagation_to_pcs.hpp index e7cbd325d3..4dba5be89f 100644 --- a/irohad/multi_sig_transactions/propagation_to_pcs.hpp +++ b/irohad/multi_sig_transactions/propagation_to_pcs.hpp @@ -26,7 +26,7 @@ namespace iroha { virtual ~MstToPcsPropagation(); - void notifyCompletedBatch(BatchPtr batch); + void notifyCompletedBatch(std::shared_ptr batch); size_t pendingBatchesQuantity() const; diff --git a/irohad/multi_sig_transactions/state/impl/mst_state.cpp b/irohad/multi_sig_transactions/state/impl/mst_state.cpp index 823d6996f3..8a7fb8c970 100644 --- a/irohad/multi_sig_transactions/state/impl/mst_state.cpp +++ b/irohad/multi_sig_transactions/state/impl/mst_state.cpp @@ -81,21 +81,13 @@ namespace iroha { } StateUpdateResult MstState::operator+=(const DataType &rhs) { - auto state_update = StateUpdateResult{ - std::make_shared(empty( - completer_, std::make_shared(), log_)), - std::make_shared(empty( - completer_, std::make_shared(), log_))}; + auto state_update = StateUpdateResult{completer_, log_}; insertOne(state_update, rhs); return state_update; } StateUpdateResult MstState::operator+=(const MstState &rhs) { - auto state_update = StateUpdateResult{ - std::make_shared(empty( - completer_, std::make_shared(), log_)), - std::make_shared(empty( - completer_, std::make_shared(), log_))}; + auto state_update = StateUpdateResult{completer_, log_}; return rhs.batches_.access([this, &state_update](const auto &storage) { for (auto &&rhs_tx : storage.batches.right | boost::adaptors::map_keys) { this->insertOne(state_update, rhs_tx); @@ -211,42 +203,48 @@ namespace iroha { void MstState::insertOne(StateUpdateResult &state_update, const DataType &rhs_batch) { log_->info("batch: {}", *rhs_batch); - batches_.extract([this, &state_update, &rhs_batch]( - auto &storage) -> std::vector { - auto corresponding = storage.batches.right.find(rhs_batch); - if (corresponding == storage.batches.right.end()) { - // when state does not contain transaction - if (this->rawInsert(rhs_batch)) { - // there is enough room for the new batch - BOOST_VERIFY_MSG(state_update.updated_state_->rawInsert(rhs_batch), - "Could not insert new MST batch to state update."); - } else { - // there is not enough room for the new batch - log_->info("Dropped a batch because it did not fit into storage: {}", - *rhs_batch); - } - return {}; - } + if (auto opt_completed_batch = + batches_.move([this, &state_update, &rhs_batch]( + auto &storage) -> boost::optional { + auto corresponding = storage.batches.right.find(rhs_batch); + if (corresponding == storage.batches.right.end()) { + // when state does not contain transaction + if (this->rawInsert(rhs_batch)) { + // there is enough room for the new batch + BOOST_VERIFY_MSG( + state_update.updated_state_->rawInsert(rhs_batch), + "Could not insert new MST batch to state update."); + } else { + // there is not enough room for the new batch + log_->info( + "Dropped a batch because it did not fit into storage: {}", + *rhs_batch); + } + return boost::none; + } - DataType found = corresponding->first; - // Append new signatures to the existing state - auto inserted_new_signatures = mergeSignaturesInBatch(found, rhs_batch); + DataType found = corresponding->first; + // Append new signatures to the existing state + auto inserted_new_signatures = + mergeSignaturesInBatch(found, rhs_batch); - if (completer_->isCompleted(found)) { - // state already has completed transaction, - // remove from state and return it - storage.batches.right.erase(found); - state_update.completed_state_->rawInsert(found); - return {found}; - } + if (completer_->isCompleted(found)) { + // state already has completed transaction, + // remove from state and return it + storage.batches.right.erase(found); + return found; + } - // if batch still isn't completed, return it, if new signatures were - // inserted - if (inserted_new_signatures) { - state_update.updated_state_->rawInsert(found); - } - return {}; - }); + // if batch still isn't completed, return it, if new signatures + // were inserted + if (inserted_new_signatures) { + state_update.updated_state_->rawInsert(found); + } + return boost::none; + })) { + state_update.completed_state_.emplace_back( + std::move(*opt_completed_batch)); + } } bool MstState::rawInsert(const DataType &rhs_batch) { @@ -261,7 +259,8 @@ namespace iroha { void MstState::extractExpiredImpl(const TimeType ¤t_time, boost::optional opt_extracted) { - auto extracted = batches_.extract([this, ¤t_time](auto &storage) { + auto extracted = batches_.extractMultiple([this, + ¤t_time](auto &storage) { std::vector extracted; for (auto it = storage.batches.left.begin(); it != storage.batches.left.end() diff --git a/irohad/multi_sig_transactions/state/mst_state.hpp b/irohad/multi_sig_transactions/state/mst_state.hpp index 30ac094618..892122f2b2 100644 --- a/irohad/multi_sig_transactions/state/mst_state.hpp +++ b/irohad/multi_sig_transactions/state/mst_state.hpp @@ -84,6 +84,8 @@ namespace iroha { using CompleterType = std::shared_ptr; + struct StateUpdateResult; + class MstState { public: // -----------------------------| public api |------------------------------ @@ -222,6 +224,22 @@ namespace iroha { logger::LoggerPtr log_; }; + /** + * Contains result of updating local state: + * - state with completed batches + * - state with updated (still not enough signatures) batches + */ + struct StateUpdateResult { + StateUpdateResult(std::shared_ptr completer, + logger::LoggerPtr log) + : updated_state_(std::make_shared( + MstState::empty(std::move(completer), + std::make_shared(), + std::move(log)))) {} + std::vector> completed_state_; + std::shared_ptr updated_state_; + }; + } // namespace iroha #endif // IROHA_MST_STATE_HPP diff --git a/irohad/multi_sig_transactions/storage/storage_limit.hpp b/irohad/multi_sig_transactions/storage/storage_limit.hpp index 3862aa9c96..d0d025631a 100644 --- a/irohad/multi_sig_transactions/storage/storage_limit.hpp +++ b/irohad/multi_sig_transactions/storage/storage_limit.hpp @@ -10,6 +10,8 @@ #include #include +#include +#include #include "interfaces/iroha_internal/transaction_batch.hpp" #include "multi_sig_transactions/mst_types.hpp" @@ -81,6 +83,39 @@ namespace iroha { std::atomic txs_quantity_{0}; }; + /// RAII batch wrapper for transfers between limited storages + class MovedBatchPtr : public boost::noncopyable { + public: + ~MovedBatchPtr() { + if (is_extracted_.test_and_set()) { + limit_->remove(batch_); + } + } + + BatchPtr get() const { + return batch_; + } + + BatchPtr extract() { + if (is_extracted_.test_and_set()) { + limit_->remove(batch_); + } + return batch_; + } + + protected: + template + friend class LimitedStorage; + + MovedBatchPtr(BatchPtr batch, std::shared_ptr limit) + : batch_(std::move(batch)), limit_(std::move(limit)) {} + + private: + std::atomic_flag is_extracted_ = ATOMIC_FLAG_INIT; + BatchPtr batch_; + std::shared_ptr limit_; + }; + template class LimitedStorage { static_assert( @@ -108,23 +143,17 @@ namespace iroha { if (not limit_->addIfAllowed(batch)) { return false; } - - txs_quantity_ += batch->transactions().size(); - ++batches_quantity_; + updateCountersOnInsertedBatch(batch); return storage_->insert(std::move(batch)); } template - auto extract(Lambda extractor) -> + auto extractMultiple(Lambda extractor) -> typename std::result_of::type { auto extracted = extractor(static_cast(*storage_)); for (const auto &batch : extracted) { limit_->remove(batch); - const size_t extracted_txs = batch->transactions().size(); - assert(txs_quantity_ >= extracted_txs); - txs_quantity_ -= extracted_txs; - assert(batches_quantity_ > 0); - --batches_quantity_; + updateCountersOnRemovedBatch(batch); } return extracted; } @@ -135,7 +164,44 @@ namespace iroha { return func(static_cast(*storage_)); } + template + boost::optional> move(Lambda extractor) { + if (auto opt_moved = extractor(static_cast(*storage_))) { + updateCountersOnRemovedBatch(*opt_moved); + return std::shared_ptr( + new MovedBatchPtr(std::move(*opt_moved), limit_)); + } + return boost::none; + } + + bool insert(std::shared_ptr moved) { + if (moved->limit_ == limit_) { + moved->is_extracted_.test_and_set(); + return insertUnsafe(moved->batch_); + } else { + return insert(moved->extract()); + } + } + private: + bool insertUnsafe(BatchPtr batch) { + updateCountersOnInsertedBatch(batch); + return storage_->insert(std::move(batch)); + } + + void updateCountersOnInsertedBatch(const BatchPtr &batch) { + txs_quantity_ += batch->transactions().size(); + ++batches_quantity_; + } + + void updateCountersOnRemovedBatch(const BatchPtr &batch) { + const size_t extracted_txs = batch->transactions().size(); + assert(txs_quantity_ >= extracted_txs); + txs_quantity_ -= extracted_txs; + assert(batches_quantity_ > 0); + --batches_quantity_; + } + const std::shared_ptr limit_; const std::shared_ptr storage_; size_t txs_quantity_{0}; diff --git a/irohad/pending_txs_storage/impl/pending_txs_storage_impl.cpp b/irohad/pending_txs_storage/impl/pending_txs_storage_impl.cpp index eb9ba3c9ac..8c5005b844 100644 --- a/irohad/pending_txs_storage/impl/pending_txs_storage_impl.cpp +++ b/irohad/pending_txs_storage/impl/pending_txs_storage_impl.cpp @@ -12,15 +12,15 @@ namespace iroha { PendingTransactionStorageImpl::PendingTransactionStorageImpl( StateObservable updated_batches, - BatchObservable prepared_batch, + rxcpp::observable> prepared_batch, BatchObservable expired_batch) { updated_batches_subscription_ = updated_batches.subscribe([this](const SharedState &batches) { this->updatedBatchesHandler(batches); }); prepared_batch_subscription_ = - prepared_batch.subscribe([this](const SharedBatch &preparedBatch) { - this->removeBatch(preparedBatch); + prepared_batch.subscribe([this](const auto &preparedBatch) { + this->removeBatch(preparedBatch->get()); }); expired_batch_subscription_ = expired_batch.subscribe([this](const SharedBatch &expiredBatch) { diff --git a/irohad/pending_txs_storage/impl/pending_txs_storage_impl.hpp b/irohad/pending_txs_storage/impl/pending_txs_storage_impl.hpp index d29ed0f5a7..64d7849fb7 100644 --- a/irohad/pending_txs_storage/impl/pending_txs_storage_impl.hpp +++ b/irohad/pending_txs_storage/impl/pending_txs_storage_impl.hpp @@ -17,6 +17,7 @@ namespace iroha { + class MovedBatchPtr; class MstState; class PendingTransactionStorageImpl : public PendingTransactionStorage { @@ -31,9 +32,10 @@ namespace iroha { using StateObservable = rxcpp::observable; using BatchObservable = rxcpp::observable; - PendingTransactionStorageImpl(StateObservable updated_batches, - BatchObservable prepared_batch, - BatchObservable expired_batch); + PendingTransactionStorageImpl( + StateObservable updated_batches, + rxcpp::observable> prepared_batch, + BatchObservable expired_batch); ~PendingTransactionStorageImpl() override; diff --git a/irohad/torii/processor/impl/transaction_processor_impl.cpp b/irohad/torii/processor/impl/transaction_processor_impl.cpp index 0ac1f27aca..10f5d1a26f 100644 --- a/irohad/torii/processor/impl/transaction_processor_impl.cpp +++ b/irohad/torii/processor/impl/transaction_processor_impl.cpp @@ -110,7 +110,7 @@ namespace iroha { }); mst_processor_->onPreparedBatches().subscribe([this](auto &&batch) { log_->info("MST batch prepared"); - this->publishEnoughSignaturesStatus(batch->transactions()); + this->publishEnoughSignaturesStatus(batch->get()->transactions()); }); mst_processor_->onExpiredBatches().subscribe([this](auto &&batch) { log_->info("MST batch {} is expired", batch->reducedHash()); diff --git a/test/framework/integration_framework/integration_test_framework.cpp b/test/framework/integration_framework/integration_test_framework.cpp index 32e3eedcd7..7d82225ee9 100644 --- a/test/framework/integration_framework/integration_test_framework.cpp +++ b/test/framework/integration_framework/integration_test_framework.cpp @@ -386,7 +386,8 @@ namespace integration_framework { IntegrationTestFramework::getMstPreparedBatchesObservable() { return iroha_instance_->getIrohaInstance() ->getMstProcessor() - ->onPreparedBatches(); + ->onPreparedBatches() + .map([](const auto &moved_batch) { return moved_batch->get(); }); } rxcpp::observable diff --git a/test/module/irohad/multi_sig_transactions/mst_mocks.hpp b/test/module/irohad/multi_sig_transactions/mst_mocks.hpp index fbdbfee445..a3574cc2e5 100644 --- a/test/module/irohad/multi_sig_transactions/mst_mocks.hpp +++ b/test/module/irohad/multi_sig_transactions/mst_mocks.hpp @@ -57,9 +57,22 @@ namespace iroha { MOCK_METHOD1(propagateBatchImpl, void(const DataType &)); MOCK_CONST_METHOD0(onStateUpdateImpl, rxcpp::observable>()); - MOCK_CONST_METHOD0(onPreparedBatchesImpl, rxcpp::observable()); + MOCK_CONST_METHOD0(onPreparedBatchesImpl, + rxcpp::observable>()); MOCK_CONST_METHOD0(onExpiredBatchesImpl, rxcpp::observable()); MOCK_CONST_METHOD1(batchInStorageImpl, bool(const DataType &)); }; + + struct MockMovedBatchPtr : public MovedBatchPtr { + MockMovedBatchPtr(BatchPtr batch) + : MovedBatchPtr(batch, std::make_shared()) { + EXPECT_CALL(*this, get()).WillRepeatedly(::testing::Return(batch)); + EXPECT_CALL(*this, extract()) + .Times(::testing::AtMost(1)) + .WillRepeatedly(::testing::Return(batch)); + } + MOCK_CONST_METHOD0(get, BatchPtr()); + MOCK_METHOD0(extract, BatchPtr()); + }; } // namespace iroha #endif // IROHA_MST_MOCKS_HPP diff --git a/test/module/irohad/multi_sig_transactions/mst_to_psc_propagation_test.cpp b/test/module/irohad/multi_sig_transactions/mst_to_psc_propagation_test.cpp index ed46607f0a..82d31735ea 100644 --- a/test/module/irohad/multi_sig_transactions/mst_to_psc_propagation_test.cpp +++ b/test/module/irohad/multi_sig_transactions/mst_to_psc_propagation_test.cpp @@ -8,6 +8,7 @@ #include #include #include "framework/test_logger.hpp" +#include "module/irohad/multi_sig_transactions/mst_mocks.hpp" #include "module/irohad/multi_sig_transactions/mst_test_helpers.hpp" #include "module/irohad/network/network_mocks.hpp" @@ -49,7 +50,7 @@ TEST_F(MstToPcsPropagationTest, PropagateImmediately) { const auto batch = make_batch(1); EXPECT_CALL(*pcs_, propagate_batch(batch)).WillOnce(Return(true)); - propagator.notifyCompletedBatch(batch); + propagator.notifyCompletedBatch(std::make_shared(batch)); EXPECT_EQ(propagator.pendingBatchesQuantity(), 0) << "Batch must have been propagated immediately."; } @@ -71,7 +72,7 @@ TEST_F(MstToPcsPropagationTest, PropagateWhenAvailabilityNotified) { // feed the batches to propagator for (auto batch : batches) { EXPECT_CALL(*pcs_, propagate_batch(batch)).WillOnce(Return(false)); - propagator.notifyCompletedBatch(batch); + propagator.notifyCompletedBatch(std::make_shared(batch)); } // notifies propagator of available transactions for propagation to PCS, sets diff --git a/test/module/irohad/multi_sig_transactions/state_test.cpp b/test/module/irohad/multi_sig_transactions/state_test.cpp index bd94c296a4..0a9fa00d66 100644 --- a/test/module/irohad/multi_sig_transactions/state_test.cpp +++ b/test/module/irohad/multi_sig_transactions/state_test.cpp @@ -259,18 +259,19 @@ TEST(StateTest, UpdateTxUntillQuorum) { auto state_after_one_tx = state += addSignatures( makeTestBatch(txBuilder(1, time, quorum)), 0, makeSignature("1", "1")); ASSERT_EQ(1, state_after_one_tx.updated_state_->batchesQuantity()); - ASSERT_EQ(0, state_after_one_tx.completed_state_->batchesQuantity()); + ASSERT_EQ(0, state_after_one_tx.completed_state_.size()); auto state_after_two_txes = state += addSignatures( makeTestBatch(txBuilder(1, time, quorum)), 0, makeSignature("2", "2")); ASSERT_EQ(1, state_after_two_txes.updated_state_->batchesQuantity()); - ASSERT_EQ(0, state_after_two_txes.completed_state_->batchesQuantity()); + ASSERT_EQ(0, state_after_two_txes.completed_state_.size()); auto state_after_three_txes = state += addSignatures( makeTestBatch(txBuilder(1, time, quorum)), 0, makeSignature("3", "3")); ASSERT_EQ(0, state_after_three_txes.updated_state_->batchesQuantity()); - ASSERT_EQ(1, state_after_three_txes.completed_state_->getBatches().size()); - ASSERT_TRUE((*state_after_three_txes.completed_state_->getBatches().begin()) + ASSERT_EQ(1, state_after_three_txes.completed_state_.size()); + ASSERT_TRUE(state_after_three_txes.completed_state_.front() + ->get() ->hasAllSignatures()); ASSERT_EQ(0, state.batchesQuantity()); } @@ -307,7 +308,7 @@ TEST(StateTest, UpdateStateWithNewStateUntilQuorum) { ASSERT_EQ(1, state2.batchesQuantity()); auto final_state = state1 += state2; - ASSERT_EQ(1, final_state.completed_state_->batchesQuantity()); + ASSERT_EQ(1, final_state.completed_state_.size()); ASSERT_EQ(1, state1.batchesQuantity()); } @@ -362,7 +363,7 @@ TEST(StateTest, TimeIndexInsertionByAddState) { makeTestBatch(txBuilder(3, time)), 0, makeSignature("3", "3")); auto final_state = state1 += state2; - ASSERT_EQ(0, final_state.completed_state_->batchesQuantity()); + ASSERT_EQ(0, final_state.completed_state_.size()); ASSERT_EQ(2, final_state.updated_state_->batchesQuantity()); } diff --git a/test/module/irohad/pending_txs_storage/pending_txs_storage_test.cpp b/test/module/irohad/pending_txs_storage/pending_txs_storage_test.cpp index 57527cadeb..7128c20157 100644 --- a/test/module/irohad/pending_txs_storage/pending_txs_storage_test.cpp +++ b/test/module/irohad/pending_txs_storage/pending_txs_storage_test.cpp @@ -7,10 +7,13 @@ #include #include "datetime/time.hpp" #include "framework/test_logger.hpp" +#include "module/irohad/multi_sig_transactions/mst_mocks.hpp" #include "module/irohad/multi_sig_transactions/mst_test_helpers.hpp" #include "multi_sig_transactions/state/mst_state.hpp" #include "pending_txs_storage/impl/pending_txs_storage_impl.hpp" +using namespace iroha; + class PendingTxsStorageFixture : public ::testing::Test { public: using Batch = shared_model::interface::TransactionBatch; @@ -34,6 +37,11 @@ class PendingTxsStorageFixture : public ::testing::Test { std::shared_ptr completer_ = std::make_shared(std::chrono::minutes(0)); + rxcpp::observable> dummy_completed = + rxcpp::observable<>::empty>(); + rxcpp::observable> dummy_expired = + rxcpp::observable<>::empty>(); + logger::LoggerPtr mst_state_log_{getTestLogger("MstState")}; logger::LoggerPtr log_{getTestLogger("PendingTxsStorageFixture")}; }; @@ -84,10 +92,10 @@ TEST_F(PendingTxsStorageFixture, InsertionTest) { s.on_next(state); s.on_completed(); }); - auto dummy = rxcpp::observable<>::create>( - [](auto s) { s.on_completed(); }); + auto completed = rxcpp::observable<>::empty>(); + auto expired = rxcpp::observable<>::empty>(); - iroha::PendingTransactionStorageImpl storage(updates, dummy, dummy); + iroha::PendingTransactionStorageImpl storage(updates, completed, expired); for (const auto &creator : {"alice@iroha", "bob@iroha"}) { auto pending = storage.getPendingTransactions(creator); ASSERT_EQ(pending.size(), 2) @@ -133,10 +141,9 @@ TEST_F(PendingTxsStorageFixture, SignaturesUpdate) { s.on_next(state2); s.on_completed(); }); - auto dummy = rxcpp::observable<>::create>( - [](auto s) { s.on_completed(); }); - iroha::PendingTransactionStorageImpl storage(updates, dummy, dummy); + iroha::PendingTransactionStorageImpl storage( + updates, dummy_completed, dummy_expired); auto pending = storage.getPendingTransactions("alice@iroha"); ASSERT_EQ(pending.size(), 1); ASSERT_EQ(boost::size(pending.front()->signatures()), 2); @@ -175,10 +182,9 @@ TEST_F(PendingTxsStorageFixture, SeveralBatches) { s.on_next(state); s.on_completed(); }); - auto dummy = rxcpp::observable<>::create>( - [](auto s) { s.on_completed(); }); - iroha::PendingTransactionStorageImpl storage(updates, dummy, dummy); + iroha::PendingTransactionStorageImpl storage( + updates, dummy_completed, dummy_expired); auto alice_pending = storage.getPendingTransactions("alice@iroha"); ASSERT_EQ(alice_pending.size(), 4); @@ -220,10 +226,9 @@ TEST_F(PendingTxsStorageFixture, SeparateBatchesDoNotOverwriteStorage) { s.on_next(state2); s.on_completed(); }); - auto dummy = rxcpp::observable<>::create>( - [](auto s) { s.on_completed(); }); - iroha::PendingTransactionStorageImpl storage(updates, dummy, dummy); + iroha::PendingTransactionStorageImpl storage( + updates, dummy_completed, dummy_expired); auto alice_pending = storage.getPendingTransactions("alice@iroha"); ASSERT_EQ(alice_pending.size(), 4); @@ -250,21 +255,21 @@ TEST_F(PendingTxsStorageFixture, PreparedBatch) { makeSignature("1", "pub_key_1")); *state += batch; - rxcpp::subjects::subject prepared_batches_subject; + rxcpp::subjects::subject> + prepared_batches_subject; auto updates = rxcpp::observable<>::create([&state](auto s) { s.on_next(state); s.on_completed(); }); - auto dummy = rxcpp::observable<>::create>( - [](auto s) { s.on_completed(); }); iroha::PendingTransactionStorageImpl storage( - updates, prepared_batches_subject.get_observable(), dummy); + updates, prepared_batches_subject.get_observable(), dummy_expired); batch = addSignatures(batch, 0, makeSignature("2", "pub_key_2"), makeSignature("3", "pub_key_3")); - prepared_batches_subject.get_subscriber().on_next(batch); + prepared_batches_subject.get_subscriber().on_next( + std::make_shared(batch)); prepared_batches_subject.get_subscriber().on_completed(); auto pending = storage.getPendingTransactions("alice@iroha"); ASSERT_EQ(pending.size(), 0); @@ -293,10 +298,8 @@ TEST_F(PendingTxsStorageFixture, ExpiredBatch) { s.on_next(state); s.on_completed(); }); - auto dummy = rxcpp::observable<>::create>( - [](auto s) { s.on_completed(); }); iroha::PendingTransactionStorageImpl storage( - updates, dummy, expired_batches_subject.get_observable()); + updates, dummy_completed, expired_batches_subject.get_observable()); expired_batches_subject.get_subscriber().on_next(batch); expired_batches_subject.get_subscriber().on_completed(); diff --git a/test/module/irohad/torii/processor/transaction_processor_test.cpp b/test/module/irohad/torii/processor/transaction_processor_test.cpp index 492af01ef7..4fe0c6a550 100644 --- a/test/module/irohad/torii/processor/transaction_processor_test.cpp +++ b/test/module/irohad/torii/processor/transaction_processor_test.cpp @@ -122,7 +122,8 @@ class TransactionProcessorTest : public ::testing::Test { rxcpp::subjects::subject> mst_update_notifier; - rxcpp::subjects::subject mst_prepared_notifier; + rxcpp::subjects::subject> + mst_prepared_notifier; rxcpp::subjects::subject mst_expired_notifier; rxcpp::subjects::subject< std::shared_ptr> @@ -431,23 +432,6 @@ TEST_F(TransactionProcessorTest, MultisigTransactionToMst) { tp->batchHandle(std::move(after_mst)); } -/** - * @given batch one transaction with quorum 2 - * AND one signature - * @when MST emits the batch - * @then checks that PCS is invoked. - * This happens because tx processor is subscribed for MST - */ -TEST_F(TransactionProcessorTest, MultisigTransactionFromMst) { - auto &&tx = addSignaturesFromKeyPairs(baseTestTx(2), makeKey(), makeKey()); - - auto &&after_mst = framework::batch::createBatchFromSingleTransaction( - std::shared_ptr(clone(tx))); - - EXPECT_CALL(*pcs, propagate_batch(_)).Times(1); - mst_prepared_notifier.get_subscriber().on_next(after_mst); -} - /** * @given valid multisig tx * @when transaction_processor handle it