历史数据处理

WonderTrader数据存储方式


  环境准备好了以后,还需要确定我们准备使用什么方式存储数据。WonderTrader实盘环境下只支持:自有文件存储。而回测环境下,还支持直接从csv读取数据(仅限于历史K线数据)。

文件存储

  历史K线数据文件,采用zstd压缩后存放。高频历史数据,包括tick数据,股票level2的委托明细、成交明细、委托队列,也采用压缩存放的方式。A股全市场一天的level2数据,压缩以后也就是大概2G不到的样子,对于硬盘来说是相当友好的。   实时数据文件,因为需要实时读写,所以不压缩数据结构,并采用mmap的方式映射到内存中,直接对文件进行读写,提高读写效率。

csv历史数据

  很多用户通过各种渠道获取到的历史数据,供应商为了便于用户直接查看数据,一般都会提供csv格式的。但是csv文件格式的数据,占用空间非常大,而且直接从csv文件读取数据的开销也是非常大的。   WonderTrader的回测框架为了尽量减少这种不必要的开销,在处理csv文件时,第一次会直接从csv文件读取,将读取的数据转成WonderTrader内部数据结构之后,会将数据转储为WonderTrader自有的压缩存放格式。这样下次在使用该数据的时候,读取压缩存放的数据以后,直接解压就可以得到结构化的历史数据,这样就可以直接进行访问了。

datahelper模块


  datahelper模块位于wtpy.apps子模块下,采用工厂模式进行封装,最大限度的降低了使用难度,将各种API的差异全部封装起来,用户在使用的时候只需要调用工厂创建即可,而不用担心因为每个数据源API不同而导致的各种问题。

数据源

datahelper模块目前已封装的数据源包括tusharebaostockRQData

  • tushare是知名度较高的免费数据源,数据比较全,使用的人也很多。但是tushare有些数据需要积分才能下载,下载速度也较慢

  • baostock是一个免费、开源的证券数据平台,无需注册,并且下载速度也非常快,可以拿到5分钟线数据

  • RQData是米筐开发的一个基于Python的金融数据工具包,是一个收费数据源,数据很全,数据质量也很高。对于一些有1分钟线甚至更高频数据的需求,免费的数据源就无法提供了,RQData可能也是一个不错的选择。

数据接口

datahelper模块主要帮助用户进行历史数据的下载,以及一些基础数据的获取。主要包括3种接口:

  • 获取代码列表

  • 获取除权因子

  • 获取历史K线

  对于财务数据,WonderTrader暂时没有从平台层面做标准化的工作。一方面财务数据相对静态,可以相对容易的从不同的渠道拿到。另一方面只有股票才需要财务数据,而现在最流行的选股框架还是多因子框架。相对WonderTrader而言,多因子框架几乎是另一个维度的,所以WonderTrader暂时就不涉及财务数据这块了。

数据辅助模块各个接口的详细定义如下:

class BaseDataHelper:

    def __init__(self):
        self.isAuthed = False
        pass

    def __check__(self):
        if not self.isAuthed:
            raise Exception("This module has not authorized yet!")

    def auth(self, **kwargs):
        '''
        模块认证
        '''
        pass

    def dmpCodeListToFile(self, filename:str, hasIndex:bool=True, hasStock:bool=True):
        '''
        将代码列表导出到文件\n
        @filename   要输出的文件名,json格式\n
        @hasIndex   是否包含指数\n
        @hasStock   是否包含股票\n
        '''
        pass

    def dmpAdjFactorsToFile(self, codes:list, filename:str):
        '''
        将除权因子导出到文件\n
        @codes  股票列表,格式如["SSE.600000","SZSE.000001"]\n
        @filename   要输出的文件名,json格式
        '''
        pass

    def dmpBarsToFile(self, folder:str, codes:list, start_date:datetime=None, end_date:datetime=None, period:str="day"):
        '''
        将K线导出到指定的目录下的csv文件,文件名格式如SSE.600000_d.csv\n
        @folder 要输出的文件夹\n
        @codes  股票列表,格式如["SSE.600000","SZSE.000001"]\n
        @start_date 开始日期,datetime类型,传None则自动设置为1990-01-01\n
        @end_date   结束日期,datetime类型,传None则自动设置为当前日期\n
        @period K线周期,支持day、min1、min5\n
        '''
        pass

    def dmpAdjFactorsToDB(self, dbHelper:DBHelper, codes:list):
        '''
        将除权因子导出到数据库\n
        @codes  股票列表,格式如["SSE.600000","SZSE.000001"]\n
        @dbHelper   数据库辅助模块
        '''
        pass

    def dmpBarsToDB(self, dbHelper:DBHelper, codes:list, start_date:datetime=None, end_date:datetime=None, period:str="day"):
        '''
        将K线导出到数据库\n
        @dbHelper 数据库辅助模块\n
        @codes  股票列表,格式如["SSE.600000","SZSE.000001"]\n
        @start_date 开始日期,datetime类型,传None则自动设置为1990-01-01\n
        @end_date   结束日期,datetime类型,传None则自动设置为当前日期\n
        @period K线周期,支持day、min1、min5\n
        '''
        pass

