Skip to main content

Event-driven Python backtesting engine - Data - Part 3

·1486 words·7 mins
Anthony Ori
Author
Anthony Ori
~ tinkerer ~

As mentioned in the previous article of the series, we will take a closer look at the data layer.

A snapshot of the data looks like:

AAPL price ($)

         Date     Open     High      Low    Close    Volume  OpenInt
0  1984-09-07  0.42388  0.42902  0.41874  0.42388  23220030        0
1  1984-09-10  0.42388  0.42516  0.41366  0.42134  18022532        0
2  1984-09-11  0.42516  0.43668  0.42516  0.42902  42498199        0
3  1984-09-12  0.42902  0.43157  0.41618  0.41618  37125801        0
4  1984-09-13  0.43927  0.44052  0.43927  0.43927  57822062        0

So whichever data you use, will have to follow the tabular/spreadsheet format the engine expects, i.e. aapl.csv would look like this:

Date,Open,High,Low,Close,Volume,OpenInt
1984-09-07,0.42388,0.42902,0.41874,0.42388,23220030,0
1984-09-10,0.42388,0.42516,0.41366,0.42134,18022532,0
1984-09-11,0.42516,0.43668,0.42516,0.42902,42498199,0
1984-09-12,0.42902,0.43157,0.41618,0.41618,37125801,0
1984-09-13,0.43927,0.44052,0.43927,0.43927,57822062,0

If it doesn’t fit your data, then you can adapt this layer at will.

The important takeaway is that this is the only part of the engine that should handle data.

The final code will look like this:

Details

# trading_engine/data.py

import pandas as pd

from abc import ABC, abstractmethod
from pathlib import Path

from events import MarketEvent
from utils import truncate_dfs


class DataHandler(ABC):
    """
    DataHandler is an abstract base class providing an interface for all subsequent inherited
    data handlers (live and historic).

    The goal of a derived data handler object is to output a generated set of bars (OHLCVI) for each
    symbol requested.

    This will replicate how a live strategy would function as current market data would be sent "down
    the pipe" (drip feed). An historic and live system will be treated identically in terms of data feed.
    """

    @abstractmethod
    def get_latest_bars(self, symbol, N=1):
        """
        Returns the last N bars from the latest_symbol list, or fewer if less bars are available.
        """
        raise NotImplementedError("Should implement get_latest_bars()")

    @abstractmethod
    def update_bars(self):
        """
        Pushes the latest bar to the latest symbol structure for all symbols in the symbol list
        """
        raise NotImplementedError("Should implement update_bars()")


