當前位置:
首頁 > 知識 > PyalgoTrade源碼閱讀完結篇

PyalgoTrade源碼閱讀完結篇

點擊上方「

Python開發

」,選擇「置頂公眾號」


關鍵時刻,第一時間送達!






作者:youerning


http://blog.51cto.com/youerning/2162751



python開發整理髮布,轉載請聯繫作者獲得授權


前言


本文著重於回測相關得模塊。


由於上一篇文章實在是寫得太爛了, 這一篇文章重新開始寫。

Pyalgotrade業務邏輯及實現原理


以官方教程示例為例


下載數據

python -c "from pyalgotrade.tools import yahoofinance; yahoofinance.download_daily_bars("orcl", 2000, "orcl-2000.csv")"

構建策略並運行

from pyalgotrade import strategy
from pyalgotrade.barfeed import yahoofeed
from pyalgotrade.technical import ma

class MyStrategy(strategy.BacktestingStrategy):
   def __init__(self, feed, instrument, smaPeriod):
       super(MyStrategy, self).__init__(feed, 1000)
       self.__position = None
       self.__instrument = instrument
       # We"ll use adjusted close values instead of regular close values.
       self.setUseAdjustedValues(True)
       self.__sma = ma.SMA(feed[instrument].getPriceDataSeries(), smaPeriod)

   def onEnterOk(self, position):
       execInfo = position.getEntryOrder().getExecutionInfo()
       self.info("BUY at $%.2f" % (execInfo.getPrice()))

   def onEnterCanceled(self, position):
       self.__position = None

   def onExitOk(self, position):
       execInfo = position.getExitOrder().getExecutionInfo()
       self.info("SELL at $%.2f" % (execInfo.getPrice()))
       self.__position = None

   def onExitCanceled(self, position):
       # If the exit was canceled, re-submit it.
       self.__position.exitMarket()

   def onBars(self, bars):
       # Wait for enough bars to be available to calculate a SMA.
       if self.__sma[-1] is None:
           return

       bar = bars[self.__instrument]
       # If a position was not opened, check if we should enter a long position.
       if self.__position is None:
           if bar.getPrice() > self.__sma[-1]:
               # Enter a buy market order for 10 shares. The order is good till canceled.
               self.__position = self.enterLong(self.__instrument, 10, True)
       # Check if we have to exit the position.
       elif bar.getPrice() < self.__sma[-1] and not self.__position.exitActive():
           self.__position.exitMarket()

def run_strategy(smaPeriod):
   # Load the yahoo feed from the CSV file
   feed = yahoofeed.Feed()
   feed.addBarsFromCSV("orcl", "orcl-2000.csv")

   # Evaluate the strategy with the feed.
   myStrategy = MyStrategy(feed, "orcl", smaPeriod)
   myStrategy.run()
   print "Final portfolio value: $%.2f" % myStrategy.getBroker().getEquity()

run_strategy(15)

業務邏輯概括




  1. 創建Feed對象載入回測歷史數據



  2. 創建策略



  3. 將Feed對象傳入策略



  4. 內部創建Broker對象



  5. 在策略中初始化技術指標



  6. 運行策略(內部會創建事件循環,依次讀取每一個bars數據調用策略邏輯,即onBars函)


回測數據 Feed對象


用於承載回測的數據,提供介面訪問,驅動整個事件循環。


創建Feed對象

# 導入yahoofeed模塊
from pyalgotrade.barfeed import yahoofeed

# 創建yahoofeed.Feed類創建其實例
feed = yahoofeed.Feed()

# 通過addBarsFromCSV載入本地csv文件
# 傳入股票代碼名, 文件路徑
feed.addBarsFromCSV("orcl", "orcl-2000.csv")

Feed對象繼承鏈




注: 由IntelliJ Idea生成


由上圖可知, 分別繼承不同的BarFeed,最終業務邏輯基類pyalgotrade.observer.subject.


Feed數據結構構建過程

主要方法調用順序如下:


yahooFeed.addBarsFromCSV


-> csvFeed.BarFeed.addBarsFromCSV


-> membf.BarFeed.addBarsFromSequence


-> barfeed.registerInstrument


-> feed.registerDataSeries


-> barfeed.createDataSeries


Feed數據結構


