Skip to content

Commit

Permalink
Merge branch 'payments-dev' into kek/fix-overspend
Browse files Browse the repository at this point in the history
  • Loading branch information
scx1332 authored Oct 31, 2023
2 parents b9faf42 + 6518402 commit 3dd7f40
Show file tree
Hide file tree
Showing 9 changed files with 111 additions and 124 deletions.
17 changes: 13 additions & 4 deletions .env-template
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,24 @@ YAGNA_DATADIR="."
#POLYGON_GETH_ADDR=https://bor.golem.network,https://polygon-rpc.com
#MUMBAI_GETH_ADDR=https://matic-mumbai.chainstacklabs.com

## ERC20 driver.
#ETH_FAUCET_ADDRESS=http://faucet.testnet.golem.network:4000/donate
#ERC20_RINKEBY_REQUIRED_CONFIRMATIONS=3
#ERC20_MAINNET_REQUIRED_CONFIRMATIONS=5
## T/GLM contract addresses
#RINKEBY_TGLM_CONTRACT_ADDRESS=0xd94e3DC39d4Cad1DAd634e7eb585A57A19dC7EFE
#RINKEBY_TGLM_FAUCET_ADDRESS=0x59259943616265A03d775145a2eC371732E2B06C
#MAINNET_GLM_CONTRACT_ADDRESS=0x7DD9c5Cba05E151C895FDe1CF355C9A1D5DA6429

## Eth faucet address
#ETH_FAUCET_ADDRESS=http://faucet.testnet.golem.network:4000/donate

## ERC20 driver.
#ERC20_SENDOUT_INTERVAL_SECS=10
#ERC20_CONFIRMATION_INTERVAL_SECS=5
#ERC20_RINKEBY_REQUIRED_CONFIRMATIONS=3
#ERC20_MAINNET_REQUIRED_CONFIRMATIONS=5

## ERC20Next driver.
#ERC20NEXT_SENDOUT_INTERVAL_SECS=10
#ERC20NEXT_RINKEBY_REQUIRED_CONFIRMATIONS=3
#ERC20NEXT_MAINNET_REQUIRED_CONFIRMATIONS=5

## Activity Service

Expand Down
10 changes: 5 additions & 5 deletions core/payment-driver/base/src/account.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,26 @@
*/

// External crates
use std::cell::RefCell;
use std::collections::HashMap;
use std::rc::Rc;
use std::sync::Arc;

use tokio::sync::Mutex;
// Workspace uses
use ya_core_model::identity::event::Event as IdentityEvent;

// Local uses
use crate::driver::NodeId;

// Public types
pub type AccountsRc = Rc<RefCell<Accounts>>;
pub type AccountsArc = Arc<Mutex<Accounts>>;

pub struct Accounts {
accounts: HashMap<String, NodeId>,
}

