Source code for curvesim.pool.stableswap.pool

"""
Mainly a module to house the `Pool`, a basic stableswap implementation in Python.
"""
from math import prod
from typing import Type

from gmpy2 import mpz

from curvesim.exceptions import CurvesimValueError
from curvesim.pool.snapshot import CurvePoolBalanceSnapshot, Snapshot

from ..base import Pool


[docs]class CurvePool(Pool): # pylint: disable=too-many-instance-attributes """ Basic stableswap implementation in Python. """ snapshot_class: Type[Snapshot] = CurvePoolBalanceSnapshot __slots__ = ( "A", "n", "fee", "rates", "balances", "tokens", "fee_mul", "admin_fee", "r", "n_total", "admin_balances", )
[docs] def __init__( # pylint: disable=too-many-arguments self, A, D, n, rates=None, tokens=None, fee=4 * 10**6, fee_mul=None, admin_fee=5 * 10**9, virtual_price=None, ): """ Parameters ---------- A : int Amplification coefficient; this is :math:`A n^{n-1}` in the whitepaper. D : int or list of int virtual total balance or pool coin balances in native token units n: int number of coins rates: list of int precision and rate adjustments tokens: int LP token supply fee: int, optional fee with 10**10 precision (default = .004%) fee_mul: optional fee multiplier for dynamic fee pools admin_fee: int, optional percentage of `fee` with 10**10 precision (default = 50%) virtual_price: int, optional amount of D invariant per LP token; can be used when missing `tokens` value. """ rates = rates or [10**18] * n self.A = A self.n = n self.fee = fee self.rates = rates if isinstance(D, list): self.balances = D.copy() else: self.balances = self._convert_D_to_balances(D) if tokens and virtual_price: raise CurvesimValueError( "Should not set both `tokens` and `virtual_price`." ) # By now, should have set everything needed for D. D = self.D() if tokens: self.tokens = tokens elif virtual_price: self.tokens = D * 10**18 // virtual_price else: self.tokens = D self.fee_mul = fee_mul self.admin_fee = admin_fee self.r = False self.n_total = n self.admin_balances = [0] * n
def _convert_D_to_balances(self, D): n = self.n rates = self.rates return [D // n * 10**18 // _p for _p in rates] def _xp(self): rates = self.rates balances = self.balances return self._xp_mem(rates, balances) def _xp_mem(self, rates, balances): return [x * p // 10**18 for x, p in zip(balances, rates)]
[docs] def D(self, xp=None): """ `D` is the stableswap invariant; this can be thought of as the value of all coin balances if the pool were to become balanced. Convenience wrapper for `get_D` which uses the set `A` and makes `xp` an optional arg. Parameters ---------- xp: list of ints Coin balances in units of D Returns ------- int The stableswap invariant, `D`. Note ---- This is a "view" function; it doesn't change the state of the pool. """ A = self.A xp = xp or self._xp() return self.get_D(xp, A)
[docs] def get_D(self, xp, A): r""" Calculate D invariant iteratively using non-overflowing integer operations. Stableswap equation: .. math:: A n^n \sum{x_i} + D = A n^n D + D^{n+1} / (n^n \prod{x_i}) Converging solution using Newton's method: .. math:: d_{j+1} = (A n^n \sum{x_i} + n d_j^{n+1} / (n^n \prod{x_i})) / (A n^n + (n+1) d_j^n/(n^n \prod{x_i}) - 1) Replace :math:`A n^n` by `An` and :math:`d_j^{n+1}/(n^n \prod{x_i})` by :math:`D_p` to arrive at the iterative formula in the code. Parameters ---------- xp: list of ints Coin balances in units of D A: int Amplification coefficient Returns ------- int The stableswap invariant, `D`. Note ---- This is a "view" function; it doesn't change the state of the pool. """ # noqa Dprev = 0 S = sum(xp) D = S n = self.n Ann = A * n D = mpz(D) Ann = mpz(Ann) while abs(D - Dprev) > 1: D_P = D for x in xp: D_P = D_P * D // (n * x) Dprev = D D = (Ann * S + D_P * n) * D // ((Ann - 1) * D + (n + 1) * D_P) D = int(D) return D
[docs] def get_D_mem(self, balances, A): """ Convenience wrapper for `get_D` which takes in balances in token units. Naming is based on the vyper equivalent. Parameters ---------- balances: list of ints Coin balances in native token units A: int Amplification coefficient Returns ------- int The stableswap invariant, `D`. Note ---- This is a "view" function; it doesn't change the state of the pool. """ xp = [x * p // 10**18 for x, p in zip(balances, self.rates)] return self.get_D(xp, A)
[docs] def get_y(self, i, j, x, xp): r""" Calculate x[j] if one makes x[i] = x. The stableswap equation gives the following: .. math:: x_1^2 + x_1 (\operatorname{sum'} - (A n^n - 1) D / (A n^n)) = D^{n+1}/(n^{2 n} \operatorname{prod'} A) where :math:`\operatorname{sum'}` is the sum of all :math:`x_i` for :math:`i \\neq j` and :math:`\operatorname{prod'}` is the product of all :math:`x_i` for :math:`i \\neq j`. This is a quadratic equation in :math:`x_j`. .. math:: x_1^2 + b x_1 = c which can then be solved iteratively by Newton's method: .. math:: x_1 := (x_1^2 + c) / (2 x_1 + b) Parameters ---------- i: int index of coin; usually the "in"-token j: int index of coin; usually the "out"-token x: int balance of i-th coin in units of D xp: list of int coin balances in units of D Returns ------- int The balance of the j-th coin, in units of D, for the other coin balances given. Note ---- This is a "view" function; it doesn't change the state of the pool. """ # noqa xx = xp[:] D = self.D(xx) D = mpz(D) xx[i] = x # x is quantity of underlying asset brought to 1e18 precision n = self.n xx = [xx[k] for k in range(n) if k != j] Ann = self.A * n c = D for y in xx: c = c * D // (y * n) c = c * D // (n * Ann) b = sum(xx) + D // Ann - D y_prev = 0 y = D while abs(y - y_prev) > 1: y_prev = y y = (y**2 + c) // (2 * y + b) y = int(y) return y # result is in units for D
[docs] def get_y_D(self, A, i, xp, D): """ Calculate x[i] if one uses a reduced `D` than one calculated for given `xp`. See docstring for `get_y`. Parameters ---------- A: int Amplification coefficient for given xp and D i: int index of coin to calculate balance for xp: list of int coin balances in units of D D: int new invariant value Returns ------- int The balance of the i-th coin, in units of D Note ---- This is a "view" function; it doesn't change the state of the pool. """ D = mpz(D) n = self.n xx = [xp[k] for k in range(n) if k != i] S = sum(xx) Ann = A * n c = D for y in xx: c = c * D // (y * n) c = c * D // (n * Ann) b = S + D // Ann y_prev = 0 y = D while abs(y - y_prev) > 1: y_prev = y y = (y**2 + c) // (2 * y + b - D) y = int(y) return y # result is in units for D
[docs] def exchange(self, i, j, dx): """ Perform an exchange between two coins. Index values can be found via the `coins` public getter method. Parameters ---------- i : int Index of "in" coin. j : int Index of "out" coin. dx : int Amount of coin `i` being exchanged. Returns ------- (int, int) (amount of coin `j` received, trading fee) Examples -------- >>> pool = Pool(A=250, D=1000000 * 10**18, n=2, p=[10**30, 10**30]) >>> pool.exchange(0, 1, 150 * 10**6) (149939820, 59999) """ xp = self._xp() x = xp[i] + dx * self.rates[i] // 10**18 y = self.get_y(i, j, x, xp) dy = xp[j] - y - 1 if self.fee_mul is None: fee = dy * self.fee // 10**10 else: fee = dy * self.dynamic_fee((xp[i] + x) // 2, (xp[j] + y) // 2) // 10**10 admin_fee = fee * self.admin_fee // 10**10 # Convert all to real units rate = self.rates[j] dy = (dy - fee) * 10**18 // rate fee = fee * 10**18 // rate admin_fee = admin_fee * 10**18 // rate assert dy >= 0 self.balances[i] += dx self.balances[j] -= dy + admin_fee self.admin_balances[j] += admin_fee return dy, fee
# pylint: disable-next=too-many-locals
[docs] def calc_withdraw_one_coin(self, token_amount, i, use_fee=True): """ Calculate the amount in the i-th coin received from redeeming the given amount of LP tokens. By default, fees are deducted. Parameters ---------- token_amount: int Amount of LP tokens to redeem i: int Index of coin to withdraw in. use_fee: bool, default=True Deduct fees. Returns ------- int Redemption amount in i-th coin Note ---- This is a "view" function; it doesn't change the state of the pool. """ A = self.A xp = self._xp() D0 = self.D() D1 = D0 - token_amount * D0 // self.tokens new_y = self.get_y_D(A, i, xp, D1) dy_before_fee = (xp[i] - new_y) * 10**18 // self.rates[i] xp_reduced = xp if self.fee and use_fee: n_coins = self.n _fee = self.fee * n_coins // (4 * (n_coins - 1)) for j in range(n_coins): dx_expected = 0 if j == i: dx_expected = xp[j] * D1 // D0 - new_y else: dx_expected = xp[j] - xp[j] * D1 // D0 xp_reduced[j] -= _fee * dx_expected // 10**10 dy = xp[i] - self.get_y_D(A, i, xp_reduced, D1) dy = (dy - 1) * 10**18 // self.rates[i] if use_fee: dy_fee = dy_before_fee - dy return dy, dy_fee return dy
[docs] def add_liquidity(self, amounts): """ Deposit coin amounts for LP token. Parameters ---------- amounts: list of int Coin amounts to deposit Returns ------- int LP token amount received for the deposit amounts. """ mint_amount, fees = self.calc_token_amount(amounts, use_fee=True) self.tokens += mint_amount balances = self.balances afee = self.admin_fee admin_fees = [f * afee // 10**10 for f in fees] new_balances = [ bal + amt - fee for bal, amt, fee in zip(balances, amounts, admin_fees) ] self.balances = new_balances self.admin_balances = [t + a for t, a in zip(self.admin_balances, admin_fees)] return mint_amount
[docs] def remove_liquidity_one_coin(self, token_amount, i): """ Redeem given LP token amount for the i-th coin. Parameters ---------- token_amount: int Amount of LP tokens to redeem i: int Index of coin to withdraw in Returns ------- int Redemption amount in i-th coin """ dy, dy_fee = self.calc_withdraw_one_coin(token_amount, i, use_fee=True) admin_fee = dy_fee * self.admin_fee // 10**10 self.balances[i] -= dy + admin_fee self.admin_balances[i] += admin_fee self.tokens -= token_amount return dy, dy_fee
[docs] def remove_liquidity(self, _amount, min_amounts=None): """ Remove liquidity (burn LP tokens) to receive back part (or all) of the deposited funds. Parameters ---------- _amount: int Amount LP tokens to burn. min_amounts: List[int], optional Minimum required amounts for each coin. Default is 0 each. Note ---- "This withdrawal method is very safe, does no complex math" """ min_amounts = min_amounts or [0] * self.n total_supply = self.tokens self.tokens -= _amount balances = self.balances for i in range(self.n): d_balance = balances[i] * _amount // total_supply assert d_balance >= min_amounts[i] self.balances[i] = balances[i] - d_balance
# pylint: disable-next=too-many-locals
[docs] def remove_liquidity_imbalance(self, amounts, max_burn_amount=None): """ Withdraw an imbalanced amount of tokens from the pool. Accounts for fees. Parameters ---------- amounts: List[int] Amounts of tokens to withdraw (positive ints) max_burn_amount: int, optional Maximum amount of LP tokens to burn Returns ------- burn_amount: int Amount of LP token burned in the withdrawal fees: List[int] Amount of fees paid """ A = self.A old_balances = self.balances D0 = self.get_D_mem(old_balances, A) new_balances = self.balances[:] for i in range(self.n): amount = amounts[i] assert amount >= 0 # must input positive ints new_balances[i] -= amount D1 = self.get_D_mem(new_balances, A) fees = [0] * self.n _fee = self.fee * self.n // (4 * (self.n - 1)) for i in range(self.n): ideal_balance = D1 * old_balances[i] // D0 difference = abs(ideal_balance - new_balances[i]) fees[i] = _fee * difference // 10**10 admin_fee = fees[i] * self.admin_fee // 10**10 self.admin_balances[i] += admin_fee self.balances[i] = new_balances[i] - admin_fee # sans admin fees new_balances[i] -= fees[i] D2 = self.get_D_mem(new_balances, A) burn_amount = self.tokens * (D0 - D2) // D0 + 1 # should be positive if max_burn_amount: assert burn_amount <= max_burn_amount assert burn_amount >= 0 self.tokens -= burn_amount return burn_amount, fees
# pylint: disable-next=too-many-locals
[docs] def calc_token_amount(self, amounts, use_fee=False): """ Calculate the amount of LP tokens received for the given coin deposit amounts. Fee logic is based on add_liquidity, which makes this more accurate than the `calc_token_amount` in the actual contract, which neglects fees. By default, it's assumed you the same behavior as the vyper contract, which is to NOT deduct fees. Parameters ---------- amounts: list of int Coin amounts to be deposited. use_fee: bool, default=False Deduct fees. Returns ------- int LP token amount received for the deposit amounts. Note ---- This is a "view" function; it doesn't change the state of the pool. """ A = self.A old_balances = self.balances D0 = self.get_D_mem(old_balances, A) new_balances = self.balances[:] for i in range(self.n): new_balances[i] += amounts[i] D1 = self.get_D_mem(new_balances, A) mint_balances = new_balances[:] if use_fee: _fee = self.fee * self.n // (4 * (self.n - 1)) fees = [0] * self.n for i in range(self.n): ideal_balance = D1 * old_balances[i] // D0 difference = abs(ideal_balance - new_balances[i]) fees[i] = _fee * difference // 10**10 mint_balances[i] -= fees[i] D2 = self.get_D_mem(mint_balances, A) mint_amount = self.tokens * (D2 - D0) // D0 if use_fee: return mint_amount, fees return mint_amount
[docs] def get_virtual_price(self): """ Returns the expected value of one LP token if the pool were to become perfectly balanced (all coins revert to peg). Returns ------- int Amount of the stableswap invariant, `D`, corresponding to one LP token, in units of `D`. Note ---- This is a "view" function; it doesn't change the state of the pool. """ return self.D() * 10**18 // self.tokens
def dynamic_fee(self, xpi, xpj): xps2 = xpi + xpj xps2 *= xps2 # Doing just ** 2 can overflow apparently return (self.fee_mul * self.fee) // ( (self.fee_mul - 10**10) * 4 * xpi * xpj // xps2 + 10**10 )
[docs] def dydxfee(self, i, j): """ Returns the spot price of i-th coin quoted in terms of j-th coin, i.e. the ratio of output coin amount to input coin amount for an "infinitesimally" small trade. Trading fees are deducted. Parameters ---------- i: int Index of coin to be priced; in a swapping context, this is the "in"-token. j: int Index of quote currency; in a swapping context, this is the "out"-token. Returns ------- float Price of i-th coin quoted in j-th coin with fees deducted. Note ---- This is a "view" function; it doesn't change the state of the pool. """ return self.dydx(i, j, use_fee=True)
[docs] def dydx(self, i, j, use_fee=False): """ Returns the spot price of i-th coin quoted in terms of j-th coin, i.e. the ratio of output coin amount to input coin amount for an "infinitesimally" small trade. Defaults to no fees deducted. Parameters ---------- i: int Index of coin to be priced; in a swapping context, this is the "in"-token. j: int Index of quote currency; in a swapping context, this is the "out"-token. Returns ------- float Price of i-th coin quoted in j-th coin Note ---- This is a "view" function; it doesn't change the state of the pool. """ xp = self._xp() return self._dydx(i, j, xp, use_fee)
def _dydx(self, i, j, xp, use_fee): xi = xp[i] xj = xp[j] n = self.n A = self.A D = self.D(xp) D_pow = mpz(D) ** (n + 1) x_prod = prod(xp) A_pow = A * n ** (n + 1) dydx = (xj * (xi * A_pow * x_prod + D_pow)) / ( xi * (xj * A_pow * x_prod + D_pow) ) if use_fee: if self.fee_mul is None: fee_factor = self.fee / 10**10 else: fee_factor = self.dynamic_fee(xi, xj) / 10**10 else: fee_factor = 0 dydx *= 1 - fee_factor return float(dydx)