diff --git a/.env-template b/.env-template index 108dda5d11..8d7f0792c2 100644 --- a/.env-template +++ b/.env-template @@ -53,15 +53,24 @@ YAGNA_DATADIR="." #POLYGON_GETH_ADDR=https://bor.golem.network,https://polygon-rpc.com #MUMBAI_GETH_ADDR=https://matic-mumbai.chainstacklabs.com -## ERC20 driver. -#ETH_FAUCET_ADDRESS=http://faucet.testnet.golem.network:4000/donate -#ERC20_RINKEBY_REQUIRED_CONFIRMATIONS=3 -#ERC20_MAINNET_REQUIRED_CONFIRMATIONS=5 +## T/GLM contract addresses #RINKEBY_TGLM_CONTRACT_ADDRESS=0xd94e3DC39d4Cad1DAd634e7eb585A57A19dC7EFE #RINKEBY_TGLM_FAUCET_ADDRESS=0x59259943616265A03d775145a2eC371732E2B06C #MAINNET_GLM_CONTRACT_ADDRESS=0x7DD9c5Cba05E151C895FDe1CF355C9A1D5DA6429 + +## Eth faucet address +#ETH_FAUCET_ADDRESS=http://faucet.testnet.golem.network:4000/donate + +## ERC20 driver. #ERC20_SENDOUT_INTERVAL_SECS=10 #ERC20_CONFIRMATION_INTERVAL_SECS=5 +#ERC20_RINKEBY_REQUIRED_CONFIRMATIONS=3 +#ERC20_MAINNET_REQUIRED_CONFIRMATIONS=5 + +## ERC20Next driver. +#ERC20NEXT_SENDOUT_INTERVAL_SECS=10 +#ERC20NEXT_RINKEBY_REQUIRED_CONFIRMATIONS=3 +#ERC20NEXT_MAINNET_REQUIRED_CONFIRMATIONS=5 ## Activity Service diff --git a/core/payment-driver/base/src/account.rs b/core/payment-driver/base/src/account.rs index 515a096f5a..66e7e8e185 100644 --- a/core/payment-driver/base/src/account.rs +++ b/core/payment-driver/base/src/account.rs @@ -3,10 +3,10 @@ */ // External crates -use std::cell::RefCell; use std::collections::HashMap; -use std::rc::Rc; +use std::sync::Arc; +use tokio::sync::Mutex; // Workspace uses use ya_core_model::identity::event::Event as IdentityEvent; @@ -14,15 +14,15 @@ use ya_core_model::identity::event::Event as IdentityEvent; use crate::driver::NodeId; // Public types -pub type AccountsRc = Rc>; +pub type AccountsArc = Arc>; pub struct Accounts { accounts: HashMap, } impl Accounts { - pub fn new_rc() -> AccountsRc { - Rc::new(RefCell::new(Self::new())) + pub fn new_rc() -> AccountsArc { + Arc::new(Mutex::new(Self::new())) } pub fn handle_event(&mut self, msg: IdentityEvent) { diff --git a/core/payment-driver/erc20/src/driver.rs b/core/payment-driver/erc20/src/driver.rs index f3aec71799..c32e5452ab 100644 --- a/core/payment-driver/erc20/src/driver.rs +++ b/core/payment-driver/erc20/src/driver.rs @@ -12,7 +12,7 @@ use ya_client_model::payment::DriverStatusProperty; // Workspace uses use ya_payment_driver::{ - account::{Accounts, AccountsRc}, + account::{Accounts, AccountsArc}, bus, cron::PaymentDriverCron, dao::DbExecutor, @@ -48,7 +48,7 @@ lazy_static::lazy_static! { } pub struct Erc20Driver { - active_accounts: AccountsRc, + active_accounts: AccountsArc, dao: Erc20Dao, sendout_lock: Mutex<()>, confirmation_lock: Mutex<()>, @@ -67,15 +67,21 @@ impl Erc20Driver { pub async fn load_active_accounts(&self) { log::debug!("load_active_accounts"); let unlocked_accounts = bus::list_unlocked_identities().await.unwrap(); - let mut accounts = self.active_accounts.borrow_mut(); + let mut accounts = self.active_accounts.lock().await; for account in unlocked_accounts { log::debug!("account={}", account); accounts.add_account(account) } } - fn is_account_active(&self, address: &str) -> Result<(), GenericError> { - match self.active_accounts.as_ref().borrow().get_node_id(address) { + async fn is_account_active(&self, address: &str) -> Result<(), GenericError> { + match self + .active_accounts + .as_ref() + .lock() + .await + .get_node_id(address) + { Some(_) => Ok(()), None => Err(GenericError::new(format!( "Account not active: {}", @@ -93,7 +99,7 @@ impl PaymentDriver for Erc20Driver { _caller: String, msg: IdentityEvent, ) -> Result<(), IdentityError> { - self.active_accounts.borrow_mut().handle_event(msg); + self.active_accounts.lock().await.handle_event(msg); Ok(()) } @@ -171,7 +177,7 @@ impl PaymentDriver for Erc20Driver { _caller: String, msg: Transfer, ) -> Result { - self.is_account_active(&msg.sender)?; + self.is_account_active(&msg.sender).await?; cli::transfer(&self.dao, msg).await } @@ -183,7 +189,7 @@ impl PaymentDriver for Erc20Driver { ) -> Result { log::debug!("schedule_payment: {:?}", msg); - self.is_account_active(&msg.sender())?; + self.is_account_active(&msg.sender()).await?; api::schedule_payment(&self.dao, msg).await } @@ -267,7 +273,7 @@ impl PaymentDriverCron for Erc20Driver { 'outer: for network_key in self.get_networks().keys() { let network = Network::from_str(network_key).unwrap(); // Process payment rows - let accounts = self.active_accounts.borrow().list_accounts(); + let accounts = self.active_accounts.lock().await.list_accounts(); for node_id in accounts { if let Err(e) = cron::process_payments_for_account(&self.dao, &node_id, network).await diff --git a/core/payment-driver/erc20/src/driver/cli.rs b/core/payment-driver/erc20/src/driver/cli.rs index 9453b0b38c..4b7bc453d8 100644 --- a/core/payment-driver/erc20/src/driver/cli.rs +++ b/core/payment-driver/erc20/src/driver/cli.rs @@ -30,7 +30,7 @@ pub async fn init(driver: &Erc20Driver, msg: Init) -> Result<(), GenericError> { // Ensure account is unlock before initialising send mode if mode.contains(AccountMode::SEND) { - driver.is_account_active(&address)? + driver.is_account_active(&address).await? } wallet::init_wallet( diff --git a/core/payment-driver/erc20next/src/driver.rs b/core/payment-driver/erc20next/src/driver.rs index 91db7f5552..c2ab5563ca 100644 --- a/core/payment-driver/erc20next/src/driver.rs +++ b/core/payment-driver/erc20next/src/driver.rs @@ -11,17 +11,17 @@ use ethereum_types::U256; use num_bigint::BigInt; use std::collections::HashMap; use std::str::FromStr; +use std::sync::Arc; use tokio::sync::mpsc::Receiver; -use tokio::sync::Mutex; +use tokio_util::task::LocalPoolHandle; use uuid::Uuid; use web3::types::H256; use ya_client_model::payment::DriverStatusProperty; // Workspace uses use ya_payment_driver::{ - account::{Accounts, AccountsRc}, + account::{Accounts, AccountsArc}, bus, - cron::PaymentDriverCron, dao::DbExecutor, driver::{ async_trait, BigDecimal, IdentityError, IdentityEvent, Network as NetworkConfig, @@ -38,49 +38,42 @@ use crate::{network::SUPPORTED_NETWORKS, DRIVER_NAME, RINKEBY_NETWORK}; mod cli; -lazy_static::lazy_static! { - static ref TX_SENDOUT_INTERVAL: std::time::Duration = std::time::Duration::from_secs( - std::env::var("ERC20NEXT_SENDOUT_INTERVAL_SECS") - .ok() - .and_then(|x| x.parse().ok()) - .unwrap_or(30), - ); - - static ref TX_CONFIRMATION_INTERVAL: std::time::Duration = std::time::Duration::from_secs( - std::env::var("ERC20NEXT_CONFIRMATION_INTERVAL_SECS") - .ok() - .and_then(|x| x.parse().ok()) - .unwrap_or(30), - ); -} - pub struct Erc20NextDriver { - active_accounts: AccountsRc, + active_accounts: AccountsArc, payment_runtime: PaymentRuntime, - events: Mutex>, } impl Erc20NextDriver { - pub fn new(payment_runtime: PaymentRuntime, events: Receiver) -> Self { - Self { + pub fn new(payment_runtime: PaymentRuntime, recv: Receiver) -> Arc { + let this = Arc::new(Self { active_accounts: Accounts::new_rc(), payment_runtime, - events: Mutex::new(events), - } + }); + + let this_ = Arc::clone(&this); + LocalPoolHandle::new(1).spawn_pinned(move || Self::payment_confirm_job(this_, recv)); + + this } pub async fn load_active_accounts(&self) { log::debug!("load_active_accounts"); let unlocked_accounts = bus::list_unlocked_identities().await.unwrap(); - let mut accounts = self.active_accounts.borrow_mut(); + let mut accounts = self.active_accounts.lock().await; for account in unlocked_accounts { log::debug!("account={}", account); accounts.add_account(account) } } - fn is_account_active(&self, address: &str) -> Result<(), GenericError> { - match self.active_accounts.as_ref().borrow().get_node_id(address) { + async fn is_account_active(&self, address: &str) -> Result<(), GenericError> { + match self + .active_accounts + .as_ref() + .lock() + .await + .get_node_id(address) + { Some(_) => Ok(()), None => Err(GenericError::new(format!( "Account not active: {}", @@ -96,7 +89,7 @@ impl Erc20NextDriver { amount: &BigDecimal, network: &str, ) -> Result { - self.is_account_active(sender)?; + self.is_account_active(sender).await?; let sender = H160::from_str(sender) .map_err(|err| GenericError::new(format!("Error when parsing sender {err:?}")))?; let receiver = H160::from_str(to) @@ -120,6 +113,38 @@ impl Erc20NextDriver { Ok(payment_id) } + async fn payment_confirm_job(this: Arc, mut events: Receiver) { + while let Some(event) = events.recv().await { + if let DriverEventContent::TransferFinished(transfer_finished) = &event.content { + match this + ._confirm_payments( + &transfer_finished.token_transfer_dao, + &transfer_finished.tx_dao, + ) + .await + { + Ok(_) => log::info!( + "Payment confirmed: {}", + transfer_finished + .token_transfer_dao + .payment_id + .clone() + .unwrap_or_default() + ), + Err(e) => log::error!( + "Error confirming payment: {}, error: {}", + transfer_finished + .token_transfer_dao + .payment_id + .clone() + .unwrap_or_default(), + e + ), + } + } + } + } + async fn _confirm_payments( &self, token_transfer: &TokenTransferDao, @@ -204,7 +229,7 @@ impl PaymentDriver for Erc20NextDriver { _caller: String, msg: IdentityEvent, ) -> Result<(), IdentityError> { - self.active_accounts.borrow_mut().handle_event(msg); + self.active_accounts.lock().await.handle_event(msg); Ok(()) } @@ -509,51 +534,3 @@ impl PaymentDriver for Erc20NextDriver { Ok(()) } } - -#[async_trait(?Send)] -impl PaymentDriverCron for Erc20NextDriver { - fn sendout_interval(&self) -> std::time::Duration { - *TX_SENDOUT_INTERVAL - } - - fn confirmation_interval(&self) -> std::time::Duration { - *TX_CONFIRMATION_INTERVAL - } - - async fn send_out_payments(&self) { - // no-op, handled by erc20_payment_lib internally - } - - async fn confirm_payments(&self) { - let mut events = self.events.lock().await; - while let Ok(event) = events.try_recv() { - if let DriverEventContent::TransferFinished(transfer_finished) = &event.content { - match self - ._confirm_payments( - &transfer_finished.token_transfer_dao, - &transfer_finished.tx_dao, - ) - .await - { - Ok(_) => log::info!( - "Payment confirmed: {}", - transfer_finished - .token_transfer_dao - .payment_id - .clone() - .unwrap_or_default() - ), - Err(e) => log::error!( - "Error confirming payment: {}, error: {}", - transfer_finished - .token_transfer_dao - .payment_id - .clone() - .unwrap_or_default(), - e - ), - } - } - } - } -} diff --git a/core/payment-driver/erc20next/src/driver/cli.rs b/core/payment-driver/erc20next/src/driver/cli.rs index e53c0bf3f8..c2ef6ebc5d 100644 --- a/core/payment-driver/erc20next/src/driver/cli.rs +++ b/core/payment-driver/erc20next/src/driver/cli.rs @@ -21,7 +21,7 @@ pub async fn init(driver: &Erc20NextDriver, msg: Init) -> Result<(), GenericErro // Ensure account is unlock before initialising send mode if mode.contains(AccountMode::SEND) { - driver.is_account_active(&address)? + driver.is_account_active(&address).await? } let network = network::network_like_to_network(msg.network()); diff --git a/core/payment-driver/erc20next/src/service.rs b/core/payment-driver/erc20next/src/service.rs index d5548454da..bf1cc1797b 100644 --- a/core/payment-driver/erc20next/src/service.rs +++ b/core/payment-driver/erc20next/src/service.rs @@ -9,10 +9,8 @@ use erc20_payment_lib::config::{AdditionalOptions, MultiContractSettings}; use erc20_payment_lib::misc::load_private_keys; use erc20_payment_lib::runtime::PaymentRuntime; use ethereum_types::H160; -use std::sync::Arc; // Workspace uses -use ya_payment_driver::cron::Cron; use ya_payment_driver::{ bus, dao::{init, DbExecutor}, @@ -63,13 +61,14 @@ impl Erc20NextService { log::warn!( "Format of the file may change in the future releases, use with caution!" ); + match config::Config::load(&path.join("config-payments.toml")).await { Ok(config_from_file) => { log::info!("Config file loaded successfully, overwriting default config"); config = config_from_file; } Err(err) => { - log::error!("Config file found but failed to load from file - using default config. Error: {}", err) + log::error!("Config file found but failed to load from file - using default config. Error: {err}") } } } else { @@ -79,17 +78,27 @@ impl Erc20NextService { ); } + let sendout_interval_env = "ERC20NEXT_SENDOUT_INTERVAL_SECS"; + if let Ok(sendout_interval) = env::var(sendout_interval_env) { + match sendout_interval.parse::() { + Ok(sendout_interval_secs) => { + log::info!("erc20next gather interval set to {sendout_interval_secs}s"); + config.engine.gather_interval = sendout_interval_secs; + }, + Err(e) => log::warn!("Value {sendout_interval} for {sendout_interval_env} is not a valid integer: {e}"), + } + } + for (network, chain) in &mut config.chain { let prefix = network.to_ascii_uppercase(); - let mut token = chain.token.clone(); - let symbol = token.symbol.to_ascii_uppercase(); + let symbol = chain.token.symbol.to_ascii_uppercase(); let rpc_env = format!("{prefix}_GETH_ADDR"); let priority_fee_env = format!("{prefix}_PRIORITY_FEE"); let max_fee_per_gas_env = format!("{prefix}_MAX_FEE_PER_GAS"); let token_addr_env = format!("{prefix}_{symbol}_CONTRACT_ADDRESS"); let multi_payment_addr_env = format!("{prefix}_MULTI_PAYMENT_CONTRACT_ADDRESS"); - let confirmations = format!("ERC20_{prefix}_REQUIRED_CONFIRMATIONS"); + let confirmations_env = format!("ERC20NEXT_{prefix}_REQUIRED_CONFIRMATIONS"); if let Ok(addr) = env::var(&rpc_env) { chain.rpc_endpoints = addr.split(',').map(ToOwned::to_owned).collect(); @@ -125,7 +134,7 @@ impl Erc20NextService { match H160::from_str(&addr) { Ok(parsed) => { log::info!("{network} token address set to {addr}"); - token.address = parsed; + chain.token.address = parsed; } Err(e) => { log::warn!( @@ -134,7 +143,7 @@ impl Erc20NextService { } }; } - if let Ok(confirmations) = env::var(&confirmations) { + if let Ok(confirmations) = env::var(&confirmations_env) { match confirmations.parse::() { Ok(parsed) => { log::info!("{network} required confirmations set to {parsed}"); @@ -167,7 +176,7 @@ impl Erc20NextService { } } - log::warn!("Starting payment engine: {:#?}", config); + log::debug!("Starting payment engine: {:#?}", config); let signer = IdentitySigner::new(); let (sender, recv) = tokio::sync::mpsc::channel(16); @@ -185,17 +194,10 @@ impl Erc20NextService { .await .unwrap(); - log::warn!("Payment engine started - outside task"); + log::debug!("Bind erc20next driver"); let driver = Erc20NextDriver::new(pr, recv); - driver.load_active_accounts().await; - let driver_rc = Arc::new(driver); - - bus::bind_service(db, driver_rc.clone()).await?; - - // Start cron - Cron::new(driver_rc); - log::debug!("Cron started"); + bus::bind_service(db, driver).await?; log::info!("Successfully connected Erc20NextService to gsb."); Ok(()) diff --git a/goth_tests/domain/payments/goth-config.yml b/goth_tests/domain/payments/goth-config.yml index 91536ef20d..2cd99a6097 100644 --- a/goth_tests/domain/payments/goth-config.yml +++ b/goth_tests/domain/payments/goth-config.yml @@ -4,7 +4,6 @@ # home directory. docker-compose: - # Path to compose file to be used, relative to `docker-dir` compose-file: "docker-compose.yml" docker-dir: "../../assets/docker/" @@ -26,13 +25,10 @@ docker-compose: ethereum-goerli: ".*Wallets supplied." ethereum-polygon: ".*Wallets supplied." - key-dir: "../../assets/keys" - web-root: "../../assets/web-root" - node-types: # Each node type is a collection of attributes common to a group of nodes. # Required attributes are "name" and "class". @@ -43,7 +39,6 @@ node-types: - "ERC20_SENDOUT_INTERVAL_SECS=1" - "ERC20_CONFIRMATION_INTERVAL_SECS=1" - "ERC20NEXT_SENDOUT_INTERVAL_SECS=1" - - "ERC20NEXT_CONFIRMATION_INTERVAL_SECS=1" - name: "Provider" class: "goth_tests.helpers.probe.ProviderProbe" @@ -59,10 +54,8 @@ node-types: - "DEBIT_NOTE_INTERVAL=3s" - "PAYMENT_TIMEOUT=5s" - "ERC20_CONFIRMATION_INTERVAL_SECS=1" - - "ERC20NEXT_CONFIRMATION_INTERVAL_SECS=1" nodes: - - name: "requestor" type: "Requestor" payment-config: "erc20" diff --git a/goth_tests/pyproject.toml b/goth_tests/pyproject.toml index 43c0adec4b..d1674945da 100644 --- a/goth_tests/pyproject.toml +++ b/goth_tests/pyproject.toml @@ -27,7 +27,7 @@ pytest-asyncio = "0.21" pytest-split = "^0.8.1" # goth = "0.15.8" # to use development goth version uncomment below -goth = { git = "https://github.com/golemfactory/goth.git", rev = "2c2c8a8b01324350eb5968171fe9e38d333b90f4" } +goth = { git = "https://github.com/golemfactory/goth.git", rev = "2d506620acfd843e60a0ba68dcfdfda966e4472a" } [tool.poetry.dev-dependencies] black = "21.7b0"