class HistoricCsvDataHandler(DataHandler):
    """
    HistoricCsvDataHandler is designed to read CSV files for each requested symbol from disk and
    provide an interface to obtain the "latest" (in the context of historic csv data of course) bar
    in a manner identical to a live trading interface.
    """

    def __init__(self, events, csv_dir, symbol_list):
        """
        Initialises the historic data handler by requesting the location of the CSV files
        and a list of symbols.

        It will be assumed that all files are of the form 'symbol.csv', where symbol is a sting
        in the list.

        :param events: the Event queue -> Queue object
        :param csv_dir: directory path to the csv files -> Path or Path-like object
        :param symbol_list: a list of symbol strings -> str
        """

        self.events = events
        self.csv_dir = csv_dir
        self.symbol_list = symbol_list

        self.symbol_data = {}
        self.latest_symbol_data = {}
        self.continue_backtest = True

        self._open_convert_csv_files()

    def _open_convert_csv_files(self):
        """
        Opens the csv files from the data directory, converting them into pandas DataFrames
        within a symbol dictionary.
        """
      
        # Note: combining the index is particularly useful for mean-reverting strategies,
        # because it allows graceful handling of missing data
        comb_index = None
        for symbol in self.symbol_list:
            self.symbol_data[symbol] = pd.read_csv(
                Path.joinpath(self.csv_dir.joinpath(f"{symbol.lower()}.csv")),
                header=0,
                names=["datetime", "open", "high", "low", "close", "volume", "oi"]
            )

            # combine the index column to pad forward values
            if comb_index is None:
                comb_index = self.symbol_data[symbol].index
            else:
                comb_index.union(self.symbol_data[symbol].index)

            self.latest_symbol_data[symbol] = []  # initialise to empty list (for now)

        # reindex the data frames
        for symbol in self.symbol_list:
            self.symbol_data[symbol] = self.symbol_data[symbol].reindex(index=comb_index, method="pad").itertuples()

    def _get_new_bar(self, symbol):
        """
        Returns the latest bar from the data feed as a tuple of

        (symbol, datetime, open, high, low, close, volume)

        :param symbol: symbol to retrieve -> str
        """

        for bar in self.symbol_data[symbol]:
            yield tuple([
                symbol,
                bar.datetime,
                bar.open,
                bar.high,
                bar.low,
                bar.close,
                bar.volume
            ])

    def get_latest_bars(self, symbol, N=1):
        """
        Returns the last N bars from the latest_symbol list, or N - K if less are available.

        :param symbol: symbol to retrieve
        :param N: non-negative integer indicating number of bars to retrieve
        """

        try:
            bars_list = self.latest_symbol_data[symbol]
        except KeyError as e:
            print(f"Symbol {symbol} is not available in the historical data set.")
        else:
            return bars_list[-N:]

    def update_bars(self):
        """
        Pushes the latest bar to the latest_symbol data structure for all symbols in the list
        """

        for symbol in self.symbol_list:
            try:
                bar = self._get_new_bar(symbol).__next__()
            except StopIteration:
                self.continue_backtest = False
            else:
                if bar is not None:
                    self.latest_symbol_data[symbol].append(bar)

        self.events.put(MarketEvent())

Starting from the initialisation method __init__, we have:


class HistoricCsvDataHandler(DataHandler):
    """
    HistoricCsvDataHandler is designed to read CSV files for each requested symbol from disk and
    provide an interface to obtain the "latest" bar, to simulate a live trading drip feed.
    """

    def __init__(self, events, csv_dir, symbol_list):
        """
        Initialise the historic data handler by requesting the location of the CSV files
        and a list of symbols.

        It will be assumed that all files are of the form 'symbol.csv', where symbol is a sting
        in the list.

        :param events: the Event queue -> Queue object
        :param csv_dir: directory path to the csv files -> Path or Path-like object
        :param symbol_list: a list of symbol strings -> str
        """

        self.events = events
        self.csv_dir = csv_dir
        self.symbol_list = symbol_list

        self.symbol_data = {}
        self.latest_symbol_data = {}
        self.continue_backtest = True

        self._open_convert_csv_files()

This method takes in a queue of events, a directory from which ticker data will be retrieved from, and an array of tickers. The params will be assigned to instance attributes with the same name. This naming convention will be followed across the entire engine, wherever possible.

Next is the _open_convert_csv_files() method:


def _open_convert_csv_files(self):
        """
        Opens the csv files from the data directory, converting them into pandas DataFrames
        within a symbol dictionary.
        """
        
        # Note: combining the index is particularly useful for mean-reverting strategies,
        # because it allows graceful handling of missing data
        comb_index = None
        for symbol in self.symbol_list:
            self.symbol_data[symbol] = pd.read_csv(
                Path.joinpath(self.csv_dir.joinpath(f"{symbol.lower()}.csv")),
                header=0,
                names=["datetime", "open", "high", "low", "close", "volume", "oi"]
            )

            # combine the index column to pad forward values
            if comb_index is None:
                comb_index = self.symbol_data[symbol].index
            else:
                comb_index.union(self.symbol_data[symbol].index)

            self.latest_symbol_data[symbol] = []  # initialise to empty list (for now)

        # reindex the data frames
        for symbol in self.symbol_list:
            self.symbol_data[symbol] = self.symbol_data[symbol].reindex(index=comb_index, method="pad").itertuples()