数据下载


本文将以tushare数据源为例,演示一下数据辅助模块的基本用法。

模块初始化

  • 首先,创建tushare对应的数据辅助模块:

    from **wtpy**.apps.datahelper import DHFactory as DHF
    
    hlper = DHF.createHelper("tushare")
    
  • 创建好了以后,对tushare进行认证:

    hlper.auth(**{"token":"your token of tushare","use_pro":True})
    

      值得一提的是,上面的代码中没有一个参数use_pro,该参数不是tushare认证需要的,而是用于控制tushare调用的接口的,如果use_proTrue,那么就调用tusharepro_bar接口读取历史K线数据,否则就调用老版本的接口get_k_data读取历史K线数据。之所以这样,是因为pro_bar接口获取分钟数据的时候需要积分的,但是老的接口是不需要积分的。

下载数据到文件中

  • 然后调用不同的接口获取数据,下面的代码演示了将数据下载到指定的文件中:

    # 将代码列表下载到文件中
    hlper.dmpCodeListToFile(filename = 'codes.json', hasStock = True, hasIndex = True)
    
    # 将除权因子下载到文件中
    hlper.dmpAdjFactorsToFile(codes=['SSE.600000','SZSE.000001'], filename="./adjfactors.json")
    
    # 将K线下载到指定目录
    hlper.dmpBarsToFile("./", codes = ['SSE.600000','SZSE.000001'], period="day")
    

代码列表下载截图

代码列表下载截图

代码列表文件示意

{
    "SSE": {
        "000001": {
            "code": "000001",
            "exchg": "SSE",
            "name": "上证指数",
            "product": "IDX"
        },
        "600000": {
            "code": "600000",
            "exchg": "SSE",
            "name": "浦发银行",
            "product": "STK"
        }
    },
    "SZSE": {
        "000001": {
            "code": "000001",
            "exchg": "SZSE",
            "name": "平安银行",
            "product": "STK"
        }
        "399001": {
            "code": "399001",
            "exchg": "SZSE",
            "name": "深证成指",
            "product": "IDX"
        }
    }
}

除权因子下载截图

除权因子下载截图

除权因子文件示意

{
    "SSE": {
        "600000": [
            {
                "date": "20160623",
                "factor": 9.267
            },
            {
                "date": "20170525",
                "factor": 12.201
            },
            {
                "date": "20180713",
                "factor": 12.33
            },
            {
                "date": "20190611",
                "factor": 12.713
            },
            {
                "date": "20200723",
                "factor": 13.405
            }
        ]
    },
    "SZSE": {
        "000001": [
            {
                "date": "20160616",
                "factor": 104.758
            },
            {
                "date": "20170721",
                "factor": 106.309
            },
            {
                "date": "20180712",
                "factor": 108.031
            },
            {
                "date": "20190626",
                "factor": 109.169
            },
            {
                "date": "20200528",
                "factor": 111.048
            }
        ]
    }
}

K线数据下载截图
K线数据下载截图

K线数据示意

date,time,open,high,low,close,volume,turnover
20000112,0,26.0,26.0,24.8,25.12,35274900.0,88986734.0
20000113,0,25.0,25.25,24.8,24.9,7975600.0,19924430.4
20000114,0,24.88,25.0,23.91,24.2,17861900.0,43420608.7
20000117,0,24.08,24.44,23.75,24.4,8101500.0,19477696.400000002
20000118,0,24.5,24.57,23.88,24.14,7693300.0,18509168.400000002
20000119,0,24.14,24.29,23.98,24.13,4658400.0,11232523.799999999
20000120,0,24.12,24.65,24.1,24.44,5114400.0,12466817.600000001
20000121,0,24.51,24.63,24.08,24.27,8138700.0,19752321.5
20000124,0,24.2,24.25,23.95,24.18,9250300.0,22240443.5
......
20210210,0,10.67,10.85,10.56,10.69,105092240.99999999,112396135.6
20210218,0,10.8,11.02,10.74,10.83,143397923.0,155850481.1
20210219,0,10.83,11.12,10.77,10.97,122926300.0,135129039.1
20210222,0,10.92,10.95,10.7,10.71,127379413.99999999,137353338.2
20210223,0,10.71,10.99,10.71,10.78,93327786.0,101090131.6
20210224,0,10.81,10.89,10.55,10.6,97783999.0,104747027.89999999
20210225,0,10.66,10.85,10.6,10.8,89855495.0,96473275.5
20210226,0,10.72,10.82,10.54,10.54,85386093.0,90929810.5