在Feed中有兩個比較重要的數據對象




  1. self.__bars = {}



  2. self.__ds = BarDataSeries()
    其中BarDataSeries對象有以下定義

pyalgotrade/pyalgotrade/dataseries/bards.py

class BarDataSeries(dataseries.SequenceDataSeries):
   def __init__(self, maxLen=None):
       super(BarDataSeries, self).__init__(maxLen)
       self.__openDS = dataseries.SequenceDataSeries(maxLen)
       self.__closeDS = dataseries.SequenceDataSeries(maxLen)
       self.__highDS = dataseries.SequenceDataSeries(maxLen)
       self.__lowDS = dataseries.SequenceDataSeries(maxLen)
       self.__volumeDS = dataseries.SequenceDataSeries(maxLen)
       self.__adjCloseDS = dataseries.SequenceDataSeries(maxLen)
       self.__extraDS = {}
       self.__useAdjustedValues = False

BarDataSeries提供一系列方法返回相應的數據序列,以getOpenDataSeries為例

pyalgotrade/pyalgotrade/dataseries/bards.py:87

   def getOpenDataSeries(self):
       """Returns a :class:`pyalgotrade.dataseries.DataSeries` with the open prices."""
       return self.__openDS

而dataseries.SequenceDataSeries對象是一個數據存儲在collections.ListDeque對象上,並集成事件監聽的類對象.


self._bars在membf.BarFeed.addBarsFromSequence方法中讀取csv文件生成.
self.
_ds在barfeed.createDataSeries方法中創建一個默認長度為1024的BarDataSeries空數據對象.


小結


bar是含有時間, 開盤價, 收盤價, 當日最高價, 當日最低價, 成交量,復權收盤價的數據對象.


self.__bars是key為股票代碼, value是元素為bars數據對象的列表的字典.


self.__ds是BarDataSeries對象


事件循環


事件循環是PyalgoTrade的數據引擎,驅動著整個策略運轉.

下面是Pyalgotrade內部事件循環的一個簡單的實現。

# coding: utf8
import abc

class Event(object):
   """事件類.
   用於訂閱指定的操作,如函數
   當事件執行emit方法的時候,遍歷訂閱了的操作,並執行該操作"""
   def __init__(self):
       # 內部handlers列表
       self.__handlers = []

   def subscribe(self, handler):
       if handler not in self.__handlers:
           self.__handlers.append(handler)

   def emit(self, *args, **kwargs):
       """執行所有訂閱了的操作"""
       for handler in self.__handlers:
           handler(*args, **kwargs)

class Subject(object):
   """將元類指向abc.ABCMeta元類
   1. 當抽象方法未被實現的時候,不能新建該類的實例
   2. abstractmethod相當於子類要實現的介面,如果不實現,則不能新建該類的實例"""
   __metaclass__ = abc.ABCMeta

   @abc.abstractmethod
   def start(self):
       pass

   @abc.abstractmethod
   def stop(self):
       pass

   @abc.abstractmethod
   def dispatch(self):
       raise NotImplementedError()

   @abc.abstractmethod
   def eof(self):
       raise NotImplementedError()

class Dispatcher(object):
   """調度類
   1. 維護事件循環
   2. 不斷的調度subject的disptch操作並判斷是否結束"""
   def __init__(self):
       self.__subjects = []
       self.__stop = False

   def run(self):
       """運行整個事件循環並在調度之前,之後分別調用subject的start, stop方法"""
       try:
           for subject in self.__subjects:
               subject.start()

           while not self.__stop:
               eof, dispatched = self.dispatch()
               if eof:
                   self.__stop = True
       finally:
           for subject in self.__subjects:
               subject.stop()

   def dispatch(self):
       ret = False
       eof = False
       for subject in self.__subjects:
           ret = subject.dispatch() is True
           eof = subject.eof()

       return eof, ret

   def addSubject(self, subject):
       self.__subjects.append(subject)

class Broker(Subject):
   """Broker 類"""
   def dispatch(self):
       return None

   def eof(self):
       return None

   def start(self):
       pass

   def stop(self):
       pass

