From e218081d5df50cb5fa282726c8c320b0b3fe8a5e Mon Sep 17 00:00:00 2001 From: Kamil Koczurek Date: Fri, 3 Nov 2023 10:59:55 +0100 Subject: [PATCH 1/7] payment: add driver status events to debit notes & invoices - Fix details ser/de for events. - General refactor. --- Cargo.lock | 6 +- Cargo.toml | 4 +- core/model/src/payment.rs | 13 +- core/payment-driver/base/src/bus.rs | 13 +- core/payment-driver/erc20next/src/driver.rs | 204 ++++++++++--------- core/payment/src/dao/activity.rs | 3 +- core/payment/src/dao/agreement.rs | 3 +- core/payment/src/dao/debit_note.rs | 24 ++- core/payment/src/dao/debit_note_event.rs | 50 ++++- core/payment/src/dao/invoice.rs | 39 +++- core/payment/src/dao/invoice_event.rs | 50 ++++- core/payment/src/models/debit_note_event.rs | 35 ++-- core/payment/src/models/invoice_event.rs | 34 ++-- core/payment/src/service.rs | 207 ++++++++++++++++++++ 14 files changed, 521 insertions(+), 164 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4684ce228f..248e520f15 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7123,7 +7123,7 @@ checksum = "e7141e445af09c8919f1d5f8a20dae0b20c3b57a45dee0d5823c6ed5d237f15a" dependencies = [ "bitflags 1.3.2", "chrono", - "rustc_version 0.2.3", + "rustc_version 0.4.0", ] [[package]] @@ -7691,7 +7691,7 @@ dependencies = [ [[package]] name = "ya-client" version = "0.7.0" -source = "git+https://github.com/golemfactory/ya-client.git?rev=2bb679e3bb1d61eddd713d7f19ee127595f27162#2bb679e3bb1d61eddd713d7f19ee127595f27162" +source = "git+https://github.com/golemfactory/ya-client.git?rev=ca30ace41d68ee2df0c3ce32fce89c45669453b9#ca30ace41d68ee2df0c3ce32fce89c45669453b9" dependencies = [ "actix-codec", "awc", @@ -7715,7 +7715,7 @@ dependencies = [ [[package]] name = "ya-client-model" version = "0.5.0" -source = "git+https://github.com/golemfactory/ya-client.git?rev=2bb679e3bb1d61eddd713d7f19ee127595f27162#2bb679e3bb1d61eddd713d7f19ee127595f27162" +source = "git+https://github.com/golemfactory/ya-client.git?rev=ca30ace41d68ee2df0c3ce32fce89c45669453b9#ca30ace41d68ee2df0c3ce32fce89c45669453b9" dependencies = [ "bigdecimal 0.2.2", "chrono", diff --git a/Cargo.toml b/Cargo.toml index 1412878372..8680286580 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -270,8 +270,8 @@ ya-sb-util = { git = "https://github.com/golemfactory/ya-service-bus.git", rev = #ya-sb-util = { path = "../ya-service-bus/crates/util" } ## CLIENT -ya-client = { git = "https://github.com/golemfactory/ya-client.git", rev = "2bb679e3bb1d61eddd713d7f19ee127595f27162" } -ya-client-model = { git = "https://github.com/golemfactory/ya-client.git", rev = "2bb679e3bb1d61eddd713d7f19ee127595f27162" } +ya-client = { git = "https://github.com/golemfactory/ya-client.git", rev = "ca30ace41d68ee2df0c3ce32fce89c45669453b9" } +ya-client-model = { git = "https://github.com/golemfactory/ya-client.git", rev = "ca30ace41d68ee2df0c3ce32fce89c45669453b9" } ## RELAY and networking stack ya-relay-stack = { git = "https://github.com/golemfactory/ya-relay.git", rev = "c92a75b0cf062fcc9dbb3ea2a034d913e5fad8e5" } diff --git a/core/model/src/payment.rs b/core/model/src/payment.rs index d92e4a8fba..caf60c3a66 100644 --- a/core/model/src/payment.rs +++ b/core/model/src/payment.rs @@ -18,7 +18,7 @@ pub enum RpcMessageError { } pub mod local { - use super::*; + use super::{public::Ack, *}; use crate::driver::{AccountMode, GasDetails, PaymentConfirmation}; use bigdecimal::{BigDecimal, Zero}; use chrono::{DateTime, Utc}; @@ -424,6 +424,17 @@ pub mod local { Internal(String), } + #[derive(Clone, Debug, Serialize, Deserialize)] + pub struct PaymentDriverStatusChange { + pub properties: Vec, + } + + impl RpcMessage for PaymentDriverStatusChange { + const ID: &'static str = "PaymentDriverStatusChange"; + type Item = Ack; + type Error = GenericError; + } + #[derive(Clone, Debug, Serialize, Deserialize)] pub struct PaymentDriverStatus { pub driver: Option, diff --git a/core/payment-driver/base/src/bus.rs b/core/payment-driver/base/src/bus.rs index 256cc5a909..3edaff4214 100644 --- a/core/payment-driver/base/src/bus.rs +++ b/core/payment-driver/base/src/bus.rs @@ -7,6 +7,7 @@ // External crates use std::sync::Arc; +use ya_client_model::payment::DriverStatusProperty; // Workspace uses use ya_client_model::payment::driver_details::DriverDetails; use ya_client_model::NodeId; @@ -14,7 +15,7 @@ use ya_core_model::driver::{ driver_bus_id, AccountMode, GenericError, PaymentConfirmation, PaymentDetails, }; use ya_core_model::identity; -use ya_core_model::payment::local as payment_srv; +use ya_core_model::payment::local::{self as payment_srv, PaymentDriverStatusChange}; use ya_service_bus::{ typed::{service, ServiceBinder}, RpcEndpoint, @@ -191,3 +192,13 @@ pub async fn notify_payment( .map_err(GenericError::new)?; Ok(()) } + +pub async fn status_changed(properties: Vec) -> Result<(), GenericError> { + let msg = PaymentDriverStatusChange { properties }; + service(payment_srv::BUS_ID) + .send(msg) + .await + .map_err(GenericError::new)? + .map_err(GenericError::new)?; + Ok(()) +} diff --git a/core/payment-driver/erc20next/src/driver.rs b/core/payment-driver/erc20next/src/driver.rs index dd19b97f70..4ab70c9e87 100644 --- a/core/payment-driver/erc20next/src/driver.rs +++ b/core/payment-driver/erc20next/src/driver.rs @@ -120,34 +120,118 @@ impl Erc20NextDriver { async fn payment_confirm_job(this: Arc, mut events: Receiver) { while let Some(event) = events.recv().await { - if let DriverEventContent::TransferFinished(transfer_finished) = &event.content { - match this - ._confirm_payments( - &transfer_finished.token_transfer_dao, - &transfer_finished.tx_dao, - ) - .await - { - Ok(_) => log::info!( - "Payment confirmed: {}", - transfer_finished - .token_transfer_dao - .payment_id - .clone() - .unwrap_or_default() - ), - Err(e) => log::error!( - "Error confirming payment: {}, error: {}", - transfer_finished - .token_transfer_dao - .payment_id - .clone() - .unwrap_or_default(), - e - ), + match &event.content { + DriverEventContent::TransferFinished(transfer_finished) => { + match this + ._confirm_payments( + &transfer_finished.token_transfer_dao, + &transfer_finished.tx_dao, + ) + .await + { + Ok(_) => log::info!( + "Payment confirmed: {}", + transfer_finished + .token_transfer_dao + .payment_id + .clone() + .unwrap_or_default() + ), + Err(e) => log::error!( + "Error confirming payment: {}, error: {}", + transfer_finished + .token_transfer_dao + .payment_id + .clone() + .unwrap_or_default(), + e + ), + } } + DriverEventContent::StatusChanged(_) => { + if let Ok(status) = this._status(DriverStatus { network: None }).await { + log::info!("Payment driver [{DRIVER_NAME}] status changed: {status:#?}"); + bus::status_changed(status).await.ok(); + } + } + _ => {} + } + } + } + + async fn _status( + &self, + msg: DriverStatus, + ) -> Result, DriverStatusError> { + use erc20_payment_lib::runtime::StatusProperty as LibStatusProperty; + + // Map chain-id to network + let chain_id_to_net = |id: i64| self.payment_runtime.network_name(id).unwrap().to_string(); + + // check if network matches the filter + let network_filter = |net_candidate: &str| { + msg.network + .as_ref() + .map(|net| net == net_candidate) + .unwrap_or(true) + }; + + if let Some(network) = msg.network.as_ref() { + let found_net = self + .payment_runtime + .chains() + .into_iter() + .any(|id| &chain_id_to_net(id) == network); + + if !found_net { + return Err(DriverStatusError::NetworkNotFound(network.clone())); } } + + Ok(self + .payment_runtime + .get_status() + .await + .into_iter() + .flat_map(|prop| match prop { + LibStatusProperty::InvalidChainId { chain_id } => { + Some(DriverStatusProperty::InvalidChainId { + driver: DRIVER_NAME.into(), + chain_id, + }) + } + LibStatusProperty::CantSign { chain_id, address } => { + let network = chain_id_to_net(chain_id); + Some(DriverStatusProperty::CantSign { + driver: DRIVER_NAME.into(), + network, + address, + }) + } + LibStatusProperty::NoGas { + chain_id, + missing_gas, + } => { + let network = chain_id_to_net(chain_id); + network_filter(&network).then(|| DriverStatusProperty::InsufficientGas { + driver: DRIVER_NAME.into(), + network, + needed_gas_est: missing_gas.unwrap_or_default().to_string(), + }) + } + LibStatusProperty::NoToken { + chain_id, + missing_token, + } => { + let network = chain_id_to_net(chain_id); + network_filter(&network).then(|| DriverStatusProperty::InsufficientToken { + driver: DRIVER_NAME.into(), + network, + needed_token_est: missing_token.unwrap_or_default().to_string(), + }) + } + }) + .collect()) } async fn _confirm_payments( @@ -485,75 +569,7 @@ impl PaymentDriver for Erc20NextDriver { _caller: String, msg: DriverStatus, ) -> Result, DriverStatusError> { - use erc20_payment_lib::runtime::StatusProperty as LibStatusProperty; - - // Map chain-id to network - let chain_id_to_net = |id: i64| self.payment_runtime.network_name(id).unwrap().to_string(); - - // check if network matches the filter - let network_filter = |net_candidate: &str| { - msg.network - .as_ref() - .map(|net| net == net_candidate) - .unwrap_or(true) - }; - - if let Some(network) = msg.network.as_ref() { - let found_net = self - .payment_runtime - .chains() - .into_iter() - .any(|id| &chain_id_to_net(id) == network); - - if !found_net { - return Err(DriverStatusError::NetworkNotFound(network.clone())); - } - } - - Ok(self - .payment_runtime - .get_status() - .await - .into_iter() - .flat_map(|prop| match prop { - LibStatusProperty::InvalidChainId { chain_id } => { - Some(DriverStatusProperty::InvalidChainId { - driver: DRIVER_NAME.into(), - chain_id, - }) - } - LibStatusProperty::CantSign { chain_id, address } => { - let network = chain_id_to_net(chain_id); - Some(DriverStatusProperty::CantSign { - driver: DRIVER_NAME.into(), - network, - address, - }) - } - LibStatusProperty::NoGas { - chain_id, - missing_gas, - } => { - let network = chain_id_to_net(chain_id); - network_filter(&network).then(|| DriverStatusProperty::InsufficientGas { - driver: DRIVER_NAME.into(), - network, - needed_gas_est: missing_gas.unwrap_or_default().to_string(), - }) - } - LibStatusProperty::NoToken { - chain_id, - missing_token, - } => { - let network = chain_id_to_net(chain_id); - network_filter(&network).then(|| DriverStatusProperty::InsufficientToken { - driver: DRIVER_NAME.into(), - network, - needed_token_est: missing_token.unwrap_or_default().to_string(), - }) - } - }) - .collect()) + self._status(msg).await } async fn shut_down( diff --git a/core/payment/src/dao/activity.rs b/core/payment/src/dao/activity.rs index ccf27c4700..5d2134e4be 100644 --- a/core/payment/src/dao/activity.rs +++ b/core/payment/src/dao/activity.rs @@ -103,11 +103,10 @@ pub fn increase_amount_paid( debit_note::update_status(&debit_note_ids, owner_id, &DocumentStatus::Settled, conn)?; for debit_note_id in debit_note_ids { - debit_note_event::create::<()>( + debit_note_event::create( debit_note_id, *owner_id, DebitNoteEventType::DebitNoteSettledEvent, - None, conn, )?; } diff --git a/core/payment/src/dao/agreement.rs b/core/payment/src/dao/agreement.rs index bea9b1c50c..944c4f5687 100644 --- a/core/payment/src/dao/agreement.rs +++ b/core/payment/src/dao/agreement.rs @@ -148,11 +148,10 @@ pub fn increase_amount_paid( if let Some((invoice_id, role)) = invoice_query { invoice::update_status(&invoice_id, owner_id, &DocumentStatus::Settled, conn)?; - invoice_event::create::<()>( + invoice_event::create( invoice_id, *owner_id, InvoiceEventType::InvoiceSettledEvent, - None, conn, )?; } diff --git a/core/payment/src/dao/debit_note.rs b/core/payment/src/dao/debit_note.rs index 4543d0e992..75782c86fc 100644 --- a/core/payment/src/dao/debit_note.rs +++ b/core/payment/src/dao/debit_note.rs @@ -138,11 +138,10 @@ impl<'c> DebitNoteDao<'c> { diesel::insert_into(dsl::pay_debit_note) .values(debit_note) .execute(conn)?; - debit_note_event::create::<()>( + debit_note_event::create( debit_note_id.clone(), owner_id, DebitNoteEventType::DebitNoteReceivedEvent, - None, conn, )?; Ok(debit_note_id) @@ -170,11 +169,10 @@ impl<'c> DebitNoteDao<'c> { diesel::insert_into(dsl::pay_debit_note) .values(debit_note) .execute(conn)?; - debit_note_event::create::<()>( + debit_note_event::create( debit_note_id, owner_id, DebitNoteEventType::DebitNoteReceivedEvent, - None, conn, )?; Ok(()) @@ -201,9 +199,21 @@ impl<'c> DebitNoteDao<'c> { .await } - pub async fn get_all(&self) -> DbResult> { + pub async fn list( + &self, + role: Option, + status: Option, + ) -> DbResult> { readonly_transaction(self.pool, move |conn| { - let debit_notes: Vec = query!().order_by(dsl::timestamp.desc()).load(conn)?; + let mut query = query!().into_boxed(); + if let Some(role) = role { + query = query.filter(dsl::role.eq(role.to_string())); + } + if let Some(status) = status { + query = query.filter(dsl::status.eq(status.to_string())); + } + + let debit_notes: Vec = query.order_by(dsl::timestamp.desc()).load(conn)?; debit_notes.into_iter().map(TryInto::try_into).collect() }) .await @@ -272,7 +282,7 @@ impl<'c> DebitNoteDao<'c> { update_status(&vec![debit_note_id.clone()], &owner_id, &status, conn)?; activity::set_amount_accepted(&activity_id, &owner_id, &amount, conn)?; for event in events { - debit_note_event::create::<()>(debit_note_id.clone(), owner_id, event, None, conn)?; + debit_note_event::create(debit_note_id.clone(), owner_id, event, conn)?; } Ok(()) diff --git a/core/payment/src/dao/debit_note_event.rs b/core/payment/src/dao/debit_note_event.rs index c96f754ad7..4b0ddf6d19 100644 --- a/core/payment/src/dao/debit_note_event.rs +++ b/core/payment/src/dao/debit_note_event.rs @@ -4,7 +4,6 @@ use crate::schema::pay_debit_note_event::dsl as write_dsl; use crate::schema::pay_debit_note_event_read::dsl as read_dsl; use chrono::NaiveDateTime; use diesel::{ExpressionMethods, QueryDsl, RunQueryDsl}; -use serde::Serialize; use std::borrow::Cow; use std::collections::HashSet; use std::convert::TryInto; @@ -15,14 +14,13 @@ use ya_persistence::executor::{ }; use ya_persistence::types::{AdaptTimestamp, Role}; -pub fn create( +pub fn create( debit_note_id: String, owner_id: NodeId, event_type: DebitNoteEventType, - details: Option, conn: &ConnType, ) -> DbResult<()> { - let event = WriteObj::new(debit_note_id, owner_id, event_type, details)?; + let event = WriteObj::new(debit_note_id, owner_id, event_type)?; diesel::insert_into(write_dsl::pay_debit_note_event) .values(event) .execute(conn)?; @@ -40,15 +38,53 @@ impl<'c> AsDao<'c> for DebitNoteEventDao<'c> { } impl<'c> DebitNoteEventDao<'c> { - pub async fn create( + pub async fn create( &self, debit_note_id: String, owner_id: NodeId, event_type: DebitNoteEventType, - details: Option, ) -> DbResult<()> { do_with_transaction(self.pool, move |conn| { - create(debit_note_id, owner_id, event_type, details, conn) + create(debit_note_id, owner_id, event_type, conn) + }) + .await + } + + pub async fn get_for_debit_note_id( + &self, + debit_note_id: String, + after_timestamp: Option, + max_events: Option, + app_session_id: Option, + requestor_events: Vec>, + provider_events: Vec>, + ) -> DbResult> { + readonly_transaction(self.pool, move |conn| { + let mut query = read_dsl::pay_debit_note_event_read + .filter(read_dsl::debit_note_id.eq(debit_note_id)) + .order_by(read_dsl::timestamp.asc()) + .into_boxed(); + if let Some(timestamp) = after_timestamp { + query = query.filter(read_dsl::timestamp.gt(timestamp.adapt())); + } + if let Some(app_session_id) = app_session_id { + query = query.filter(read_dsl::app_session_id.eq(app_session_id)); + } + if let Some(limit) = max_events { + query = query.limit(limit.into()); + } + let events: Vec = query.load(conn)?; + let requestor_events: HashSet> = + requestor_events.into_iter().collect(); + let provider_events: HashSet> = provider_events.into_iter().collect(); + events + .into_iter() + .filter(|e| match e.role { + Role::Requestor => requestor_events.contains(e.event_type.as_str()), + Role::Provider => provider_events.contains(e.event_type.as_str()), + }) + .map(TryInto::try_into) + .collect() }) .await } diff --git a/core/payment/src/dao/invoice.rs b/core/payment/src/dao/invoice.rs index 8b1e4c13c1..e4463519d8 100644 --- a/core/payment/src/dao/invoice.rs +++ b/core/payment/src/dao/invoice.rs @@ -112,11 +112,10 @@ impl<'c> InvoiceDao<'c> { .map(|_| ()) })?; - invoice_event::create::<()>( + invoice_event::create( invoice_id, owner_id, InvoiceEventType::InvoiceReceivedEvent, - None, conn, )?; @@ -139,6 +138,37 @@ impl<'c> InvoiceDao<'c> { self.insert(invoice, activity_ids).await } + pub async fn list( + &self, + role: Option, + status: Option, + ) -> DbResult> { + readonly_transaction(self.pool, move |conn| { + let mut query = query!().into_boxed(); + if let Some(role) = role { + query = query.filter(dsl::role.eq(role.to_string())); + } + if let Some(status) = status { + query = query.filter(dsl::status.eq(status.to_string())); + } + + let read_objs: Vec = query.order_by(dsl::timestamp.desc()).load(conn)?; + let mut invoices = Vec::::new(); + + for read_obj in read_objs { + let activity_ids = activity_dsl::pay_invoice_x_activity + .select(activity_dsl::activity_id) + .filter(activity_dsl::invoice_id.eq(&read_obj.id)) + .filter(activity_dsl::owner_id.eq(read_obj.owner_id)) + .load(conn)?; + invoices.push(read_obj.into_api_model(activity_ids)?); + } + + Ok(invoices) + }) + .await + } + pub async fn get(&self, invoice_id: String, owner_id: NodeId) -> DbResult> { readonly_transaction(self.pool, move |conn| { let invoice: Option = query!() @@ -273,7 +303,7 @@ impl<'c> InvoiceDao<'c> { agreement::set_amount_accepted(&agreement_id, &owner_id, &amount, conn)?; for event in events { - invoice_event::create::<()>(invoice_id.clone(), owner_id, event, None, conn)?; + invoice_event::create(invoice_id.clone(), owner_id, event, conn)?; } Ok(()) @@ -376,11 +406,10 @@ impl<'c> InvoiceDao<'c> { agreement::compute_amount_due(&agreement_id, &owner_id, conn)?; update_status(&invoice_id, &owner_id, &DocumentStatus::Cancelled, conn)?; - invoice_event::create::<()>( + invoice_event::create( invoice_id, owner_id, InvoiceEventType::InvoiceCancelledEvent, - None, conn, )?; diff --git a/core/payment/src/dao/invoice_event.rs b/core/payment/src/dao/invoice_event.rs index 46e7b302ca..3e50fa2b50 100644 --- a/core/payment/src/dao/invoice_event.rs +++ b/core/payment/src/dao/invoice_event.rs @@ -4,7 +4,6 @@ use crate::schema::pay_invoice_event::dsl as write_dsl; use crate::schema::pay_invoice_event_read::dsl as read_dsl; use chrono::NaiveDateTime; use diesel::{ExpressionMethods, QueryDsl, RunQueryDsl}; -use serde::Serialize; use std::borrow::Cow; use std::collections::HashSet; use std::convert::TryInto; @@ -15,14 +14,13 @@ use ya_persistence::executor::{ }; use ya_persistence::types::{AdaptTimestamp, Role}; -pub fn create( +pub fn create( invoice_id: String, owner_id: NodeId, event_type: InvoiceEventType, - details: Option, conn: &ConnType, ) -> DbResult<()> { - let event = WriteObj::new(invoice_id, owner_id, event_type, details)?; + let event = WriteObj::new(invoice_id, owner_id, event_type)?; diesel::insert_into(write_dsl::pay_invoice_event) .values(event) .execute(conn)?; @@ -40,15 +38,53 @@ impl<'c> AsDao<'c> for InvoiceEventDao<'c> { } impl<'c> InvoiceEventDao<'c> { - pub async fn create( + pub async fn create( &self, invoice_id: String, owner_id: NodeId, event_type: InvoiceEventType, - details: Option, ) -> DbResult<()> { do_with_transaction(self.pool, move |conn| { - create(invoice_id, owner_id, event_type, details, conn) + create(invoice_id, owner_id, event_type, conn) + }) + .await + } + + pub async fn get_for_invoice_id( + &self, + invoice_id: String, + after_timestamp: Option, + max_events: Option, + app_session_id: Option, + requestor_events: Vec>, + provider_events: Vec>, + ) -> DbResult> { + readonly_transaction(self.pool, move |conn| { + let mut query = read_dsl::pay_invoice_event_read + .filter(read_dsl::invoice_id.eq(invoice_id)) + .order_by(read_dsl::timestamp.asc()) + .into_boxed(); + if let Some(timestamp) = after_timestamp { + query = query.filter(read_dsl::timestamp.gt(timestamp.adapt())); + } + if let Some(app_session_id) = app_session_id { + query = query.filter(read_dsl::app_session_id.eq(app_session_id)); + } + if let Some(limit) = max_events { + query = query.limit(limit.into()); + } + let events: Vec = query.load(conn)?; + let requestor_events: HashSet> = + requestor_events.into_iter().collect(); + let provider_events: HashSet> = provider_events.into_iter().collect(); + events + .into_iter() + .filter(|e| match e.role { + Role::Requestor => requestor_events.contains(e.event_type.as_str()), + Role::Provider => provider_events.contains(e.event_type.as_str()), + }) + .map(TryInto::try_into) + .collect() }) .await } diff --git a/core/payment/src/models/debit_note_event.rs b/core/payment/src/models/debit_note_event.rs index 7e03dd38bd..1f8704b044 100644 --- a/core/payment/src/models/debit_note_event.rs +++ b/core/payment/src/models/debit_note_event.rs @@ -1,8 +1,6 @@ use crate::error::{DbError, DbResult}; use crate::schema::{pay_debit_note_event, pay_debit_note_event_read}; -use crate::utils::{json_from_str, json_to_string}; use chrono::{NaiveDateTime, TimeZone, Utc}; -use serde::Serialize; use std::convert::TryFrom; use ya_client_model::payment::{DebitNoteEvent, DebitNoteEventType}; use ya_client_model::NodeId; @@ -20,20 +18,20 @@ pub struct WriteObj { } impl WriteObj { - pub fn new( + pub fn new( debit_note_id: String, owner_id: NodeId, event_type: DebitNoteEventType, - details: Option, ) -> DbResult { - let details = match details { - Some(details) => Some(json_to_string(&details)?), + let details = match event_type.details() { + Some(details) => Some(serde_json::to_string(&details)?), None => None, }; + Ok(Self { debit_note_id, owner_id, - event_type: event_type.to_string(), + event_type: event_type.discriminant().to_owned(), details, timestamp: Utc::now().adapt(), }) @@ -57,17 +55,22 @@ impl TryFrom for DebitNoteEvent { type Error = DbError; fn try_from(event: ReadObj) -> DbResult { - let event_type = event.event_type.parse().map_err(|e| { - DbError::Integrity(format!( - "DebitNoteEvent type `{}` parsing failed: {}", - &event.event_type, e - )) - })?; - // TODO Attach details when event_type=REJECTED - let _details = match event.details { - Some(s) => Some(json_from_str(&s)?), + let details = match &event.details { + Some(text) => Some( + serde_json::from_str::(text) + .map_err(|e| DbError::Integrity(e.to_string()))?, + ), None => None, }; + let event_type = + DebitNoteEventType::from_discriminant_and_details(&event.event_type, details.clone()) + .ok_or_else(|| { + DbError::Integrity(format!( + "event = {}, details = {:#?} is not valid DebitNoteEventType", + &event.event_type, details + )) + })?; + Ok(Self { debit_note_id: event.debit_note_id, event_date: Utc.from_utc_datetime(&event.timestamp), diff --git a/core/payment/src/models/invoice_event.rs b/core/payment/src/models/invoice_event.rs index 441dedc4d6..46cb6e35c1 100644 --- a/core/payment/src/models/invoice_event.rs +++ b/core/payment/src/models/invoice_event.rs @@ -1,8 +1,6 @@ use crate::error::{DbError, DbResult}; use crate::schema::{pay_invoice_event, pay_invoice_event_read}; -use crate::utils::{json_from_str, json_to_string}; use chrono::{NaiveDateTime, TimeZone, Utc}; -use serde::Serialize; use std::convert::TryFrom; use ya_client_model::payment::{InvoiceEvent, InvoiceEventType}; use ya_client_model::NodeId; @@ -20,21 +18,20 @@ pub struct WriteObj { } impl WriteObj { - pub fn new( + pub fn new( invoice_id: String, owner_id: NodeId, event_type: InvoiceEventType, - details: Option, ) -> DbResult { - let details = match details { - Some(details) => Some(json_to_string(&details)?), + let details = match event_type.details() { + Some(details) => Some(serde_json::to_string(&details)?), None => None, }; Ok(Self { invoice_id, owner_id, - event_type: event_type.to_string(), + event_type: event_type.discriminant().to_owned(), details, timestamp: Utc::now().adapt(), }) @@ -58,18 +55,21 @@ impl TryFrom for InvoiceEvent { type Error = DbError; fn try_from(event: ReadObj) -> DbResult { - let event_type = event.event_type.parse().map_err(|e| { - DbError::Integrity(format!( - "InvoiceEvent type `{}` parsing failed: {}", - event.event_type, e - )) - })?; - - // TODO Attach details when event_type=REJECTED - let _details = match event.details { - Some(s) => Some(json_from_str(&s)?), + let details = match &event.details { + Some(text) => Some( + serde_json::from_str::(text) + .map_err(|e| DbError::Integrity(e.to_string()))?, + ), None => None, }; + let event_type = + InvoiceEventType::from_discriminant_and_details(&event.event_type, details.clone()) + .ok_or_else(|| { + DbError::Integrity(format!( + "event = {}, details = {:#?} is not valid DebitNoteEventType", + &event.event_type, details + )) + })?; Ok(Self { invoice_id: event.invoice_id, diff --git a/core/payment/src/service.rs b/core/payment/src/service.rs index ea239d8765..36c0d261c7 100644 --- a/core/payment/src/service.rs +++ b/core/payment/src/service.rs @@ -396,6 +396,7 @@ mod local { } mod public { + use std::convert::TryInto; use std::str::FromStr; use super::*; @@ -408,6 +409,7 @@ mod public { // use crate::error::processor::VerifyPaymentError; use ya_client_model::{payment::*, NodeId}; + use ya_core_model::payment::local::PaymentDriverStatusChange; use ya_core_model::payment::public::*; use ya_persistence::types::Role; @@ -797,6 +799,211 @@ mod public { } // *************************** PAYMENT **************************** + async fn handle_status_change( + db: DbExecutor, + msg: PaymentDriverStatusChange, + ) -> Result { + /// Payment platform affected by status + /// + /// It doesn't contain the token because we don't actually + /// support multiple tokens on one chain. + /// + /// TODO: remove references to token stuff in yagna and ideally + /// make payment platforms properly typed along the way. + #[derive(Hash, PartialEq, Eq)] + struct Platform { + driver: String, + network: String, + } + + impl Platform { + fn new(driver: impl Into, network: impl Into) -> Self { + Platform { + driver: driver.into(), + network: network.into(), + } + } + } + + let platform_str_to_platform = |platform: &str| -> Result { + let parts = platform.split("-").collect::>(); + let [driver, network, _]: [_; 3] = parts.try_into().map_err(|_| { + GenericError::new("Payment platform must be of the form {driver}-{network}-{token}") + })?; + + Ok(Platform::new(driver, network)) + }; + + /// Event broadcast information + /// + /// Each status property shall be broadcasted to all debit notes + /// and invoices affected. + /// + /// If properties are empty, a PaymentOkEvent will be sent. + #[derive(Default)] + struct Broadcast { + debit_notes: Vec<(String, NodeId)>, + invoices: Vec<(String, NodeId)>, + properties: Vec, + } + + // Create a mapping between platforms and relevant properties. + // + // This relies on the fact that a given payment driver status property + // can only affect one platform. + let mut broadcast = HashMap::::default(); + for prop in msg.properties { + let Some(network) = prop.network() else { + continue; + }; + + let value = broadcast + .entry(Platform::new(prop.driver(), network)) + .or_default(); + value.properties.push(prop); + } + + // All DAOs + let debit_note_dao: DebitNoteDao = db.as_dao(); + let debit_note_ev_dao: DebitNoteEventDao = db.as_dao(); + let invoice_dao: InvoiceDao = db.as_dao(); + let invoice_ev_dao: InvoiceEventDao = db.as_dao(); + + let accepted_notes = debit_note_dao + .list(Some(Role::Requestor), Some(DocumentStatus::Accepted)) + .await + .map_err(GenericError::new)?; + + // Populate broadcasts with affected debit_notes + for debit_note in accepted_notes { + let platform = platform_str_to_platform(&debit_note.payment_platform)?; + + // checks if the last payment-status event was PAYMENT_OK or no such event was emitted + // + // If debit note has reported driver errors before, we *must* send a broadcast on status change. + // This will either be a new problem, or PaymentOkEvent if no errors are found. + let was_already_ok = debit_note_ev_dao + .get_for_debit_note_id( + debit_note.debit_note_id.clone(), + None, + None, + None, + vec!["PAYMENT_EVENT".into(), "PAYMENT_OK".into()], + vec![], + ) + .await + .map_err(GenericError::new)? + .last() + .map(|ev_type| { + matches!( + &ev_type.event_type, + DebitNoteEventType::DebitNotePaymentOkEvent + ) + }) + .unwrap_or(true); + + if !was_already_ok { + if let Some(affected) = broadcast.get_mut(&platform) { + affected + .debit_notes + .push((debit_note.debit_note_id, debit_note.issuer_id)); + } + } + } + + let accepted_invoices = invoice_dao + .list(Some(Role::Requestor), Some(DocumentStatus::Accepted)) + .await + .map_err(GenericError::new)?; + + // Populate broadcasts with affected invoices + for invoice in accepted_invoices { + let platform = platform_str_to_platform(&invoice.payment_platform)?; + + // checks if the last payment-status event was PAYMENT_OK or no such event was emitted + // + // If debit note has reported driver errors before, we *must* send a broadcast on status change. + // This will either be a new problem, or PaymentOkEvent if no errors are found. + let was_already_ok = invoice_ev_dao + .get_for_invoice_id( + invoice.invoice_id.clone(), + None, + None, + None, + vec!["PAYMENT_EVENT".into(), "PAYMENT_OK".into()], + vec![], + ) + .await + .map_err(GenericError::new)? + .last() + .map(|ev_type| { + matches!(&ev_type.event_type, InvoiceEventType::InvoicePaymentOkEvent) + }) + .unwrap_or(true); + if let Some(affected) = broadcast.get_mut(&platform) { + affected + .invoices + .push((invoice.invoice_id, invoice.issuer_id)); + } + } + + // Emit debit note & invoice events. + for broadcast in broadcast.into_values() { + // If properties are empty, send OkEvents. Otherwise send the wrapped properties. + if broadcast.properties.is_empty() { + for (debit_note_id, owner_id) in &broadcast.debit_notes { + debit_note_ev_dao + .create( + debit_note_id.clone(), + *owner_id, + DebitNoteEventType::DebitNotePaymentOkEvent, + ) + .await + .map_err(GenericError::new)?; + } + + for (invoice_id, owner_id) in &broadcast.invoices { + invoice_ev_dao + .create( + invoice_id.clone(), + *owner_id, + InvoiceEventType::InvoicePaymentOkEvent, + ) + .await + .map_err(GenericError::new)?; + } + } else { + for prop in broadcast.properties { + for (invoice_id, owner_id) in &broadcast.invoices { + invoice_ev_dao + .create( + invoice_id.clone(), + *owner_id, + InvoiceEventType::InvoicePaymentStatusEvent { + property: prop.clone(), + }, + ) + .await + .map_err(GenericError::new)?; + } + for (debit_note_id, owner_id) in &broadcast.debit_notes { + debit_note_ev_dao + .create( + debit_note_id.clone(), + *owner_id, + DebitNoteEventType::DebitNotePaymentStatusEvent { + property: prop.clone(), + }, + ) + .await + .map_err(GenericError::new)?; + } + } + } + } + + Ok(Ack {}) + } async fn send_payment( db: DbExecutor, From 9334300222bdde931cf386336f21ad6e2af07495 Mon Sep 17 00:00:00 2001 From: Kamil Koczurek Date: Tue, 7 Nov 2023 09:58:43 +0100 Subject: [PATCH 2/7] erc20next: remove rpc.goerli.mudit.blog endpoint --- core/payment-driver/erc20next/config-payments.toml | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/core/payment-driver/erc20next/config-payments.toml b/core/payment-driver/erc20next/config-payments.toml index 68602e4f6c..8f7e09a642 100644 --- a/core/payment-driver/erc20next/config-payments.toml +++ b/core/payment-driver/erc20next/config-payments.toml @@ -23,11 +23,12 @@ ignore-deadlines = false [chain.goerli] chain-name = "Goerli" chain-id = 5 -rpc-endpoints = ["https://ethereum-goerli-rpc.allthatnode.com", - "https://rpc.goerli.mudit.blog", +rpc-endpoints = [ + "https://ethereum-goerli-rpc.allthatnode.com", "https://rpc.slock.it/goerli", "https://www.ethercluster.com/goerli", - "https://rpc.ankr.com/eth_goerli"] + "https://rpc.ankr.com/eth_goerli", +] currency-symbol = "tETH" priority-fee = 1.01 max-fee-per-gas = 300.0 @@ -41,7 +42,9 @@ block-explorer-url = "https://goerli.etherscan.io" [chain.mumbai] chain-name = "Mumbai testnet" chain-id = 80001 -rpc-endpoints = ["https://rpc-mumbai.maticvigil.com/v1/fd04db1066cae0f44d3461ae6d6a7cbbdd46e4a5"] +rpc-endpoints = [ + "https://rpc-mumbai.maticvigil.com/v1/fd04db1066cae0f44d3461ae6d6a7cbbdd46e4a5", +] # rpc-endpoints = ["http://127.0.0.1:8545"] currency-symbol = "tMATIC" priority-fee = 1.01 @@ -66,4 +69,3 @@ token = { address = "0x0B220b82F3eA3B7F6d9A1D8ab58930C064A2b5Bf", symbol = "GLM" # multi-contract = { address = "0x50100d4faf5f3b09987dea36dc2eddd57a3e561b", max-at-once = 10 } confirmation-blocks = 1 block-explorer-url = "https://polygonscan.com" - From c9187650f63e3c4293f7ccd0cc933e2b2a2ad21f Mon Sep 17 00:00:00 2001 From: Kamil Koczurek Date: Tue, 7 Nov 2023 10:29:37 +0100 Subject: [PATCH 3/7] erc20next: update readme.md --- core/payment-driver/erc20next/Readme.md | 213 ++++++------------------ 1 file changed, 53 insertions(+), 160 deletions(-) diff --git a/core/payment-driver/erc20next/Readme.md b/core/payment-driver/erc20next/Readme.md index e44e2b8330..fd76dcf889 100644 --- a/core/payment-driver/erc20next/Readme.md +++ b/core/payment-driver/erc20next/Readme.md @@ -1,163 +1,56 @@ -## Current ERC20 transactions flow -Date: 2021-10-22 +# Erc20Next Payment driver +## Functionality +A payment driver is an abstraction over any operations relating to funds, which includes: +* Scheduling transfers to run at any point in the future. +* Verifying transfers done by other parties. +* Checking acount balance. +* Reporting status of scheduled transactions and the account. -Disclaimer: This is dev documentation not officially maintained, it is intended for internal development. - -Testing transfers: -This command will send 0.0001 GLMs from internal wallet to address 0x89Ef977db64A2597bA57E3eb4b717D3bAAeBaeC3 (use your own address for testing) -Note that service have to be running otherwise you get no connection error. - -``` -yagna.exe payment transfer --amount 0.0001 --driver erc20next --network mumbai --to-address 0x89Ef977db64A2597bA57E3eb4b717D3bAAeBaeC3 -``` - -You can specify extra options -* --gas-price (starting gas price in gwei) -* --max-gas-price (maximum allowed gas price in gwei) -* --gas-limit (limit of gas used in transaction). Better to leave default as it is not affecting cost of transaction. This is convenient for testing errors on blockchain. - -``` -yagna.exe payment transfer --amount 0.0001 --gas-price 1.1 --max-gas-price 60.4 --gas-limit 80000 --driver erc20next --network mumbai --to-address 0x89Ef977db64A2597bA57E3eb4b717D3bAAeBaeC3 -``` - -Networks currently supported: -* mainnnet (ETH mainnet, do not use) -* rinkeby (ETH testnet, good support) -* goerli (ETH testnet) -* mumbai (Polygon testnet) -* polygon (Polygon mainnet) +The Erc20Next driver is such an abstraction built on top of the [ERC20 standard](https://ethereum.org/en/developers/docs/standards/tokens/erc-20/). ## Implementation - -DB fields explained - -```sql - tx_id TEXT NOT NULL PRIMARY KEY, - sender TEXT NOT NULL, - nonce INTEGER NOT NULL DEFAULT -1, - status INTEGER NOT NULL, - tx_type INTEGER NOT NULL, - tmp_onchain_txs TEXT NULL, - final_tx TEXT NULL, - starting_gas_price DOUBLE NULL, - current_gas_price DOUBLE NULL, - max_gas_price DOUBLE NULL, - final_gas_price DOUBLE NULL, - final_gas_used INTEGER NULL, - gas_limit INTEGER NULL, - time_created DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, - time_last_action DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, - time_sent DATETIME NULL, - time_confirmed DATETIME NULL, - network INTEGER NOT NULL DEFAULT 4, - last_error_msg TEXT NULL, - resent_times INT DEFAULT 0, - signature TEXT NULL, - encoded TEXT NOT NULL, -``` - -* tx_id - unique UUID4 generated for trasnsaction -* sender - internal yagna address used for sending transaction -* nonce - Ethereum nonce assigned to transaction -* status - status of the transaction: - * CREATED(1) - the transaction is submitted to db and wait to be sent - * SENT(2) - the transaction is successfully sent to the blockchain network - * PENDING(3) - the transaction is found on blockchain but waiting for execution - * CONFIRMED(4) - the transaction is confirmed and succeeded - * ERRORSENT(10) - transaction failed to be sent on-chain (not consuming nonce, has to be repeated) - * ERRORONCHAIN(11) - transaction is confirmed but failed on-chain (consuming nonce, cannot be repeated until new transaction is assigned to payment) -* tx_type (transfer or faucet transaction) -* tmp_onchain_txs - hashes of all transactions that are sent to the chain (important for checking transaction status when gas is increased) -* final_tx - onchain transaction hash only when transaction is CONFIRMED or ERRORONCHAIN. tmp_onchain_txs are removed to reduce clutter. -* starting_gas_price - gas in Gwei -* current_gas_price - starts with null then it has assigned higher level of gas until transaction is processed -* max_gas_price - limit for current_gas_price -* final_gas_price - assigned after transaction is CONFIRMED or ERRORONCHAIN. -* final_gas_used - assigned after transaction is CONFIRMED or ERRORONCHAIN. use final_gas_used * final_gas_price to have transaction cost in Gwei -* gas_limit - assigned max gas for transaction. If set to low ends with error during transaction sent or error on chain depending on set value. -* time_created - UTC time when entry is created. -* time_last_action - UTC time of last change of the entry. -* time_sent - UTC time of last succesfull sent. -* time_confirmed - UTC time of transaction confirmation (can be also when error on-chain) -* network - id of the Network (for example: 80001 is Mumbai, 137 is Polygon) -* last_error_msg - last error during sending or error onchain, Nulled when trasnaction is successfull -* resent_times - not used right now, intended to limit transaction retries -* signature - transaction signature -* encoded - YagnaRawTransaction encoded in json - -## Assigning nonces - -To process transaction in ethereum network you have to assign nonce which has to be strictly one greater than previous nonce. -This makes process of sending transaction tricky and complicated, because when you send two transactions with the same nonce one of them will fail. Other case is when one transaction is blocked or waiting others with higher nonce will get stuck too. -Currently for every transaction nonce is assigned and not changed until transaction will consume nonce on chain. - -Huge issue: When transaction is malformed and it get stuck, resend will not help and all transactions are blocked. - -With the implementation of the driver we are trying to resolve cases automatically but the process is complicated and not perfect right now. - -Good news: Nonce prevents double spending so we don't have to worry about that (unless we change the nonce) - -Note: Error on chain will consume nonce and gas and new nonce has to be assigned to proceed with transaction, which is currently not handled. - -## Bumping gas prices - -Problem on Ethereum network is as follows: -If you sent transaction you have only vague idea when transaction will be performed. -There is function that is estimating current gas price but it is far from perfect. -Also we want to pay for gas as little as possible. -For example on Polygon Network if you sent transaction for 30.01Gwei you can be pretty sure it get proceeded fairly smoothly -right now, but when the network is congested transaction can get stuck for indefinite amount of time. -When network usage is lower you can try to make transaction for lower gas fee (for example 20Gwei). -To resolve this issue we implemented automatic gas bumping. Every set amount of time gas is increased to increase probability of transaction getting processed. -That way we can save money on gas and also have bigger chance of transactions not getting stuck. - -Currently we proposed following gas levels for polygon network: -``` -10.011, //LOW PRIORITY -15.011, //LOW PRIORITY -20.011, //LOW PRIORITY -25.011, //LOW PRIORITY -30.011, //minimum suggested gas price - FAST PRIORITY -33.011, //not used -36.011, //not used -40.011, //not used -50.011, //not used -60.011, //EXPRESS PRIORITY -80.011, -100.011 -``` -Note that we add 0.011 to increase the chance of getting inline of someone setting whole number as gas price (which people tend to do). It costs almost nothing but significally increase your chance of transaction beeing processed. - -Note that on test networks gas doesn't matter and transaction is processed instantly regardless of gas set. So to test this -feature you have to use Polygon network and pay some Matic for gas. - -## VARIABLES: - -POLYGON_PRIORITY: -possible values: -slow - normal, low priority, economic mode, -fast - fast transaction (for testing or normal mode) -express - express transaction (for testing) - -ERC20_WAIT_FOR_PENDING_ON_NETWORK: (duration) -after that time transaction is resent with higher gas - -## List of known errors: - -Error when sending when gas-limit set too low -``` -RPC error: Error { code: ServerError(-32000), message: "intrinsic gas too low", data: None } -``` -``` -RPC error: Error { code: ServerError(-32000), message: "already known", data: None } -``` -``` -RPC error: Error { code: ServerError(-32000), message: "nonce too low", data: None } -``` - - - - - - - +The core implementation is in [erc20_payment_lib](https://github.com/golemfactory/erc20_payment_lib), this crate only serves as an interface connecting +it to yagna. + +## Configuration +### Via environment variables +#### Global settings +* `ERC20NEXT_SENDOUT_INTERVAL_SECS` -- The maximum interval at which transactions are batched and processed. A longer duration may conserve gas at the expense +of delivering payments at a later date. +#### Per-chain settings +In environment variables below, substitute `{CHAIN}` for the actual chain you wish to configure and `{GLM}` for the GLM symbol used on the chain. +To avoid confusion, `TGLM` is used on test chains that can mint GLM and `GLM` on non-test chains. +See `config-payments.toml` for the list of supported chains and token symbols. +* `{CHAIN}_GETH_ADDR` -- List of comma-separated RPC endpoints to be used. +* `{CHAIN}_PRIORITY_FEE` -- [priority fee](https://ethereum.org/nl/developers/docs/gas/#priority-fee). +* `{CHAIN}_MAX_FEE_PER_GAS` -- [max fee per gas](https://ethereum.org/nl/developers/docs/gas/#maxfee). +* `{CHAIN}_{SYMBOL}_CONTRACT_ADDRESS` -- Address of the GLM contract. +* `{CHAIN}_MULTI_PAYMENT_CONTRACT_ADDRESS` -- Address of a custom Golem contract allowing for executing multiple transfers at once. +* `ERC20NEXT_{CHAIN}_REQUIRED_CONFIRMATIONS` -- The number of confirmation blocks required to consider a transaction complete. + +Be aware that options not prefixed with `ERC20NEXT` are also applicable to the old Erc20 driver. + +### Via TOML file +* The default configuration can be seen in `config-payments.toml`. +* It can be overriden by placing a `config-payments.toml` file in yagna data directory. This is not recommended and is not guaranteed to work across versions. + +## Statuses +The Erc20Next driver can report a selection of statuses which indicate possible issues. +* `InsufficientGas`: + * An account does not have sufficient gas to execute further transactions. + * Contains: `driver`, `network`, `address`, `neededGasEst`. +* `InsufficientToken`: + * An account does not have sufficient funds to execute further transactions. + * Contains: `driver`, `network`, `address`, `neededTokenEst`. +* `InvalidChainId`: + * A transaction has been scheduled on a chain that is not present in the configuration. This can only happen if `payments-config.toml` has been changed in an incorrect manner. + * Contains: `driver`, `chainId`. +* `CantSign`: + * The transaction cannot be signed. This means that yagna cannot access the identitiy used for this transfer, which can be caused by it being removed or locked. + * Contains: `driver`, `network`, `address`. +* `TxStuck`: + * A transaction cannot proceed despite being sent to the blockchain. The most likely reason is too low `max fee` setting. + * Contains: `driver`, `network`. +* `RpcError`: + * An RPC endpoint is unreliable. Consider using a better selection of endpoints. + * `driver`, `network` \ No newline at end of file From c773d29815b76c95bda86ee6bb22d52e78bdbdb3 Mon Sep 17 00:00:00 2001 From: Kamil Koczurek Date: Tue, 7 Nov 2023 10:53:08 +0100 Subject: [PATCH 4/7] erc20next: add driver address to InsufficientGas and InsufficientToken statuses --- Cargo.lock | 6 +++--- Cargo.toml | 6 +++--- core/payment-driver/erc20next/src/driver.rs | 8 ++++++-- 3 files changed, 12 insertions(+), 8 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 248e520f15..31e801a818 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1869,7 +1869,7 @@ checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" [[package]] name = "erc20_payment_lib" version = "0.3.1" -source = "git+https://github.com/golemfactory/erc20_payment_lib?rev=101e6bf331db3fd6b2b55813abc9ccccbc4960b7#101e6bf331db3fd6b2b55813abc9ccccbc4960b7" +source = "git+https://github.com/golemfactory/erc20_payment_lib?rev=f3559980c8998f2865e593be73766ad2abcc11ca#f3559980c8998f2865e593be73766ad2abcc11ca" dependencies = [ "actix-files", "actix-web", @@ -7691,7 +7691,7 @@ dependencies = [ [[package]] name = "ya-client" version = "0.7.0" -source = "git+https://github.com/golemfactory/ya-client.git?rev=ca30ace41d68ee2df0c3ce32fce89c45669453b9#ca30ace41d68ee2df0c3ce32fce89c45669453b9" +source = "git+https://github.com/golemfactory/ya-client.git?rev=afdc1daf1f17a5128d911f8fff9d55c4f213e9af#afdc1daf1f17a5128d911f8fff9d55c4f213e9af" dependencies = [ "actix-codec", "awc", @@ -7715,7 +7715,7 @@ dependencies = [ [[package]] name = "ya-client-model" version = "0.5.0" -source = "git+https://github.com/golemfactory/ya-client.git?rev=ca30ace41d68ee2df0c3ce32fce89c45669453b9#ca30ace41d68ee2df0c3ce32fce89c45669453b9" +source = "git+https://github.com/golemfactory/ya-client.git?rev=afdc1daf1f17a5128d911f8fff9d55c4f213e9af#afdc1daf1f17a5128d911f8fff9d55c4f213e9af" dependencies = [ "bigdecimal 0.2.2", "chrono", diff --git a/Cargo.toml b/Cargo.toml index 8680286580..7eba711be2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -224,7 +224,7 @@ members = [ # diesel 1.4.* supports up to 0.23.0, but sqlx 0.5.9 requires 0.22.0 # sqlx 0.5.10 need 0.23.2, so 0.5.9 is last version possible libsqlite3-sys = { version = "0.26.0", features = ["bundled"] } -erc20_payment_lib = { git = "https://github.com/golemfactory/erc20_payment_lib", rev = "101e6bf331db3fd6b2b55813abc9ccccbc4960b7" } +erc20_payment_lib = { git = "https://github.com/golemfactory/erc20_payment_lib", rev = "f3559980c8998f2865e593be73766ad2abcc11ca" } #erc20_payment_lib = { path = "../../payments/erc20_payment_lib/crates/erc20_payment_lib" } #erc20_payment_lib = { version = "=0.3.1" } rand = "0.8.5" @@ -270,8 +270,8 @@ ya-sb-util = { git = "https://github.com/golemfactory/ya-service-bus.git", rev = #ya-sb-util = { path = "../ya-service-bus/crates/util" } ## CLIENT -ya-client = { git = "https://github.com/golemfactory/ya-client.git", rev = "ca30ace41d68ee2df0c3ce32fce89c45669453b9" } -ya-client-model = { git = "https://github.com/golemfactory/ya-client.git", rev = "ca30ace41d68ee2df0c3ce32fce89c45669453b9" } +ya-client = { git = "https://github.com/golemfactory/ya-client.git", rev = "afdc1daf1f17a5128d911f8fff9d55c4f213e9af" } +ya-client-model = { git = "https://github.com/golemfactory/ya-client.git", rev = "afdc1daf1f17a5128d911f8fff9d55c4f213e9af" } ## RELAY and networking stack ya-relay-stack = { git = "https://github.com/golemfactory/ya-relay.git", rev = "c92a75b0cf062fcc9dbb3ea2a034d913e5fad8e5" } diff --git a/core/payment-driver/erc20next/src/driver.rs b/core/payment-driver/erc20next/src/driver.rs index 4ab70c9e87..a2c7649fc7 100644 --- a/core/payment-driver/erc20next/src/driver.rs +++ b/core/payment-driver/erc20next/src/driver.rs @@ -210,24 +210,28 @@ impl Erc20NextDriver { } LibStatusProperty::NoGas { chain_id, + address, missing_gas, } => { let network = chain_id_to_net(chain_id); network_filter(&network).then(|| DriverStatusProperty::InsufficientGas { driver: DRIVER_NAME.into(), + address, network, - needed_gas_est: missing_gas.unwrap_or_default().to_string(), + needed_gas_est: missing_gas.to_string(), }) } LibStatusProperty::NoToken { chain_id, + address, missing_token, } => { let network = chain_id_to_net(chain_id); network_filter(&network).then(|| DriverStatusProperty::InsufficientToken { driver: DRIVER_NAME.into(), + address, network, - needed_token_est: missing_token.unwrap_or_default().to_string(), + needed_token_est: missing_token.to_string(), }) } }) From faa399b82e72e017440b1e33329dff18ece8e948 Mon Sep 17 00:00:00 2001 From: Kamil Koczurek Date: Mon, 13 Nov 2023 10:48:22 +0100 Subject: [PATCH 5/7] payment: fix driver status events for was_already_ok case --- core/payment/src/service.rs | 33 +++++++++++++++++++++++---------- 1 file changed, 23 insertions(+), 10 deletions(-) diff --git a/core/payment/src/service.rs b/core/payment/src/service.rs index 36c0d261c7..f34f000c87 100644 --- a/core/payment/src/service.rs +++ b/core/payment/src/service.rs @@ -879,9 +879,6 @@ mod public { let platform = platform_str_to_platform(&debit_note.payment_platform)?; // checks if the last payment-status event was PAYMENT_OK or no such event was emitted - // - // If debit note has reported driver errors before, we *must* send a broadcast on status change. - // This will either be a new problem, or PaymentOkEvent if no errors are found. let was_already_ok = debit_note_ev_dao .get_for_debit_note_id( debit_note.debit_note_id.clone(), @@ -903,8 +900,16 @@ mod public { .unwrap_or(true); if !was_already_ok { - if let Some(affected) = broadcast.get_mut(&platform) { - affected + // If debit note has reported driver errors before, we *must* send a broadcast on status change. + // This will either be a new problem, or PaymentOkEvent if no errors are found. + broadcast + .entry(platform) + .or_default() + .debit_notes + .push((debit_note.debit_note_id, debit_note.issuer_id)); + } else { + if let Some(broadcast) = broadcast.get_mut(&platform) { + broadcast .debit_notes .push((debit_note.debit_note_id, debit_note.issuer_id)); } @@ -921,9 +926,6 @@ mod public { let platform = platform_str_to_platform(&invoice.payment_platform)?; // checks if the last payment-status event was PAYMENT_OK or no such event was emitted - // - // If debit note has reported driver errors before, we *must* send a broadcast on status change. - // This will either be a new problem, or PaymentOkEvent if no errors are found. let was_already_ok = invoice_ev_dao .get_for_invoice_id( invoice.invoice_id.clone(), @@ -940,10 +942,21 @@ mod public { matches!(&ev_type.event_type, InvoiceEventType::InvoicePaymentOkEvent) }) .unwrap_or(true); - if let Some(affected) = broadcast.get_mut(&platform) { - affected + + if !was_already_ok { + // If invoice has reported driver errors before, we *must* send a broadcast on status change. + // This will either be a new problem, or PaymentOkEvent if no errors are found. + broadcast + .entry(platform) + .or_default() .invoices .push((invoice.invoice_id, invoice.issuer_id)); + } else { + if let Some(broadcast) = broadcast.get_mut(&platform) { + broadcast + .invoices + .push((invoice.invoice_id, invoice.issuer_id)); + } } } From 2a83376a4b987cc45eaa10b98f92584d16bdeed2 Mon Sep 17 00:00:00 2001 From: Kamil Koczurek Date: Mon, 13 Nov 2023 11:50:30 +0100 Subject: [PATCH 6/7] payment: Add indexes on status columnt for pay_{invoice,debit_note} --- .../migrations/2023-11-13-123456_index_on_doc_status/drop.sql | 3 +++ .../migrations/2023-11-13-123456_index_on_doc_status/up.sql | 3 +++ 2 files changed, 6 insertions(+) create mode 100644 core/payment/migrations/2023-11-13-123456_index_on_doc_status/drop.sql create mode 100644 core/payment/migrations/2023-11-13-123456_index_on_doc_status/up.sql diff --git a/core/payment/migrations/2023-11-13-123456_index_on_doc_status/drop.sql b/core/payment/migrations/2023-11-13-123456_index_on_doc_status/drop.sql new file mode 100644 index 0000000000..89726cac49 --- /dev/null +++ b/core/payment/migrations/2023-11-13-123456_index_on_doc_status/drop.sql @@ -0,0 +1,3 @@ +drop index pay_invoice_status; +drop index pay_debit_note_status; +drop index pay_debit_note_due_date; diff --git a/core/payment/migrations/2023-11-13-123456_index_on_doc_status/up.sql b/core/payment/migrations/2023-11-13-123456_index_on_doc_status/up.sql new file mode 100644 index 0000000000..1ebbce112d --- /dev/null +++ b/core/payment/migrations/2023-11-13-123456_index_on_doc_status/up.sql @@ -0,0 +1,3 @@ +create index if not exists pay_invoice_status on pay_invoice ("status"); +create index if not exists pay_debit_note_status on pay_debit_note ("status"); +create index if not exists pay_debit_note_due_date on pay_debit_note (payment_due_date); From d4321e886c83effdfdbeb44abb89c16bf44831ce Mon Sep 17 00:00:00 2001 From: Kamil Koczurek Date: Mon, 13 Nov 2023 12:18:28 +0100 Subject: [PATCH 7/7] payment: Only generate payment events for payable debit notes --- core/payment/src/dao/debit_note.rs | 9 +++++++++ core/payment/src/service.rs | 6 +++++- 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/core/payment/src/dao/debit_note.rs b/core/payment/src/dao/debit_note.rs index 75782c86fc..1bcaa4e676 100644 --- a/core/payment/src/dao/debit_note.rs +++ b/core/payment/src/dao/debit_note.rs @@ -203,6 +203,7 @@ impl<'c> DebitNoteDao<'c> { &self, role: Option, status: Option, + payable: Option, ) -> DbResult> { readonly_transaction(self.pool, move |conn| { let mut query = query!().into_boxed(); @@ -212,6 +213,14 @@ impl<'c> DebitNoteDao<'c> { if let Some(status) = status { query = query.filter(dsl::status.eq(status.to_string())); } + if let Some(payable) = payable { + // Payable debit notes have not-null payment_due_date. + if payable { + query = query.filter(dsl::payment_due_date.is_not_null()); + } else { + query = query.filter(dsl::payment_due_date.is_null()); + } + } let debit_notes: Vec = query.order_by(dsl::timestamp.desc()).load(conn)?; debit_notes.into_iter().map(TryInto::try_into).collect() diff --git a/core/payment/src/service.rs b/core/payment/src/service.rs index f34f000c87..7b16ebb7b4 100644 --- a/core/payment/src/service.rs +++ b/core/payment/src/service.rs @@ -870,7 +870,11 @@ mod public { let invoice_ev_dao: InvoiceEventDao = db.as_dao(); let accepted_notes = debit_note_dao - .list(Some(Role::Requestor), Some(DocumentStatus::Accepted)) + .list( + Some(Role::Requestor), + Some(DocumentStatus::Accepted), + Some(true), + ) .await .map_err(GenericError::new)?;