历史数据处理
WonderTrader数据存储方式
环境准备好了以后,还需要确定我们准备使用什么方式存储数据。WonderTrader实盘环境下只支持:自有文件存储。而回测环境下,还支持直接从csv读取数据(仅限于历史K线数据)。
文件存储
历史K线数据文件,采用zstd压缩后存放。高频历史数据,包括tick数据,股票level2的委托明细、成交明细、委托队列,也采用压缩存放的方式。A股全市场一天的level2数据,压缩以后也就是大概2G不到的样子,对于硬盘来说是相当友好的。
实时数据文件,因为需要实时读写,所以不压缩数据结构,并采用mmap的方式映射到内存中,直接对文件进行读写,提高读写效率。
csv历史数据
很多用户通过各种渠道获取到的历史数据,供应商为了便于用户直接查看数据,一般都会提供csv格式的。但是csv文件格式的数据,占用空间非常大,而且直接从csv文件读取数据的开销也是非常大的。
WonderTrader的回测框架为了尽量减少这种不必要的开销,在处理csv文件时,第一次会直接从csv文件读取,将读取的数据转成WonderTrader内部数据结构之后,会将数据转储为WonderTrader自有的压缩存放格式。这样下次在使用该数据的时候,读取压缩存放的数据以后,直接解压就可以得到结构化的历史数据,这样就可以直接进行访问了。
datahelper模块
datahelper模块位于wtpy.apps子模块下,采用工厂模式进行封装,最大限度的降低了使用难度,将各种API的差异全部封装起来,用户在使用的时候只需要调用工厂创建即可,而不用担心因为每个数据源API不同而导致的各种问题。
数据源
datahelper模块目前已封装的数据源包括tushare、baostock、RQData。
tushare是知名度较高的免费数据源,数据比较全,使用的人也很多。但是tushare有些数据需要积分才能下载,下载速度也较慢baostock是一个免费、开源的证券数据平台,无需注册,并且下载速度也非常快,可以拿到5分钟线数据RQData是米筐开发的一个基于Python的金融数据工具包,是一个收费数据源,数据很全,数据质量也很高。对于一些有1分钟线甚至更高频数据的需求,免费的数据源就无法提供了,RQData可能也是一个不错的选择。
数据接口
datahelper模块主要帮助用户进行历史数据的下载,以及一些基础数据的获取。主要包括3种接口:
获取代码列表
获取除权因子
获取历史K线
对于财务数据,WonderTrader暂时没有从平台层面做标准化的工作。一方面财务数据相对静态,可以相对容易的从不同的渠道拿到。另一方面只有股票才需要财务数据,而现在最流行的选股框架还是多因子框架。相对WonderTrader而言,多因子框架几乎是另一个维度的,所以WonderTrader暂时就不涉及财务数据这块了。
数据辅助模块各个接口的详细定义如下:
class BaseDataHelper:
def __init__(self):
self.isAuthed = False
pass
def __check__(self):
if not self.isAuthed:
raise Exception("This module has not authorized yet!")
def auth(self, **kwargs):
'''
模块认证
'''
pass
def dmpCodeListToFile(self, filename:str, hasIndex:bool=True, hasStock:bool=True):
'''
将代码列表导出到文件\n
@filename 要输出的文件名,json格式\n
@hasIndex 是否包含指数\n
@hasStock 是否包含股票\n
'''
pass
def dmpAdjFactorsToFile(self, codes:list, filename:str):
'''
将除权因子导出到文件\n
@codes 股票列表,格式如["SSE.600000","SZSE.000001"]\n
@filename 要输出的文件名,json格式
'''
pass
def dmpBarsToFile(self, folder:str, codes:list, start_date:datetime=None, end_date:datetime=None, period:str="day"):
'''
将K线导出到指定的目录下的csv文件,文件名格式如SSE.600000_d.csv\n
@folder 要输出的文件夹\n
@codes 股票列表,格式如["SSE.600000","SZSE.000001"]\n
@start_date 开始日期,datetime类型,传None则自动设置为1990-01-01\n
@end_date 结束日期,datetime类型,传None则自动设置为当前日期\n
@period K线周期,支持day、min1、min5\n
'''
pass
def dmpAdjFactorsToDB(self, dbHelper:DBHelper, codes:list):
'''
将除权因子导出到数据库\n
@codes 股票列表,格式如["SSE.600000","SZSE.000001"]\n
@dbHelper 数据库辅助模块
'''
pass
def dmpBarsToDB(self, dbHelper:DBHelper, codes:list, start_date:datetime=None, end_date:datetime=None, period:str="day"):
'''
将K线导出到数据库\n
@dbHelper 数据库辅助模块\n
@codes 股票列表,格式如["SSE.600000","SZSE.000001"]\n
@start_date 开始日期,datetime类型,传None则自动设置为1990-01-01\n
@end_date 结束日期,datetime类型,传None则自动设置为当前日期\n
@period K线周期,支持day、min1、min5\n
'''
pass
数据下载
本文将以tushare数据源为例,演示一下数据辅助模块的基本用法。
模块初始化
首先,创建
tushare对应的数据辅助模块:from **wtpy**.apps.datahelper import DHFactory as DHF hlper = DHF.createHelper("tushare")
创建好了以后,对
tushare进行认证:hlper.auth(**{"token":"your token of tushare","use_pro":True})
值得一提的是,上面的代码中没有一个参数
use_pro,该参数不是tushare认证需要的,而是用于控制tushare调用的接口的,如果use_pro为True,那么就调用tushare的pro_bar接口读取历史K线数据,否则就调用老版本的接口get_k_data读取历史K线数据。之所以这样,是因为pro_bar接口获取分钟数据的时候需要积分的,但是老的接口是不需要积分的。
下载数据到文件中
然后调用不同的接口获取数据,下面的代码演示了将数据下载到指定的文件中:
# 将代码列表下载到文件中 hlper.dmpCodeListToFile(filename = 'codes.json', hasStock = True, hasIndex = True) # 将除权因子下载到文件中 hlper.dmpAdjFactorsToFile(codes=['SSE.600000','SZSE.000001'], filename="./adjfactors.json") # 将K线下载到指定目录 hlper.dmpBarsToFile("./", codes = ['SSE.600000','SZSE.000001'], period="day")
代码列表下载截图

