Source code for curvesim.pool.cryptoswap.pool

"""
Mainly a module to house the `CurveCryptoPool`, a cryptoswap implementation in Python.
"""
import time
from math import isqrt, prod
from typing import List, Optional, Tuple, Type

from curvesim.exceptions import CalculationError, CryptoPoolError, CurvesimValueError
from curvesim.logging import get_logger
from curvesim.pool.base import Pool
from curvesim.pool.snapshot import CurveCryptoPoolBalanceSnapshot, Snapshot

from .calcs import (
    factory_2_coin,
    geometric_mean,
    get_alpha,
    get_p,
    get_y,
    newton_D,
    tricrypto_ng,
)

logger = get_logger(__name__)

NOISE_FEE = 10**5  # 0.1 bps
EXP_PRECISION = 10**10
PRECISION = 10**18


[docs]class CurveCryptoPool(Pool): # pylint: disable=too-many-instance-attributes """Cryptoswap implementation in Python.""" snapshot_class: Type[Snapshot] = CurveCryptoPoolBalanceSnapshot __slots__ = ( "A", "gamma", "n", "precisions", "mid_fee", "out_fee", "allowed_extra_profit", "fee_gamma", "adjustment_step", "admin_fee", "ma_half_time", "price_scale", "_price_oracle", "last_prices", "last_prices_timestamp", "_block_timestamp", "balances", "D", "virtual_price", "tokens", "xcp_profit", "xcp_profit_a", "not_adjusted", ) # pylint: disable-next=too-many-locals,too-many-arguments,too-many-branches
[docs] def __init__( self, A: int, gamma: int, n: int, precisions: List[int], mid_fee: int, out_fee: int, allowed_extra_profit: int, fee_gamma: int, adjustment_step: int, ma_half_time: int, price_scale: List[int], price_oracle=None, last_prices=None, last_prices_timestamp=None, balances=None, D=None, tokens=None, admin_fee: int = 5 * 10**9, xcp_profit=10**18, xcp_profit_a=10**18, virtual_price=None, ) -> None: """ Parameters ---------- A : int Amplification coefficient; this is :math:`A n^n` in the whitepaper multiplied by 10**4 for greater precision. gamma: int Decay factor for A. n: int Number of coins; currently only n = 2 is supported. precisions: list of int Precision adjustments to convert native token units to 18 decimals; this assumes tokens have at most 18 decimals i.e. balance in native units * precision = balance in D units mid_fee: int Fee with 10**10 precision, for trades near price scale out_fee: int Fee with 10**10 precision, used to adjust `mid_fee` for trades further away from price_scale allowed_extra_profit: int "Buffer" used to determine if the price adjustment algorithm should run. fee_gamma: int Factor used to control the transition from `mid_fee` to `out_fee`. adjustment_step: Minimum step size to adjust the price scale. ma_half_time: int "Half-life" for exponential moving average of trade prices. price_scale: List[Int] Price scale value for the pool. This is where liquidity is concentrated. price_oracle: List[Int], optional Price oracle value for the pool. This is the EMA price used to adjust the price scale toward. Defaults to `price_scale`. last_prices: List[Int], optional Last trade price for the pool. Defaults to `price_scale`. last_prices_timestamp: int, optional Timestamp for last operation altering pool price. Defaults to unix timestamp. balances: list of int, optional Coin balances in native token units; either `balances` or `D` is required D : int, optional Stableswap invariant for given balances, precisions, price_scale, A, and gamma; either `balances` or `D` is required tokens: int, optional LP token supply (default is calculated from `D`, which is also calculated if needed) admin_fee: int, optional Percentage of `fee` with 10**10 precision. Fee paid to the DAO (default = 5*10**9) xcp_profit: int, optional Counter for accumulated profits, no losses (default = 10**18) xcp_profit_a: int, optional Value of `xcp_profit` when admin fees last claimed (default = 10**18) virtual_price: int, optional amount of XCP invariant per LP token; can be used when missing `tokens` value. """ self.A = A self.gamma = gamma self.mid_fee = mid_fee self.out_fee = out_fee self.allowed_extra_profit = allowed_extra_profit self.fee_gamma = fee_gamma self.adjustment_step = adjustment_step self.admin_fee = admin_fee self.price_scale = price_scale.copy() self._price_oracle = price_oracle.copy() if price_oracle else price_scale.copy() self.last_prices = last_prices.copy() if last_prices else price_scale.copy() self.ma_half_time = ma_half_time self._block_timestamp = _get_unix_timestamp() self.last_prices_timestamp = last_prices_timestamp or 0 self.xcp_profit = xcp_profit self.xcp_profit_a = xcp_profit_a # Full profit at last claim of admin fees self.not_adjusted = False if n not in [2, 3]: raise CryptoPoolError( "Only 2 or 3-coin crypto pools are currently supported." ) self.n = n self.precisions = precisions if len(precisions) != n: raise ValueError("`len(precisions)` must equal `n`") if balances is None and D is None: raise ValueError("Must provide at least one of `balances` or `D`.") # All state variables needed for balance conversions or "newton" # calculations should have been set by this point. if balances: self.balances = balances.copy() if D is not None: self.D = D if not balances: self.balances = self._convert_D_to_balances(D) else: # If user passes both `D` and `balances`, it's possible they may # be inconsistent; however we allow this for unanticipated use-cases. logger.debug( "Both `D` and `balances` were passed into `__init__`. " "Inconsistent values may create issues." ) else: xp = self._xp() D = newton_D(A, gamma, xp) self.D = D if tokens and virtual_price: raise CurvesimValueError( "Should not set both `tokens` and `virtual_price`." ) xcp = self._get_xcp(D) if virtual_price: self.virtual_price = virtual_price self.tokens = xcp * 10**18 // virtual_price else: if tokens: self.tokens = tokens else: self.tokens = xcp self.virtual_price = 10**18 * xcp // self.tokens
def _convert_D_to_balances(self, D): price_scale = self.price_scale precisions = self.precisions n = self.n return [D // n // precisions[0]] + [ D * PRECISION // (p * n) // prec for p, prec in zip(price_scale, precisions[1:]) ] def _xp(self) -> List[int]: """ Calculate the balances in units of `D`, converting using `price_scale` so a unit of each token has equal value. Returns -------- List[int] The pool balances in units of `D`. Note ----- This intentionally always return a new copy of the balances. """ balances = self.balances return self._xp_mem(balances) def _xp_mem(self, balances) -> List[int]: """ Parameters ---------- balances: List[int] The pool balances in native token units. Returns -------- List[int] The pool balances in units of `D`. Note ----- This intentionally always returns a new copy of the balances. """ precisions = self.precisions price_scale = self.price_scale return [balances[0] * precisions[0]] + [ balance * precision * price // PRECISION for balance, precision, price in zip( balances[1:], precisions[1:], price_scale ) ] def _get_xcp(self, D: int) -> int: """ Calculate the constant-product profit, using the balances at equilibrium point. """ n_coins: int = self.n price_scale: List[int] = self.price_scale x = [D // n_coins] + [ D * PRECISION // (price * n_coins) for price in price_scale ] return geometric_mean(x) def _increment_timestamp(self, blocks=1, timestamp=None) -> None: """Update the internal clock used to mimic the block timestamp.""" if timestamp: self._block_timestamp = timestamp return self._block_timestamp += 12 * blocks # pylint: disable-next=R0912,R0913,R0914,R0915 def _tweak_price( # noqa: complexity: 12 self, A: int, gamma: int, _xp: List[int], i: int, p_i: Optional[int], new_D: int, K0_prev: int = 0, ) -> None: """ Applies several kinds of updates: - EMA price update: price_oracle - Profit counters: D, virtual_price, xcp_profit - price adjustment: price_scale - If p_i is None, the spot price will be used as the last price Also claims admin fees if appropriate (enough profit and price scale and oracle is close enough). """ price_oracle: List[int] = self._price_oracle last_prices: List[int] = self.last_prices price_scale: List[int] = self.price_scale last_prices_timestamp: int = self.last_prices_timestamp block_timestamp: int = self._block_timestamp n_coins: int = self.n # Update EMA price oracle for a new block. Happens once per block. # EMA uses price of the last trade and oracle price in previous block. if last_prices_timestamp < block_timestamp: ma_half_time: int = self.ma_half_time alpha: int = get_alpha( ma_half_time, block_timestamp, last_prices_timestamp, n_coins ) if n_coins == 2: price_oracle = [ (last_p * (10**18 - alpha) + oracle_p * alpha) // 10**18 for last_p, oracle_p in zip(last_prices, price_oracle) ] elif n_coins == 3: # Cap state price that goes into the EMA with 2*price_scale. price_oracle = [ (min(last_p, 2 * p) * (10**18 - alpha) + oracle_p * alpha) // 10**18 for last_p, oracle_p, p in zip( last_prices, price_oracle, price_scale ) ] else: raise CalculationError("More than 3 coins is not supported.") self._price_oracle = price_oracle self.last_prices_timestamp = block_timestamp D_unadjusted: int = new_D # Withdrawal methods know new D already if new_D == 0: D_unadjusted = newton_D(A, gamma, _xp, K0_prev) if p_i is None: if n_coins == 2: # calculate real prices __xp: List[int] = _xp.copy() dx_price: int = __xp[0] // 10**6 __xp[0] += dx_price last_prices = [ price_scale[k - 1] * dx_price // ( __xp[k] - factory_2_coin.newton_y(A, gamma, __xp, D_unadjusted, k) ) for k in range(1, n_coins) ] else: last_prices = get_p(_xp, D_unadjusted, A, gamma) last_prices = [ last_p * p // 10**18 for last_p, p in zip(last_prices, price_scale) ] elif p_i > 0: # Save the last price if i > 0: last_prices[i - 1] = p_i else: # If 0th price changed - change all prices instead for k in range(n_coins - 1): last_prices[k] = last_prices[k] * 10**18 // p_i else: raise CalculationError(f"p_i (last price) cannot be {p_i}.") self.last_prices = last_prices total_supply: int = self.tokens old_xcp_profit: int = self.xcp_profit old_virtual_price: int = self.virtual_price # Update profit numbers without price adjustment first xp: List[int] = [D_unadjusted // n_coins] + [ D_unadjusted * PRECISION // (n_coins * price) for price in price_scale ] xcp_profit: int = 10**18 virtual_price: int = 10**18 if old_virtual_price > 0: xcp: int = geometric_mean(xp) virtual_price = 10**18 * xcp // total_supply if virtual_price < old_virtual_price: raise CryptoPoolError("Loss") xcp_profit = old_xcp_profit * virtual_price // old_virtual_price self.xcp_profit = xcp_profit if virtual_price * 2 - 10**18 > xcp_profit + 2 * self.allowed_extra_profit: norm: int = 0 ratio: int = 0 for k in range(n_coins - 1): ratio = price_oracle[k] * 10**18 // price_scale[k] ratio = abs(ratio - 10**18) norm += ratio**2 norm = isqrt(norm) adjustment_step: int = max(self.adjustment_step, norm // 5) if norm > adjustment_step: new_prices = [ (p * (norm - adjustment_step) + adjustment_step * p_oracle) // norm for p, p_oracle in zip(price_scale, price_oracle) ] # Calculate balances * prices xp = [_xp[0]] + [ balance * p_new // p for balance, p, p_new in zip(_xp[1:], price_scale, new_prices) ] # Calculate "extended constant product" invariant xCP and virtual price D: int = newton_D(A, gamma, xp) xp = [D // n_coins] + [ D * PRECISION // (n_coins * p_new) for p_new in new_prices ] new_virtual_price = 10**18 * geometric_mean(xp) // total_supply # Proceed if we've got enough profit: # new_virtual_price > 10**18 # new_virtual_price - 10**18 > (xcp_profit - 10**18) / 2 if (new_virtual_price > 10**18) and ( 2 * new_virtual_price - 10**18 > xcp_profit ): self.price_scale = new_prices self.D = D self.virtual_price = new_virtual_price return # If we are here, the price_scale adjustment did not happen # Still need to update the profit counter and D self.D = D_unadjusted self.virtual_price = virtual_price def _claim_admin_fees(self) -> None: """ If the pool's profit has increased since the last fee claim, update profit, pool value, and LP token supply to reflect the admin taking its share of the fees by minting itself LP tokens. Otherwise, change nothing. Tricrypto-NG and Cryptopool implement this functionality differently, so we copy only Tricrypto-NG's way in this class for consistency. """ # no gulping logic needed for the python code A: int = self.A gamma: int = self.gamma xcp_profit: int = self.xcp_profit xcp_profit_a: int = self.xcp_profit_a total_supply: int = self.tokens if xcp_profit <= xcp_profit_a or total_supply < 10**18: return vprice: int = self.virtual_price fees: int = (xcp_profit - xcp_profit_a) * self.admin_fee // (2 * 10**10) if fees > 0: frac: int = vprice * 10**18 // (vprice - fees) - 10**18 d_supply: int = total_supply * frac // 10**18 self.tokens += d_supply xcp_profit -= fees * 2 self.xcp_profit = xcp_profit D: int = newton_D(A, gamma, self._xp()) self.D = D self.virtual_price = 10**18 * self._get_xcp(D) // self.tokens self.xcp_profit_a = xcp_profit
[docs] def get_dy(self, i: int, j: int, dx: int) -> int: """ Calculate the amount received from swapping `dx` amount of the `i`-th coin for the `j`-th coin. Parameters ---------- i: int Index of 'in' coin j: int Index of 'out' coin dx: int Amount of 'in' coin Returns ------- int The 'out' coin amount Note ---- This is a "view" function; it doesn't change the state of the pool. """ assert i != j # dev: same input and output coin assert i < self.n # dev: coin index out of range assert j < self.n # dev: coin index out of range xp: List[int] = self.balances.copy() xp[i] += dx xp = self._xp_mem(xp) A: int = self.A gamma: int = self.gamma D: int = self.D y: int = get_y(A, gamma, xp, D, j)[0] dy: int = xp[j] - y - 1 xp[j] = y precisions: List[int] = self.precisions price_scale: List[int] = self.price_scale if j > 0: dy = dy * PRECISION // (price_scale[j - 1] * precisions[j]) else: dy = dy // precisions[0] dy -= self._fee(xp) * dy // 10**10 return dy
[docs] def get_y(self, i: int, j: int, x: int, xp: List[int]) -> int: r""" Calculate x[j] if one makes x[i] = x. Parameters ---------- i: int index of coin; usually the "in"-token j: int index of coin; usually the "out"-token x: int balance of i-th coin in units of `D` xp: list of int coin balances in units of `D` Returns ------- int The balance of the j-th coin, in units of `D`, for the other coin balances given. Note ---- This is a "view" function; it doesn't change the state of the pool. """ A: int = self.A gamma: int = self.gamma D: int = newton_D(A, gamma, xp) xp = xp.copy() xp[i] = x y, _ = get_y(A, gamma, xp, D, j) return y
def _fee(self, xp: List[int]) -> int: """ f = fee_gamma / (fee_gamma + (1 - K)) where K = prod(x) / (sum(x) / N)**N (all normalized to 1e18) """ n_coins: int = self.n fee_gamma: int = self.fee_gamma if n_coins == 2: f: int = xp[0] + xp[1] f = ( fee_gamma * 10**18 // ( fee_gamma + 10**18 - (10**18 * n_coins**n_coins) * xp[0] // f * xp[1] // f ) ) else: _sum_xp: int = sum(xp) K: int = 10**18 for _x in xp: K = K * n_coins * _x // _sum_xp f = fee_gamma * 10**18 // (fee_gamma + 10**18 - K) return (self.mid_fee * f + self.out_fee * (10**18 - f)) // 10**18 # pylint: disable-next=too-many-locals def _exchange( self, i: int, j: int, dx: int, min_dy: int, ) -> Tuple[int, int]: assert i != j, "Indices must be different" assert i < self.n, "Index out of bounds" assert j < self.n, "Index out of bounds" assert dx > 0, "Can't swap zero amount" A = self.A gamma = self.gamma xp: List[int] = self.balances.copy() ix: int = j y: int = xp[j] xp[i] += dx self.balances[i] = xp[i] xp = self._xp_mem(xp) y_out = get_y(A, gamma, xp, self.D, j) dy: int = xp[j] - y_out[0] assert dy >= 0, f"Invalid dy: dx: {dx}, dy: {dy}, i: {i}, j: {j} " xp[j] -= dy dy -= 1 price_scale: int = self.price_scale[j - 1] prec_i: int = self.precisions[i] prec_j: int = self.precisions[j] if j > 0: dy = dy * PRECISION // (price_scale) dy = dy // prec_j fee = self._fee(xp) * dy // 10**10 dy -= fee assert dy >= min_dy, f"Slippage: dy: {dy}" y -= dy self.balances[j] = y y *= prec_j if j > 0: y = y * price_scale // PRECISION xp[j] = y p: Optional[int] = None K0_prev: int = 0 if self.n == 2: if dx > 10**5 and dy > 10**5: _dx: int = dx * prec_i _dy: int = dy * prec_j if i != 0 and j != 0: p = self.last_prices[i - 1] * _dx // _dy elif i == 0: p = _dx * 10**18 // _dy else: # j == 0 p = _dy * 10**18 // _dx ix = i else: K0_prev = y_out[1] self._tweak_price(A, gamma, xp, ix, p, 0, K0_prev) return dy, fee
[docs] def exchange( self, i: int, j: int, dx: int, min_dy: int = 0, ) -> Tuple[int, int]: """ Swap `dx` amount of the `i`-th coin for the `j`-th coin. Parameters ---------- i: int 'In' coin index j: int 'Out' coin index dx: int 'In' coin amount min_dy: int, optional Minimum 'out' coin amount required (default = 0) Returns ------- (int, int) (amount of coin `j` received, trading fee) Note ----- In the vyper contract, there is an option to exchange using WETH or ETH. """ return self._exchange(i, j, dx, min_dy)
[docs] def exchange_underlying( self, i: int, j: int, dx: int, min_dy: int = 0, ) -> Tuple[int, int]: """ In the vyper contract, this exchanges using ETH instead of WETH. In Curvesim, this is the same as `exchange`. """ return self.exchange(i, j, dx, min_dy)
# pylint: disable-next=too-many-locals, too-many-statements
[docs] def add_liquidity( self, amounts: List[int], min_mint_amount: int = 0, ) -> int: """ Add liquidity into the pool by depositing coins for LP tokens. Parameters ---------- amounts: List[int] Deposit amounts. At least one coin amount must be nonzero. min_mint_amount: int Minimum amount of LP tokens required (default = 0) Returns ------- int Amount of LP tokens minted. """ assert all(x >= 0 for x in amounts) assert sum(amounts) > 0 A = self.A gamma = self.gamma n_coins: int = self.n xp_old: List[int] = self._xp_mem(self.balances) for i in range(n_coins): self.balances[i] += amounts[i] xp: List[int] = self._xp_mem(self.balances) amountsp: List[int] = [xp[i] - xp_old[i] for i in range(n_coins)] old_D: int = self.D D: int = newton_D(A, gamma, xp) d_token: int = 0 token_supply: int = self.tokens if old_D > 0: d_token = token_supply * D // old_D - token_supply else: # sets initial virtual price to 1 d_token = self._get_xcp(D) assert d_token > 0 # dev: nothing minted d_token_fee: int = 0 if old_D > 0: d_token_fee = self._calc_token_fee(amountsp, xp) * d_token // 10**10 + 1 d_token -= d_token_fee token_supply += d_token self.tokens += d_token p: Optional[int] = None ix: int = -1 if n_coins == 2 and d_token > 10**5: nonzero_indices = [i for i, a in enumerate(amounts) if a != 0] if len(nonzero_indices) == 1: # Calculate price for 2 coins: # p_i * (dx_i - dtoken / token_supply * xx_i) # = sum{k!=i}(p_k * (dtoken / token_supply * xx_k - dx_k)) # only ix is nonzero prec: List[int] = self.precisions last_prices: List[int] = self.last_prices balances: List[int] = self.balances ix = amounts.index(0) S: int = 0 for i in range(n_coins): if i == ix: continue if i == 0: S += balances[i] * prec[i] else: S += balances[i] * prec[i] * last_prices[i - 1] // PRECISION S = S * d_token // token_supply p = ( S * PRECISION // ( amounts[ix] * prec[ix] - d_token * balances[ix] * prec[ix] // token_supply ) ) self._tweak_price(A, gamma, xp, ix, p, D) else: self.D = D self.virtual_price = 10**18 self.xcp_profit = 10**18 self.xcp_profit_a = 10**18 self.tokens += d_token assert d_token >= min_mint_amount, "Slippage" self._claim_admin_fees() return d_token
def _calc_token_fee(self, amounts: List[int], xp: List[int]) -> int: n_coins: int = self.n # fee = sum(amounts_i - avg(amounts)) * fee' / sum(amounts) fee: int = self._fee(xp) * n_coins // (4 * (n_coins - 1)) S: int = sum(amounts) avg: int = S // n_coins Sdiff: int = sum(abs(_x - avg) for _x in amounts) return fee * Sdiff // S + NOISE_FEE
[docs] def remove_liquidity( self, _amount: int, min_amounts=None, ) -> List[int]: """ Remove liquidity (burn LP tokens) to receive back part (or all) of the deposited funds. Parameters ---------- _amount: int Amount LP tokens to burn. min_amounts: List[int], optional Minimum required amounts for each coin. Default is 0 each. Returns ------- List[int] The amounts of each coin received. Note ---- "This withdrawal method is very safe, does no complex math" """ n_coins: int = self.n min_amounts = min_amounts or [0] * n_coins amount: int = _amount balances: List[int] = self.balances withdraw_amounts: List[int] = [0] * n_coins self._claim_admin_fees() total_supply: int = self.tokens assert amount <= total_supply self.tokens -= amount if amount != total_supply: amount -= 1 # Make rounding errors favor other LPs a tiny bit for i in range(n_coins): withdraw_amounts[i] = balances[i] * amount // total_supply assert withdraw_amounts[i] >= min_amounts[i] self.balances[i] = balances[i] - withdraw_amounts[i] D: int = self.D self.D = D - D * amount // total_supply return withdraw_amounts
[docs] def remove_liquidity_one_coin( self, token_amount: int, i: int, min_amount: int ) -> int: """ Remove liquidity entirely in one type of coin. Fees will be extracted and there may be significant price impact incurred. Parameters ---------- token_amount: int Amount of LP tokens to burn. i: int Index of the `out` coin. min_amount: int Minimum amount of the 'out' coin required (default = 0) Returns ------- int Amount of the `i`-th coin received. """ A: int = self.A gamma: int = self.gamma dy: int = 0 D: int = 0 p: Optional[int] = None xp: List[int] = [0] * self.n self._claim_admin_fees() calc_price: bool = self.n == 2 dy, p, D, xp = self._calc_withdraw_one_coin( A, gamma, token_amount, i, False, calc_price ) assert dy >= min_amount, "Slippage" self.balances[i] -= dy self.tokens -= token_amount self._tweak_price(A, gamma, xp, i, p, D) return dy
# pylint: disable-next=too-many-locals,too-many-arguments, too-many-branches def _calc_withdraw_one_coin( self, A: int, gamma: int, token_amount: int, i: int, update_D: bool, calc_price: bool, ) -> Tuple[int, Optional[int], int, List[int]]: """ Calculate the output amount from burning `token amount` of LP tokens and receiving entirely in the `i`-th coin. Parameters ---------- A: int Amplification coefficient equal to `A * n**n` from the whitepaper. gamma: int Gamma coefficient from the whitepaper. token_amount: int Amount of LP tokens to burn. i: int Index of the `out` coin. update_D: bool Switch for recomputing `D` during calculation. calc_price: bool Switch for recomputing the price of token `i` after simulating a withdrawal. Only does something if `n` = 2. Returns ------- int Output amount of the `i`-th coin. Optional[int] Price of the `i`-th coin after the withdrawal. int `D` after the withdrawal, accounting for fee. List[int] `xp` after the withdrawal, accounting for fee. Note ---- The 3-coin pool contract doesn't have calc_price even though the 2-coin one does, so the price value returned is `None` when the price calculation doesn't occur. This is a "view" function; it doesn't change the state of the pool. """ token_supply: int = self.tokens assert token_amount <= token_supply assert i < self.n, "Index out of bounds" xx: List[int] = self.balances.copy() precisions: List[int] = self.precisions xp: List[int] = self._xp_mem(xx) if update_D: D0: int = newton_D(A, gamma, xp) else: D0 = self.D D: int = D0 if self.n == 2: fee: int = self._fee(xp) elif self.n == 3: # For n = 3, charge max fee if xp_correction > xp_imprecise[i] # Specifically, if % of xp[i] withdrawn > 1/n = 1/3 # Otherwise, _fee() underflows n_coins: int = self.n xp_imprecise: List[int] = xp.copy() xp_correction: int = xp[i] * n_coins * token_amount // token_supply if xp_correction < xp_imprecise[i]: xp_imprecise[i] -= xp_correction fee = self._fee(xp_imprecise) else: fee = self.out_fee else: raise CalculationError( "_calc_withdraw_one_coin doesn't support more than 3 coins" ) # Charge fee on D, not on y, e.g. reducing invariant LESS than charging user dD: int = token_amount * D // token_supply D_fee: int = fee * dD // (2 * 10**10) + 1 D -= dD - D_fee y: int = get_y(A, gamma, xp, D, i)[0] if i == 0: dy: int = (xp[i] - y) // precisions[i] else: dy = (xp[i] - y) * PRECISION // (precisions[i] * self.price_scale[i - 1]) xp[i] = y # Price calc p: Optional[int] = None if self.n == 2 and calc_price: if dy > 10**5 and token_amount > 10**5: # p_i = dD / D0 * sum'(p_k * x_k) / (dy - dD / D0 * y0) j = (i + 1) % 2 precision: int = precisions[i] S: int = xx[j] * precisions[j] S = S * dD // D0 p = S * PRECISION // (dy * precision - dD * xx[i] * precision // D0) if i == 0: p = (10**18) ** 2 // p return dy, p, D, xp
[docs] def calc_withdraw_one_coin(self, token_amount: int, i: int) -> int: """ Calculate the output amount from burning `token amount` of LP tokens and receiving entirely in the `i`-th coin. Parameters ---------- token_amount: int Amount of LP tokens to burn. i: int Index of the `out` coin. Returns ------- int Output amount of the `i`-th coin. Note ---- This is a "view" function; it doesn't change the state of the pool. """ return self._calc_withdraw_one_coin( self.A, self.gamma, token_amount, i, True, False )[0]
[docs] def lp_price(self) -> int: """ Returns the price of an LP token in units of token 0. Derived from the equilibrium point of a constant-product AMM that approximates Cryptoswap's behavior. Returns ------- int Liquidity redeemable per LP token in units of token 0. Note ---- This is a "view" function; it doesn't change the state of the pool. """ if self.n == 2: virtual_price: int = self.virtual_price price_oracle: List[int] = self.internal_price_oracle() price: int = factory_2_coin.lp_price(virtual_price, price_oracle) elif self.n == 3: # 3-coin vyper contract uses cached packed oracle prices # instead of internal_price_oracle() virtual_price = self.virtual_price price_oracle = self._price_oracle price = tricrypto_ng.lp_price(virtual_price, price_oracle) else: raise CalculationError("LP price calc doesn't support more than 3 coins") return price
[docs] def internal_price_oracle(self) -> List[int]: """ Return the value of the EMA price oracle. """ price_oracle: List[int] = self._price_oracle last_prices_timestamp: int = self.last_prices_timestamp block_timestamp: int = self._block_timestamp if last_prices_timestamp < block_timestamp: if self.n == 2: last_prices: List[int] = self.last_prices elif self.n == 3: # 3-coin vyper contract caps every "last price" that gets fed to the EMA last_prices = self.last_prices.copy() price_scale: List[int] = self.price_scale last_prices = [ min(last_p, 2 * storage_p) for last_p, storage_p in zip(last_prices, price_scale) ] else: raise CalculationError( "Price oracle calc doesn't support more than 3 coins" ) ma_half_time: int = self.ma_half_time n_coins: int = self.n alpha: int = get_alpha( ma_half_time, block_timestamp, last_prices_timestamp, n_coins ) return [ (last_p * (10**18 - alpha) + oracle_p * alpha) // 10**18 for last_p, oracle_p in zip(last_prices, price_oracle) ] return price_oracle
[docs] def price_oracle(self) -> List[int]: """ Return the value of the EMA price oracle. Same as `internal_price_oracle`. Kept for compatability with the vyper interface. """ return self.internal_price_oracle()
[docs] def get_virtual_price(self) -> int: """ Return the virtual price of an LP token. """ return 10**18 * self._get_xcp(self.D) // self.tokens
[docs] def calc_token_amount(self, amounts: List[int]) -> int: """ Calculate the amount of LP tokens minted by depositing given amounts. Parameters ---------- amounts: List[int] Deposit amounts. At least one coin amount must be nonzero. Returns ------- int Amount of LP tokens minted. Note ---- This is a "view" function; it doesn't change the state of the pool. """ token_supply: int = self.tokens A: int = self.A gamma: int = self.gamma xp: List[int] = self._xp() amountsp: List[int] = self._xp_mem(amounts) D0: int = self.D for i, a in enumerate(amountsp): xp[i] += a D: int = newton_D(A, gamma, xp) d_token: int = token_supply * D // D0 - token_supply d_token -= self._calc_token_fee(amountsp, xp) * d_token // 10**10 + 1 return d_token
[docs] def dydxfee(self, i, j): """ Returns the spot price of i-th coin quoted in terms of j-th coin, i.e. the ratio of output coin amount to input coin amount for an "infinitesimally" small trade. Trading fees are deducted. Parameters ---------- i: int Index of coin to be priced; in a swapping context, this is the "in"-token. j: int Index of quote currency; in a swapping context, this is the "out"-token. Returns ------- float Price of i-th coin quoted in j-th coin with fees deducted. Note ---- This is a "view" function; it doesn't change the state of the pool. """ return self.dydx(i, j, use_fee=True)
[docs] def dydx(self, i, j, use_fee=False): # pylint: disable=too-many-locals """ Returns the spot price of i-th coin quoted in terms of j-th coin, i.e. the ratio of output coin amount to input coin amount for an "infinitesimally" small trade. Defaults to no fees deducted. Parameters ---------- i: int Index of coin to be priced; in a swapping context, this is the "in"-token. j: int Index of quote currency; in a swapping context, this is the "out"-token. Returns ------- float Price of i-th coin quoted in j-th coin Note ---- This is a "view" function; it doesn't change the state of the pool. """ xp = self._xp() x_i = xp[i] x_j = xp[j] n = len(xp) D = self.D A = self.A A_multiplier = 10**4 gamma = self.gamma K0 = 10**18 * n**n * prod(xp) / D**n coeff = A * gamma**2 / (10**18 + gamma - K0) ** 2 frac = (10**18 + gamma + K0) * (sum(xp) - D) / (10**18 + gamma - K0) dydx_top = x_j * (A_multiplier * D + coeff * (x_i + frac)) dydx_bottom = x_i * (A_multiplier * D + coeff * (x_j + frac)) dydx = dydx_top / dydx_bottom if j > 0: price_scale = self.price_scale[j - 1] dydx = dydx * 10**18 / price_scale if i > 0: price_scale = self.price_scale[i - 1] dydx = dydx * price_scale / 10**18 if use_fee: fee = self._fee(xp) dydx = dydx - dydx * fee / 10**10 return dydx
def _get_unix_timestamp(): """Get the timestamp in Unix time.""" return int(time.time())