WtDataHelper

WtDatahelper是什么


作为一个内核使用C++开发的量化交易框架,WonderTrader是天然追求更高的性能的。作为一个量化交易框架,如果高效地读写数据就成了追求高性能的必须要解决的问题。

WonderTrader采用自定义的数据结构对文件进行存储,为了提高访问性能,实时数据直接采用不原始数据结构连续内存排列的方式存储数据,而历史数据则采用zstd进行压缩存放,在读取的时候做一次解压就可以全部加载到内存中。采用这样的机制,WonderTrader在数据读写的性能上可以达到非常高的效率。

我们在使用的过程中,新的数据可以通过datakit来落地,历史数据我们也可以通过WtDHFacor来拉取。但是WonderTrader毕竟不可能覆盖全部的数据源,另外在做数据维护和管理的时候,也会面临着检查指定数据文件的场景。

WtDataHelper就是为了给更多场景下的数据读写提供灵活的访问接口的。WtDataHelper主要提供的就是数据文件的读写能力。具体如下:

class WtDataHelper:

    def dump_bars(self, binFolder:str, csvFolder:str, strFilter:str=""):
        '''
        将目录下的.dsb格式的历史K线数据导出为.csv格式
        @binFolder  .dsb文件存储目录
        @csvFolder  .csv文件的输出目录
        @strFilter  代码过滤器(暂未启用)
        '''
        pass

    def dump_ticks(self, binFolder: str, csvFolder: str, strFilter: str=""):
        '''
        将目录下的.dsb格式的历史Tik数据导出为.csv格式
        @binFolder  .dsb文件存储目录
        @csvFolder  .csv文件的输出目录
        @strFilter  代码过滤器(暂未启用)
        '''
        pass

    def trans_csv_bars(self, csvFolder: str, binFolder: str, period: str):
        '''
        将目录下的.csv格式的历史K线数据转成.dsb格式
        @csvFolder  .csv文件的输出目录
        @binFolder  .dsb文件存储目录
        @period     K线周期,m1-1分钟线,m5-5分钟线,d-日线
        '''
        pass

    def read_dsb_ticks(self, tickFile: str) -> WtTickRecords:
        '''
        读取.dsb格式的tick数据
        @tickFile   .dsb的tick数据文件
        @return     WtTickRecords
        '''
        pass


    def read_dsb_bars(self, barFile: str, isDay:bool = False) -> WtBarRecords:
        '''
        读取.dsb格式的K线数据
        @tickFile   .dsb的K线数据文件
        @return     WtBarRecords
        '''
        pass

    def read_dmb_ticks(self, tickFile: str) -> WtTickRecords:
        '''
        读取.dmb格式的tick数据
        @tickFile   .dmb的tick数据文件
        @return     WTSTickStruct的list
        '''
        pass

    def read_dmb_bars(self, barFile: str) -> WtBarRecords:
        '''
        读取.dmb格式的K线数据
        @tickFile   .dmb的K线数据文件
        @return     WTSBarStruct的list
        '''
        pass

    def store_bars(self, barFile:str, firstBar:POINTER(WTSBarStruct), count:int, period:str) -> bool:
        '''
        将K线转储到dsb文件中
        @barFile    要存储的文件路径
        @firstBar   第一条bar的指针
        @count      一共要写入的数据条数
        @period     周期,m1/m5/d
        '''
        pass

    def store_ticks(self, tickFile:str, firstTick:POINTER(WTSTickStruct), count:int) -> bool:
        '''
        将Tick数据转储到dsb文件中
        @tickFile   要存储的文件路径
        @firstTick  第一条tick的指针
        @count      一共要写入的数据条数
        '''
        pass

    def resample_bars(self, barFile:str, period:str, times:int, fromTime:int, endTime:int, sessInfo:SessionInfo) -> WtBarRecords:
        '''
        重采样K线
        @barFile    dsb格式的K线数据文件
        @period     基础K线周期,m1/m5/d
        @times      重采样倍数,如利用m1生成m3数据时,times为3
        @fromTime   开始时间,日线数据格式yyyymmdd,分钟线数据为格式为yyyymmddHHMMSS
        @endTime    结束时间,日线数据格式yyyymmdd,分钟线数据为格式为yyyymmddHHMMSS
        @sessInfo   交易时间模板
        '''
        pass

