diff --git a/.gitignore b/.gitignore index 98406f41a..e93842e28 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,7 @@ # Generated by Cargo # will have compiled files and executables /target/ +.target/ # These are backup files generated by rustfmt **/*.rs.bk diff --git a/core/payment/src/processor.rs b/core/payment/src/processor.rs index 7b1ae35e8..e7d3f25ba 100644 --- a/core/payment/src/processor.rs +++ b/core/payment/src/processor.rs @@ -8,8 +8,9 @@ use crate::error::processor::{ }; use crate::models::order::ReadObj as DbOrder; use crate::payment_sync::SYNC_NOTIFS_NOTIFY; -use crate::timeout_lock::RwLockTimeoutExt; +use crate::timeout_lock::{RwLockTimeoutExt, TimedMutex}; use crate::utils::remove_allocation_ids_from_payment; + use actix_web::web::Data; use bigdecimal::{BigDecimal, Zero}; use chrono::{DateTime, Utc}; @@ -17,17 +18,12 @@ use futures::{FutureExt, TryFutureExt}; use metrics::counter; use std::collections::hash_map::Entry; use std::collections::HashMap; -use std::ops::Deref; -use std::process::exit; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::Duration; use thiserror::Error; -use tokio::sync::mpsc::UnboundedSender; -use tokio::sync::{Mutex, RwLock}; -use tokio::task::JoinHandle; -use tokio::time::error::Elapsed; -use uuid::Uuid; +use tokio::sync::RwLock; + use ya_client_model::payment::allocation::Deposit; use ya_client_model::payment::{ Account, ActivityPayment, AgreementPayment, DriverDetails, Network, Payment, @@ -312,140 +308,6 @@ impl DriverRegistry { const DB_LOCK_TIMEOUT: Duration = Duration::from_secs(30); const REGISTRY_LOCK_TIMEOUT: Duration = Duration::from_secs(30); -struct TimedMutex { - mutex: Mutex, - sender: Option>, - counter_task: Option>, -} - -use tokio::sync::MutexGuard; - -enum TimedMutexTaskMessage { - Start(String), - Finish, -} - -struct TimedMutexGuard<'a> { - mutex_guard: MutexGuard<'a, DbExecutor>, - sender: &'a Option>, -} - -impl Drop for TimedMutexGuard<'_> { - fn drop(&mut self) { - if let Some(sender) = &self.sender { - if let Err(e) = sender.send(TimedMutexTaskMessage::Finish) { - log::error!("Cannot send fininsh to counter task {e}"); - } - } - } -} - -impl<'a> Deref for TimedMutexGuard<'a> { - type Target = MutexGuard<'a, DbExecutor>; - - fn deref(&self) -> &Self::Target { - &self.mutex_guard - } -} - -impl TimedMutex { - fn new(db: DbExecutor) -> Self { - let (sender, mut receiver) = - tokio::sync::mpsc::unbounded_channel::(); - - let counter_task = tokio::spawn(async move { - log::info!("[TimedMutex] Counter thread started"); - loop { - // wait for start or close without timeout - let task_name = match receiver.recv().await { - None => break, - Some(TimedMutexTaskMessage::Start(x)) => x, - Some(TimedMutexTaskMessage::Finish) => { - panic!("[TimedMutex] Unexpected finish") - } - }; - - log::info!("[TimedMutex] task {task_name} started..."); - let mut counter = 0; - loop { - match tokio::time::timeout(Duration::from_secs(10), receiver.recv()).await { - Err(_) => { - log::error!("[TimedMutex] Long running task: {task_name}!"); - counter += 1; - // five minutes - if counter > 30 { - exit(41); - } - } - Ok(None) => panic!("[TimedMutex] Unexpected mpsc close."), - Ok(Some(TimedMutexTaskMessage::Finish)) => break, - Ok(Some(TimedMutexTaskMessage::Start(_))) => { - panic!("[TimedMutex] Unexpected start") - } - } - } - - log::info!("[TimedMutex] Timed task {task_name} finished."); - } - log::info!("[TimedMutex] Counter thread finished"); - }); - - Self { - mutex: Mutex::new(db), - sender: Some(sender), - counter_task: Some(counter_task), - } - } - - async fn timeout_lock( - &self, - duration: Duration, - name: &str, - ) -> Result, Elapsed> { - let result = tokio::time::timeout(duration, self.mutex.lock()) - .await - .map_err(|e| { - log::info!("Failed to lock mutex in scenario {0}", name); - e - })?; - - if self - .counter_task - .as_ref() - .map_or(false, |v| v.is_finished()) - { - log::error!("counter task is dead! {name}"); - exit(42) - } - - let id = Uuid::new_v4().to_simple().to_string(); - let task_id = format!("{name}::{id}"); - - if let Some(sender) = &self.sender { - if let Err(e) = sender.send(TimedMutexTaskMessage::Start(task_id)) { - log::error!("Cannot send start to counter task {name}: {e}"); - } - } - - Ok(TimedMutexGuard { - mutex_guard: result, - sender: &self.sender, - }) - } -} - -impl Drop for TimedMutex { - fn drop(&mut self) { - self.sender.take().unwrap(); - let handle = self.counter_task.take().unwrap(); - tokio::spawn(async move { - if let Err(e) = handle.await { - log::error!("Cannot join counter thread {e}"); - } - }); - } -} - pub struct PaymentProcessor { db_executor: Arc, registry: RwLock, diff --git a/core/payment/src/timeout_lock.rs b/core/payment/src/timeout_lock.rs index 48f826a8a..8178e350b 100644 --- a/core/payment/src/timeout_lock.rs +++ b/core/payment/src/timeout_lock.rs @@ -1,10 +1,15 @@ -use std::time::Duration; - use futures::Future; +use std::ops::Deref; +use std::time::Duration; +use tokio::sync::mpsc::UnboundedSender; +use tokio::time::error::Elapsed; use tokio::{ sync::{Mutex, MutexGuard, RwLock, RwLockReadGuard, RwLockWriteGuard}, time::Timeout, }; +use uuid::Uuid; + +use ya_persistence::executor::DbExecutor; pub trait MutexTimeoutExt { fn timeout_lock(&self, duration: Duration) -> Timeout>>; @@ -43,3 +48,119 @@ impl RwLockTimeoutExt for RwLock { tokio::time::timeout(duration, self.write()) } } + +pub struct TimedMutex { + mutex: Mutex, + sender: Option>, +} + +enum TimedMutexTaskMessage { + Start(String), + Finish, +} + +pub struct TimedMutexGuard<'a> { + mutex_guard: MutexGuard<'a, DbExecutor>, + sender: &'a Option>, +} + +impl Drop for TimedMutexGuard<'_> { + fn drop(&mut self) { + if let Some(sender) = &self.sender { + if let Err(e) = sender.send(TimedMutexTaskMessage::Finish) { + log::warn!("Cannot send finish to counter task {e}"); + } + } + } +} + +impl<'a> Deref for TimedMutexGuard<'a> { + type Target = MutexGuard<'a, DbExecutor>; + + fn deref(&self) -> &Self::Target { + &self.mutex_guard + } +} + +impl TimedMutex { + pub fn new(db: DbExecutor) -> Self { + let (sender, mut receiver) = + tokio::sync::mpsc::unbounded_channel::(); + + tokio::spawn(async move { + log::debug!("[TimedMutex] Counter thread started"); + loop { + // wait for start or close without timeout + let task_name = match receiver.recv().await { + None => break, + Some(TimedMutexTaskMessage::Start(x)) => x, + Some(TimedMutexTaskMessage::Finish) => { + log::warn!("[TimedMutex] Unexpected finish"); + return; + } + }; + + log::info!("[TimedMutex] task {task_name} started..."); + let mut counter = 0; + loop { + match tokio::time::timeout(Duration::from_secs(10), receiver.recv()).await { + Err(_) => { + log::warn!("[TimedMutex] Long running task: {task_name}!"); + counter += 1; + // five minutes + if counter > 30 { + return; + } + } + Ok(None) => log::warn!("[TimedMutex] Unexpected mpsc close."), + Ok(Some(TimedMutexTaskMessage::Finish)) => break, + Ok(Some(TimedMutexTaskMessage::Start(_))) => { + log::warn!("[TimedMutex] Unexpected start") + } + } + } + + log::debug!("[TimedMutex] Timed task {task_name} finished."); + } + log::debug!("[TimedMutex] Counter thread finished"); + }); + + Self { + mutex: Mutex::new(db), + sender: Some(sender), + } + } + + pub async fn timeout_lock( + &self, + duration: Duration, + name: &str, + ) -> Result, Elapsed> { + let result = tokio::time::timeout(duration, self.mutex.lock()) + .await + .map_err(|e| { + log::warn!("Failed to lock mutex in scenario {0}", name); + e + })?; + + let id = Uuid::new_v4().to_simple().to_string(); + let task_id = format!("{name}::{id}"); + + if let Some(sender) = &self.sender { + if let Err(e) = sender.send(TimedMutexTaskMessage::Start(task_id)) { + log::warn!("Cannot send start to counter task {name}: {e}"); + } + } + + Ok(TimedMutexGuard { + mutex_guard: result, + sender: &self.sender, + }) + } +} + +impl Drop for TimedMutex { + fn drop(&mut self) { + self.sender.take(); + } +}