数据的后续处理


  上面演示了datahelper模块的用法,该模块能够帮助用户快速拉取WonderTrader可以直接使用的历史数据,可以有效的降低用户初次使用WonderTrader进行策略回测的门槛。

  不过在实盘的过程中,还有很多实施的细节,本文也做一个大概的梳理。

标的代码的规则

  • 期货合约代码,标准格式为CFFEX.IF.2103,其中郑商所的合约,月份也要扩展为4位

  • 期货主力合约,标准格式为CFFEX.IF.HOTWonderTrader会根据一个主力合约规则文件自动映射到分月合约

  • 证券代码,股票的标准格式为SSE.STK.600000,指数的标准格式为SZSE.IDX.399001

品种合约更新

WonderTrader需要定期维护品种和合约信息,即commodities.jsoncontracts.json文件,来更新至最新期货品种。 对此,WonderTrader提供了ctp_loader,从simnow拉取品种合约信息。 注意:如果出现新的品种,需要自己在map_future.ini中更新相对应的键值,否则会导致更新的基础文件出现对应问题,造成运行引擎WtEngine初始化时报错!(同时需要wtpy==0.9.8)

文件配置

run.py
config.ini
map_future.ini  # 期货品种映射文件,期权对应map_futopt.ini

config.ini

[ctp]
front=tcp://180.168.146.187:10201
broker=9999
user=simnow账号
pass=simnow密码
appid=simnow_client_test
authcode=0000000000000000

[config]
path=../common/
mask=1  # 1|2|4,1-fut期货,2-opt期权,4-stk股票
mapfiles=map_future.ini,map_futopt.ini

map_future.ini

[Name]  # 代码与名称的映射
IC=中证
IF=沪深
c=玉米
a=豆一
ag=沪银

[Session]  # 品种与开收盘时间段的映射,对应common中的session.json文件
CFFEX.IC=SD0930
CFFEX.IF=SD0930
DCE.c=FN2300
SHFE.ag=FN0230

主力合约维护

主力合约映射的规则,需要每天维护,即hots.json文件,WonderTrader会根据规则自动处理映射,用户只需要使用.HOT代码就可以了。

对于主力合约规则文件的更新,WonderTrader提供了hotpicker工具,有两种方式更新:

  1. 根据datakit的落地行情更新

  2. 从交易所官网爬取(不稳定)

主力合约规则文件如下:

{
    "CFFEX": {
        "IC": [
            {
                "date": 20191018,
                "from": "IC1910",
                "newclose": 4923.6,
                "oldclose": 5028.2,
                "to": "IC1912"
            },
            {
                "date": 20191219,
                "from": "IC1912",
                "newclose": 5208.6,
                "oldclose": 5244.4,
                "to": "IC2003"
            },
            {
                "date": 20200320,
                "from": "IC2003",
                "newclose": 5099.6,
                "oldclose": 5147.4,
                "to": "IC2004"
            }
        ]
    }
}

  WonderTrader在读取主力合约的历史数据时,会优先读取直接对应的历史数据文件。如存储模式为文件时会先读取名为CFFEX.IF_HOT.dsb的文件,然后再根据主力合约规则读取分月合约的数据进行拼接。而如果存储模式为数据库,则会优先读取代码为xx.HOT的数据,然后再根据主力合约规则读取分月合约的数据。

datakit实时数据录制

工作逻辑

  1. 实时录制:datakit负责在实盘中录制实时行情数据,存储在指定的数据目录中,同时通知策略进行接收

  2. 收盘作业:在每个交易日结束以后,会对实时行情数据做一个盘后处理,默认是在每天的16点

    • 将实时高频数据按天按代码压缩存放(ticklevel2高频数据)

    • 将当日的K线数据(min1min5)合并到历史K线数据中

    • 根据当天最新的tick数据,生成当天的日K线数据并合并到历史日K线数据中