WtDataHelper怎么用


WtDataHelper的使用相对简单,就是调用接口读写文件即可,下面是代码示例:

from wtpy.wrapper import WtDataHelper
from wtpy.WtCoreDefs import WTSBarStruct, WTSTickStruct
from ctypes import POINTER
from wtpy.SessionMgr import SessionMgr
import pandas as pd

dtHelper = WtDataHelper()

def test_store_bars():
    df = pd.read_csv('../storage/csv/CFFEX.IF.HOT_m5.csv')
    df = df.rename(columns={
        '<Date>':'date',
        ' <Time>':'time',
        ' <Open>':'open',
        ' <High>':'high',
        ' <Low>':'low',
        ' <Close>':'close',
        ' <Volume>':'vol',
        })
    df['date'] = df['date'].astype('datetime64').dt.strftime('%Y%m%d').astype('int64')
    df['time'] = (df['date']-19900000)*10000 + df['time'].str.replace(':', '').str[:-2].astype('int')

    BUFFER = WTSBarStruct*len(df)
    buffer = BUFFER()

    def assign(procession, buffer):
        tuple(map(lambda x: setattr(buffer[x[0]], procession.name, x[1]), enumerate(procession)))


    df.apply(assign, buffer=buffer)
    print(df)
    print(buffer[0].to_dict)
    print(buffer[-1].to_dict)

    dtHelper.store_bars(barFile="./CFFEX.IF.HOT_m5.bin", firstBar=buffer, count=len(df), period="m5")
    
def test_store_ticks():

    df = pd.read_csv('../storage/csv/rb主力连续_20201030.csv')
    BUFFER = WTSTickStruct*len(df)
    buffer = BUFFER()

    tags = ["一","二","三","四","五"]

    for i in range(len(df)):
        curTick = buffer[i]

        curTick.exchg = b"SHFE"
        curTick.code = b"SHFE.rb.HOT"

        curTick.price = float(df[i]["最新价"])
        curTick.open = float(df[i]["今开盘"])
        curTick.high = float(df[i]["最高价"])
        curTick.low = float(df[i]["最低价"])
        curTick.settle = float(df[i]["本次结算价"])
        
        curTick.total_volume = float(df[i]["数量"])
        curTick.total_turnover = float(df[i]["成交额"])
        curTick.open_interest = float(df[i]["持仓量"])

        curTick.trading_date = int(df[i]["交易日"])
        curTick.action_date = int(df[i]["业务日期"])
        curTick.action_time = int(df[i]["最后修改时间"].replace(":",""))*1000 + int(df[i]["最后修改毫秒"])

        curTick.pre_close = float(df[i]["昨收盘"])
        curTick.pre_settle = float(df[i]["上次结算价"])
        curTick.pre_interest = float(df[i]["昨持仓量"])

        for x in range(5):
            setattr(curTick, f"bid_price_{x}", float(df[i]["申买价"+tags[x]]))
            setattr(curTick, f"bid_qty_{x}", float(df[i]["申买量"+tags[x]]))
            setattr(curTick, f"ask_price_{x}", float(df[i]["申卖价"+tags[x]]))
            setattr(curTick, f"ask_qty_{x}", float(df[i]["申卖量"+tags[x]]))

    dtHelper.store_ticks(tickFile="./SHFE.rb.HOT_ticks.dsb", firstTick=buffer, count=len(df))

def test_resample():
    # 测试重采样
    sessMgr = SessionMgr()
    sessMgr.load("sessions.json")
    sInfo = sessMgr.getSession("SD0930")
    ret = dtHelper.resample_bars("IC2009.dsb",'m1',5,202001010931,202009181500,sInfo)
    print(ret)

更多详情可以参考demo: wtpy/demos/test_datahelper