代码列表文件示意
{
"SSE": {
"000001": {
"code": "000001",
"exchg": "SSE",
"name": "上证指数",
"product": "IDX"
},
"600000": {
"code": "600000",
"exchg": "SSE",
"name": "浦发银行",
"product": "STK"
}
},
"SZSE": {
"000001": {
"code": "000001",
"exchg": "SZSE",
"name": "平安银行",
"product": "STK"
}
"399001": {
"code": "399001",
"exchg": "SZSE",
"name": "深证成指",
"product": "IDX"
}
}
}
除权因子下载截图

除权因子文件示意
{
"SSE": {
"600000": [
{
"date": "20160623",
"factor": 9.267
},
{
"date": "20170525",
"factor": 12.201
},
{
"date": "20180713",
"factor": 12.33
},
{
"date": "20190611",
"factor": 12.713
},
{
"date": "20200723",
"factor": 13.405
}
]
},
"SZSE": {
"000001": [
{
"date": "20160616",
"factor": 104.758
},
{
"date": "20170721",
"factor": 106.309
},
{
"date": "20180712",
"factor": 108.031
},
{
"date": "20190626",
"factor": 109.169
},
{
"date": "20200528",
"factor": 111.048
}
]
}
}
K线数据下载截图

K线数据示意
date,time,open,high,low,close,volume,turnover
20000112,0,26.0,26.0,24.8,25.12,35274900.0,88986734.0
20000113,0,25.0,25.25,24.8,24.9,7975600.0,19924430.4
20000114,0,24.88,25.0,23.91,24.2,17861900.0,43420608.7
20000117,0,24.08,24.44,23.75,24.4,8101500.0,19477696.400000002
20000118,0,24.5,24.57,23.88,24.14,7693300.0,18509168.400000002
20000119,0,24.14,24.29,23.98,24.13,4658400.0,11232523.799999999
20000120,0,24.12,24.65,24.1,24.44,5114400.0,12466817.600000001
20000121,0,24.51,24.63,24.08,24.27,8138700.0,19752321.5
20000124,0,24.2,24.25,23.95,24.18,9250300.0,22240443.5
......
20210210,0,10.67,10.85,10.56,10.69,105092240.99999999,112396135.6
20210218,0,10.8,11.02,10.74,10.83,143397923.0,155850481.1
20210219,0,10.83,11.12,10.77,10.97,122926300.0,135129039.1
20210222,0,10.92,10.95,10.7,10.71,127379413.99999999,137353338.2
20210223,0,10.71,10.99,10.71,10.78,93327786.0,101090131.6
20210224,0,10.81,10.89,10.55,10.6,97783999.0,104747027.89999999
20210225,0,10.66,10.85,10.6,10.8,89855495.0,96473275.5
20210226,0,10.72,10.82,10.54,10.54,85386093.0,90929810.5
数据的后续处理
上面演示了datahelper模块的用法,该模块能够帮助用户快速拉取WonderTrader可以直接使用的历史数据,可以有效的降低用户初次使用WonderTrader进行策略回测的门槛。
不过在实盘的过程中,还有很多实施的细节,本文也做一个大概的梳理。
标的代码的规则
期货合约代码,标准格式为
CFFEX.IF.2103,其中郑商所的合约,月份也要扩展为4位期货主力合约,标准格式为
CFFEX.IF.HOT,WonderTrader会根据一个主力合约规则文件自动映射到分月合约证券代码,股票的标准格式为
SSE.STK.600000,指数的标准格式为SZSE.IDX.399001
品种合约更新
WonderTrader需要定期维护品种和合约信息,即commodities.json和contracts.json文件,来更新至最新期货品种。
对此,WonderTrader提供了ctp_loader,从simnow拉取品种合约信息。
注意:如果出现新的品种,需要自己在map_future.ini中更新相对应的键值,否则会导致更新的基础文件出现对应问题,造成运行引擎WtEngine初始化时报错!(同时需要wtpy==0.9.8)
文件配置
run.py
config.ini
map_future.ini # 期货品种映射文件,期权对应map_futopt.ini
config.ini
[ctp]
front=tcp://180.168.146.187:10201
broker=9999
user=simnow账号
pass=simnow密码
appid=simnow_client_test
authcode=0000000000000000
[config]
path=../common/
mask=1 # 1|2|4,1-fut期货,2-opt期权,4-stk股票
mapfiles=map_future.ini,map_futopt.ini
map_future.ini
[Name] # 代码与名称的映射
IC=中证
IF=沪深
c=玉米
a=豆一
ag=沪银
[Session] # 品种与开收盘时间段的映射,对应common中的session.json文件
CFFEX.IC=SD0930
CFFEX.IF=SD0930
DCE.c=FN2300
SHFE.ag=FN0230
主力合约维护
主力合约映射的规则,需要每天维护,即hots.json文件,WonderTrader会根据规则自动处理映射,用户只需要使用.HOT代码就可以了。
对于主力合约规则文件的更新,WonderTrader提供了hotpicker工具,有两种方式更新:
根据datakit的落地行情更新
从交易所官网爬取(不稳定)
主力合约规则文件如下:
{
"CFFEX": {
"IC": [
{
"date": 20191018,
"from": "IC1910",
"newclose": 4923.6,
"oldclose": 5028.2,
"to": "IC1912"
},
{
"date": 20191219,
"from": "IC1912",
"newclose": 5208.6,
"oldclose": 5244.4,
"to": "IC2003"
},
{
"date": 20200320,
"from": "IC2003",
"newclose": 5099.6,
"oldclose": 5147.4,
"to": "IC2004"
}
]
}
}
WonderTrader在读取主力合约的历史数据时,会优先读取直接对应的历史数据文件。如存储模式为文件时会先读取名为CFFEX.IF_HOT.dsb的文件,然后再根据主力合约规则读取分月合约的数据进行拼接。而如果存储模式为数据库,则会优先读取代码为xx.HOT的数据,然后再根据主力合约规则读取分月合约的数据。
datakit实时数据录制
工作逻辑
实时录制:
datakit负责在实盘中录制实时行情数据,存储在指定的数据目录中,同时通知策略进行接收收盘作业:在每个交易日结束以后,会对实时行情数据做一个盘后处理,默认是在每天的16点
将实时高频数据按天按代码压缩存放(
tick和level2高频数据)将当日的K线数据(
min1和min5)合并到历史K线数据中根据当天最新的
tick数据,生成当天的日K线数据并合并到历史日K线数据中
正是因为有收盘作业这么一个机制,所以WonderTrader目前还不能很好的适应7×24小时交易的品种,如数字货币。所以WonderTrader对于数据货币的支持的最大的问题,还是7×24小时交易机制的数据处理问题。
demo:datakit_fut
文件配置
runDT.py
dtcfg.yaml # 环境配置
mdparsers.yaml # 行情通道配置,被dtcfg.yaml引用
statemonitor.yaml # 监控配置,被dtcfg.yaml引用
logcfdgt.yaml # 日志配置
DtLogs/ #(运行后生成)日志
CTPMDFlow/ #(运行后生成)
指定的数据落地目录/ #(运行后生成)
rt/ # 实时数据
ticks/
{交易所}/ # 如 CFFEX/
/{合约代码}.dmb # 如 IF2312.dmb
min1/
{交易所}/ # 如 CFFEX/
/{合约代码}.dmb # 如 IF2312.dmb
min5/
{交易所}/ # 如 CFFEX/
/{合约代码}.dmb # 如 IF2312.dmb
his/ # 历史数据(收盘作业生成)
ticks/
{交易所}/ # 如 CFFEX/
{交易日}/ # 如 20230731/
{合约代码}.dsb # 如 IF2312.dsb
min1/
{交易所}/ # 如 CFFEX/
{合约代码}.dsb # 如 IF2312.dsb
min5/
{交易所}/ # 如 CFFEX/
{合约代码}.dsb # 如 IF2312.dsb
day/
{交易所}/ # 如 CFFEX/
{合约代码}.dsb # 如 IF2312.dsb
cache.dmb # 临时缓存
dtcfg.yaml
basefiles: # 基础文件
commodity: ../common/commodities.json
contract: ../common/contracts.json
holiday: ../common/holidays.json
session: ../common/sessions.json
utf-8: true
writer: # 数据落地配置
module: WtDataStorage #数据存储模块
async: false #同步落地还是异步落地,期货推荐同步,股票推荐异步
groupsize: 20 #日志分组大小,主要用于控制日志输出,当订阅合约较多时,推荐1000以上,当订阅的合约数较少时,推荐100以内
path: ../FUT_Data #数据存储的路径
savelog: false #是否保存tick到csv
disabletick: false #不保存tick数据,默认false
disablemin1: false #不保存min1数据,默认false
disablemin5: false #不保存min5数据,默认false
disableday: false #不保存day数据,默认false
disablehis: false #收盘作业不转储历史数据,默认false
broadcaster: # UDP广播器配置项
active: true
bport: 3997 # UDP查询端口,主要是用于查询最新的快照
broadcast: # 广播配置
- host: 255.255.255.255 # 广播地址,255.255.255.255会向整个局域网广播,但是受限于路由器
port: 9001 # 广播端口,接收端口要和广播端口一致
type: 2 # 数据类型,固定为2
parsers: mdparsers.yaml
statemonitor: statemonitor.yaml # 监控配置,设置监控时段(开收盘)
mdparsers.yaml
parsers: # 行情通道配置
- active: true
broker: '9999'
code: '' # 要录制的合约代码,如果为空默认contracts.json中的全部,不为空则只录制指定的合约,注意这里须与contracts中的代码一致!如'CFFEX.IF2408, CFFEX.IF2403'
front: tcp://180.168.146.187:10211
id: parser
module: ParserCTP
user: 账号
pass: 密码
statemonitor.yaml(一般无需改动)
FD0900:
closetime: 1515 # 关闭时间
inittime: 850 # 初始化时间
name: 期白0900
proctime: 1600 # 收盘作业时间
FD0915:
closetime: 1530
inittime: 900
name: 期白0915
proctime: 1600
FN0100:
closetime: 1515
inittime: 2050
name: 期夜0100
proctime: 1600
FN0230:
closetime: 1515
inittime: 2050
name: 期夜0230
proctime: 1600
FN2300:
closetime: 1515
inittime: 2050
name: 期夜2300
proctime: 1600
FN2330:
closetime: 1515
inittime: 2050
name: 期夜2330
proctime: 1600
SD0930:
closetime: 1515
inittime: 915
name: 股白0930
proctime: 1600