Skip to content

Commit

Permalink
Fix panics in TimedMutex
Browse files Browse the repository at this point in the history
  • Loading branch information
nieznanysprawiciel committed Sep 10, 2024
1 parent 24c705b commit 09d3a02
Show file tree
Hide file tree
Showing 3 changed files with 128 additions and 144 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Generated by Cargo
# will have compiled files and executables
/target/
.target/

# These are backup files generated by rustfmt
**/*.rs.bk
Expand Down
146 changes: 4 additions & 142 deletions core/payment/src/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,26 +8,22 @@ use crate::error::processor::{
};
use crate::models::order::ReadObj as DbOrder;
use crate::payment_sync::SYNC_NOTIFS_NOTIFY;
use crate::timeout_lock::RwLockTimeoutExt;
use crate::timeout_lock::{RwLockTimeoutExt, TimedMutex};
use crate::utils::remove_allocation_ids_from_payment;

use actix_web::web::Data;
use bigdecimal::{BigDecimal, Zero};
use chrono::{DateTime, Utc};
use futures::{FutureExt, TryFutureExt};
use metrics::counter;
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::ops::Deref;
use std::process::exit;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use thiserror::Error;
use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::{Mutex, RwLock};
use tokio::task::JoinHandle;
use tokio::time::error::Elapsed;
use uuid::Uuid;
use tokio::sync::RwLock;

use ya_client_model::payment::allocation::Deposit;
use ya_client_model::payment::{
Account, ActivityPayment, AgreementPayment, DriverDetails, Network, Payment,
Expand Down Expand Up @@ -312,140 +308,6 @@ impl DriverRegistry {
const DB_LOCK_TIMEOUT: Duration = Duration::from_secs(30);
const REGISTRY_LOCK_TIMEOUT: Duration = Duration::from_secs(30);

struct TimedMutex {
mutex: Mutex<DbExecutor>,
sender: Option<UnboundedSender<TimedMutexTaskMessage>>,
counter_task: Option<JoinHandle<()>>,
}

use tokio::sync::MutexGuard;

enum TimedMutexTaskMessage {
Start(String),
Finish,
}

struct TimedMutexGuard<'a> {
mutex_guard: MutexGuard<'a, DbExecutor>,
sender: &'a Option<UnboundedSender<TimedMutexTaskMessage>>,
}

impl Drop for TimedMutexGuard<'_> {
fn drop(&mut self) {
if let Some(sender) = &self.sender {
if let Err(e) = sender.send(TimedMutexTaskMessage::Finish) {
log::error!("Cannot send fininsh to counter task {e}");
}
}
}
}

impl<'a> Deref for TimedMutexGuard<'a> {
type Target = MutexGuard<'a, DbExecutor>;

fn deref(&self) -> &Self::Target {
&self.mutex_guard
}
}

impl TimedMutex {
fn new(db: DbExecutor) -> Self {
let (sender, mut receiver) =
tokio::sync::mpsc::unbounded_channel::<TimedMutexTaskMessage>();

let counter_task = tokio::spawn(async move {
log::info!("[TimedMutex] Counter thread started");
loop {
// wait for start or close without timeout
let task_name = match receiver.recv().await {
None => break,
Some(TimedMutexTaskMessage::Start(x)) => x,
Some(TimedMutexTaskMessage::Finish) => {
panic!("[TimedMutex] Unexpected finish")
}
};

log::info!("[TimedMutex] task {task_name} started...");
let mut counter = 0;
loop {
match tokio::time::timeout(Duration::from_secs(10), receiver.recv()).await {
Err(_) => {
log::error!("[TimedMutex] Long running task: {task_name}!");
counter += 1;
// five minutes
if counter > 30 {
exit(41);
}
}
Ok(None) => panic!("[TimedMutex] Unexpected mpsc close."),
Ok(Some(TimedMutexTaskMessage::Finish)) => break,
Ok(Some(TimedMutexTaskMessage::Start(_))) => {
panic!("[TimedMutex] Unexpected start")
}
}
}

log::info!("[TimedMutex] Timed task {task_name} finished.");
}
log::info!("[TimedMutex] Counter thread finished");
});

Self {
mutex: Mutex::new(db),
sender: Some(sender),
counter_task: Some(counter_task),
}
}

async fn timeout_lock(
&self,
duration: Duration,
name: &str,
) -> Result<TimedMutexGuard<'_>, Elapsed> {
let result = tokio::time::timeout(duration, self.mutex.lock())
.await
.map_err(|e| {
log::info!("Failed to lock mutex in scenario {0}", name);
e
})?;

if self
.counter_task
.as_ref()
.map_or(false, |v| v.is_finished())
{
log::error!("counter task is dead! {name}");
exit(42)
}

let id = Uuid::new_v4().to_simple().to_string();
let task_id = format!("{name}::{id}");

if let Some(sender) = &self.sender {
if let Err(e) = sender.send(TimedMutexTaskMessage::Start(task_id)) {
log::error!("Cannot send start to counter task {name}: {e}");
}
}

Ok(TimedMutexGuard {
mutex_guard: result,
sender: &self.sender,
})
}
}

impl Drop for TimedMutex {
fn drop(&mut self) {
self.sender.take().unwrap();
let handle = self.counter_task.take().unwrap();
tokio::spawn(async move {
if let Err(e) = handle.await {
log::error!("Cannot join counter thread {e}");
}
});
}
}

pub struct PaymentProcessor {
db_executor: Arc<TimedMutex>,
registry: RwLock<DriverRegistry>,
Expand Down
125 changes: 123 additions & 2 deletions core/payment/src/timeout_lock.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
use std::time::Duration;

use futures::Future;
use std::ops::Deref;
use std::time::Duration;
use tokio::sync::mpsc::UnboundedSender;
use tokio::time::error::Elapsed;
use tokio::{
sync::{Mutex, MutexGuard, RwLock, RwLockReadGuard, RwLockWriteGuard},
time::Timeout,
};
use uuid::Uuid;

use ya_persistence::executor::DbExecutor;

pub trait MutexTimeoutExt<T: ?Sized + 'static> {
fn timeout_lock(&self, duration: Duration) -> Timeout<impl Future<Output = MutexGuard<'_, T>>>;
Expand Down Expand Up @@ -43,3 +48,119 @@ impl<T: ?Sized + 'static> RwLockTimeoutExt<T> for RwLock<T> {
tokio::time::timeout(duration, self.write())
}
}

pub struct TimedMutex {
mutex: Mutex<DbExecutor>,
sender: Option<UnboundedSender<TimedMutexTaskMessage>>,
}

enum TimedMutexTaskMessage {
Start(String),
Finish,
}

pub struct TimedMutexGuard<'a> {
mutex_guard: MutexGuard<'a, DbExecutor>,
sender: &'a Option<UnboundedSender<TimedMutexTaskMessage>>,
}

impl Drop for TimedMutexGuard<'_> {
fn drop(&mut self) {
if let Some(sender) = &self.sender {
if let Err(e) = sender.send(TimedMutexTaskMessage::Finish) {
log::warn!("Cannot send finish to counter task {e}");
}
}
}
}

impl<'a> Deref for TimedMutexGuard<'a> {
type Target = MutexGuard<'a, DbExecutor>;

fn deref(&self) -> &Self::Target {
&self.mutex_guard
}
}

impl TimedMutex {
pub fn new(db: DbExecutor) -> Self {
let (sender, mut receiver) =
tokio::sync::mpsc::unbounded_channel::<TimedMutexTaskMessage>();

tokio::spawn(async move {
log::debug!("[TimedMutex] Counter thread started");
loop {
// wait for start or close without timeout
let task_name = match receiver.recv().await {
None => break,
Some(TimedMutexTaskMessage::Start(x)) => x,
Some(TimedMutexTaskMessage::Finish) => {
log::warn!("[TimedMutex] Unexpected finish");
return;
}
};

log::info!("[TimedMutex] task {task_name} started...");
let mut counter = 0;
loop {
match tokio::time::timeout(Duration::from_secs(10), receiver.recv()).await {
Err(_) => {
log::warn!("[TimedMutex] Long running task: {task_name}!");
counter += 1;
// five minutes
if counter > 30 {
return;
}
}
Ok(None) => log::warn!("[TimedMutex] Unexpected mpsc close."),
Ok(Some(TimedMutexTaskMessage::Finish)) => break,
Ok(Some(TimedMutexTaskMessage::Start(_))) => {
log::warn!("[TimedMutex] Unexpected start")
}
}
}

log::debug!("[TimedMutex] Timed task {task_name} finished.");
}
log::debug!("[TimedMutex] Counter thread finished");
});

Self {
mutex: Mutex::new(db),
sender: Some(sender),
}
}

pub async fn timeout_lock(
&self,
duration: Duration,
name: &str,
) -> Result<TimedMutexGuard<'_>, Elapsed> {
let result = tokio::time::timeout(duration, self.mutex.lock())
.await
.map_err(|e| {
log::warn!("Failed to lock mutex in scenario {0}", name);
e
})?;

let id = Uuid::new_v4().to_simple().to_string();
let task_id = format!("{name}::{id}");

if let Some(sender) = &self.sender {
if let Err(e) = sender.send(TimedMutexTaskMessage::Start(task_id)) {
log::warn!("Cannot send start to counter task {name}: {e}");
}
}

Ok(TimedMutexGuard {
mutex_guard: result,
sender: &self.sender,
})
}
}

impl Drop for TimedMutex {
fn drop(&mut self) {
self.sender.take();
}
}

0 comments on commit 09d3a02

Please sign in to comment.