Source code for curvesim.pipelines

"""
Tools for implementing and running simulation pipelines.

The basic model for a pipeline is demonstrated in the implementation of
:func:`run_pipeline`.  It takes in a `param_sampler`, `price_sampler`, and
`strategy`.

Pipelines iterate over pools with parameters set by
:mod:`curvesim.iterators.param_samplers` and time-series data produced by
:mod:`curvesim.iterators.price_samplers`.  At each timestemp, the
the :class:`~curvesim.pipelines.templates.Strategy` dictates what is done.

A typical pipeline implementation is a function taking in whatever market data needed;
pool data such as :class:`~curvesim.pool_data.metadata.PoolMetaDataInterface`;
instantiates a param_sampler, price_sampler, and strategy; and invokes `run_pipeline`,
returning its result metrics.
"""
from multiprocessing import Pool as cpu_pool

from curvesim.logging import (
    configure_multiprocess_logging,
    get_logger,
    multiprocessing_logging_queue,
)

logger = get_logger(__name__)


[docs]def run_pipeline(param_sampler, price_sampler, strategy, ncpu=4): """ Core function for running pipelines. Typically called within a function specifying the pipeline components (see, e.g., :func:`curvesim.pipelines.vol_limited_arb.pipeline`) Parameters ---------- param_sampler : iterator An iterator that returns pool parameters (see :mod:`.param_samplers`). price_sampler : iterator An iterator that returns (minimally) a time-series of prices (see :mod:`.price_samplers`). strategy: callable A function dictating what happens at each timestep. ncpu : int, default=4 Number of cores to use. Returns ------- results : tuple Contains the metrics produced by the strategy. """ if ncpu > 1: with multiprocessing_logging_queue() as logging_queue: strategy_args_list = [ (pool, params, price_sampler) for pool, params in param_sampler ] wrapped_args_list = [ (strategy, logging_queue, *args) for args in strategy_args_list ] with cpu_pool(ncpu) as clust: results = tuple( zip(*clust.starmap(wrapped_strategy, wrapped_args_list)) ) clust.close() clust.join() # coverage needs this else: results = [] for pool, params in param_sampler: metrics = strategy(pool, params, price_sampler) results.append(metrics) results = tuple(zip(*results)) return results
def wrapped_strategy(strategy, logging_queue, *args): """ This wrapper ensures we configure logging to use the multiprocessing enqueueing logic within the new process. Must be defined at the top-level of the module so it can be pickled. """ configure_multiprocess_logging(logging_queue) return strategy(*args)