Skip to content

Commit

Permalink
payment: add driver status events to debit notes & invoices
Browse files Browse the repository at this point in the history
- Fix details ser/de for events.
- General refactor.
  • Loading branch information
kamirr committed Nov 3, 2023
1 parent 5f969fa commit 6bc0413
Show file tree
Hide file tree
Showing 12 changed files with 265 additions and 69 deletions.
6 changes: 3 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -270,8 +270,8 @@ ya-sb-util = { git = "https://github.com/golemfactory/ya-service-bus.git", rev =
#ya-sb-util = { path = "../ya-service-bus/crates/util" }

## CLIENT
ya-client = { git = "https://github.com/golemfactory/ya-client.git", rev = "2bb679e3bb1d61eddd713d7f19ee127595f27162" }
ya-client-model = { git = "https://github.com/golemfactory/ya-client.git", rev = "2bb679e3bb1d61eddd713d7f19ee127595f27162" }
ya-client = { git = "https://github.com/golemfactory/ya-client.git", rev = "543c37d9ae360acae954cde88379583f689cdfa4" }
ya-client-model = { git = "https://github.com/golemfactory/ya-client.git", rev = "543c37d9ae360acae954cde88379583f689cdfa4" }

## RELAY and networking stack
ya-relay-stack = { git = "https://github.com/golemfactory/ya-relay.git", rev = "c92a75b0cf062fcc9dbb3ea2a034d913e5fad8e5" }
Expand Down
13 changes: 12 additions & 1 deletion core/model/src/payment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ pub enum RpcMessageError {
}

pub mod local {
use super::*;
use super::{public::Ack, *};
use crate::driver::{AccountMode, GasDetails, PaymentConfirmation};
use bigdecimal::{BigDecimal, Zero};
use chrono::{DateTime, Utc};
Expand Down Expand Up @@ -424,6 +424,17 @@ pub mod local {
Internal(String),
}

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct PaymentDriverStatusChange {
pub properties: Vec<DriverStatusProperty>,
}

impl RpcMessage for PaymentDriverStatusChange {
const ID: &'static str = "PaymentDriverStatusChange";
type Item = Ack;
type Error = GenericError;
}

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct PaymentDriverStatus {
pub driver: Option<String>,
Expand Down
3 changes: 1 addition & 2 deletions core/payment/src/dao/activity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,10 @@ pub fn increase_amount_paid(
debit_note::update_status(&debit_note_ids, owner_id, &DocumentStatus::Settled, conn)?;

for debit_note_id in debit_note_ids {
debit_note_event::create::<()>(
debit_note_event::create(
debit_note_id,
*owner_id,
DebitNoteEventType::DebitNoteSettledEvent,
None,
conn,
)?;
}
Expand Down
3 changes: 1 addition & 2 deletions core/payment/src/dao/agreement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,11 +148,10 @@ pub fn increase_amount_paid(

if let Some((invoice_id, role)) = invoice_query {
invoice::update_status(&invoice_id, owner_id, &DocumentStatus::Settled, conn)?;
invoice_event::create::<()>(
invoice_event::create(
invoice_id,
*owner_id,
InvoiceEventType::InvoiceSettledEvent,
None,
conn,
)?;
}
Expand Down
24 changes: 17 additions & 7 deletions core/payment/src/dao/debit_note.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,11 +138,10 @@ impl<'c> DebitNoteDao<'c> {
diesel::insert_into(dsl::pay_debit_note)
.values(debit_note)
.execute(conn)?;
debit_note_event::create::<()>(
debit_note_event::create(
debit_note_id.clone(),
owner_id,
DebitNoteEventType::DebitNoteReceivedEvent,
None,
conn,
)?;
Ok(debit_note_id)
Expand Down Expand Up @@ -170,11 +169,10 @@ impl<'c> DebitNoteDao<'c> {
diesel::insert_into(dsl::pay_debit_note)
.values(debit_note)
.execute(conn)?;
debit_note_event::create::<()>(
debit_note_event::create(
debit_note_id,
owner_id,
DebitNoteEventType::DebitNoteReceivedEvent,
None,
conn,
)?;
Ok(())
Expand All @@ -201,9 +199,21 @@ impl<'c> DebitNoteDao<'c> {
.await
}

pub async fn get_all(&self) -> DbResult<Vec<DebitNote>> {
pub async fn list(
&self,
role: Option<Role>,
status: Option<DocumentStatus>,
) -> DbResult<Vec<DebitNote>> {
readonly_transaction(self.pool, move |conn| {
let debit_notes: Vec<ReadObj> = query!().order_by(dsl::timestamp.desc()).load(conn)?;
let mut query = query!().into_boxed();
if let Some(role) = role {
query = query.filter(dsl::role.eq(role.to_string()));
}
if let Some(status) = status {
query = query.filter(dsl::status.eq(status.to_string()));
}

let debit_notes: Vec<ReadObj> = query.order_by(dsl::timestamp.desc()).load(conn)?;
debit_notes.into_iter().map(TryInto::try_into).collect()
})
.await
Expand Down Expand Up @@ -272,7 +282,7 @@ impl<'c> DebitNoteDao<'c> {
update_status(&vec![debit_note_id.clone()], &owner_id, &status, conn)?;
activity::set_amount_accepted(&activity_id, &owner_id, &amount, conn)?;
for event in events {
debit_note_event::create::<()>(debit_note_id.clone(), owner_id, event, None, conn)?;
debit_note_event::create(debit_note_id.clone(), owner_id, event, conn)?;
}

Ok(())
Expand Down
50 changes: 43 additions & 7 deletions core/payment/src/dao/debit_note_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use crate::schema::pay_debit_note_event::dsl as write_dsl;
use crate::schema::pay_debit_note_event_read::dsl as read_dsl;
use chrono::NaiveDateTime;
use diesel::{ExpressionMethods, QueryDsl, RunQueryDsl};
use serde::Serialize;
use std::borrow::Cow;
use std::collections::HashSet;
use std::convert::TryInto;
Expand All @@ -15,14 +14,13 @@ use ya_persistence::executor::{
};
use ya_persistence::types::{AdaptTimestamp, Role};

pub fn create<T: Serialize>(
pub fn create(
debit_note_id: String,
owner_id: NodeId,
event_type: DebitNoteEventType,
details: Option<T>,
conn: &ConnType,
) -> DbResult<()> {
let event = WriteObj::new(debit_note_id, owner_id, event_type, details)?;
let event = WriteObj::new(debit_note_id, owner_id, event_type)?;
diesel::insert_into(write_dsl::pay_debit_note_event)
.values(event)
.execute(conn)?;
Expand All @@ -40,15 +38,53 @@ impl<'c> AsDao<'c> for DebitNoteEventDao<'c> {
}

impl<'c> DebitNoteEventDao<'c> {
pub async fn create<T: Serialize + Send + 'static>(
pub async fn create(
&self,
debit_note_id: String,
owner_id: NodeId,
event_type: DebitNoteEventType,
details: Option<T>,
) -> DbResult<()> {
do_with_transaction(self.pool, move |conn| {
create(debit_note_id, owner_id, event_type, details, conn)
create(debit_note_id, owner_id, event_type, conn)
})
.await
}

pub async fn get_for_debit_note_id(
&self,
debit_note_id: String,
after_timestamp: Option<NaiveDateTime>,
max_events: Option<u32>,
app_session_id: Option<String>,
requestor_events: Vec<Cow<'static, str>>,
provider_events: Vec<Cow<'static, str>>,
) -> DbResult<Vec<DebitNoteEvent>> {
readonly_transaction(self.pool, move |conn| {
let mut query = read_dsl::pay_debit_note_event_read
.filter(read_dsl::debit_note_id.eq(debit_note_id))
.order_by(read_dsl::timestamp.asc())
.into_boxed();
if let Some(timestamp) = after_timestamp {
query = query.filter(read_dsl::timestamp.gt(timestamp.adapt()));
}
if let Some(app_session_id) = app_session_id {
query = query.filter(read_dsl::app_session_id.eq(app_session_id));
}
if let Some(limit) = max_events {
query = query.limit(limit.into());
}
let events: Vec<ReadObj> = query.load(conn)?;
let requestor_events: HashSet<Cow<'static, str>> =
requestor_events.into_iter().collect();
let provider_events: HashSet<Cow<'static, str>> = provider_events.into_iter().collect();
events
.into_iter()
.filter(|e| match e.role {
Role::Requestor => requestor_events.contains(e.event_type.as_str()),
Role::Provider => provider_events.contains(e.event_type.as_str()),
})
.map(TryInto::try_into)
.collect()
})
.await
}
Expand Down
39 changes: 34 additions & 5 deletions core/payment/src/dao/invoice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,10 @@ impl<'c> InvoiceDao<'c> {
.map(|_| ())
})?;

invoice_event::create::<()>(
invoice_event::create(
invoice_id,
owner_id,
InvoiceEventType::InvoiceReceivedEvent,
None,
conn,
)?;

Expand All @@ -139,6 +138,37 @@ impl<'c> InvoiceDao<'c> {
self.insert(invoice, activity_ids).await
}

pub async fn list(
&self,
role: Option<Role>,
status: Option<DocumentStatus>,
) -> DbResult<Vec<Invoice>> {
readonly_transaction(self.pool, move |conn| {
let mut query = query!().into_boxed();
if let Some(role) = role {
query = query.filter(dsl::role.eq(role.to_string()));
}
if let Some(status) = status {
query = query.filter(dsl::status.eq(status.to_string()));
}

let read_objs: Vec<ReadObj> = query.order_by(dsl::timestamp.desc()).load(conn)?;
let mut invoices = Vec::<Invoice>::new();

for read_obj in read_objs {
let activity_ids = activity_dsl::pay_invoice_x_activity
.select(activity_dsl::activity_id)
.filter(activity_dsl::invoice_id.eq(&read_obj.id))
.filter(activity_dsl::owner_id.eq(read_obj.owner_id))
.load(conn)?;
invoices.push(read_obj.into_api_model(activity_ids)?);
}

Ok(invoices)
})
.await
}

pub async fn get(&self, invoice_id: String, owner_id: NodeId) -> DbResult<Option<Invoice>> {
readonly_transaction(self.pool, move |conn| {
let invoice: Option<ReadObj> = query!()
Expand Down Expand Up @@ -273,7 +303,7 @@ impl<'c> InvoiceDao<'c> {
agreement::set_amount_accepted(&agreement_id, &owner_id, &amount, conn)?;

for event in events {
invoice_event::create::<()>(invoice_id.clone(), owner_id, event, None, conn)?;
invoice_event::create(invoice_id.clone(), owner_id, event, conn)?;
}

Ok(())
Expand Down Expand Up @@ -376,11 +406,10 @@ impl<'c> InvoiceDao<'c> {
agreement::compute_amount_due(&agreement_id, &owner_id, conn)?;

update_status(&invoice_id, &owner_id, &DocumentStatus::Cancelled, conn)?;
invoice_event::create::<()>(
invoice_event::create(
invoice_id,
owner_id,
InvoiceEventType::InvoiceCancelledEvent,
None,
conn,
)?;

Expand Down
11 changes: 4 additions & 7 deletions core/payment/src/dao/invoice_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use crate::schema::pay_invoice_event::dsl as write_dsl;
use crate::schema::pay_invoice_event_read::dsl as read_dsl;
use chrono::NaiveDateTime;
use diesel::{ExpressionMethods, QueryDsl, RunQueryDsl};
use serde::Serialize;
use std::borrow::Cow;
use std::collections::HashSet;
use std::convert::TryInto;
Expand All @@ -15,14 +14,13 @@ use ya_persistence::executor::{
};
use ya_persistence::types::{AdaptTimestamp, Role};

pub fn create<T: Serialize>(
pub fn create(
invoice_id: String,
owner_id: NodeId,
event_type: InvoiceEventType,
details: Option<T>,
conn: &ConnType,
) -> DbResult<()> {
let event = WriteObj::new(invoice_id, owner_id, event_type, details)?;
let event = WriteObj::new(invoice_id, owner_id, event_type)?;
diesel::insert_into(write_dsl::pay_invoice_event)
.values(event)
.execute(conn)?;
Expand All @@ -40,15 +38,14 @@ impl<'c> AsDao<'c> for InvoiceEventDao<'c> {
}

impl<'c> InvoiceEventDao<'c> {
pub async fn create<T: Serialize + Send + 'static>(
pub async fn create(
&self,
invoice_id: String,
owner_id: NodeId,
event_type: InvoiceEventType,
details: Option<T>,
) -> DbResult<()> {
do_with_transaction(self.pool, move |conn| {
create(invoice_id, owner_id, event_type, details, conn)
create(invoice_id, owner_id, event_type, conn)
})
.await
}
Expand Down
Loading

0 comments on commit 6bc0413

Please sign in to comment.