diff --git a/src/Transfers/BlockchainSynchronizer.cpp b/src/Transfers/BlockchainSynchronizer.cpp index f9735d4d89..571a353084 100644 --- a/src/Transfers/BlockchainSynchronizer.cpp +++ b/src/Transfers/BlockchainSynchronizer.cpp @@ -250,7 +250,7 @@ bool BlockchainSynchronizer::setFutureStateIf(State s, std::function void BlockchainSynchronizer::actualizeFutureState() { std::unique_lock lk(m_stateMutex); - if (m_currentState == State::stopped && m_futureState == State::deleteOldTxs) { // start(), immideately attach observer + if (m_currentState == State::stopped && (m_futureState == State::deleteOldTxs || m_futureState == State::blockchainSync)) { // start(), immideately attach observer m_node.addObserver(this); } @@ -352,7 +352,15 @@ void BlockchainSynchronizer::start() { throw std::runtime_error(message); } - if (!setFutureStateIf(State::deleteOldTxs, [this] { return m_currentState == State::stopped && m_futureState == State::stopped; })) { + State nextState; + if (!wasStarted) { + nextState = State::deleteOldTxs; + wasStarted = true; + } else { + nextState = State::blockchainSync; + } + + if (!setFutureStateIf(nextState, [this] { return m_currentState == State::stopped && m_futureState == State::stopped; })) { auto message = "Failed to start: already started"; m_logger(ERROR, BRIGHT_RED) << message; throw std::runtime_error(message); @@ -499,7 +507,6 @@ void BlockchainSynchronizer::processBlocks(GetBlocksResponse& response) { CompleteBlock completeBlock; completeBlock.blockHash = block.blockHash; - interval.blocks.push_back(completeBlock.blockHash); if (block.hasBlock) { completeBlock.block = std::move(block.block); completeBlock.transactions.push_back(createTransactionPrefix(completeBlock.block->baseTransaction)); @@ -516,6 +523,7 @@ void BlockchainSynchronizer::processBlocks(GetBlocksResponse& response) { } } + interval.blocks.push_back(completeBlock.blockHash); blocks.push_back(std::move(completeBlock)); } @@ -534,7 +542,7 @@ void BlockchainSynchronizer::processBlocks(GetBlocksResponse& response) { break; case UpdateConsumersResult::nothingChanged: - if (m_node.getLastKnownBlockHeight() != m_node.getLastLocalBlockHeight()) { + if (m_node.getKnownBlockCount() != m_node.getLocalBlockCount()) { m_logger(DEBUGGING) << "Blockchain updated, resume blockchain synchronization"; std::this_thread::sleep_for(std::chrono::milliseconds(100)); } else { @@ -564,8 +572,12 @@ void BlockchainSynchronizer::processBlocks(GetBlocksResponse& response) { /// \pre m_consumersMutex is locked BlockchainSynchronizer::UpdateConsumersResult BlockchainSynchronizer::updateConsumers(const BlockchainInterval& interval, const std::vector& blocks) { + assert(interval.blocks.size() == blocks.size()); + bool smthChanged = false; + bool hasErrors = false; + uint32_t lastBlockIndex = std::numeric_limits::max(); for (auto& kv : m_consumers) { auto result = kv.second->checkInterval(interval); @@ -577,21 +589,40 @@ BlockchainSynchronizer::UpdateConsumersResult BlockchainSynchronizer::updateCons if (result.hasNewBlocks) { uint32_t startOffset = result.newBlockHeight - interval.startHeight; - // update consumer uint32_t blockCount = static_cast(blocks.size()) - startOffset; + // update consumer m_logger(DEBUGGING) << "Adding blocks to consumer, consumer " << kv.first << ", start index " << result.newBlockHeight << ", count " << blockCount; - if (kv.first->onNewBlocks(blocks.data() + startOffset, result.newBlockHeight, blockCount)) { + uint32_t addedCount = kv.first->onNewBlocks(blocks.data() + startOffset, result.newBlockHeight, blockCount); + if (addedCount > 0) { + if (addedCount < blockCount) { + m_logger(ERROR, BRIGHT_RED) << "Failed to add " << (blockCount - addedCount) << " blocks of " << blockCount << " to consumer, consumer " << kv.first; + hasErrors = true; + } + // update state if consumer succeeded - kv.second->addBlocks(interval.blocks.data() + startOffset, result.newBlockHeight, static_cast(interval.blocks.size()) - startOffset); + kv.second->addBlocks(interval.blocks.data() + startOffset, result.newBlockHeight, addedCount); smthChanged = true; } else { m_logger(ERROR, BRIGHT_RED) << "Failed to add blocks to consumer, consumer " << kv.first; - return UpdateConsumersResult::errorOccurred; + hasErrors = true; + } + + if (addedCount > 0) { + lastBlockIndex = std::min(lastBlockIndex, startOffset + addedCount - 1); } } } - if (smthChanged) { + if (lastBlockIndex != std::numeric_limits::max()) { + assert(lastBlockIndex < blocks.size()); + lastBlockId = blocks[lastBlockIndex].blockHash; + m_logger(DEBUGGING) << "Last block hash " << lastBlockId << ", index " << (interval.startHeight + lastBlockIndex); + } + + if (hasErrors) { + m_logger(DEBUGGING) << "Not all blocks were added to consumers, there were errors"; + return UpdateConsumersResult::errorOccurred; + } else if (smthChanged) { m_logger(DEBUGGING) << "Blocks added to consumers"; return UpdateConsumersResult::addedNewBlocks; } else { diff --git a/src/Transfers/BlockchainSynchronizer.h b/src/Transfers/BlockchainSynchronizer.h index 1056f4ba37..353478936c 100644 --- a/src/Transfers/BlockchainSynchronizer.h +++ b/src/Transfers/BlockchainSynchronizer.h @@ -1,4 +1,5 @@ -// Copyright (c) 2012-2016, The CryptoNote developers, The Bytecoin developers +// Copyright (c) 2012-2017, The CryptoNote developers, The Bytecoin developers +// Copyright (c) 2016-2019, The Karbo developers // // This file is part of Karbo. // @@ -38,7 +39,7 @@ class BlockchainSynchronizer : public: BlockchainSynchronizer(INode& node, Logging::ILogger& logger, const Crypto::Hash& genesisBlockHash); - ~BlockchainSynchronizer(); + virtual ~BlockchainSynchronizer() override; // IBlockchainSynchronizer virtual void addConsumer(IBlockchainConsumer* consumer) override; @@ -146,6 +147,8 @@ class BlockchainSynchronizer : mutable std::mutex m_consumersMutex; mutable std::mutex m_stateMutex; std::condition_variable m_hasWork; + + bool wasStarted = false; }; } diff --git a/src/Transfers/IBlockchainSynchronizer.h b/src/Transfers/IBlockchainSynchronizer.h index 235f5786e8..7023c64df0 100644 --- a/src/Transfers/IBlockchainSynchronizer.h +++ b/src/Transfers/IBlockchainSynchronizer.h @@ -1,4 +1,5 @@ -// Copyright (c) 2012-2016, The CryptoNote developers, The Bytecoin developers +// Copyright (c) 2012-2017, The CryptoNote developers, The Bytecoin developers +// Copyright (c) 2016-2019, The Karbo developers // // This file is part of Karbo. // @@ -37,6 +38,7 @@ class IBlockchainSynchronizerObserver { public: virtual void synchronizationProgressUpdated(uint32_t processedBlockCount, uint32_t totalBlockCount) {} virtual void synchronizationCompleted(std::error_code result) {} + virtual ~IBlockchainSynchronizerObserver() {} }; class IBlockchainConsumerObserver; @@ -47,7 +49,7 @@ class IBlockchainConsumer : public IObservable { virtual SynchronizationStart getSyncStart() = 0; virtual const std::unordered_set& getKnownPoolTxIds() const = 0; virtual void onBlockchainDetach(uint32_t height) = 0; - virtual bool onNewBlocks(const CompleteBlock* blocks, uint32_t startHeight, uint32_t count) = 0; + virtual uint32_t onNewBlocks(const CompleteBlock* blocks, uint32_t startHeight, uint32_t count) = 0; virtual std::error_code onPoolUpdated(const std::vector>& addedTransactions, const std::vector& deletedTransactions) = 0; virtual std::error_code addUnconfirmedTransaction(const ITransactionReader& transaction) = 0; diff --git a/src/Transfers/TransfersConsumer.h b/src/Transfers/TransfersConsumer.h index 20f8b021fa..21733f6664 100755 --- a/src/Transfers/TransfersConsumer.h +++ b/src/Transfers/TransfersConsumer.h @@ -35,7 +35,7 @@ namespace CryptoNote { class INode; -class TransfersConsumer: public IObservableImpl { +class TransfersConsumer : public IObservableImpl { public: TransfersConsumer(const CryptoNote::Currency& currency, INode& node, Logging::ILogger& logger, const Crypto::SecretKey& viewSecret); @@ -48,11 +48,11 @@ class TransfersConsumer: public IObservableImpl& uncommitedTransactions); void addPublicKeysSeen(const Crypto::Hash& transactionHash, const Crypto::PublicKey& outputKey); - + // IBlockchainConsumer virtual SynchronizationStart getSyncStart() override; virtual void onBlockchainDetach(uint32_t height) override; - virtual bool onNewBlocks(const CompleteBlock* blocks, uint32_t startHeight, uint32_t count) override; + virtual uint32_t onNewBlocks(const CompleteBlock* blocks, uint32_t startHeight, uint32_t count) override; virtual std::error_code onPoolUpdated(const std::vector>& addedTransactions, const std::vector& deletedTransactions) override; virtual const std::unordered_set& getKnownPoolTxIds() const override; @@ -78,8 +78,7 @@ class TransfersConsumer: public IObservableImpl& outputs, const std::vector& globalIdxs, bool& contains, bool& updated); - std::error_code createTransfers(const AccountKeys& account, const TransactionBlockInfo& blockInfo, const ITransactionReader& tx, - const std::vector& outputs, const std::vector& globalIdxs, std::vector& transfers); + std::error_code getGlobalIndices(const Crypto::Hash& transactionHash, std::vector& outsGlobalIndices); void updateSyncStart();