From 743ec47f834ee34e311f484269cd9b92b165ffaf Mon Sep 17 00:00:00 2001 From: Deep Bhatt Date: Fri, 13 Sep 2024 20:09:34 -0400 Subject: [PATCH] Refactor OgmiosKupoBackend to improve UTXO retrieval --- example.py | 75 +++-- ruff.toml | 2 +- .../backend/ogmios_kupo/__init__.py | 27 +- src/charli3_dendrite/dexs/amm/vyfi.py | 288 +++++++++++------- 4 files changed, 255 insertions(+), 137 deletions(-) diff --git a/example.py b/example.py index 2a3cc17..b3d3271 100644 --- a/example.py +++ b/example.py @@ -8,7 +8,14 @@ from typing import Any from typing import Optional +from charli3_dendrite import MinswapCPPState # type: ignore +from charli3_dendrite import MinswapV2CPPState # type: ignore +from charli3_dendrite import MuesliSwapCPPState # type: ignore +from charli3_dendrite import SpectrumCPPState # type: ignore from charli3_dendrite import SundaeSwapCPPState # type: ignore +from charli3_dendrite import SundaeSwapV3CPPState # type: ignore +from charli3_dendrite import VyFiCPPState # type: ignore +from charli3_dendrite import WingRidersCPPState # type: ignore from charli3_dendrite.backend import AbstractBackend # type: ignore from charli3_dendrite.backend import DbsyncBackend # type: ignore from charli3_dendrite.backend import get_backend @@ -29,7 +36,13 @@ DEXS: list[type[AbstractPoolState]] = [ SundaeSwapCPPState, - # MinswapV2CPPState, + MinswapV2CPPState, + MinswapCPPState, + WingRidersCPPState, + VyFiCPPState, + SpectrumCPPState, + MuesliSwapCPPState, + SundaeSwapV3CPPState, # Add other DEX states here ] @@ -44,32 +57,56 @@ def save_to_file(data: dict[str, Any], filename: str = "blockchain_data.json") - logger.error("Error saving data to file: %s", e) -def test_get_pool_utxos( +def test_get_pool_utxos( # noqa: PLR0912 backend: AbstractBackend, dex: type[AbstractPoolState], ) -> dict[str, Any]: - """Test get_pool_utxos function.""" + """Test get_pool_utxos function for various DEX implementations.""" logger.info("Testing get_pool_utxos for %s...", dex.__name__) - selector = dex.pool_selector() - selector_dict = selector.model_dump() - - existing_assets = selector_dict.pop("assets", []) - if existing_assets is None: - existing_assets = [] - elif not isinstance(existing_assets, list): - existing_assets = [existing_assets] specific_asset = ( "8e51398904a5d3fc129fbf4f1589701de23c7824d5c90fdb9490e15a434841524c4933" ) - assets = [*existing_assets, specific_asset] - result = backend.get_pool_utxos( - limit=10000, - historical=False, - assets=assets, - **selector_dict, - ) + # Check if the DEX supports asset-based pool selection + if ( + hasattr(dex, "pool_selector") + and "assets" in dex.pool_selector.__code__.co_varnames + ): + try: + selector = dex.pool_selector(assets=[specific_asset]) + logger.info("Using asset-based pool selection for %s", dex.__name__) + except (AttributeError, TypeError, ValueError) as e: + logger.error("Error in asset-based pool_selector: %s", str(e)) + return {} + else: + # Fallback to standard pool selection + selector = dex.pool_selector() + logger.info("Using standard pool selection for %s", dex.__name__) + + selector_dict = selector.model_dump() + + # Handle assets for get_pool_utxos + assets = selector_dict.pop("assets", []) + if assets is None: + assets = [] + elif not isinstance(assets, list): + assets = [assets] + + # Add the specific asset if it's not already included + if specific_asset not in assets: + assets.append(specific_asset) + + try: + result = backend.get_pool_utxos( + limit=10000, + historical=False, + assets=assets, + **selector_dict, + ) + except (ConnectionError, TimeoutError, ValueError) as e: + logger.error("Error in get_pool_utxos: %s", str(e)) + return {} pool_data = {} for pool in result: @@ -82,6 +119,8 @@ def test_get_pool_utxos( } except (NoAssetsError, InvalidLPError, InvalidPoolError) as e: logger.warning("Invalid pool data found: %s", e) + except (TypeError, ValueError, KeyError) as e: + logger.error("Unexpected error processing pool data: %s", str(e)) logger.info("Found %d pools for %s", len(pool_data), dex.__name__) return pool_data diff --git a/ruff.toml b/ruff.toml index 8281c0c..8610a9c 100644 --- a/ruff.toml +++ b/ruff.toml @@ -29,7 +29,7 @@ select = [ "NPY", # NumPy-specific rules "RUF", # Ruff-specific rules ] -ignore = ["ANN101", "ANN102", "UP006", "ARG002", "PLR0913"] +ignore = ["ANN101", "ANN102", "UP006", "UP007", "ARG002", "PLR0913"] unfixable = ["B"] # Avoid trying to fix flake8-bugbear violations. target-version = "py39" # Assume Python 3.9. extend-exclude = ["tests", "examples"] diff --git a/src/charli3_dendrite/backend/ogmios_kupo/__init__.py b/src/charli3_dendrite/backend/ogmios_kupo/__init__.py index 2f49426..2459b7e 100644 --- a/src/charli3_dendrite/backend/ogmios_kupo/__init__.py +++ b/src/charli3_dendrite/backend/ogmios_kupo/__init__.py @@ -122,8 +122,8 @@ def get_pool_utxos( """Get pool UTXOs based on assets and addresses. Args: + addresses (list[str]): List of addresses to query. assets (Optional[list[str]]): List of asset IDs to filter by. - addresses (Optional[list[str]]): List of addresses to query. limit (int): Maximum number of UTXOs to return. page (int): Page number for pagination. historical (bool): Whether to include historical data. @@ -132,7 +132,7 @@ def get_pool_utxos( PoolStateList: List of pool states. """ pool_states = [] - if addresses is None: + if not addresses: return PoolStateList(root=[]) for address in addresses: @@ -140,15 +140,20 @@ def get_pool_utxos( "limit": limit, "offset": page * limit, } + payment_cred = self.get_payment_credential(address) if assets: - params["policy_id"] = assets[0][:POLICY_ID_LENGTH] + last_asset = assets[-1] + params["policy_id"] = last_asset[:POLICY_ID_LENGTH] params["asset_name"] = ( - assets[0][POLICY_ID_LENGTH:] - if len(assets[0]) > POLICY_ID_LENGTH + last_asset[POLICY_ID_LENGTH:] + if len(last_asset) > POLICY_ID_LENGTH else None ) - matches = self._kupo_request(f"matches/{address}?unspent", params=params) + matches = self._kupo_request( + f"matches/{payment_cred}/*?unspent", + params=params, + ) if isinstance(matches.root, list): for match in matches.root: pool_state = self._pool_state_from_kupo(match) @@ -181,15 +186,17 @@ def get_pool_in_tx( "transaction_id": tx_hash, "order": "most_recent_first", } + payment_cred = self.get_payment_credential(address) if assets: - params["policy_id"] = assets[0][:POLICY_ID_LENGTH] + last_asset = assets[-1] + params["policy_id"] = last_asset[:POLICY_ID_LENGTH] params["asset_name"] = ( - assets[0][POLICY_ID_LENGTH:] - if len(assets[0]) > POLICY_ID_LENGTH + last_asset[POLICY_ID_LENGTH:] + if len(last_asset) > POLICY_ID_LENGTH else None ) - matches = self._kupo_request(f"matches/{address}", params=params) + matches = self._kupo_request(f"matches/{payment_cred}/*", params=params) if isinstance(matches.root, list): pool_states = [] if matches.root: diff --git a/src/charli3_dendrite/dexs/amm/vyfi.py b/src/charli3_dendrite/dexs/amm/vyfi.py index 51df6c1..0f2ec4c 100644 --- a/src/charli3_dendrite/dexs/amm/vyfi.py +++ b/src/charli3_dendrite/dexs/amm/vyfi.py @@ -1,6 +1,10 @@ """VyFi DEX Module.""" + +from __future__ import annotations + import json import time +from collections import defaultdict from dataclasses import dataclass from typing import Any from typing import ClassVar @@ -23,31 +27,24 @@ from charli3_dendrite.dexs.core.errors import NotAPoolError from charli3_dendrite.utility import Assets +POOL_REFRESH_INTERVAL = 3600 +ADDRESS_HASH_LENGTH = 28 +POLICY_ID_LENGTH = 56 + @dataclass class VyFiPoolDatum(PoolDatum): - """TODO: Figure out what each of these numbers mean.""" + """VyFi pool datum.""" token_a_fees: int token_b_fees: int lp_tokens: int - def pool_pair(self) -> Assets | None: + def pool_pair(self) -> Optional[Assets]: + """Return the pool pair assets.""" return None -# @dataclass -# class DepositPair(PlutusData): -# CONSTR_ID = 0 -# min_amount_a: int -# min_amount_b: int - -# @dataclass -# class Deposit(PlutusData): -# CONSTR_ID = 1 -# assets: DepositPair - - @dataclass class Deposit(PlutusData): """Deposit assets into the pool.""" @@ -125,11 +122,11 @@ def create_datum( address_source: Address, in_assets: Assets, out_assets: Assets, - batcher_fee: Assets, - deposit: Assets, - address_target: Address | None = None, - datum_target: PlutusData | None = None, - ): + batcher_fee: Assets, # noqa: ARG003 + deposit: Assets, # noqa: ARG003 + address_target: Optional[Address] = None, # noqa: ARG003 + datum_target: Optional[PlutusData] = None, # noqa: ARG003 + ) -> VyFiOrderDatum: """Create a new order datum.""" address_hash = ( address_source.payment_part.to_primitive() @@ -145,77 +142,92 @@ def create_datum( return cls(address=address_hash, order=order) def address_source(self) -> Address: - payment_part = VerificationKeyHash.from_primitive(self.address[:28]) - if len(self.address) == 28: - staking_part = None - else: - staking_part = VerificationKeyHash.from_primitive(self.address[28:56]) + """Get the source address.""" + payment_part = VerificationKeyHash.from_primitive( + self.address[:ADDRESS_HASH_LENGTH], + ) + staking_part = ( + VerificationKeyHash.from_primitive(self.address[ADDRESS_HASH_LENGTH:]) + if len(self.address) > ADDRESS_HASH_LENGTH + else None + ) return Address(payment_part=payment_part, staking_part=staking_part) def requested_amount(self) -> Assets: + """Get the requested amount.""" if isinstance(self.order, BtoA): return Assets({"asset_a": self.order.min_receive}) - elif isinstance(self.order, AtoB): + if isinstance(self.order, AtoB): return Assets({"asset_b": self.order.min_receive}) - elif isinstance(self.order, (ZapInA, ZapInB, Deposit)): + if isinstance(self.order, (ZapInA, ZapInB, Deposit)): return Assets({"lp": self.order.min_lp_receive}) - elif isinstance(self.order, Withdraw): + if isinstance(self.order, Withdraw): return Assets( { "asset_a": self.order.min_lp_receive.min_amount_a, "asset_b": self.order.min_lp_receive.min_amount_b, }, ) + return Assets() - def order_type(self) -> OrderType | None: - order_type = None + def order_type(self) -> Optional[OrderType]: + """Get the order type.""" if isinstance(self.order, (BtoA, AtoB, ZapInA, ZapInB)): - order_type = OrderType.swap - elif isinstance(self.order, Deposit): - order_type = OrderType.deposit - elif isinstance(self.order, Withdraw): - order_type = OrderType.withdraw - - return order_type + return OrderType.swap + if isinstance(self.order, Deposit): + return OrderType.deposit + if isinstance(self.order, Withdraw): + return OrderType.withdraw + return None class VyFiTokenDefinition(BaseModel): """VyFi token definition.""" - tokenName: str - currencySymbol: str + token_name: str = Field(alias="tokenName") + currency_symbol: str = Field(alias="currencySymbol") class VyFiFees(BaseModel): """VyFi fees.""" - barFee: int - processFee: int - liqFee: int + bar_fee: int = Field(alias="barFee") + process_fee: int = Field(alias="processFee") + liq_fee: int = Field(alias="liqFee") class VyFiPoolTokens(BaseModel): """VyFi pool tokens.""" - aAsset: VyFiTokenDefinition - bAsset: VyFiTokenDefinition - mainNFT: VyFiTokenDefinition - operatorToken: VyFiTokenDefinition - lpTokenName: dict[str, str] - feesSettings: VyFiFees - stakeKey: Optional[str] + a_asset: VyFiTokenDefinition = Field(alias="aAsset") + b_asset: VyFiTokenDefinition = Field(alias="bAsset") + main_nft: VyFiTokenDefinition = Field(alias="mainNFT") + operator_token: VyFiTokenDefinition = Field(alias="operatorToken") + lp_token_name: dict[str, str] = Field(alias="lpTokenName") + fees_settings: VyFiFees = Field(alias="feesSettings") + stake_key: Optional[str] = Field(alias="stakeKey") class VyFiPoolDefinition(BaseModel): """VyFi pool definition.""" - unitsPair: str - poolValidatorUtxoAddress: str - lpPolicyId_assetId: str = Field(alias="lpPolicyId-assetId") + units_pair: str = Field(alias="unitsPair") + pool_validator_utxo_address: str = Field(alias="poolValidatorUtxoAddress") + lp_policy_id_asset_id: str = Field(alias="lpPolicyId-assetId") json_: VyFiPoolTokens = Field(alias="json") pair: str - isLive: bool - orderValidatorUtxoAddress: str + is_live: bool = Field(alias="isLive") + order_validator_utxo_address: str = Field(alias="orderValidatorUtxoAddress") + + def __hash__(self) -> int: + """Make VyFiPoolDefinition hashable.""" + return hash( + ( + self.units_pair, + self.pool_validator_utxo_address, + self.order_validator_utxo_address, + ), + ) class VyFiCPPState(AbstractConstantProductPoolState): @@ -223,117 +235,177 @@ class VyFiCPPState(AbstractConstantProductPoolState): _batcher = Assets(lovelace=1900000) _deposit = Assets(lovelace=2000000) - _pools: ClassVar[dict[str, VyFiPoolDefinition] | None] = None - _pools_refresh: ClassVar[float] = time.time() + _pools: ClassVar[Optional[dict[str, VyFiPoolDefinition]]] = None + _pools_refresh: ClassVar[float] = 0.0 lp_fee: int = 0 bar_fee: int = 0 @classmethod def dex(cls) -> str: + """Get the DEX name.""" return "VyFi" @classmethod - @property def pools(cls) -> dict[str, VyFiPoolDefinition]: """Get the pools.""" - if cls._pools is None or (time.time() - cls._pools_refresh) > 3600: + if ( + cls._pools is None + or (time.time() - cls._pools_refresh) > POOL_REFRESH_INTERVAL + ): + cls._refresh_pools() + return cls._pools or {} + + @classmethod + def order_selector(cls) -> list[str]: + """Get order selector addresses.""" + return [p.order_validator_utxo_address for p in cls.pools().values()] + + @classmethod + def pool_selector(cls, assets: Optional[list[str]] = None) -> PoolSelector: + """Get a PoolSelector for VyFi pools, optionally filtered by assets.""" + asset_to_pool = cls._create_asset_to_pool_mapping() + relevant_pools = cls._filter_relevant_pools(asset_to_pool, assets) + addresses = [pool.pool_validator_utxo_address for pool in relevant_pools] + return PoolSelector(addresses=addresses) + + @classmethod + def _create_asset_to_pool_mapping( + cls, + ) -> defaultdict[str, list[VyFiPoolDefinition]]: + """Create a mapping of assets to pools.""" + asset_to_pool: defaultdict[str, list[VyFiPoolDefinition]] = defaultdict(list) + for pool in cls.pools().values(): + asset_a = cls._encode_asset( + pool.json_.a_asset.currency_symbol, + pool.json_.a_asset.token_name, + ) + asset_b = cls._encode_asset( + pool.json_.b_asset.currency_symbol, + pool.json_.b_asset.token_name, + ) + asset_to_pool[asset_a].append(pool) + asset_to_pool[asset_b].append(pool) + return asset_to_pool + + @classmethod + def _filter_relevant_pools( + cls, + asset_to_pool: defaultdict[str, list[VyFiPoolDefinition]], + assets: Optional[list[str]], + ) -> set[VyFiPoolDefinition]: + """Filter relevant pools based on assets.""" + if assets: + relevant_pools = set() + for asset in assets: + relevant_pools.update(asset_to_pool.get(asset, [])) + else: + relevant_pools = set(cls.pools().values()) + return relevant_pools + + @staticmethod + def _encode_asset(policy_id: str, asset_name: str) -> str: + """Encode an asset by combining policy ID and hex-encoded asset name.""" + encoded_name = asset_name.encode("utf-8").hex() + return policy_id + encoded_name + + @staticmethod + def _decode_asset(encoded_asset: str) -> tuple[str, str]: + """Decode an encoded asset into policy ID and asset name.""" + policy_id = encoded_asset[:POLICY_ID_LENGTH] + asset_name = bytes.fromhex(encoded_asset[POLICY_ID_LENGTH:]).decode("utf-8") + return policy_id, asset_name + + @staticmethod + def _split_asset(asset: str) -> tuple[str, str]: + """Split an asset string into policy ID and asset name.""" + if len(asset) == POLICY_ID_LENGTH: # Only policy ID + return asset, "" + return asset[:POLICY_ID_LENGTH], asset[POLICY_ID_LENGTH:] + + @classmethod + def _refresh_pools(cls) -> None: + """Refresh the pools data from the API.""" + try: + response = requests.get( + "https://api.vyfi.io/lp?networkId=1&v2=true", + timeout=10, + ) + response.raise_for_status() cls._pools = {} - for p in requests.get("https://api.vyfi.io/lp?networkId=1&v2=true").json(): + for p in response.json(): p["json"] = json.loads(p["json"]) cls._pools[ p["json"]["mainNFT"]["currencySymbol"] ] = VyFiPoolDefinition.model_validate(p) cls._pools_refresh = time.time() - - return cls._pools - - @classmethod - def order_selector(cls) -> list[str]: - return [p.orderValidatorUtxoAddress for p in cls.pools.values()] - - @classmethod - def pool_selector(cls) -> PoolSelector: - return PoolSelector( - addresses=[pool.poolValidatorUtxoAddress for pool in cls.pools.values()], - ) + except requests.RequestException as e: + # Log the error or handle it as appropriate for your application + print(f"Error refreshing pools: {e}") # noqa: T201 @property def swap_forward(self) -> bool: + """Check if swap is forward.""" return False @property def stake_address(self) -> Address: + """Get the stake address.""" return Address.from_primitive( - VyFiCPPState.pools[self.pool_id].orderValidatorUtxoAddress, + VyFiCPPState.pools()[self.pool_id].order_validator_utxo_address, ) @classmethod - def order_datum_class(self) -> type[VyFiOrderDatum]: + def order_datum_class(cls) -> type[VyFiOrderDatum]: + """Get the order datum class.""" return VyFiOrderDatum @classmethod def pool_datum_class(cls) -> type[VyFiPoolDatum]: + """Get the pool datum class.""" return VyFiPoolDatum @property def pool_id(self) -> str: - """A unique identifier for the pool.""" + """Get a unique identifier for the pool.""" return self.pool_nft.unit() @property def volume_fee(self) -> int: + """Get the volume fee.""" return self.lp_fee + self.bar_fee @classmethod def extract_pool_nft(cls, values: dict[str, Any]) -> Optional[Assets]: - """Extract the dex nft from the UTXO. - - Some DEXs put a DEX nft into the pool UTXO. - - This function checks to see if the DEX nft is in the UTXO if the DEX policy is - defined. - - If the dex nft is in the values, this value is skipped because it is assumed - that this utxo has already been parsed. - - Args: - values: The pool UTXO inputs. - - Returns: - Assets: None or the dex nft. - """ + """Extract the dex nft from the UTXO.""" assets = values["assets"] - # If the dex nft is in the values, it's been parsed already if "pool_nft" in values: - assert any([p in cls.pools for p in values["pool_nft"]]) - if isinstance(values["pool_nft"], dict): - pool_nft = Assets(root=values["pool_nft"]) - else: - pool_nft = values["pool_nft"] - - # Check for the dex nft + pool_nft = ( + Assets(root=values["pool_nft"]) + if isinstance(values["pool_nft"], dict) + else values["pool_nft"] + ) + if not any(p in cls.pools() for p in values["pool_nft"]): + raise ValueError("Invalid pool NFT") else: - nfts = [asset for asset, quantity in assets.items() if asset in cls.pools] + nfts = [asset for asset, quantity in assets.items() if asset in cls.pools()] if len(nfts) < 1: if len(assets) == 0: - raise NoAssetsError( - f"{cls.__name__}: No assets supplied.", - ) - else: - raise NotAPoolError( - f"{cls.__name__}: Pool must have one DEX NFT token.", - ) + raise NoAssetsError(f"{cls.__name__}: No assets supplied.") + raise NotAPoolError( + f"{cls.__name__}: Pool must have one DEX NFT token.", + ) pool_nft = Assets(**{nfts[0]: assets.root.pop(nfts[0])}) values["pool_nft"] = pool_nft - values["lp_fee"] = cls.pools[pool_nft.unit()].json_.feesSettings.liqFee - values["bar_fee"] = cls.pools[pool_nft.unit()].json_.feesSettings.barFee + values["lp_fee"] = cls.pools()[pool_nft.unit()].json_.fees_settings.liq_fee + values["bar_fee"] = cls.pools()[pool_nft.unit()].json_.fees_settings.bar_fee return pool_nft @classmethod - def post_init(cls, values): + def post_init(cls, values: dict[str, Any]) -> None: + """Post-initialization processing.""" super().post_init(values) assets = values["assets"]