In the previous article we discussed the Strategy component and its role in generating trading signals in line with the strategy logic.
In this article we’ll go through a central part of the system, the Portfolio component. This component communicates with multiple other parts of the system. For example on one side it takes in signals from the Signal layer, on the other it generates order events to be passed on to the Broker component, and then later does the bookkeeping to calculate exposure (in terms of positions and cash), PnL, etc. after the order has been filled by the broker.
The final code for the Portfolio is:
Details
# trading_engine/portfolio.py
import pandas as pd
from abc import ABC, abstractmethod
from math import floor
from events import OrderEvent
from performance import create_sharpe_ratio, create_drawdowns
class Portfolio(ABC):
"""
The Portfolio class handles the positions and market value of all instruments at a resolution of a "bar",
i.e., secondly, minutely, hourly, etc.
"""
@abstractmethod
def update_signal(self, event):
"""
Acts on a SignalEvent to generate new orders based on the portfolio logic.
:param event: the Queue event object
"""
raise NotImplementedError("Should implement update_signal()")
@abstractmethod
def update_fill(self, event):
"""
Updates the portfolio current positions and holdings from a FillEvent.
:param event: a FillEvent object
"""
raise NotImplementedError("Should implement update_fill()")
class BuyAndHoldPortfolio(Portfolio):
"""
The BuyAndHoldPortfolio is designed to send orders to a brokerage object with a
specified quantity and size.
"""
def __init__(self, bars, events, start_date, initial_capital=100000.0):
"""
Initialises the portfolio with bars, an event queue, a starting time index,
and initial capital (in USD unless stated otherwise).
:param bars: the DataHandler object with current market data -> pandas-like obj
:param events: the Event queue object _> Queue
:param start_date: the start date (bar) of the portfolio -> datetime
:param initial_capital: the starting capital in USD -> float
"""
self.bars = bars
self.events = events
self.symbol_list = self.bars.symbol_list
self.start_date = start_date
self.initial_capital = initial_capital
self.all_positions = self.construct_all_positions()
self.current_positions = {k: v for k, v in [(s, 0) for s in self.symbol_list]}
self.all_holdings = self.construct_all_holdings()
self.current_holdings = self.construct_current_holdings()
def construct_all_positions(self):
"""
Constructs the positions list using the start_date attribute to determine when the time index will begin.
"""
d = {k: v for k, v in [(s, 0) for s in self.symbol_list]}
d['datetime'] = self.start_date
return [d]
def construct_all_holdings(self):
"""
Constructs the holdings list using the start_date to determine when the time index will begin.
"""
d = {k: v for k, v in [(s, 0.0) for s in self.symbol_list]}
d['datetime'] = self.start_date
d['cash'] = self.initial_capital
d['commission'] = 0.0
d['total'] = self.initial_capital
return [d]
def construct_current_holdings(self):
"""
This constructs the dictionary which will hold the instantaneous
value of the portfolio across all symbols.
"""
d = {k: v for k, v in [(s, 0.0) for s in self.symbol_list]}
d['cash'] = self.initial_capital
d['commission'] = 0.0
d['total'] = self.initial_capital
return d
def update_timeindex(self, event):
"""
Adds a new record to the positions matrix for the current market data bar. This reflects the
PREVIOUS bar, i.e., all current market data at this state is known (OHLCVI).
Makes use of a MarketEvent from the events queue.
N.B. this doesn't update the actual position values -- those are updated after a FillEvent.
:param event: a MarketEvent
"""
if event.type == "MARKET":
bars = {}
for sym in self.symbol_list:
bars[sym] = self.bars.get_latest_bars(sym, N=1)
# update positions
dp = {k: v for k, v in [(s, 0) for s in self.symbol_list]}
dp['datetime'] = bars[self.symbol_list[0]][0][1]
for s in self.symbol_list:
dp[s] = self.current_positions[s]
# append the current positions
self.all_positions.append(dp)
# update holdings
dh = {k: v for k, v in [(s, 0) for s in self.symbol_list]}
dh['datetime'] = bars[self.symbol_list[0]][0][1]
dh['cash'] = self.current_holdings['cash']
dh['commission'] = self.current_holdings['commission']
dh['total'] = self.current_holdings['cash']
for s in self.symbol_list:
# bars[s][0][5] = close price of the symbol; an approximation to the market value
market_value = self.current_positions[s] * bars[s][0][5]
dh[s] = market_value
dh['total'] += market_value
# append the current holdings
self.all_holdings.append(dh)
def update_positions_from_fill(self, fill):
"""
Takes a FillEvent object and updates the positions matrix to reflect new positions.
:param fill: the FillEvent to update positions with -> Event
"""
fill_dir = 0
if fill.direction == 'BUY':
fill_dir = 1
elif fill.direction == 'SELL':
fill_dir = -1
self.current_positions[fill.symbol] += fill_dir * fill.filled_quantity
def update_holdings_from_fill(self, fill):
"""
Takes a FillEvent object and updates the holdings matrix to reflect the holdings value.
:param fill: a FillEvent object
"""
fill_dir = 0
if fill.direction == 'BUY':
fill_dir = 1
if fill.direction == 'SELL':
fill_dir = -1
# update holdings list with new quantities
fill_cost = self.bars.get_latest_bars(fill.symbol)[0][5] # close price
cost = fill_dir * fill_cost
self.current_holdings[fill.symbol] += cost
self.current_holdings['commission'] += fill.commission
self.current_holdings['cash'] -= (cost + fill.commission)
self.current_holdings['total'] -= (cost + fill.commission)
def update_fill(self, event):
"""
Updates the portfolio current positions and holdings from a FillEvent.
:param event: a FillEvent
"""
if event.type == 'FILL':
event.print_order()
self.update_positions_from_fill(event)
self.update_holdings_from_fill(event)
def generate_naive_order(self, signal):
"""
Simply transact an OrderEvent object with a specified quantity of the symbol(s) in the signal
:param signal: a SignalEvent object
"""
order = None
symbol = signal.symbol
direction = signal.signal_type
strength = signal.strength
max_cash_per_trade = 0.10 * self.current_holdings['cash'] # max. 10% cash per trade
# position size = max cash per trade / unit cost of asset
position_size = max_cash_per_trade // self.bars.get_latest_bars(symbol, N=1)[0][5] # [5] = closing price
mkt_quantity = floor(position_size * strength) # floor() rounds to lower integer -- no fractional shares
curr_quantity = self.current_positions[symbol]
order_type = 'MKT'
if direction == 'LONG' and curr_quantity == 0:
order = OrderEvent(symbol, order_type, mkt_quantity, 'BUY')
if direction == 'SHORT' and curr_quantity == 0:
order = OrderEvent(symbol, order_type, mkt_quantity, 'SELL')
if direction == 'EXIT' and curr_quantity > 0:
order = OrderEvent(symbol, order_type, abs(curr_quantity), 'SELL')
if direction == 'EXIT' and curr_quantity < 0:
order = OrderEvent(symbol, order_type, abs(curr_quantity), 'BUY')
return order
def update_signal(self, event):
"""
Acts on a SignalEvent to generate new orders based on portfolio logic.
:param event: a SignalEvent
"""
if event.type == 'SIGNAL':
order_event = self.generate_naive_order(event)
self.events.put(order_event)
def create_equity_curve_dataframe(self):
"""
Creates a pandas Dataframe from the all holdings structure.
"""
curve = pd.DataFrame(self.all_holdings)
curve.set_index('datetime', inplace=True)
curve['returns'] = curve['total'].pct_change()
curve['equity_curve'] = (1.0 + curve['returns']).cumprod()
self.equity_curve = curve
def output_summary_stats(self, periods="days"):
"""
Creates a list of output summary statistics for the portfolio such as Sharpe Ratio and drawdown info.
:param periods: max drawdown duration periods, either in days, weeks, years, depnding on data frequency -> str
"""
total_return = self.equity_curve['equity_curve'].iloc[-1]
returns = self.equity_curve['returns']
pnl = self.equity_curve['equity_curve']
sharpe_ratio = create_sharpe_ratio(returns)
max_dd, dd_duration = create_drawdowns(pnl)
stats = [
f"Initial cash balance ${self.initial_capital:0,.2f}",
f"Final cash balance ${self.all_holdings[-1]['cash']:0,.2f}",
f"Final account value ${self.all_holdings[-1]['total']:0,.2f}",
f"Total Return {(total_return - 1.0) * 100.0:0.2f}%",
f"Sharpe Ratio {sharpe_ratio:0.4f}",
f"Max Drawdown {max_dd * 100.0:0.2f}%",
f"Max Drawdown Duration {dd_duration:g} {periods}"
]
return stats
However, we’ll go through the most significant parts to better understand the whole. Starting from the
__init__ method,
# trading_engine/portfolio.py
# [...]
class BuyAndHoldPortfolio(Portfolio):
"""
The BuyAndHoldPortfolio is designed to send orders to a brokerage object with a
specified quantity and size.
"""
def __init__(self, bars, events, start_date, initial_capital=100000.0):
"""
Initialises the portfolio with bars, an event queue, a starting time index,
and initial capital (in USD unless stated otherwise).
:param bars: the DataHandler object with current market data -> pandas-like obj
:param events: the Event queue object _> Queue
:param start_date: the start date (bar) of the portfolio -> datetime
:param initial_capital: the starting capital in USD -> float
"""
self.bars = bars
self.events = events
self.symbol_list = self.bars.symbol_list
self.start_date = start_date
self.initial_capital = initial_capital
self.all_positions = self.construct_all_positions()
self.current_positions = {k: v for k, v in [(s, 0) for s in self.symbol_list]}
self.all_holdings = self.construct_all_holdings()
self.current_holdings = self.construct_current_holdings()
The key parts are the last four lines that respectively build dicts for keeping track of all positions; current positions; all holdings; and current holdings. In Python parlance a dict is what is considered a hash map in other programming languages. Positions are simply the quantities or units of the assets in the Portfolio, and the holdings are the $ quantities * price $. For the actual methods the variables are assigned to, refer to the first snippet with the final code.
Next is the update_timeindex method,
def update_timeindex(self, event):
"""
Adds a new record to the positions matrix for the current market data bar. This reflects the
PREVIOUS bar, i.e., all current market data at this state is known (OHLCVI).
Makes use of a MarketEvent from the events queue.
N.B. this doesn't update the actual position values -- those are updated after a FillEvent.
:param event: a MarketEvent
"""
if event.type == "MARKET":
bars = {}
for sym in self.symbol_list:
bars[sym] = self.bars.get_latest_bars(sym, N=1)
# update positions
dp = {k: v for k, v in [(s, 0) for s in self.symbol_list]}
dp['datetime'] = bars[self.symbol_list[0]][0][1]
for s in self.symbol_list:
dp[s] = self.current_positions[s]
# append the current positions
self.all_positions.append(dp)
# update holdings
dh = {k: v for k, v in [(s, 0) for s in self.symbol_list]}
dh['datetime'] = bars[self.symbol_list[0]][0][1]
dh['cash'] = self.current_holdings['cash']
dh['commission'] = self.current_holdings['commission']
dh['total'] = self.current_holdings['cash']
for s in self.symbol_list:
# bars[s][0][5] = close price of the symbol; an approximation to the market value
market_value = self.current_positions[s] * bars[s][0][5]
dh[s] = market_value
dh['total'] += market_value
# append the current holdings
self.all_holdings.append(dh)
To better simulate the passing of time and the arrival of new information in a real trading system, we will need to progressively, as market events are triggered and absorbed, adjust our positions and holdings to reflect arrival of a new data bar and thus add these new data, if any, to the Portfolio to keep it in sync with the market feed and look out for market events that lead up to a signal being generated.
Once a trading signal is generated, the Portfolio will capture that signal, from the update_signal(),
and act on it by triggering an OrderEvent and adding it to the queue:
# [...]
def generate_naive_order(self, signal):
"""
Simply transact an OrderEvent object with a specified quantity of the symbol(s) in the signal
:param signal: a SignalEvent object
"""
order = None
symbol = signal.symbol
direction = signal.signal_type
strength = signal.strength
max_cash_per_trade = 0.10 * self.current_holdings['cash'] # max. 10% cash per trade
# position size = max cash per trade / unit cost of asset
position_size = max_cash_per_trade // self.bars.get_latest_bars(symbol, N=1)[0][5] # [5] = closing price
mkt_quantity = floor(position_size * strength) # floor() rounds to lower integer, aka no fractional shares
curr_quantity = self.current_positions[symbol]
order_type = 'MKT'
if direction == 'LONG' and curr_quantity == 0:
order = OrderEvent(symbol, order_type, mkt_quantity, 'BUY')
if direction == 'SHORT' and curr_quantity == 0:
order = OrderEvent(symbol, order_type, mkt_quantity, 'SELL')
if direction == 'EXIT' and curr_quantity > 0:
order = OrderEvent(symbol, order_type, abs(curr_quantity), 'SELL')
if direction == 'EXIT' and curr_quantity < 0:
order = OrderEvent(symbol, order_type, abs(curr_quantity), 'BUY')
return order
def update_signal(self, event):
"""
Acts on a SignalEvent to generate new orders based on portfolio logic.
:param event: a SignalEvent
"""
if event.type == 'SIGNAL':
order_event = self.generate_naive_order(event)
self.events.put(order_event)
Refer back to the main event loop in Part 1 of the series. Once an order event is triggered, the Broker will act on it, but we will get to that later.
Next is the update_fill method, which captures a FillEvent generated by the broker, and
updates the positions, holdings, etc.,
# [...]
def update_fill(self, event):
"""
Updates the portfolio current positions and holdings from a FillEvent.
:param event: a FillEvent
"""
if event.type == 'FILL':
event.print_order()
self.update_positions_from_fill(event)
self.update_holdings_from_fill(event)
The two methods update_positions_from_fill and update_holdings_from_fill do what it says on the tin.
You can look at their definitions in the final code at the top.
And finally, we cover the create_equity_curve_dataframe and output_summary_stats:
# [...]
def create_equity_curve_dataframe(self):
"""
Creates a pandas Dataframe from the all holdings structure.
"""
curve = pd.DataFrame(self.all_holdings)
curve.set_index('datetime', inplace=True)
curve['returns'] = curve['total'].pct_change()
curve['equity_curve'] = (1.0 + curve['returns']).cumprod()
self.equity_curve = curve
def output_summary_stats(self, periods="days"):
"""
Creates a list of output summary statistics for the portfolio such as Sharpe Ratio and drawdown info.
:param periods: max drawdown duration periods, either in days, weeks, years, depnding on data frequency -> str
"""
total_return = self.equity_curve['equity_curve'].iloc[-1]
returns = self.equity_curve['returns']
pnl = self.equity_curve['equity_curve']
sharpe_ratio = create_sharpe_ratio(returns)
max_dd, dd_duration = create_drawdowns(pnl)
stats = [
f"Initial cash balance ${self.initial_capital:0,.2f}",
f"Final cash balance ${self.all_holdings[-1]['cash']:0,.2f}",
f"Final account value ${self.all_holdings[-1]['total']:0,.2f}",
f"Total Return {(total_return - 1.0) * 100.0:0.2f}%",
f"Sharpe Ratio {sharpe_ratio:0.4f}",
f"Max Drawdown {max_dd * 100.0:0.2f}%",
f"Max Drawdown Duration {dd_duration:g} {periods}"
]
return stats
The first builds the equity curve, while the second prints key portfolio metrics to the console. You could get creative and instead of printing to console, write to a .pdf, .html, email, whatever, or a combination of those.
That’s it for the Portfolio.
In the next article we will cover the Broker component.
Portfolio tests#
# trading_engine/tests/test_portfolio.py
import queue
import pytest
import datetime
from broker import SimulatedExecutionHandler
from portfolio import BuyAndHoldPortfolio
from data import HistoricCsvDataHandler
from strategy import BuyAndHoldStrategy
from events import MarketEvent, SignalEvent, OrderEvent, FillEvent
from utils import data_dir_setup, asset_class_selector, cli_parser
@pytest.fixture
def portfolio_setup():
# CLI data
args = ["--asset_class", "stocks", "--ticker", "AMD INTC"]
parsed_args = cli_parser(args)
asset_class = asset_class_selector(parsed_args)
# Assets data
data_dir = data_dir_setup(asset_class)
# Event queue
events = queue.Queue()
# The symbols below match those hard-coded from the CLI above. In theory,
# we should extract the list below from the DataHandler, two lines below this,
# but the symbols_filterer has already been tested, so we know that if the CLI
# tickers are invalid, the invalid ticker won't be passed to the data layer.
symbols = ["AMD", "INTC"]
bars = HistoricCsvDataHandler(events, data_dir, symbols)
buy_hold_portfolio = BuyAndHoldPortfolio(bars, events, datetime.datetime(1960, 1, 1).strftime("%Y-%m-%d"))
return buy_hold_portfolio
class TestBuyAndHoldPortfolio:
def test_update_timeindex(self, portfolio_setup):
portfolio = portfolio_setup
portfolio.bars.update_bars() # triggers a MarketEvent
market_event = portfolio.events.get(False)
previous_time_index_length = len(portfolio.all_positions)
portfolio.update_timeindex(market_event)
current_time_index_length = len(portfolio.all_positions)
assert current_time_index_length > previous_time_index_length
# The actual date is of no interest since it changes based on the assets under
# analysis;what we care about is knowing that the time index is updated when
# a new market event is captured.
def test_update_signal(self, portfolio_setup):
portfolio = portfolio_setup
portfolio.bars.update_bars() # triggers a MarketEvent
strategy = BuyAndHoldStrategy(portfolio.bars, portfolio.events)
market_event = portfolio.events.get(False)
strategy.calculate_signals(market_event) # triggers a SignalEvent
signal_event = portfolio.events.get(False)
assert type(signal_event) == SignalEvent
portfolio.update_signal(signal_event) # triggers an OrderEvent
portfolio.events.get(False) # pop out the SignalEvent first
assert type(portfolio.events.get(False)) == OrderEvent
def test_update_fill(self, portfolio_setup):
portfolio = portfolio_setup
portfolio.bars.update_bars() # triggers a MarketEvent
strategy = BuyAndHoldStrategy(portfolio.bars, portfolio.events)
market_event = portfolio.events.get(False)
strategy.calculate_signals(market_event) # triggers a SignalEvent
signal_event = portfolio.events.get(False)
assert type(signal_event) == SignalEvent
portfolio.update_signal(signal_event) # triggers an OrderEvent
portfolio.events.get(False) # pop out the SignalEvent first
order_event = portfolio.events.get(False)
assert type(order_event) == OrderEvent
broker = SimulatedExecutionHandler(portfolio.events)
broker.execute_order(order_event)
fill_event = broker.events.get(False)
assert type(fill_event) == FillEvent
# We test to see that the current positions and current holdings have changed
# after the fill.
previous_positions = portfolio.current_positions[fill_event.symbol]
previous_holdings = portfolio.current_holdings[fill_event.symbol]
portfolio.update_fill(fill_event)
assert portfolio.current_positions[fill_event.symbol] != previous_positions
assert portfolio.current_holdings[fill_event.symbol] != previous_holdings
Feel free to share the article on socials: