Skip to content

Commit

Permalink
working on better gather time handling
Browse files Browse the repository at this point in the history
  • Loading branch information
scx1332 committed Oct 24, 2023
1 parent 35648f5 commit be56954
Show file tree
Hide file tree
Showing 10 changed files with 150 additions and 139 deletions.
8 changes: 6 additions & 2 deletions config-payments.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
[engine]
service-sleep = 10
process-sleep = 10
# service sleep is for internal runtime checks
service-sleep = 5
# process sleep is to set how often payments are gathered
gather-sleep = 600
# gather payments on payment driver start (otherwise wait for first gather-sleep)
gather-at-start = true
automatic-recover = false

[chain.rinkeby]
Expand Down
3 changes: 2 additions & 1 deletion crates/erc20_payment_lib/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ impl AdditionalOptions {
#[serde(rename_all = "kebab-case")]
pub struct Engine {
pub service_sleep: u64,
pub process_sleep: u64,
pub gather_sleep: u64,
pub gather_at_start: bool,
pub automatic_recover: bool,
}

Expand Down
9 changes: 6 additions & 3 deletions crates/erc20_payment_lib/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,8 @@ pub struct ValidatedOptions {
pub generate_tx_only: bool,
pub skip_multi_contract_check: bool,
pub service_sleep: u64,
pub process_sleep: u64,
pub gather_sleep: u64,
pub gather_at_start: bool,
pub http_threads: u64,
pub http_port: u16,
pub http_addr: String,
Expand All @@ -195,7 +196,8 @@ impl Default for ValidatedOptions {
generate_tx_only: false,
skip_multi_contract_check: false,
service_sleep: 10,
process_sleep: 10,
gather_sleep: 10,
gather_at_start: false,
http_threads: 2,
http_port: 8080,
http_addr: "127.0.0.1".to_string(),
Expand Down Expand Up @@ -398,7 +400,8 @@ impl PaymentRuntime {
options.generate_tx_only,
options.skip_multi_contract_check,
config.engine.service_sleep,
config.engine.process_sleep,
config.engine.gather_sleep,
config.engine.gather_at_start,
config.engine.automatic_recover,
)?;
payment_setup.use_transfer_for_single_payment = options.use_transfer_for_single_payment;
Expand Down
14 changes: 11 additions & 3 deletions crates/erc20_payment_lib/src/sender/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ pub async fn process_transaction(
) -> Result<(TxDao, ProcessTransactionResult), PaymentError> {
const CHECKS_UNTIL_NOT_FOUND: u64 = 5;

let wait_duration = Duration::from_secs(payment_setup.process_sleep);
let gather_sleep = Duration::from_secs(payment_setup.gather_sleep);

let chain_id = web3_tx_dao.chain_id;
let Ok(chain_setup) = payment_setup.get_chain_setup(chain_id) else {
Expand Down Expand Up @@ -754,7 +754,11 @@ pub async fn process_transaction(
send_transaction(event_sender.clone(), web3, web3_tx_dao).await?;
web3_tx_dao.broadcast_count += 1;
update_tx(conn, web3_tx_dao).await.map_err(err_from!())?;
tokio::time::sleep(wait_duration).await;
log::warn!(
"Sleeping for {} seconds (process sleep after transaction send)",
gather_sleep.as_secs()
);
tokio::time::sleep(gather_sleep).await;
continue;
} else {
//timeout transaction when it is not confirmed after transaction_timeout seconds
Expand Down Expand Up @@ -831,6 +835,10 @@ pub async fn process_transaction(
if !wait_for_confirmation {
return Ok((web3_tx_dao.clone(), ProcessTransactionResult::Unknown));
}
tokio::time::sleep(wait_duration).await;
log::warn!(
"Sleeping for {} seconds (process sleep at the end of the loop)",
gather_sleep.as_secs()
);
tokio::time::sleep(gather_sleep).await;
}
}
237 changes: 114 additions & 123 deletions crates/erc20_payment_lib/src/sender/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::db::model::*;
use crate::db::ops::*;
use crate::error::{ErrorBag, PaymentError};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Mutex;

use crate::sender::process::{process_transaction, ProcessTransactionResult};
Expand Down Expand Up @@ -160,7 +161,6 @@ pub async fn update_approve_result(
ProcessTransactionResult::Confirmed => {
tx.processing = 0;

log::error!("Updating approve result {tx:?}");
let mut db_transaction = conn.begin().await.map_err(err_from!())?;
let mut allowance = get_allowance_by_tx(&mut *db_transaction, tx.id)
.await
Expand Down Expand Up @@ -266,83 +266,83 @@ pub async fn process_transactions(
.await
.map_err(err_from!())?;

if let Some(tx) = transactions.get_mut(0) {
let (mut tx, process_t_res) = if shared_state.lock().await.is_skipped(tx.id) {
(
tx.clone(),
ProcessTransactionResult::InternalError("Transaction skipped by user".into()),
)
} else {
shared_state
.lock()
.await
.set_tx_message(tx.id, "Processing".to_string());
match process_transaction(
event_sender.clone(),
shared_state.clone(),
conn,
tx,
payment_setup,
signer,
false,
)
.await
{
Ok((tx_dao, process_result)) => (tx_dao, process_result),
Err(err) => match err.inner {
ErrorBag::TransactionFailedError(err2) => {
shared_state
.lock()
.await
.set_tx_error(tx.id, Some(err2.message.clone()));
let Some(tx) = transactions.get_mut(0) else {
break;
};

return Err(err_create!(err2));
}
_ => {
log::error!("Error in process transaction: {}", err.inner);
shared_state
.lock()
.await
.set_tx_error(tx.id, Some(format!("{}", err.inner)));
return Err(err);
}
},
}
};
if let ProcessTransactionResult::Replaced = process_t_res {
continue;
};
if tx.method.starts_with("MULTI.golemTransfer")
|| tx.method == "ERC20.transfer"
|| tx.method == "transfer"
let (mut tx, process_t_res) = if shared_state.lock().await.is_skipped(tx.id) {
(
tx.clone(),
ProcessTransactionResult::InternalError("Transaction skipped by user".into()),
)
} else {
shared_state
.lock()
.await
.set_tx_message(tx.id, "Processing".to_string());
match process_transaction(
event_sender.clone(),
shared_state.clone(),
conn,
tx,
payment_setup,
signer,
false,
)
.await
{
log::debug!("Updating token transfer result");
update_token_transfer_result(event_sender.clone(), conn, &mut tx, &process_t_res)
.await?;
} else if tx.method == "ERC20.approve" {
log::debug!("Updating token approve result");
update_approve_result(event_sender.clone(), conn, &mut tx, &process_t_res).await?;
} else {
log::debug!("Updating plain tx result");
update_tx_result(conn, &mut tx, &process_t_res).await?;
}
match process_t_res {
ProcessTransactionResult::Unknown => {}
ProcessTransactionResult::Confirmed => {
send_driver_event(
&event_sender,
DriverEventContent::TransactionConfirmed(tx.clone()),
)
.await;
}
_ => {
shared_state.lock().await.current_tx_info.remove(&tx.id);
}
Ok((tx_dao, process_result)) => (tx_dao, process_result),
Err(err) => match err.inner {
ErrorBag::TransactionFailedError(err2) => {
shared_state
.lock()
.await
.set_tx_error(tx.id, Some(err2.message.clone()));

return Err(err_create!(err2));
}
_ => {
log::error!("Error in process transaction: {}", err.inner);
shared_state
.lock()
.await
.set_tx_error(tx.id, Some(format!("{}", err.inner)));
return Err(err);
}
},
}
};
if let ProcessTransactionResult::Replaced = process_t_res {
continue;
};
if tx.method.starts_with("MULTI.golemTransfer")
|| tx.method == "ERC20.transfer"
|| tx.method == "transfer"
{
log::debug!("Updating token transfer result");
update_token_transfer_result(event_sender.clone(), conn, &mut tx, &process_t_res)
.await?;
} else if tx.method == "ERC20.approve" {
log::debug!("Updating token approve result");
update_approve_result(event_sender.clone(), conn, &mut tx, &process_t_res).await?;
} else {
log::debug!("Updating plain tx result");
update_tx_result(conn, &mut tx, &process_t_res).await?;
}
if transactions.is_empty() {
break;
match process_t_res {
ProcessTransactionResult::Unknown => {}
ProcessTransactionResult::Confirmed => {
send_driver_event(
&event_sender,
DriverEventContent::TransactionConfirmed(tx.clone()),
)
.await;
}
_ => {
shared_state.lock().await.current_tx_info.remove(&tx.id);
}
}

tokio::time::sleep(std::time::Duration::from_secs(payment_setup.service_sleep)).await;
}
Ok(())
Expand All @@ -355,58 +355,53 @@ pub async fn service_loop(
signer: impl Signer + Send + Sync + 'static,
event_sender: Option<tokio::sync::mpsc::Sender<DriverEvent>>,
) {
let process_transactions_interval = payment_setup.process_sleep as i64;
let gather_transactions_interval = payment_setup.process_sleep as i64;
let mut last_update_time1 =
chrono::Utc::now() - chrono::Duration::seconds(process_transactions_interval);
let mut last_update_time2 =
chrono::Utc::now() - chrono::Duration::seconds(gather_transactions_interval);
let gather_transactions_interval = payment_setup.gather_sleep as i64;
let mut last_gather_time = if payment_setup.gather_at_start {
chrono::Utc::now() - chrono::Duration::seconds(gather_transactions_interval)
} else {
chrono::Utc::now()
};

let mut process_tx_needed = true;
let mut process_tx_instantly = true;
let mut process_tx_needed;
loop {
log::debug!("Sender service loop - start loop");
let current_time = chrono::Utc::now();
if current_time < last_update_time1 {
//handle case when system time changed
last_update_time1 = current_time;
}

if process_tx_instantly
|| (process_tx_needed
&& current_time
> last_update_time1 + chrono::Duration::seconds(process_transactions_interval))
if payment_setup.generate_tx_only {
log::warn!("Skipping processing transactions...");
} else if let Err(e) = process_transactions(
event_sender.clone(),
shared_state.clone(),
conn,
payment_setup,
&signer,
)
.await
{
process_tx_instantly = false;
if payment_setup.generate_tx_only {
log::warn!("Skipping processing transactions...");
process_tx_needed = false;
} else {
match process_transactions(
event_sender.clone(),
shared_state.clone(),
conn,
payment_setup,
&signer,
)
.await
{
Ok(_) => {
//all pending transactions processed
process_tx_needed = false;
}
Err(e) => {
log::error!("Error in process transactions: {}", e);
}
};
}
last_update_time1 = current_time;
log::error!("Error in process transactions: {}", e);
tokio::time::sleep(std::time::Duration::from_secs(10)).await;
continue;
}

if current_time
> last_update_time2 + chrono::Duration::seconds(gather_transactions_interval)
&& !process_tx_needed
{
process_tx_needed = false;

//we should be here only when all pending transactions are processed

let next_gather_time =
last_gather_time + chrono::Duration::seconds(gather_transactions_interval);
if current_time < next_gather_time {
log::info!(
"Transaction will be gathered in {} seconds",
humantime::format_duration(Duration::from_secs(
(next_gather_time - current_time).num_seconds() as u64
))
);
tokio::time::sleep(Duration::from_secs(std::cmp::min(
60,
(next_gather_time - current_time).num_seconds() as u64,
)))
.await;
} else {
log::info!("Gathering transfers...");
let mut token_transfer_map = match gather_transactions_pre(conn, payment_setup).await {
Ok(token_transfer_map) => token_transfer_map,
Expand All @@ -429,7 +424,6 @@ pub async fn service_loop(
Ok(count) => {
if count > 0 {
process_tx_needed = true;
process_tx_instantly = true;
} else {
log::info!("No new transfers to process");
}
Expand All @@ -443,8 +437,6 @@ pub async fn service_loop(
{
Ok(_) => {
//process transaction instantly
process_tx_needed = true;
process_tx_instantly = true;
shared_state.lock().await.idling = false;
continue;
}
Expand All @@ -462,7 +454,7 @@ pub async fn service_loop(
log::error!("Error in gather transactions: {}", e);
}
};
last_update_time2 = current_time;
last_gather_time = current_time;
if payment_setup.finish_when_done && !process_tx_needed {
log::info!("No more work to do, exiting...");
break;
Expand All @@ -474,7 +466,6 @@ pub async fn service_loop(
shared_state.lock().await.idling = false;
}
}

tokio::time::sleep(std::time::Duration::from_secs(payment_setup.service_sleep)).await;
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
}
Loading

0 comments on commit be56954

Please sign in to comment.