impl Accounts {
pub fn new_rc() -> AccountsRc {
Rc::new(RefCell::new(Self::new()))
pub fn new_rc() -> AccountsArc {
Arc::new(Mutex::new(Self::new()))
}

pub fn handle_event(&mut self, msg: IdentityEvent) {
Expand Down
24 changes: 15 additions & 9 deletions core/payment-driver/erc20/src/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use ya_client_model::payment::DriverStatusProperty;

// Workspace uses
use ya_payment_driver::{
account::{Accounts, AccountsRc},
account::{Accounts, AccountsArc},
bus,
cron::PaymentDriverCron,
dao::DbExecutor,
Expand Down Expand Up @@ -48,7 +48,7 @@ lazy_static::lazy_static! {
}

pub struct Erc20Driver {
active_accounts: AccountsRc,
active_accounts: AccountsArc,
dao: Erc20Dao,
sendout_lock: Mutex<()>,
confirmation_lock: Mutex<()>,
Expand All @@ -67,15 +67,21 @@ impl Erc20Driver {
pub async fn load_active_accounts(&self) {
log::debug!("load_active_accounts");
let unlocked_accounts = bus::list_unlocked_identities().await.unwrap();
let mut accounts = self.active_accounts.borrow_mut();
let mut accounts = self.active_accounts.lock().await;
for account in unlocked_accounts {
log::debug!("account={}", account);
accounts.add_account(account)
}
}

fn is_account_active(&self, address: &str) -> Result<(), GenericError> {
match self.active_accounts.as_ref().borrow().get_node_id(address) {
async fn is_account_active(&self, address: &str) -> Result<(), GenericError> {
match self
.active_accounts
.as_ref()
.lock()
.await
.get_node_id(address)
{
Some(_) => Ok(()),
None => Err(GenericError::new(format!(
"Account not active: {}",
Expand All @@ -93,7 +99,7 @@ impl PaymentDriver for Erc20Driver {
_caller: String,
msg: IdentityEvent,
) -> Result<(), IdentityError> {
self.active_accounts.borrow_mut().handle_event(msg);
self.active_accounts.lock().await.handle_event(msg);
Ok(())
}

Expand Down Expand Up @@ -171,7 +177,7 @@ impl PaymentDriver for Erc20Driver {
_caller: String,
msg: Transfer,
) -> Result<String, GenericError> {
self.is_account_active(&msg.sender)?;
self.is_account_active(&msg.sender).await?;
cli::transfer(&self.dao, msg).await
}

Expand All @@ -183,7 +189,7 @@ impl PaymentDriver for Erc20Driver {
) -> Result<String, GenericError> {
log::debug!("schedule_payment: {:?}", msg);

self.is_account_active(&msg.sender())?;
self.is_account_active(&msg.sender()).await?;
api::schedule_payment(&self.dao, msg).await
}

Expand Down Expand Up @@ -267,7 +273,7 @@ impl PaymentDriverCron for Erc20Driver {
'outer: for network_key in self.get_networks().keys() {
let network = Network::from_str(network_key).unwrap();
// Process payment rows
let accounts = self.active_accounts.borrow().list_accounts();
let accounts = self.active_accounts.lock().await.list_accounts();
for node_id in accounts {
if let Err(e) =
cron::process_payments_for_account(&self.dao, &node_id, network).await
Expand Down
2 changes: 1 addition & 1 deletion core/payment-driver/erc20/src/driver/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ pub async fn init(driver: &Erc20Driver, msg: Init) -> Result<(), GenericError> {

// Ensure account is unlock before initialising send mode
if mode.contains(AccountMode::SEND) {
driver.is_account_active(&address)?
driver.is_account_active(&address).await?
}

wallet::init_wallet(
Expand Down
133 changes: 55 additions & 78 deletions core/payment-driver/erc20next/src/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,17 @@ use ethereum_types::U256;
use num_bigint::BigInt;
use std::collections::HashMap;
use std::str::FromStr;
use std::sync::Arc;
use tokio::sync::mpsc::Receiver;
use tokio::sync::Mutex;
use tokio_util::task::LocalPoolHandle;
use uuid::Uuid;
use web3::types::H256;
use ya_client_model::payment::DriverStatusProperty;

// Workspace uses
use ya_payment_driver::{
account::{Accounts, AccountsRc},
account::{Accounts, AccountsArc},
bus,
cron::PaymentDriverCron,
dao::DbExecutor,
driver::{
async_trait, BigDecimal, IdentityError, IdentityEvent, Network as NetworkConfig,
Expand All @@ -38,49 +38,42 @@ use crate::{network::SUPPORTED_NETWORKS, DRIVER_NAME, RINKEBY_NETWORK};

mod cli;

lazy_static::lazy_static! {
static ref TX_SENDOUT_INTERVAL: std::time::Duration = std::time::Duration::from_secs(
std::env::var("ERC20NEXT_SENDOUT_INTERVAL_SECS")
.ok()
.and_then(|x| x.parse().ok())
.unwrap_or(30),
);

static ref TX_CONFIRMATION_INTERVAL: std::time::Duration = std::time::Duration::from_secs(
std::env::var("ERC20NEXT_CONFIRMATION_INTERVAL_SECS")
.ok()
.and_then(|x| x.parse().ok())
.unwrap_or(30),
);
}

pub struct Erc20NextDriver {
active_accounts: AccountsRc,
active_accounts: AccountsArc,
payment_runtime: PaymentRuntime,
events: Mutex<Receiver<DriverEvent>>,
}

impl Erc20NextDriver {
pub fn new(payment_runtime: PaymentRuntime, events: Receiver<DriverEvent>) -> Self {
Self {
pub fn new(payment_runtime: PaymentRuntime, recv: Receiver<DriverEvent>) -> Arc<Self> {
let this = Arc::new(Self {
active_accounts: Accounts::new_rc(),
payment_runtime,
events: Mutex::new(events),
}
});

let this_ = Arc::clone(&this);
LocalPoolHandle::new(1).spawn_pinned(move || Self::payment_confirm_job(this_, recv));

this
}

pub async fn load_active_accounts(&self) {
log::debug!("load_active_accounts");
let unlocked_accounts = bus::list_unlocked_identities().await.unwrap();
let mut accounts = self.active_accounts.borrow_mut();
let mut accounts = self.active_accounts.lock().await;
for account in unlocked_accounts {
log::debug!("account={}", account);
accounts.add_account(account)
}
}

fn is_account_active(&self, address: &str) -> Result<(), GenericError> {
match self.active_accounts.as_ref().borrow().get_node_id(address) {
async fn is_account_active(&self, address: &str) -> Result<(), GenericError> {
match self
.active_accounts
.as_ref()
.lock()
.await
.get_node_id(address)
{
Some(_) => Ok(()),
None => Err(GenericError::new(format!(
"Account not active: {}",
Expand All @@ -96,7 +89,7 @@ impl Erc20NextDriver {
amount: &BigDecimal,
network: &str,
) -> Result<String, GenericError> {
self.is_account_active(sender)?;
self.is_account_active(sender).await?;
let sender = H160::from_str(sender)
.map_err(|err| GenericError::new(format!("Error when parsing sender {err:?}")))?;
let receiver = H160::from_str(to)
Expand All @@ -120,6 +113,38 @@ impl Erc20NextDriver {
Ok(payment_id)
}

async fn payment_confirm_job(this: Arc<Self>, mut events: Receiver<DriverEvent>) {
while let Some(event) = events.recv().await {
if let DriverEventContent::TransferFinished(transfer_finished) = &event.content {
match this
._confirm_payments(
&transfer_finished.token_transfer_dao,
&transfer_finished.tx_dao,
)
.await
{
Ok(_) => log::info!(
"Payment confirmed: {}",
transfer_finished
.token_transfer_dao
.payment_id
.clone()
.unwrap_or_default()
),
Err(e) => log::error!(
"Error confirming payment: {}, error: {}",
transfer_finished
.token_transfer_dao
.payment_id
.clone()
.unwrap_or_default(),
e
),
}
}
}
}

async fn _confirm_payments(
&self,
token_transfer: &TokenTransferDao,
Expand Down Expand Up @@ -204,7 +229,7 @@ impl PaymentDriver for Erc20NextDriver {
_caller: String,
msg: IdentityEvent,
) -> Result<(), IdentityError> {
self.active_accounts.borrow_mut().handle_event(msg);
self.active_accounts.lock().await.handle_event(msg);
Ok(())
}

Expand Down Expand Up @@ -509,51 +534,3 @@ impl PaymentDriver for Erc20NextDriver {
Ok(())
}
}

#[async_trait(?Send)]
impl PaymentDriverCron for Erc20NextDriver {
fn sendout_interval(&self) -> std::time::Duration {
*TX_SENDOUT_INTERVAL
}

fn confirmation_interval(&self) -> std::time::Duration {
*TX_CONFIRMATION_INTERVAL
}

async fn send_out_payments(&self) {
// no-op, handled by erc20_payment_lib internally
}

async fn confirm_payments(&self) {
let mut events = self.events.lock().await;
while let Ok(event) = events.try_recv() {
if let DriverEventContent::TransferFinished(transfer_finished) = &event.content {
match self
._confirm_payments(
&transfer_finished.token_transfer_dao,
&transfer_finished.tx_dao,
)
.await
{
Ok(_) => log::info!(
"Payment confirmed: {}",
transfer_finished
.token_transfer_dao
.payment_id
.clone()
.unwrap_or_default()
),
Err(e) => log::error!(
"Error confirming payment: {}, error: {}",
transfer_finished
.token_transfer_dao
.payment_id
.clone()
.unwrap_or_default(),
e
),
}
}
}
}
}
2 changes: 1 addition & 1 deletion core/payment-driver/erc20next/src/driver/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub async fn init(driver: &Erc20NextDriver, msg: Init) -> Result<(), GenericErro

// Ensure account is unlock before initialising send mode
if mode.contains(AccountMode::SEND) {
driver.is_account_active(&address)?
driver.is_account_active(&address).await?
}

let network = network::network_like_to_network(msg.network());
Expand Down
Loading

0 comments on commit 3dd7f40

Please sign in to comment.