class Feed(Subject):
   """Feed類
   1. 承載數據源
   2. 通過數據驅動事件循環"""
   def __init__(self, size):
       self.__data = range(size)
       self.__nextPos = 0
       self.__event = Event()

   def start(self):
       pass

   def stop(self):
       pass

   def dispatch(self):
       value = self.__data[self.__nextPos]
       self.__event.emit(value)
       self.__nextPos += 1
       return True

   def getNewValueEvent(self):
       return self.__event

   def eof(self):
       return self.__nextPos >= len(self.__data)

class Strategy(object):
   def __init__(self, broker, feed):
       self.__dispatcher = Dispatcher()
       self.__feed = feed
       self.__broker = broker
       # 將策略的self.__onBars方法傳入Feed的self.__event裡面
       # 當Feed調用dispatch方法的時候, 會指定self.__onBars函數
       self.__feed.getNewValueEvent().subscribe(self.__onBars)
       # 注意順序,Feed對象必須在最後
       self.__dispatcher.addSubject(self.__broker)
       self.__dispatcher.addSubject(self.__feed)

   def __onBars(self, value):
       print("dispatch before.")
       self.onBars(value)
       print("dispatch after")

   def onBars(self, value):
       print("on Bar: {}".format(value))

   def run(self):
       self.__dispatcher.run()

if __name__ == "__main__":
   feed = Feed(3)
   broker = Broker()
   myStrategy = Strategy(broker, feed)
   myStrategy.run()

output:
dispatch before.
on Bar: 0
dispatch after
dispatch before.
on Bar: 1
dispatch after
dispatch before.
on Bar: 2
dispatch after

上面的代碼主要說明策略的onBars方法是怎麼被調用的。



關於Broker怎麼被驅動,在後面講解




  1. 策略中維護一個調度器dispatcher,當策略啟動的時候, 調度器dipatcher啟動, 並嘗試調用feed,broker start方法.



  2. 不斷調用feed, broker的dispatch方法, 判斷是否結束, 如果結束, 則做結束動作, 調用feed, broker的stop方法




    1. feed對象在調用dispatch方法的時候, feed對象會觸發自身維護的self._event.  而self.event在MyStrategy._init方法中,通過self._feed.getNewValueEvent().subscribe(self._onBars)訂閱了MyStrategy._onBars方法,  所以Feed對象每次dispatch的時候,MyStrategy._onBars都會被調用.


至此, Feed對象怎麼驅動策略的邏輯已經清晰。
接下來,講解BaseStrategy, BacktestingStrategy初始化過程


策略初始化


策略的繼承鏈並不複雜, 所有策略的基類是BaseStartegy, BacktestingStrategy是提供給用戶使用的策略,至少實現onBars函數則可以回測。

BaseStrategy, BacktestingStrategy的初始化源代碼如下

pyalgotrade/pyalgotrade/strategy/__init__.py

class BaseStartegy(object):
   def __init__(self, barFeed, broker):
       # 綁定barFeed對象
       self.__barFeed = barFeed
       # 綁定broker對象
       self.__broker = broker
       # 交易相關的倉位
       self.__activePositions = set()
       # 訂單處理順序
       self.__orderToPosition = {}
       # bar被處理後的事件
       self.__barsProcessedEvent = observer.Event()
       # analyzer列表
       self.__analyzers = []
       # 命名的analyzer列表
       self.__namedAnalyzers = {}
       # 重新取樣的feed對象列表
       self.__resampledBarFeeds = []
       # 調度器對象
       self.__dispatcher = dispatcher.Dispatcher()
       # broker的訂單被更新時的事件, 訂閱self.__onOrderEvent方法
       self.__broker.getOrderUpdatedEvent().subscribe(self.__onOrderEvent)
       # barfeed值被更新的時候的事件(當barfeed被調度的時候),訂閱self.__onBars方法
       self.__barFeed.getNewValuesEvent().subscribe(self.__onBars)

       # 調度器的開始事件,訂閱self.onStart方法
       self.__dispatcher.getStartEvent().subscribe(self.onStart)
       # 調度器的空閑事件, 訂閱self.__onIdle方法
       self.__dispatcher.getIdleEvent().subscribe(self.__onIdle)

       # 分別將繼承了Subject類的broker,barFeed對象加入到調度器的subject列表
       self.__dispatcher.addSubject(self.__broker)
       self.__dispatcher.addSubject(self.__barFeed)

       # 日誌級別的初始化
       self.__logger = logger.getLogger(BaseStrategy.LOGGER_NAME)

class BacktestingStrategy(BaseStrategy):
   # 默認初始化一個持有100w現金的虛擬賬戶
   def __init__(self, barFeed, cash_or_brk=1000000):

       # 如果沒有傳入cash_or_brk參數, 或者傳入數值類型的值
       # 則傳入cash_or_brk,barFeed對象新建一個backtesting.Broker實例,並調用父類的__init__方法
       # 如果傳入的cash_or_brk參數值是backtesting.Broker的實例, 則直接使用
       if isinstance(cash_or_brk, pyalgotrade.broker.Broker):
           broker = cash_or_brk
       else:
           broker = backtesting.Broker(cash_or_brk, barFeed)

       BaseStrategy.__init__(self, barFeed, broker)
       # 默認self.__useAdjustedValue=False
       self.__useAdjustedValues = False
       # 配置日誌參數
       self.setUseEventDateTimeInLogs(True)
       self.setDebugMode(True)

總的來說真正Strategy對象,barFeed對象,broker對象訂閱了更多的事件,  以及更多的判斷。但,內核都是調度器驅動著barFeed, broker對象不斷的被調度(調用dispatch方法),  而barFeed對象會不斷的從self._bars中取數據追加到self._ds對象中,並將取出來的數據提交的self._event中,而self._event訂閱了Strategy.__onBars方法,  所以不斷的驅動著Strategy的自定義策略(onBars裡面定義的交易邏輯).


交易賬戶 Broker對象


在Strategy對象初始化時候, 會初始化一個虛擬的回測賬戶.


回測賬戶broker需要傳入barfeed對象, 並在barfeed的event對象裡面訂閱自己的onBars函數,源碼如下:

pyalgotrade/pyalgotrade/broker/__init__.py

class Broker(broker.Broker):
   LOGGER_NAME = "broker.backtesting"

   def __init__(self, cash, barFeed, commission=None):
       super(Broker, self).__init__()

       assert(cash >= 0)
       self.__cash = cash
       if commission is None:
           self.__commission = NoCommission()
       else:
           self.__commission = commission
       self.__shares = {}
       self.__activeOrders = {}
       self.__useAdjustedValues = False
       # 持倉策略, 使用DefaultStrategy
       # 使用DefaultStrategy.volumeLimit = 0.25
       # 當交易訂單的成交量大於當前bar的成交量的25%則不能成交
       # 沒有滑點
       # 沒有手續費
       self.__fillStrategy = fillstrategy.DefaultStrategy()
       self.__logger = logger.getLogger(Broker.LOGGER_NAME)

       # 讓barfeed對象訂閱self.onBars方法
       barFeed.getNewValuesEvent().subscribe(self.onBars)
       self.__barFeed = barFeed
       self.__allowNegativeCash = False
       self.__nextOrderId = 1

由上可知,當barFeed對象數據更新的時候,還會調用BackTestBroker.onBars方法.


交易倉位 Position對象


當使用enterLong之類交易方法,則會返回一個Postion的對象,這個對象承載著當前各股的持倉比例,以及持有現金.


以enterLong方法說明持倉流程.




  1. 實例化一個LongPosition對象



  2. 調用broker的createMarketOrder方法創建一個MarketOrder.



  3. 註冊order, 以便barFeed對象數據驅動的時候,使用該order


以exitMarket方法說明平倉流程.




  1. 使用Position對象的exitMarket方法提交平倉訂單.



  2. 註冊order, 以便barFeed對象數據驅動的時候,使用該order



源代碼調用鏈太長….所以文字概括.


交易訂單 Order對象


當我們買入或者賣出的時候,其實是提交一個訂單給交易賬戶(broker), 交易賬戶會根據交易訂單的類型,動作等相關信息執行相關的操作.



交易訂單的類型參考:  https://www.thebalance.com/understanding-stock-orders-3141318


一般有買入(做多), 賣出(做空)兩種交易類型, 但是這兩種類型成交的方式分別由市價成交, 限價成交.