The above is straightforward: go through each ticker from the list, and retrieve its data, then combine the indices to match time-series of different lengths.

The other two important methods are get_latest_bars and update_bars:


def get_latest_bars(self, symbol, N=1):
        """
        Returns the last N bars from the latest_symbol list, or N - K if less are available.

        :param symbol: symbol to retrieve
        :param N: non-negative integer indicating number of bars to retrieve
        """

        try:
            bars_list = self.latest_symbol_data[symbol]
        except KeyError as e:
            print(f"Symbol {symbol} is not available in the historical data set.")
        else:
            return bars_list[-N:]

def update_bars(self):
    """
    Pushes the latest bar to the latest_symbol data structure for all symbols in the list
    """

    for symbol in self.symbol_list:
        try:
            bar = self._get_new_bar(symbol).__next__()
        except StopIteration:
            self.continue_backtest = False
        else:
            if bar is not None:
                self.latest_symbol_data[symbol].append(bar)

    self.events.put(MarketEvent())

The first simply retrieves the latest N bars, and it’s set to N = 1 by default, so it’ll retrieve one data bar at a time. This is consistent with how a live trading system would receive data. The second method adds the latest bar to the self.latest_symbol_data struct. It internally calls the _get_new_bar method, which I have not explained because it’s self-explanatory.

If you’re lost, refer to the intro post with the driver code in the backtester.py file, to see how update_bars is invoked. get_latest_bars will be relevant later when we discuss the Portfolio and the Strategy components.

In the next article we will go over the events layer, but first we’ll write tests for the code of the data layer we’ve discussed so far.

Data tests
#

/
├── tests
│   ├── __init__.py
│   └── test_utils.py
│   └── test_data_handler.py  <--- new file
# trading_engine/tests/test_data_handler.py

import queue
import pytest

from data import HistoricCsvDataHandler
from events import MarketEvent
from utils import data_dir_setup, asset_class_selector, cli_parser


@pytest.fixture
def stocks_setup():
    events = queue.Queue()
    symbols = ["AMD", "ZVV"]
    args = ['--asset_class', 'stocks', "--ticker", "AMD ZVV"]
    parsed_args = cli_parser(args)
    asset_class = asset_class_selector(parsed_args)
    data_dir = data_dir_setup(asset_class)

    return events, data_dir, symbols


def test_update_bars(stocks_setup):
    events, data_dir, symbols = stocks_setup
    bars = HistoricCsvDataHandler(events, data_dir, symbols)
    # test two things:
    #   1. that a bar data for a symbol is returned
    #   2. that a MarketEvent is added to the event queue
    bars.update_bars()
    first_symbol_data = bars.latest_symbol_data[symbols[0]]
    second_symbol_data = bars.latest_symbol_data[symbols[1]]
    assert first_symbol_data is not None
    assert second_symbol_data is not None
    assert events.empty() is False
    assert events.empty() is False
    assert type(events.get(False)) == MarketEvent


def test_update_bars_when_backtest_data_is_exhausted(stocks_setup):
    events, data_dir, symbols = stocks_setup
    bars = HistoricCsvDataHandler(events, data_dir, symbols[1:])
    # stock "ZVV" has only 5 observations in my dataset, so when we call
    # update_bars() the 6th time, it'll trigger a StopIteration error,
    # and set the continue_backest flag to False.
    bars.update_bars()
    bars.update_bars()
    bars.update_bars()
    bars.update_bars()
    bars.update_bars()
    bars.update_bars()

    assert bars.continue_backtest == False

Notice the code comment in the last test case. Doing that is ok, but it closely ties the data from the test environment to the application code, but we can do better. In a production setting you would create data specific to the test environment, so the two can run in isolation. For our specific case we would then create mock or stub data of AMD and ZVV .csv files.


Feel free to share the article on socials: