Skip to content

Commit

Permalink
changes page_size for orb subscriptions to 75 (#18)
Browse files Browse the repository at this point in the history
  • Loading branch information
shahadarsh authored Oct 29, 2024
1 parent b3e6e01 commit 7369e6c
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 11 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "orb_fdw"
version = "0.13.0"
version = "0.13.1"
edition = "2021"
publish = false

Expand Down
2 changes: 1 addition & 1 deletion Trunk.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ description = "Postgres Foreign Data Wrapper for withorb.com Backend API"
homepage = "https://github.com/tembo-io/orb_fdw"
documentation = "https://github.com/tembo-io/orb_fdw"
categories = ["connectors"]
version = "0.13.0"
version = "0.13.1"

[build]
postgres_version = "15"
Expand Down
44 changes: 36 additions & 8 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use pgrx::{pg_sys, prelude::*, warning, JsonB};
use serde_json::Value as JsonValue;
use std::time::Duration;
use std::{collections::HashMap, env, str::FromStr};
use supabase_wrappers::prelude::*;
use tokio::runtime::Runtime;
Expand All @@ -9,8 +10,7 @@ use crate::orb_fdw::{OrbFdwError, OrbFdwResult};
use futures::StreamExt;
use orb_billing::{
Client as OrbClient, ClientConfig as OrbClientConfig, Customer as OrbCustomer,
Invoice as OrbInvoice, InvoiceListParams, ListParams, Subscription as OrbSubscription,
SubscriptionListParams,
Invoice as OrbInvoice, InvoiceListParams, ListParams, SubscriptionListParams,
};

fn resp_to_rows(obj: &str, resp: &JsonValue, tgt_cols: &[Column]) -> OrbFdwResult<Vec<Row>> {
Expand Down Expand Up @@ -233,13 +233,41 @@ impl ForeignDataWrapper<OrbFdwError> for OrbFdw {
}
}
"subscriptions" => {
let subscriptions_stream = self
.client
.list_subscriptions(&SubscriptionListParams::DEFAULT.page_size(500));
let subscriptions = subscriptions_stream.collect::<Vec<_>>().await;
let processed_subscriptions: Vec<OrbSubscription> = process_data(subscriptions);
pub const MAX_ATTEMPTS: u8 = 3;
let mut attempt = 1;
let initial_page_size = 75;

let subscriptions = 'outer: loop {
// Reduce page size at every attempt, should reduce odds of request timeouts
let page_size = initial_page_size / (attempt as u64);
let params = SubscriptionListParams::DEFAULT.page_size(page_size);

let subscriptions_stream = self.client.list_subscriptions(&params);
let mut subscriptions_stream = Box::pin(subscriptions_stream);
let mut subscriptions = Vec::new();

while let Some(subscription_result) = subscriptions_stream.next().await {
match subscription_result {
Ok(subscription) => {
subscriptions.push(subscription);
}
Err(err) if attempt >= MAX_ATTEMPTS => {
warning!("Error getting Orb subscription after {attempt} attempts: {err}");
}
Err(err) => {
attempt += 1;
let backoff_duration = Duration::from_secs(2_u64.pow((attempt - 1) as u32));
warning!("Attempt no. {attempt} of fetching Orb subscriptions failed with {err}, trying again in {:.2} secs.", backoff_duration.as_secs_f64());

tokio::time::sleep(backoff_duration).await;
continue 'outer;
}
}
}
break subscriptions;
};

match serde_json::to_value(processed_subscriptions) {
match serde_json::to_value(subscriptions) {
Ok(value) => value,
Err(e) => error!("{}", OrbFdwError::JsonSerializationError(e)),
}
Expand Down
3 changes: 3 additions & 0 deletions src/orb_fdw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ pub enum OrbFdwError {

#[error("Missing required option: '{0}'")]
MissingRequiredOption(String),

#[error("Failed to fetch from Orb: '{0}'")]
NetworkError(String),
}

impl From<OrbFdwError> for ErrorReport {
Expand Down

0 comments on commit 7369e6c

Please sign in to comment.