"""
Network connector for subgraphs
"""
from datetime import datetime, timedelta, timezone
from decimal import Decimal
import pandas as pd
from eth_utils import to_checksum_address
from curvesim.logging import get_logger
from ..exceptions import CurvesimValueError, SubgraphResultError
from ..overrides import override_subgraph_data
from .http import HTTP
from .utils import sync
# pylint: disable=redefined-outer-name
logger = get_logger(__name__)
async def query(url, q):
"""
Core async function to query subgraphs.
Parameters
----------
url : str
URL for the subgraph.
q : str
A GraphQL query.
Returns
-------
str
The returned results.
"""
r = await HTTP.post(url, json={"query": q})
return r
query_sync = sync(query)
# Convex Community subgraphs
CONVEX_COMMUNITY_URL = (
"https://api.thegraph.com/subgraphs/name/convex-community/volume-%s"
)
STAGING_CONVEX_COMMUNITY_URL = (
"https://api.thegraph.com/subgraphs/name/convex-community/volume-%s-staging"
)
def _get_subgraph_url(chain, env="prod"):
if env.lower() == "prod":
url = CONVEX_COMMUNITY_URL % chain
elif env.lower() == "staging":
url = STAGING_CONVEX_COMMUNITY_URL % chain
else:
raise CurvesimValueError("'env' must be 'prod' or 'staging'")
return url
async def convex(chain, q, env):
"""
Async function to query convex community subgraphs
Parameters
----------
chain : str
The chain of interest.
Currently supports:
”mainnet”, “arbitrum”, “optimism”, “fantom”, “avalanche” “matic”, “xdai”
q : str
A GraphQL query.
env: str
Environment name. Supported: "prod", "staging"
Returns
-------
str
The returned results.
"""
url = _get_subgraph_url(chain, env)
r = await query(url, q)
if "data" not in r:
raise SubgraphResultError(
f"No data returned from Convex: chain: {chain}, query: {q}"
)
return r["data"]
async def symbol_address(symbol, chain, env="prod"):
"""
Async function to get a pool's address from it's (LP token) symbol.
Parameters
----------
symbol: str
The pool's (LP token) symbol
.. warning::
An LP token symbol need not be unique. In particular, factory pools
are deployed permissionlessly and no checks are done to ensure unique
LP token symbol. Currently the first pool retrieved from the subgraph
is used, which can be effectively random if token symbols clash.
chain : str
The pool's chain.
Currently supports:
”mainnet”, “arbitrum”, “optimism”, “fantom”, “avalanche” “matic”, “xdai”
Returns
-------
str
Pool address.
"""
# pylint: disable=consider-using-f-string
q = (
"""
{
pools(
where:
{symbol_starts_with_nocase: "%s"}
)
{
symbol
address
}
}
"""
% symbol
)
data = await convex(chain, q, env)
if len(data["pools"]) > 1:
pool_list = "\n\n"
for pool in data["pools"]:
pool_list += f"\"{pool['symbol']}\": {pool['address']}\n"
raise SubgraphResultError(
"Multiple pools returned for symbol query:" + pool_list
)
if len(data["pools"]) < 1:
raise SubgraphResultError("No pools found for symbol query.")
addr = to_checksum_address(data["pools"][0]["address"])
return addr
async def _pool_snapshot(address, chain, env, end_ts=None):
if not end_ts:
end_date = datetime.now(timezone.utc)
end_ts = int(end_date.timestamp())
# pylint: disable=consider-using-f-string
q = """
{
dailyPoolSnapshots(
orderBy: timestamp,
orderDirection: desc,
first: 1,
where:
{
pool: "%s"
timestamp_lte: %d
}
)
{
pool {
name
address
symbol
metapool
basePool
coins
coinNames
coinDecimals
poolType
isV2
}
A
fee
adminFee
offPegFeeMultiplier
reserves
normalizedReserves
virtualPrice
timestamp
gamma
midFee
outFee
feeGamma
allowedExtraProfit
adjustmentStep
maHalfTime
priceScale
priceOracle
lastPrices
lastPricesTimestamp
xcpProfit
xcpProfitA
}
}
""" % (
address.lower(),
end_ts,
)
r = await convex(chain, q, env)
try:
r = r["dailyPoolSnapshots"][0]
except IndexError as e:
raise SubgraphResultError(
f"No daily snapshot for this pool: {address}, {chain}"
) from e
return r
# pylint: disable-next=too-many-locals
[docs]async def pool_snapshot(address, chain, env="prod", end_ts=None):
"""
Async function to pull pool state and metadata from daily snapshots.
Parameters
----------
address : str
The pool address.
chain : str
The blockchain the pool is on.
Returns
-------
dict
A formatted dict of pool state/metadata information.
"""
r = await _pool_snapshot(address, chain, env, end_ts)
logger.debug("Pool snapshot: %s", r)
# Flatten
pool = r.pop("pool")
r.update(pool)
# Version
if r["isV2"]:
version = 2
else:
version = 1
# Fee_mul
if r["offPegFeeMultiplier"] == "0":
fee_mul = None
else:
fee_mul = int(r["offPegFeeMultiplier"]) * 10**10
# Coins
names = r["coinNames"]
addrs = [to_checksum_address(c) for c in r["coins"]]
decimals = [int(d) for d in r["coinDecimals"]]
coins = {"names": names, "addresses": addrs, "decimals": decimals}
# Reserves
normalized_reserves = [int(r) for r in r["normalizedReserves"]]
unnormalized_reserves = [int(r) for r in r["reserves"]]
# Basepool
if r["metapool"]:
basepool = await pool_snapshot(r["basePool"], chain, env, end_ts)
else:
basepool = None
# Output
if version == 1:
data = {
"name": r["name"],
"address": to_checksum_address(r["address"]),
"chain": chain,
"symbol": r["symbol"].strip(),
"version": version,
"pool_type": r["poolType"],
"params": {
"A": int(r["A"]),
"fee": int(Decimal(r["fee"]) * 10**10),
"fee_mul": fee_mul,
"admin_fee": int(Decimal(r["adminFee"]) * 10**10),
},
"coins": coins,
"reserves": {
"by_coin": normalized_reserves,
"unnormalized_by_coin": unnormalized_reserves,
"virtual_price": int(r["virtualPrice"]),
},
"basepool": basepool,
"timestamp": int(r["timestamp"]),
}
else:
# Until mainnet subgraph is fixed (or we use the new curve-prices API),
# 2-coin crypto pools will have an integer instead of list and
# 3-coin crypto pools actually return a zero.
#
# So we fix the outer type here.
if not isinstance(r["priceScale"], list):
r["priceScale"] = [r["priceScale"]]
if not isinstance(r["priceOracle"], list):
r["priceOracle"] = [r["priceOracle"]]
if not isinstance(r["lastPrices"], list):
r["lastPrices"] = [r["lastPrices"]]
ma_half_time = r["maHalfTime"]
if ma_half_time: # subgraph bug returns None
ma_half_time = int(ma_half_time)
data = {
"name": r["name"],
"address": to_checksum_address(r["address"]),
"chain": chain,
"symbol": r["symbol"].strip(),
"version": version,
"pool_type": r["poolType"],
"params": {
"A": int(r["A"]),
"gamma": int(r["gamma"]),
"fee_gamma": int(r["feeGamma"]),
"mid_fee": int(r["midFee"]),
"out_fee": int(r["outFee"]),
"allowed_extra_profit": int(r["allowedExtraProfit"]),
"adjustment_step": int(r["adjustmentStep"]),
"ma_half_time": ma_half_time,
"price_scale": [int(p) for p in r["priceScale"]],
"price_oracle": [int(p) for p in r["priceOracle"]],
"last_prices": [int(p) for p in r["lastPrices"]],
"last_prices_timestamp": int(r["lastPricesTimestamp"]),
"admin_fee": int(Decimal(r["adminFee"]) * 10**10),
"xcp_profit": int(r["xcpProfit"]),
"xcp_profit_a": int(r["xcpProfitA"]),
},
"coins": coins,
"reserves": {
"by_coin": normalized_reserves,
"unnormalized_by_coin": unnormalized_reserves,
"virtual_price": int(r["virtualPrice"]),
},
"basepool": basepool,
"timestamp": int(r["timestamp"]),
}
return override_subgraph_data(data, "pool_snapshot", (address, chain))
convex_sync = sync(convex)
symbol_address_sync = sync(symbol_address)
pool_snapshot_sync = sync(pool_snapshot)
# Reflexer Subgraph
RAI_ADDR = ("0x618788357D0EBd8A37e763ADab3bc575D54c2C7d", "mainnet")
def has_redemption_prices(address, chain):
"""
Return True if the given pool has RAI redemption prices available.
"""
return (address, chain) == RAI_ADDR
async def _redemption_prices(address, chain, t_start, t_end, n):
if not has_redemption_prices(address, chain):
return None
t_end = int(t_end.timestamp())
t_start = int(t_start.timestamp())
url = "https://api.thegraph.com/subgraphs/name/reflexer-labs/rai-mainnet"
q = """{
redemptionPrices(
orderBy: timestamp,
orderDirection: desc,
first: %d,
where: {timestamp_lte: %d}
)
{
timestamp
value
}
}"""
t_earliest = t_end
data = []
while t_earliest >= t_start:
r = await query(url, q % (n, t_earliest))
data += r["data"]["redemptionPrices"]
t_earliest = int(data[-1]["timestamp"])
return data
[docs]async def redemption_prices(
address=RAI_ADDR[0], chain=RAI_ADDR[1], days=60, n=1000, end=None
):
"""
Async function to pull RAI redemption prices.
Returns None if input pool is not RAI3CRV.
Parameters
----------
address : str
The pool address.
chain : str
The blockchain the pool is on.
days : int, default=60
Number of days to fetch data for.
n : int, default=1000
Number of data entries to request per query (max: 1000)
Note: the function will re-query until the requested time range is complete.
Returns
-------
dict
A formatted dict of pool state/metadata information.
"""
if end is None:
t_end = datetime.now(timezone.utc).replace(
hour=0, minute=0, second=0, microsecond=0
)
else:
t_end = datetime.fromtimestamp(end, tz=timezone.utc)
t_end = t_end.replace(tzinfo=timezone.utc)
t_start = t_end - timedelta(days=days)
r = await _redemption_prices(address, chain, t_start, t_end, n)
if r is None:
return None
data = pd.DataFrame(r)
data.columns = ["timestamp", "price"]
data.price = (data.price.astype(float) * 10**18).astype(int)
data.timestamp = pd.to_datetime(data.timestamp, unit="s", utc=True)
data.sort_values("timestamp", inplace=True)
data.set_index("timestamp", inplace=True)
data.drop_duplicates(inplace=True)
t0 = data.index.asof(t_start)
return data[data.index >= t0]
redemption_prices_sync = sync(redemption_prices)
if __name__ == "__main__":
chain = "mainnet"
symbol = "3Crv"
print("Chain:", chain)
print("Symbol:", symbol)
address = symbol_address_sync(symbol, chain)
print("Address:", address)