正是因为有收盘作业这么一个机制,所以WonderTrader目前还不能很好的适应7×24小时交易的品种,如数字货币。所以WonderTrader对于数据货币的支持的最大的问题,还是7×24小时交易机制的数据处理问题。

demo:datakit_fut

文件配置

runDT.py
dtcfg.yaml  # 环境配置
mdparsers.yaml  # 行情通道配置,被dtcfg.yaml引用
statemonitor.yaml  # 监控配置,被dtcfg.yaml引用
logcfdgt.yaml  # 日志配置
DtLogs/  #(运行后生成)日志
CTPMDFlow/  #(运行后生成)
指定的数据落地目录/  #(运行后生成)
    rt/  # 实时数据
        ticks/
            {交易所}/  # 如 CFFEX/
                /{合约代码}.dmb  # 如 IF2312.dmb
        min1/
            {交易所}/  # 如 CFFEX/
                /{合约代码}.dmb  # 如 IF2312.dmb
        min5/
            {交易所}/  # 如 CFFEX/
                /{合约代码}.dmb  # 如 IF2312.dmb
    his/  # 历史数据(收盘作业生成)
		    ticks/
			      {交易所}/  # 如 CFFEX/
				        {交易日}/  # 如 20230731/
					          {合约代码}.dsb  # 如 IF2312.dsb
		    min1/
			      {交易所}/  # 如 CFFEX/
					      {合约代码}.dsb  # 如 IF2312.dsb
		    min5/
			      {交易所}/  # 如 CFFEX/
					      {合约代码}.dsb  # 如 IF2312.dsb
		    day/
			      {交易所}/  # 如 CFFEX/
					      {合约代码}.dsb  # 如 IF2312.dsb
    cache.dmb  # 临时缓存

dtcfg.yaml

basefiles:  # 基础文件
    commodity: ../common/commodities.json
    contract: ../common/contracts.json
    holiday: ../common/holidays.json
    session: ../common/sessions.json
    utf-8: true
    
writer:  # 数据落地配置
    module: WtDataStorage #数据存储模块
    async: false          #同步落地还是异步落地,期货推荐同步,股票推荐异步
    groupsize: 20         #日志分组大小,主要用于控制日志输出,当订阅合约较多时,推荐1000以上,当订阅的合约数较少时,推荐100以内
    path: ../FUT_Data     #数据存储的路径
    savelog: false        #是否保存tick到csv
    disabletick: false    #不保存tick数据,默认false
    disablemin1: false    #不保存min1数据,默认false
    disablemin5: false    #不保存min5数据,默认false
    disableday: false     #不保存day数据,默认false
    disablehis: false     #收盘作业不转储历史数据,默认false

broadcaster:  # UDP广播器配置项
    active: true
    bport: 3997                 # UDP查询端口,主要是用于查询最新的快照
    broadcast:                  # 广播配置
    -   host: 255.255.255.255   # 广播地址,255.255.255.255会向整个局域网广播,但是受限于路由器
        port: 9001              # 广播端口,接收端口要和广播端口一致
        type: 2                 # 数据类型,固定为2

parsers: mdparsers.yaml
statemonitor: statemonitor.yaml  # 监控配置,设置监控时段(开收盘)

mdparsers.yaml

parsers:  # 行情通道配置
-   active: true
    broker: '9999'
    code: ''  # 要录制的合约代码,如果为空默认contracts.json中的全部,不为空则只录制指定的合约,注意这里须与contracts中的代码一致!如'CFFEX.IF2408, CFFEX.IF2403'
    front: tcp://180.168.146.187:10211
    id: parser
    module: ParserCTP
    user: 账号
    pass: 密码

statemonitor.yaml(一般无需改动)

FD0900:
    closetime: 1515  # 关闭时间
    inittime: 850  # 初始化时间
    name: 期白0900
    proctime: 1600  # 收盘作业时间
FD0915:
    closetime: 1530
    inittime: 900
    name: 期白0915
    proctime: 1600
FN0100:
    closetime: 1515
    inittime: 2050
    name: 期夜0100
    proctime: 1600
FN0230:
    closetime: 1515
    inittime: 2050
    name: 期夜0230
    proctime: 1600
FN2300:
    closetime: 1515
    inittime: 2050
    name: 期夜2300
    proctime: 1600
FN2330:
    closetime: 1515
    inittime: 2050
    name: 期夜2330
    proctime: 1600
SD0930:
    closetime: 1515
    inittime: 915
    name: 股白0930
    proctime: 1600