Skip to content

Commit

Permalink
Added post migration to sum paid activities into agreements
Browse files Browse the repository at this point in the history
  • Loading branch information
scx1332 committed Sep 16, 2024
1 parent 822727f commit 858a5f5
Show file tree
Hide file tree
Showing 3 changed files with 166 additions and 16 deletions.
27 changes: 14 additions & 13 deletions core/payment/migrations/2024-08-12-171012_extended_order/up.sql
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ CREATE TABLE pay_batch_order_item(

CONSTRAINT pay_batch_order_item_pk PRIMARY KEY (owner_id, order_id, payee_addr),
CONSTRAINT pay_batch_order_item_fk1 FOREIGN KEY (owner_id, order_id) REFERENCES pay_batch_order(owner_id, id),
CONSTRAINT pay_batch_order_item_fk2 FOREIGN KEY (owner_id, allocation_id) REFERENCES pay_allocation(owner_id, id),
CONSTRAINT pay_batch_order_item_fk2 FOREIGN KEY (owner_id, allocation_id) REFERENCES pay_allocation(owner_id, id)
);

-- We are selecting by payment_id when notifying the pay_batch_order_item
Expand Down Expand Up @@ -173,18 +173,19 @@ SELECT owner_id,
amount
FROM pay_agreement_payment;

-- update total_paid in pay_agreement:
UPDATE pay_agreement
SET total_amount_paid = cast(total_amount_paid + (SELECT sum(total_amount_paid)
FROM pay_activity s
WHERE s.owner_id = pay_agreement.owner_id
AND s.role = pay_agreement.role
AND s.agreement_id = pay_agreement.id) AS VARCHAR)
WHERE EXISTS (SELECT 1 FROM pay_activity s2 WHERE s2.owner_id = pay_agreement.owner_id
AND s2.role = pay_agreement.role
AND s2.agreement_id = pay_agreement.id);


DROP TABLE pay_activity_payment;
DROP TABLE pay_agreement_payment;
DROP TABLE pay_order;


CREATE TABLE pay_post_migration(
job VARCHAR(50) NOT NULL,
done DATETIME,
result TEXT,

CONSTRAINT pay_post_migration_pk PRIMARY KEY (job)
);

-- update total_paid in pay_agreement after migrations
INSERT INTO pay_post_migration (job) VALUES ('sum_activities_into_agreement');
INSERT INTO pay_post_migration (job) VALUES ('dummy_job');
6 changes: 4 additions & 2 deletions core/payment/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,14 @@ impl Service for PaymentService {
impl PaymentService {
pub async fn gsb<Context: Provider<Self, DbExecutor>>(context: &Context) -> anyhow::Result<()> {
let db = context.component();
db.apply_migration(migrations::run_with_output)?;
db.apply_migration(migrations::run_with_output)
.map_err(|e| anyhow::anyhow!("Failed to apply payment service migrations: {}", e))?;

let config = Arc::new(Config::from_env()?);

let processor = Arc::new(PaymentProcessor::new(db.clone()));
self::service::bind_service(&db, processor.clone(), config);
self::service::bind_service(&db, processor.clone(), BindOptions::default(), config);
processor.process_post_migration_jobs().await?;

tokio::task::spawn(async move {
processor.release_allocations(false).await;
Expand Down
149 changes: 148 additions & 1 deletion core/payment/src/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,16 @@ use crate::error::processor::{
AccountNotRegistered, GetStatusError, NotifyPaymentError, ValidateAllocationError,
VerifyPaymentError,
};
use crate::error::DbResult;
use crate::models::cycle::DbPayBatchCycle;
use crate::payment_sync::SYNC_NOTIFS_NOTIFY;
use crate::timeout_lock::{MutexTimeoutExt, RwLockTimeoutExt};

use actix_web::web::Data;
use bigdecimal::{BigDecimal, Zero};
use chrono::{DateTime, Utc};
use diesel::sql_types::Text;
use diesel::RunQueryDsl;
use futures::{FutureExt, TryFutureExt};
use metrics::counter;
use std::collections::hash_map::Entry;
Expand Down Expand Up @@ -46,7 +49,7 @@ use ya_core_model::payment::local::{
use ya_core_model::payment::public::{SendPayment, SendSignedPayment, BUS_ID};
use ya_core_model::{identity, NodeId};
use ya_net::RemoteEndpoint;
use ya_persistence::executor::DbExecutor;
use ya_persistence::executor::{do_with_transaction, DbExecutor};
use ya_persistence::types::Role;
use ya_service_bus::typed::{service, Endpoint};
use ya_service_bus::{typed as bus, RpcEndpoint, RpcMessage};
Expand Down Expand Up @@ -446,6 +449,150 @@ impl PaymentProcessor {
.get_platform(driver, network, token)
}

pub async fn process_post_migration_jobs(&self) -> DbResult<()> {
let db_executor = self
.db_executor
.timeout_lock(DB_LOCK_TIMEOUT)
.await
.expect("db lock timeout");

/*
-- we have to run this query but by hand because of lack of decimal support:
UPDATE pay_agreement
SET total_amount_paid = cast(total_amount_paid + (SELECT sum(total_amount_paid)
FROM pay_activity s
WHERE s.owner_id = pay_agreement.owner_id
AND s.role = pay_agreement.role
AND s.agreement_id = pay_agreement.id) AS VARCHAR)
WHERE EXISTS (SELECT 1 FROM pay_activity s2 WHERE s2.owner_id = pay_agreement.owner_id
AND s2.role = pay_agreement.role
AND s2.agreement_id = pay_agreement.id);
*/
#[derive(QueryableByName, PartialEq, Debug)]
struct JobRecord {
#[sql_type = "Text"]
job: String,
}

#[derive(QueryableByName, PartialEq, Debug)]
struct AgreementActivityRecord {
#[sql_type = "Text"]
agreement_id: String,
#[sql_type = "Text"]
owner_id: String,
#[sql_type = "Text"]
role: String,
#[sql_type = "Text"]
total_amount_paid_agreement: String,
#[sql_type = "Text"]
total_amount_paid_activity: String,
}

do_with_transaction(&db_executor.pool, "run_post_migration", move |conn| {
const JOB_NAME: &str = "sum_activities_into_agreement";
let job_records = diesel::sql_query(
r#"
SELECT job FROM pay_post_migration WHERE done IS NULL AND job = ?
"#,
)
.bind::<Text, _>(JOB_NAME)
.load::<JobRecord>(conn)?;
let job_record = job_records.first();

if let Some(job_record) = job_record {
log::info!("Running post migration job: sum_activities_into_agreement");

let records: Vec<AgreementActivityRecord> = diesel::sql_query(
r#"
SELECT pag.id AS agreement_id,
pag.owner_id AS owner_id,
pag.role AS role,
pag.total_amount_paid AS total_amount_paid_agreement,
pac.total_amount_paid AS total_amount_paid_activity
FROM pay_agreement AS pag
JOIN pay_activity AS pac
ON pac.agreement_id = pag.id
AND pac.owner_id = pag.owner_id
AND pac.role = pag.role
ORDER BY agreement_id
"#,
)
.load(conn)?;

let mut current_idx: usize = 0;
if let Some(first_record) = records.get(current_idx) {
let mut current_sum: BigDecimal = Zero::zero();
let mut current_agreement_id = first_record.agreement_id.clone();

while current_idx < records.len() {
let record = &records
.get(current_idx)
.expect("record has to be found on index");

current_sum +=
BigDecimal::from_str(&records[current_idx].total_amount_paid_activity)
.unwrap_or_default();

let write_total_sum = records
.get(current_idx + 1)
.map(|rec| rec.agreement_id != current_agreement_id.as_str())
.unwrap_or(true);
if write_total_sum {
current_sum +=
BigDecimal::from_str(&record.total_amount_paid_agreement)
.unwrap_or_default();

diesel::sql_query(
r#"
UPDATE pay_agreement
SET total_amount_paid = $1
WHERE id = $2
AND owner_id = $3
AND role = $4
"#,
)
.bind::<Text, _>(current_sum.to_string())
.bind::<Text, _>(current_agreement_id)
.bind::<Text, _>(&record.owner_id)
.bind::<Text, _>(&record.role)
.execute(conn)?;
current_sum = Zero::zero();
current_agreement_id = records
.get(current_idx + 1)
.map(|rec| rec.agreement_id.clone())
.unwrap_or_default();
}
current_idx += 1;
}
}

log::info!(
"Post migration job: sum_activities_into_agreement done. Marking as done."
);
let marked = diesel::sql_query(
r#"
UPDATE pay_post_migration
SET done = STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW'),
result = 'ok'
WHERE job = ?
"#,
)
.bind::<Text, _>(JOB_NAME)
.execute(conn)?;
if marked != 1 {
log::error!(
"Post migration job: sum_activities_into_agreement not marked as done"
);
}
} else {
log::info!("No post migration jobs to run");
}
Ok(())
})
.await
}

async fn send_batch_order_payments(
&self,
owner: NodeId,
Expand Down

0 comments on commit 858a5f5

Please sign in to comment.