所以一共由以下四種類型,對應Strategy的四個方法:




  1. enterLong 以市價(下一個Bar的

    開盤價

    )買入



  2. enterLongLimit 當市價(下一個Bar的

    開盤價

    )低於或等於指定的價格時買入



  3. enterShort 與enterLong相反



  4. enterShortLimit 與enterLongShort相反.



以enter開頭是更加上層的方法, 建議使用.


goodTillCanceled為了適配實盤介面, 實盤介面可能有前一天的訂單不會再執行的限制,所以設置goodTillCanceled=True保證第二天或者更後的時間,訂單依然有效,直至手動取消.


除了提交交易訂單還可以提交止損訂單, 分別對應Strategy的兩個方法.




  1. StopOrder 提交一個止損訂單, 傳入止損價格, 當價格突破止損價位, 以市價成交進行止損.



  2. StopLimitOrder 提交一個止損訂單, 傳入止損價格, 當價格突破止損價位, 並且價格在限定的價格區間才會止損.



每個提交的訂單會到下一個事件循環才會判斷條件是否符合,才會執行.


技術指標 EventBasedFilter對象


通過藉助自定義指標或者自帶的指標,如SMA,EMA,MACD等可以更全面的看待股票的走勢以及信號.


下面是技術指標基類的初始化過程.

pyalgotrade/pyalgotrade/technical/__init__.py

class EventWindow(object):
   """數據實際承載類
   數據保存在self__values裡面
   """
   def __init__(self, windowSize, dtype=float, skipNone=True):
       assert(windowSize > 0)
       assert(isinstance(windowSize, int))
       self.__values = collections.NumPyDeque(windowSize, dtype)
       self.__windowSize = windowSize
       self.__skipNone = skipNone

   def onNewValue(self, dateTime, value):
       """提供onNewValue方法將新的值傳入"""
       if value is not None or not self.__skipNone:
           self.__values.append(value)

   def getValues(self):
       """獲取EventWindows的所有值"""
       return self.__values.data()

   def getWindowSize(self):
       """獲取EventWindow Size"""
       return self.__windowSize

   def windowFull(self):
       """eventWindow是否已經填滿"""
       return len(self.__values) == self.__windowSize

   def getValue(self):
       """子類須實現的類"""
       raise NotImplementedError()

class EventBasedFilter(dataseries.SequenceDataSeries):
   def __init__(self, dataSeries, eventWindow, maxLen=None):
       super(EventBasedFilter, self).__init__(maxLen)
       self.__dataSeries = dataSeries
       # 當dataseries數據有新值的時候,調用self.__onNewValues方法
       self.__dataSeries.getNewValueEvent().subscribe(self.__onNewValue)
       self.__eventWindow = eventWindow

   def __onNewValue(self, dataSeries, dateTime, value):
       # 讓EventWindow對象計算新值
       self.__eventWindow.onNewValue(dateTime, value)
       # 獲取計算後的結果
       newValue = self.__eventWindow.getValue()
       # 將值保存到自身實例裡面, 即self.__values
       # 因為繼承了dataseries.SequenceDataSeries類
       # 而dataseries.SequenceDataSeries父類實現了__getitem__方法, 所以可以使用索引取值.
       self.appendWithDateTime(dateTime, newValue)

   def getDataSeries(self):
       return self.__dataSeries

   def getEventWindow(self):
       return self.__eventWindow

在Feed對象初始過程中,會初始化兩個比較重要的數據結構,  一個是self._bars, 一個是self._ds,在整個事件驅動中,  策略不停的從self_bars中取數據,然後使用appendWithDateTime方法將數據追加的self._ds裡面。
源碼如下:

pyalgotrade/pyalgotrade/dataseries/bards.py

# 首先調用BarDataSeries的appendWithDateTime方法
class BarDataSeries(dataseries.SequenceDataSeries):
   def appendWithDateTime(self, dateTime, bar):
       assert(dateTime is not None)
       assert(bar is not None)
       bar.setUseAdjustedValue(self.__useAdjustedValues)

       super(BarDataSeries, self).appendWithDateTime(dateTime, bar)

       self.__openDS.appendWithDateTime(dateTime, bar.getOpen())
       self.__closeDS.appendWithDateTime(dateTime, bar.getClose())
       self.__highDS.appendWithDateTime(dateTime, bar.getHigh())
       self.__lowDS.appendWithDateTime(dateTime, bar.getLow())
       self.__volumeDS.appendWithDateTime(dateTime, bar.getVolume())
       self.__adjCloseDS.appendWithDateTime(dateTime, bar.getAdjClose())

       # Process extra columns.
       for name, value in bar.getExtraColumns().iteritems():
           extraDS = self.__getOrCreateExtraDS(name)
           extraDS.appendWithDateTime(dateTime, value)

