Source code for curvesim.pool.stableswap.metapool
"""
Mainly a module to house the `MetaPool`, a metapool stableswap implementation in Python.
"""
from math import prod
from typing import Type
from gmpy2 import mpz
from curvesim.exceptions import CurvesimValueError
from curvesim.pool.snapshot import CurveMetaPoolBalanceSnapshot, Snapshot
from ..base import Pool
[docs]class CurveMetaPool(Pool): # pylint: disable=too-many-instance-attributes
"""
Basic stableswap metapool implementation in Python.
"""
snapshot_class: Type[Snapshot] = CurveMetaPoolBalanceSnapshot
__slots__ = (
"A",
"n",
"max_coin",
"fee",
"admin_fee",
"basepool",
"rate_multiplier",
"balances",
"n_total",
"tokens",
"fee_mul",
"admin_balances",
)
# pylint: disable-next=too-many-arguments,duplicate-code
[docs] def __init__(
self,
A,
D,
n,
basepool,
rate_multiplier=10**18,
tokens=None,
fee=4 * 10**6,
fee_mul=None,
admin_fee=5 * 10**9,
virtual_price=None,
):
"""
Parameters
----------
A : int
Amplification coefficient; this is :math:`A n^{n-1}` in the whitepaper.
D : int or list of int
virtual total balance or coin balances in native token units
n: int
number of coins
basepool: :class:`curvesim.pool.Pool`
basepool for the metapool
rate_multiplier: int, optional
precision and rate adjustment, defaults to 10**18
tokens: int
LP token supply
fee: int, optional
fee with 10**10 precision (default = .004%)
fee_mul: optional
fee multiplier for dynamic fee pools
admin_fee: int, optional
percentage of `fee` with 10**10 precision (default = 50%)
virtual_price: int, optional
amount of D invariant per LP token; can be used when
missing `tokens` value.
"""
self.A = A
self.n = n
self.max_coin = self.n - 1
self.fee = fee
self.admin_fee = admin_fee
# If basepool coins have too few decimals, it can wreak havoc
# on a certain codepath of our `dydx` function, where we use
# a difference quotient to estimate the derivative.
for _p in basepool.rates:
if _p > 10**30:
raise ValueError(f"{_p} too high: decimals must be >= 6.")
self.basepool = basepool
self.rate_multiplier = rate_multiplier
if isinstance(D, list):
self.balances = D
else:
self.balances = self._convert_D_to_balances(D)
if tokens and virtual_price:
raise CurvesimValueError(
"Should not set both `tokens` and `virtual_price`."
)
# By now, should have set everything needed for D.
D = self.D()
if tokens:
self.tokens = tokens
elif virtual_price:
self.tokens = D * 10**18 // virtual_price
else:
self.tokens = D
self.n_total = n + basepool.n - 1
self.fee_mul = fee_mul
self.admin_balances = [0] * n
def _convert_D_to_balances(self, D):
n = self.n
rates = self.rates
return [D // n * 10**18 // _p for _p in rates]
[docs] def D(self, xp=None):
"""
`D` is the stableswap invariant; this can be thought of as the value of
all coin balances if the pool were to become balanced.
Convenience wrapper for `get_D` which uses the set `A` and makes `xp`
an optional arg.
Parameters
----------
xp: list of ints
Coin balances in units of D
Returns
-------
int
The stableswap invariant, `D`.
Note
----
This is a "view" function; it doesn't change the state of the pool.
"""
A = self.A
if not xp:
rates = self.rates
xp = [x * p // 10**18 for x, p in zip(self.balances, rates)]
return self.get_D(xp, A)
[docs] def get_D(self, xp, A):
r"""
Calculate D invariant iteratively using non-overflowing integer operations.
Stableswap equation:
.. math::
A n^n \sum{x_i} + D = A n^n D + D^{n+1} / (n^n \prod{x_i})
Converging solution using Newton's method:
.. math::
d_{j+1} = (A n^n \sum{x_i} + n d_j^{n+1} / (n^n \prod{x_i}))
/ (A n^n + (n+1) d_j^n/(n^n \prod{x_i}) - 1)
Replace :math:`A n^n` by `An` and :math:`d_j^{n+1}/(n^n \prod{x_i})` by
:math:`D_p` to arrive at the iterative formula in the code.
Parameters
----------
xp: list of ints
Coin balances in units of D
A: int
Amplification coefficient
Returns
-------
int
The stableswap invariant, `D`.
Note
----
This is a "view" function; it doesn't change the state of the pool.
""" # noqa
Dprev = 0
S = sum(xp)
D = S
Ann = A * self.n
D = mpz(D)
Ann = mpz(Ann)
while abs(D - Dprev) > 1:
D_P = D
for x in xp:
D_P = D_P * D // (self.n * x)
Dprev = D
D = (Ann * S + D_P * self.n) * D // ((Ann - 1) * D + (self.n + 1) * D_P)
D = int(D)
return D
def _xp(self):
rates = self.rates
balances = self.balances
return self._xp_mem(rates, balances)
def _xp_mem(self, rates, balances):
return [x * p // 10**18 for x, p in zip(balances, rates)]
[docs] def get_D_mem(self, rates, balances, A):
"""
Convenience wrapper for `get_D` which takes in balances in token units.
Naming is based on the vyper equivalent.
Parameters
----------
balances: list of ints
Coin balances in native token units
A: int
Amplification coefficient
Returns
-------
int
The stableswap invariant, `D`.
Note
----
This is a "view" function; it doesn't change the state of the pool.
"""
xp = self._xp_mem(rates, balances)
return self.get_D(xp, A)
[docs] def get_y(self, i, j, x, xp):
r"""
Calculate x[j] if one makes x[i] = x.
The stableswap equation gives the following:
.. math::
x_1^2 + x_1 (\operatorname{sum'} - (A n^n - 1) D / (A n^n))
= D^{n+1}/(n^{2 n} \operatorname{prod'} A)
where :math:`\operatorname{sum'}` is the sum of all :math:`x_i` for
:math:`i \\neq j` and :math:`\operatorname{prod'}` is the product
of all :math:`x_i` for :math:`i \\neq j`.
This is a quadratic equation in :math:`x_j`.
.. math:: x_1^2 + b x_1 = c
which can then be solved iteratively by Newton's method:
.. math:: x_1 := (x_1^2 + c) / (2 x_1 + b)
Parameters
----------
i: int
index of coin; usually the "in"-token
j: int
index of coin; usually the "out"-token
x: int
balance of i-th coin in units of D
xp: list of int
coin balances in units of D
Returns
-------
int
The balance of the j-th coin, in units of D, for the other
coin balances given.
Note
----
This is a "view" function; it doesn't change the state of the pool.
""" # noqa
xx = xp[:]
D = self.D(xx)
D = mpz(D)
xx[i] = x # x is quantity of underlying asset brought to 1e18 precision
xx = [xx[k] for k in range(self.n) if k != j]
Ann = self.A * self.n
c = D
for y in xx:
c = c * D // (y * self.n)
c = c * D // (self.n * Ann)
b = sum(xx) + D // Ann - D
y_prev = 0
y = D
while abs(y - y_prev) > 1:
y_prev = y
y = (y**2 + c) // (2 * y + b)
y = int(y)
return y # result is in units for D
[docs] def get_y_D(self, A, i, xp, D):
"""
Calculate x[i] if one uses a reduced `D` than one calculated for given `xp`.
See docstring for `get_y`.
Parameters
----------
A: int
Amplification coefficient for given xp and D
i: int
index of coin to calculate balance for
xp: list of int
coin balances in units of D
D: int
new invariant value
Returns
-------
int
The balance of the i-th coin, in units of D
Note
----
This is a "view" function; it doesn't change the state of the pool.
"""
D = mpz(D)
xx = [xp[k] for k in range(self.n) if k != i]
S = sum(xx)
Ann = A * self.n
c = D
for y in xx:
c = c * D // (y * self.n)
c = c * D // (self.n * Ann)
b = S + D // Ann
y_prev = 0
y = D
while abs(y - y_prev) > 1:
y_prev = y
y = (y**2 + c) // (2 * y + b - D)
y = int(y)
return y # result is in units for D
[docs] def exchange(self, i, j, dx):
"""
Perform an exchange between two coins.
Index values can be found via the `coins` public getter method.
Parameters
----------
i : int
Index of "in" coin.
j : int
Index of "out" coin.
dx : int
Amount of coin `i` being exchanged.
Returns
-------
(int, int)
(amount of coin `j` received, trading fee)
Examples
--------
>>> pool = MetaPool(A=250, D=1000000 * 10**18, n=2, p=[10**30, 10**30])
>>> pool.exchange(0, 1, 150 * 10**6)
(149939820, 59999)
"""
rates = self.rates
xp = self._xp_mem(rates, self.balances)
x = xp[i] + dx * rates[i] // 10**18
y = self.get_y(i, j, x, xp)
dy = xp[j] - y - 1
if self.fee_mul is None:
fee = dy * self.fee // 10**10
else:
fee = dy * self.dynamic_fee((xp[i] + x) // 2, (xp[j] + y) // 2) // 10**10
admin_fee = fee * self.admin_fee // 10**10
# Convert all to real units
rate = rates[j]
dy = (dy - fee) * 10**18 // rate
fee = fee * 10**18 // rate
admin_fee = admin_fee * 10**18 // rate
assert dy >= 0
self.balances[i] += dx
self.balances[j] -= dy + admin_fee
self.admin_balances[j] += admin_fee
return dy, fee
# pylint: disable-next=too-many-locals
[docs] def exchange_underlying(self, i, j, dx):
"""
Perform an exchange between two coins.
Index values include underlyer indices. The zero index
is the "primary" stable for the metapool, while indices
1, 2, ..., correspond to the basepool indices offset
by one.
Parameters
----------
i : int
Index of "in" coin.
j : int
Index of "out" coin.
dx : int
Amount of coin `i` being exchanged.
Returns
-------
(int, int)
(amount of coin `j` received, trading fee)
"""
rates = self.rates
# Use base_i or base_j if they are >= 0
base_i = i - self.max_coin
base_j = j - self.max_coin
meta_i = self.max_coin
meta_j = self.max_coin
if base_i < 0:
meta_i = i
if base_j < 0:
meta_j = j
if base_i < 0 or base_j < 0: # if i or j not in basepool
xp = [x * p // 10**18 for x, p in zip(self.balances, rates)]
if base_i < 0:
x = xp[i] + dx * rates[i] // 10**18
else:
# i is from BasePool
# At first, get the amount of pool tokens
base_inputs = [0] * self.basepool.n
base_inputs[base_i] = dx
# Deposit and measure delta
dx = self.basepool.add_liquidity(base_inputs)
# Need to convert pool token to "virtual" units using rates
x = dx * rates[self.max_coin] // 10**18
# Adding number of pool tokens
x += xp[self.max_coin]
y = self.get_y(meta_i, meta_j, x, xp)
# Either a real coin or token
dy = xp[meta_j] - y - 1
dy_fee = dy * self.fee // 10**10
# Convert all to real units
# Works for both pool coins and real coins
dy = (dy - dy_fee) * 10**18 // rates[meta_j]
dy_admin_fee = dy_fee * self.admin_fee // 10**10
dy_admin_fee = dy_admin_fee * 10**18 // rates[meta_j]
dy_fee = dy_fee * 10**18 // rates[meta_j]
# Change balances exactly in same way as we change actual ERC20 coin amounts
self.balances[meta_i] += dx
# When rounding errors happen, we undercharge admin fee in favor of LP
self.balances[meta_j] -= dy + dy_admin_fee
self.admin_balances[meta_j] += dy_admin_fee
# Withdraw from the base pool if needed
if base_j >= 0:
dy, dy_fee = self.basepool.remove_liquidity_one_coin(dy, base_j)
else:
# If both are from the base pool
dy, dy_fee = self.basepool.exchange(base_i, base_j, dx)
return dy, dy_fee
# pylint: disable-next=too-many-locals
[docs] def calc_withdraw_one_coin(self, token_amount, i, use_fee=True):
"""
Calculate the amount in the i-th coin received from
redeeming the given amount of LP tokens.
By default, fees are deducted.
Parameters
----------
token_amount: int
Amount of LP tokens to redeem
i: int
Index of coin to withdraw in.
use_fee: bool, default=True
Deduct fees.
Returns
-------
int
Redemption amount in i-th coin
Note
----
This is a "view" function; it doesn't change the state of the pool.
"""
A = self.A
rates = self.rates
xp = self._xp_mem(rates, self.balances)
D0 = self.D()
D1 = D0 - token_amount * D0 // self.tokens
new_y = self.get_y_D(A, i, xp, D1)
dy_before_fee = (xp[i] - new_y) * 10**18 // rates[i]
xp_reduced = xp
if self.fee and use_fee:
n_coins = self.n
_fee = self.fee * n_coins // (4 * (n_coins - 1))
for j in range(n_coins):
dx_expected = 0
if j == i:
dx_expected = xp[j] * D1 // D0 - new_y
else:
dx_expected = xp[j] - xp[j] * D1 // D0
xp_reduced[j] -= _fee * dx_expected // 10**10
dy = xp[i] - self.get_y_D(A, i, xp_reduced, D1)
dy = (dy - 1) * 10**18 // rates[i]
if use_fee:
dy_fee = dy_before_fee - dy
return dy, dy_fee
return dy
[docs] def add_liquidity(self, amounts):
"""
Deposit coin amounts for LP token.
Parameters
----------
amounts: list of int
Coin amounts to deposit
Returns
-------
int
LP token amount received for the deposit amounts.
"""
mint_amount, fees = self.calc_token_amount(amounts, use_fee=True)
self.tokens += mint_amount
balances = self.balances
afee = self.admin_fee
admin_fees = [f * afee // 10**10 for f in fees]
new_balances = [
bal + amt - fee for bal, amt, fee in zip(balances, amounts, admin_fees)
]
self.balances = new_balances
self.admin_balances = [t + a for t, a in zip(self.admin_balances, admin_fees)]
return mint_amount
[docs] def remove_liquidity_one_coin(self, token_amount, i):
"""
Redeem given LP token amount for the i-th coin.
Parameters
----------
token_amount: int
Amount of LP tokens to redeem
i: int
Index of coin to withdraw in
Returns
-------
int
Redemption amount in i-th coin
"""
dy, dy_fee = self.calc_withdraw_one_coin(token_amount, i, use_fee=True)
admin_fee = dy_fee * self.admin_fee // 10**10
self.balances[i] -= dy + admin_fee
self.admin_balances[i] += admin_fee
self.tokens -= token_amount
return dy, dy_fee
@property
def rates(self):
"""Return rates conversion for each top-level token."""
base_virtual_price = self.basepool.get_virtual_price()
return [self.rate_multiplier, base_virtual_price]
# pylint: disable-next=too-many-locals
[docs] def calc_token_amount(self, amounts, use_fee=False):
"""
Calculate the amount of LP tokens received for the given coin
deposit amounts.
Fee logic is based on add_liquidity, which makes this more accurate than
the `calc_token_amount` in the actual contract, which neglects fees.
By default, it's assumed you the same behavior as the vyper contract,
which is to NOT deduct fees.
Parameters
----------
amounts: list of int
Coin amounts to be deposited.
use_fee: bool, default=False
Deduct fees.
Returns
-------
int
LP token amount received for the deposit amounts.
Note
----
This is a "view" function; it doesn't change the state of the pool.
"""
A = self.A
old_balances = self.balances
rates = self.rates
D0 = self.get_D_mem(rates, old_balances, A)
new_balances = self.balances[:]
for i in range(self.n):
new_balances[i] += amounts[i]
D1 = self.get_D_mem(rates, new_balances, A)
mint_balances = new_balances[:]
if use_fee:
_fee = self.fee * self.n // (4 * (self.n - 1))
fees = [0] * self.n
for i in range(self.n):
ideal_balance = D1 * old_balances[i] // D0
difference = abs(ideal_balance - new_balances[i])
fees[i] = _fee * difference // 10**10
mint_balances[i] -= fees[i]
D2 = self.get_D_mem(rates, mint_balances, A)
mint_amount = self.tokens * (D2 - D0) // D0
if use_fee:
return mint_amount, fees
return mint_amount
[docs] def get_virtual_price(self):
"""
Returns the expected value of one LP token if the pool were
to become perfectly balanced (all coins revert to peg).
Returns
-------
int
Amount of the stableswap invariant, `D`, corresponding to
one LP token, in units of `D`.
Note
----
This is a "view" function; it doesn't change the state of the pool.
"""
return self.D() * 10**18 // self.tokens
[docs] def dynamic_fee(self, xpi, xpj):
"""
Return a fee based on the amount of imbalance.
Parameters
----------
xpi: int
i-th coin balance in units of D
xpj: int
j-th coin balance in units of D
Returns
-------
int
The dynamic fee in 10 decimals.
"""
xps2 = xpi + xpj
xps2 *= xps2 # Doing just ** 2 can overflow apparently
return (self.fee_mul * self.fee) // (
(self.fee_mul - 10**10) * 4 * xpi * xpj // xps2 + 10**10
)
[docs] def dydxfee(self, i, j):
"""
Returns the spot price of i-th coin quoted in terms of j-th coin,
i.e. the ratio of output coin amount to input coin amount for
an "infinitesimally" small trade.
The indices are assumed to include base pool underlyer indices.
Trading fees are deducted.
Parameters
----------
i: int
Index of coin to be priced; in a swapping context, this is
the "in"-token.
j: int
Index of quote currency; in a swapping context, this is the
"out"-token.
Returns
-------
float
Price of i-th coin quoted in j-th coin with fees deducted.
Note
----
This is a "view" function; it doesn't change the state of the pool.
"""
return self.dydx(i, j, use_fee=True)
# pylint: disable-next=too-many-locals
[docs] def dydx(self, i, j, use_fee=False):
r"""
Returns the spot price of i-th coin quoted in terms of j-th coin,
i.e. the ratio of output coin amount to input coin amount for
an "infinitesimally" small trade.
The indices are assumed to include base pool underlyer indices.
Parameters
----------
i: int
Index of coin to be priced; in a swapping context, this is
the "in"-token.
j: int
Index of quote currency; in a swapping context, this is the
"out"-token.
use_fee: bool, default=False
Deduct fees.
Returns
-------
float
Price of i-th coin quoted in j-th coin
Note
----
This is a "view" function; it doesn't change the state of the pool.
The following formulae are useful when swapping the primary stablecoin
for one of the basepool underlyers:
| $z$: primary coin virtual balance
| $w$: basepool virtual balance in the metapool
| $x_i$: basepool coin virtual balances
| $D$: basepool invariant
The chain rule gives:
.. math::
\frac{dz}{dx_i} = \frac{dz}{dw} \frac{dw}{dx_i}
= \frac{dz}{dw} \frac{dD}{dx_i} = \frac{dz}{dw} D'
where
.. math::
D' = -1 ( A n^{n+1} \prod{x_k} + D^{n+1} / x_i)
/ ( n^n \prod{x_k} - A n^{n+1} \prod{x_k} - (n + 1) D^n
""" # noqa
base_i = i - self.max_coin
base_j = j - self.max_coin
# both in and out tokens are in basepool
if base_i >= 0 and base_j >= 0:
_dydx = self.basepool.dydx(base_i, base_j, use_fee=use_fee)
return float(_dydx)
rates = self.rates
xp = [mpz(x) * p // 10**18 for x, p in zip(self.balances, rates)]
bp = self.basepool
base_xp = [mpz(x) * p // 10**18 for x, p in zip(bp.balances, bp.rates)]
x_prod = prod(base_xp)
n = bp.n
A = bp.A
D = mpz(bp.D())
D_pow = D ** (n + 1)
A_pow = A * n ** (n + 1)
if base_i < 0: # i is primary
xj = base_xp[base_j]
D_prime = (
-1
* (A_pow * x_prod + D_pow / xj)
/ (n**n * x_prod - A_pow * x_prod - (n + 1) * D**n)
)
D_prime = float(D_prime)
dwdz = self._dydx(0, self.max_coin, xp, use_fee)
_dydx = dwdz / D_prime
if use_fee and bp.fee:
fee = bp.fee - bp.fee * xj // sum(base_xp) + 5 * 10**5
else:
fee = 0
_dydx *= 1 - fee / 10**10
else: # i is from basepool
dx = 10**12
base_inputs = [0] * self.basepool.n
base_inputs[base_i] = dx * 10**18 // self.basepool.rates[base_i]
dw, _ = self.basepool.calc_token_amount(base_inputs, use_fee=True)
# Convert lp token amount to virtual units
dw = dw * rates[self.max_coin] // 10**18
x = xp[self.max_coin] + dw
meta_i = self.max_coin
meta_j = j
y = self.get_y(meta_i, meta_j, x, xp)
dy = xp[meta_j] - y - 1
if use_fee:
dy_fee = dy * self.fee // 10**10
else:
dy_fee = 0
dy -= dy_fee
_dydx = dy / dx
return float(_dydx)
def _dydx(self, i, j, xp, use_fee=False):
"""
Treats indices as applying to the "top-level" pool if a metapool.
Basically this is the "regular" pricing calc with no special metapool
handling.
Returns the spot price of i-th coin quoted in terms of j-th coin,
i.e. the ratio of output coin amount to input coin amount for
an "infinitesimally" small trade.
Defaults to no fees deducted.
Parameters
----------
i: int
Index of coin to be priced; in a swapping context, this is
the "in"-token
j: int
Index of quote currency; in a swapping context, this is the
"out"-token
xp: list of int
"Virtual" coin balances, i.e. balances in units of D
use_fee: bool, default=False
Deduct fees
Returns
-------
float
Price of i-th coin quoted in j-th coin
Note
----
This is a "view" function; it doesn't change the state of the pool.
"""
xi = xp[i]
xj = xp[j]
n = self.n
A = self.A
D = self.D(xp)
D_pow = mpz(D) ** (n + 1)
x_prod = prod(xp)
A_pow = A * n ** (n + 1)
dydx = (xj * (xi * A_pow * x_prod + D_pow)) / (
xi * (xj * A_pow * x_prod + D_pow)
)
if use_fee:
if self.fee_mul is None:
fee_factor = self.fee / 10**10
else:
fee_factor = self.dynamic_fee(xi, xj) / 10**10
else:
fee_factor = 0
dydx *= 1 - fee_factor
return float(dydx)