Skip to content

Commit

Permalink
refix prices db
Browse files Browse the repository at this point in the history
  • Loading branch information
canonbrother committed Nov 28, 2024
1 parent 997a654 commit ccae506
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 26 deletions.
27 changes: 26 additions & 1 deletion lib/ain-ocean/src/api/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@ use super::query::PaginationQuery;
use crate::{
error::{
Error::ToArrayError, InvalidAmountSnafu, InvalidFixedIntervalPriceSnafu,
InvalidPoolPairSymbolSnafu, InvalidTokenCurrencySnafu,
InvalidPriceTickerSortKeySnafu, InvalidPoolPairSymbolSnafu, InvalidTokenCurrencySnafu,
},
hex_encoder::as_sha256,
model::PriceTickerId,
network::Network,
Result,
};
Expand Down Expand Up @@ -128,6 +129,30 @@ pub fn parse_query_height_txid(item: &str) -> Result<(u32, Txid)> {
Ok((height, txid))
}

pub fn parse_price_ticker_sort(item: &str) -> Result<PriceTickerId> {
let mut parts = item.split('-');
let count_height_token = parts.next().context(InvalidPriceTickerSortKeySnafu { item })?;
let encoded_count = &count_height_token[..8];
let encoded_height = &count_height_token[8..16];
let token = &count_height_token[16..];
let token = token.to_string();

let count: [u8; 4] = hex::decode(encoded_count)?
.try_into()
.map_err(|_| ToArrayError)?;

let height: [u8; 4] = hex::decode(encoded_height)?
.try_into()
.map_err(|_| ToArrayError)?;

let currency = parts
.next()
.context(InvalidTokenCurrencySnafu { item })?
.to_string();

Ok((count, height, token, currency))
}

#[must_use]
pub fn format_number(v: Decimal) -> String {
if v == dec!(0) {
Expand Down
32 changes: 15 additions & 17 deletions lib/ain-ocean/src/api/prices.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{collections::HashSet, str::FromStr, sync::Arc};
use std::{str::FromStr, sync::Arc};

use ain_dftx::{Currency, Token, Weightage, COIN};
use ain_macros::ocean_endpoint;
Expand All @@ -13,7 +13,7 @@ use serde::{Deserialize, Serialize};
use serde_with::skip_serializing_none;

use super::{
common::parse_token_currency,
common::{parse_token_currency, parse_price_ticker_sort},
oracle::OraclePriceFeedResponse,
query::PaginationQuery,
response::{ApiPagedResponse, Response},
Expand Down Expand Up @@ -119,28 +119,26 @@ async fn list_prices(
Query(query): Query<PaginationQuery>,
Extension(ctx): Extension<Arc<AppContext>>,
) -> Result<ApiPagedResponse<PriceTickerResponse>> {
let mut set: HashSet<(Token, Currency)> = HashSet::new();
let next = query
.next
.map(|item| {
let id = parse_price_ticker_sort(&item)?;
Ok::<([u8; 4], [u8; 4], Token, Currency), Error>(id)
})
.transpose()?;

let prices = ctx
.services
.price_ticker
.by_id
.list(None, SortOrder::Descending)?
.flat_map(|item| {
.list(next.clone(), SortOrder::Descending)?
.take(query.size + usize::from(next.clone().is_some()))
.skip(usize::from(next.is_some()))
.map(|item| {
let ((_, _, token, currency), v) = item?;
let has_key = set.contains(&(token.clone(), currency.clone()));
if !has_key {
set.insert((token.clone(), currency.clone()));
Ok::<Option<PriceTickerResponse>, Error>(Some(PriceTickerResponse::from((
(token, currency),
v,
))))
} else {
Ok(None)
}
Ok(PriceTickerResponse::from(((token, currency), v)))
})
.flatten()
.collect::<Vec<_>>();
.collect::<Result<Vec<_>>>()?;

Ok(ApiPagedResponse::of(prices, query.size, |price| {
price.sort.to_string()
Expand Down
6 changes: 6 additions & 0 deletions lib/ain-ocean/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,12 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Invalid price ticker sort key: {}", item))]
InvalidPriceTickerSortKey {
item: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Invalid amount format: {}", item))]
InvalidAmount {
item: String,
Expand Down
24 changes: 16 additions & 8 deletions lib/ain-ocean/src/indexer/oracle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -359,11 +359,8 @@ fn index_set_oracle_data(
context: &Context,
pairs: &HashSet<(Token, Currency)>,
) -> Result<()> {
let oracle_repo = &services.oracle_price_aggregated;

for pair in pairs {
let price_aggregated = map_price_aggregated(services, context, pair)?;

let Some(price_aggregated) = price_aggregated else {
continue;
};
Expand All @@ -377,15 +374,26 @@ fn index_set_oracle_data(
price_aggregated.block.median_time.to_be_bytes(),
price_aggregated.block.height.to_be_bytes(),
);
oracle_repo.by_id.put(&id, &price_aggregated)?;

services.oracle_price_aggregated.by_id.put(&id, &price_aggregated)?;
let price_repo = &services.price_ticker;
let id = (
price_aggregated.aggregated.oracles.total.to_be_bytes(),
price_aggregated.block.height.to_be_bytes(),
token,
currency,
token.clone(),
currency.clone(),
);
services.price_ticker.by_id.put(
let prev_price = price_repo
.by_id
.list(Some(id.clone()), SortOrder::Descending)?
.find(|item| match item {
Ok(((_, _, t, c), _)) => t == &token && c == &currency,
_ => true
})
.transpose()?;
if let Some((k, _)) = prev_price {
price_repo.by_id.delete(&k)?
}
price_repo.by_id.put(
&id,
&PriceTicker {
price: price_aggregated,
Expand Down

0 comments on commit ccae506

Please sign in to comment.