Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve Ogmios-Kupo Backend Queries and VyFi DEX Integration #102

Open
wants to merge 2 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 57 additions & 18 deletions example.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,14 @@
from typing import Any
from typing import Optional

from charli3_dendrite import MinswapCPPState # type: ignore
from charli3_dendrite import MinswapV2CPPState # type: ignore
from charli3_dendrite import MuesliSwapCPPState # type: ignore
from charli3_dendrite import SpectrumCPPState # type: ignore
from charli3_dendrite import SundaeSwapCPPState # type: ignore
from charli3_dendrite import SundaeSwapV3CPPState # type: ignore
from charli3_dendrite import VyFiCPPState # type: ignore
from charli3_dendrite import WingRidersCPPState # type: ignore
from charli3_dendrite.backend import AbstractBackend # type: ignore
from charli3_dendrite.backend import DbsyncBackend # type: ignore
from charli3_dendrite.backend import get_backend
Expand All @@ -29,7 +36,13 @@

DEXS: list[type[AbstractPoolState]] = [
SundaeSwapCPPState,
# MinswapV2CPPState,
MinswapV2CPPState,
MinswapCPPState,
WingRidersCPPState,
VyFiCPPState,
SpectrumCPPState,
MuesliSwapCPPState,
SundaeSwapV3CPPState,
# Add other DEX states here
]

Expand All @@ -44,32 +57,56 @@ def save_to_file(data: dict[str, Any], filename: str = "blockchain_data.json") -
logger.error("Error saving data to file: %s", e)


def test_get_pool_utxos(
def test_get_pool_utxos( # noqa: PLR0912
backend: AbstractBackend,
dex: type[AbstractPoolState],
) -> dict[str, Any]:
"""Test get_pool_utxos function."""
"""Test get_pool_utxos function for various DEX implementations."""
logger.info("Testing get_pool_utxos for %s...", dex.__name__)
selector = dex.pool_selector()
selector_dict = selector.model_dump()

existing_assets = selector_dict.pop("assets", [])
if existing_assets is None:
existing_assets = []
elif not isinstance(existing_assets, list):
existing_assets = [existing_assets]

specific_asset = (
"8e51398904a5d3fc129fbf4f1589701de23c7824d5c90fdb9490e15a434841524c4933"
)
assets = [*existing_assets, specific_asset]

result = backend.get_pool_utxos(
limit=10000,
historical=False,
assets=assets,
**selector_dict,
)
# Check if the DEX supports asset-based pool selection
if (
hasattr(dex, "pool_selector")
and "assets" in dex.pool_selector.__code__.co_varnames
):
try:
selector = dex.pool_selector(assets=[specific_asset])
logger.info("Using asset-based pool selection for %s", dex.__name__)
except (AttributeError, TypeError, ValueError) as e:
logger.error("Error in asset-based pool_selector: %s", str(e))
return {}
else:
# Fallback to standard pool selection
selector = dex.pool_selector()
logger.info("Using standard pool selection for %s", dex.__name__)

selector_dict = selector.model_dump()

# Handle assets for get_pool_utxos
assets = selector_dict.pop("assets", [])
if assets is None:
assets = []
elif not isinstance(assets, list):
assets = [assets]

# Add the specific asset if it's not already included
if specific_asset not in assets:
assets.append(specific_asset)

try:
result = backend.get_pool_utxos(
limit=10000,
historical=False,
assets=assets,
**selector_dict,
)
except (ConnectionError, TimeoutError, ValueError) as e:
logger.error("Error in get_pool_utxos: %s", str(e))
return {}

pool_data = {}
for pool in result:
Expand All @@ -82,6 +119,8 @@ def test_get_pool_utxos(
}
except (NoAssetsError, InvalidLPError, InvalidPoolError) as e:
logger.warning("Invalid pool data found: %s", e)
except (TypeError, ValueError, KeyError) as e:
logger.error("Unexpected error processing pool data: %s", str(e))

logger.info("Found %d pools for %s", len(pool_data), dex.__name__)
return pool_data
Expand Down
2 changes: 1 addition & 1 deletion ruff.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ select = [
"NPY", # NumPy-specific rules
"RUF", # Ruff-specific rules
]
ignore = ["ANN101", "ANN102", "UP006", "ARG002", "PLR0913"]
ignore = ["ANN101", "ANN102", "UP006", "UP007", "ARG002", "PLR0913"]
unfixable = ["B"] # Avoid trying to fix flake8-bugbear violations.
target-version = "py39" # Assume Python 3.9.
extend-exclude = ["tests", "examples"]
Expand Down
27 changes: 17 additions & 10 deletions src/charli3_dendrite/backend/ogmios_kupo/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,8 @@ def get_pool_utxos(
"""Get pool UTXOs based on assets and addresses.

Args:
addresses (list[str]): List of addresses to query.
assets (Optional[list[str]]): List of asset IDs to filter by.
addresses (Optional[list[str]]): List of addresses to query.
limit (int): Maximum number of UTXOs to return.
page (int): Page number for pagination.
historical (bool): Whether to include historical data.
Expand All @@ -132,23 +132,28 @@ def get_pool_utxos(
PoolStateList: List of pool states.
"""
pool_states = []
if addresses is None:
if not addresses:
return PoolStateList(root=[])

for address in addresses:
params: dict[str, Union[int, str, None]] = {
"limit": limit,
"offset": page * limit,
}
payment_cred = self.get_payment_credential(address)
if assets:
params["policy_id"] = assets[0][:POLICY_ID_LENGTH]
last_asset = assets[-1]
params["policy_id"] = last_asset[:POLICY_ID_LENGTH]
params["asset_name"] = (
assets[0][POLICY_ID_LENGTH:]
if len(assets[0]) > POLICY_ID_LENGTH
last_asset[POLICY_ID_LENGTH:]
if len(last_asset) > POLICY_ID_LENGTH
else None
)

matches = self._kupo_request(f"matches/{address}?unspent", params=params)
matches = self._kupo_request(
f"matches/{payment_cred}/*?unspent",
params=params,
)
if isinstance(matches.root, list):
for match in matches.root:
pool_state = self._pool_state_from_kupo(match)
Expand Down Expand Up @@ -181,15 +186,17 @@ def get_pool_in_tx(
"transaction_id": tx_hash,
"order": "most_recent_first",
}
payment_cred = self.get_payment_credential(address)
if assets:
params["policy_id"] = assets[0][:POLICY_ID_LENGTH]
last_asset = assets[-1]
params["policy_id"] = last_asset[:POLICY_ID_LENGTH]
params["asset_name"] = (
assets[0][POLICY_ID_LENGTH:]
if len(assets[0]) > POLICY_ID_LENGTH
last_asset[POLICY_ID_LENGTH:]
if len(last_asset) > POLICY_ID_LENGTH
else None
)

matches = self._kupo_request(f"matches/{address}", params=params)
matches = self._kupo_request(f"matches/{payment_cred}/*", params=params)
if isinstance(matches.root, list):
pool_states = []
if matches.root:
Expand Down
Loading
Loading