pyalgotrade/dataseries/__init__.py

# 然後調用SequenceDataSeries對象的appendWithDateTime
# 在這個方法中提交數據更新的事件
class SequenceDataSeries(DataSeries):
   def appendWithDateTime(self, dateTime, value):
       """
       Appends a value with an associated datetime.

       .. note::
           If dateTime is not None, it must be greater than the last one.
       """

       if dateTime is not None and len(self.__dateTimes) != 0 and self.__dateTimes[-1] >= dateTime:
           raise Exception("Invalid datetime. It must be bigger than that last one")

       assert(len(self.__values) == len(self.__dateTimes))
       self.__dateTimes.append(dateTime)
       self.__values.append(value)

       self.getNewValueEvent().emit(self, dateTime, value)

小結


使用技術指標需要傳入dataSeries對象, 可以通過getPriceDataSeries, getOpenDataSeries等獲得.


創建策略


由於上面已經有完整版本的代碼,這裡做一定的刪減, 並做註解.

# 集成strategy.BacktestingStrategy類
class MyStrategy(strategy.BacktestingStrategy):
   def __init__(self, feed, instrument, smaPeriod):
       # 調用父類__init__方法
       super(MyStrategy, self).__init__(feed, 1000)
       # 初始情況下,postion設置為零, postion一般只持倉比例
       self.__position = None
       # 股票代碼
       self.__instrument = instrument
       # We"ll use adjusted close values instead of regular close values.
       # 是否使用復權收盤價
       self.setUseAdjustedValues(True)
       # 初始化策略指標
       self.__sma = ma.SMA(feed[instrument].getPriceDataSeries(), smaPeriod)

   # 省略其他鉤子函數

   # 必須實現的onBars函數,用於買賣的主要邏輯
   def onBars(self, bars):
       # 如果沒有簡單移動平均值則什麼都不做
       if self.__sma[-1] is None:
           return

       # 取出指定股票代碼的bar對象
       bar = bars[self.__instrument]

       # 如果postion is None,即持倉為0
       if self.__position is None:
           # 如果收盤價大於簡單移動平均值則買入
           if bar.getPrice() > self.__sma[-1]:
               # 買入,enterLong=做多
               self.__position = self.enterLong(self.__instrument, 10, True)
       # 反之賣出
       elif bar.getPrice() < self.__sma[-1] and not self.__position.exitActive():
           self.__position.exitMarket()

總結


BarFeed像是PyalgoTrade中的燃料,不斷的供給給策略的Dispatcher調度器, 使整個策略不斷運行,直至沒有燃料(沒有新的數據.)


BarFeed使數據源的一個抽象,裡面保存著兩個重要的數據結構, self._bars, self._ds.


self.__bars是key為股票代碼, value是元素為bar數據對象的列表的字典.


self.__ds為BarDataSeries對象.


Broker維護著虛擬賬戶裡面的現金以及相關股票的倉位.接收訂單並實時的處理訂單, 計算收益等.


Position為股票倉位持有情況的對象, 提供交易的相關介面.


EventBasedFilter為技術指標, 可以計算相關指標如MACD, SMA等, 也可以自定義自己的技術指標.


Strategy為自定義策略,只需實現onBars函數即可完成買賣邏輯, 將Broker,Position相關介面放在Strategy實例方法裡面, 同一調用介面.


【點擊成為Java大神】

喜歡這篇文章嗎?立刻分享出去讓更多人知道吧!

本站內容充實豐富,博大精深,小編精選每日熱門資訊,隨時更新,點擊「搶先收到最新資訊」瀏覽吧!


請您繼續閱讀更多來自 Python開發 的精彩文章:

神秘技術, 防好友撤回QQ消息神器, Python碼農神發明!
為什麼要在2018年學習Python?

TAG:Python開發 |