-
Notifications
You must be signed in to change notification settings - Fork 75
/
Copy pathappendix_2.2_createSnapshotDbAndTable_main.dos
91 lines (79 loc) · 11.9 KB
/
appendix_2.2_createSnapshotDbAndTable_main.dos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
/*
* 此文件为普通snapshot及array Vector类型snapshot建库建表
* 如若您有Snapshot快照数据可以直接入库,如没有,则可以跑模拟数据部分写入库表
* 数据跨度为5个工作日,4000只股票
* 一共94个字段,5个工作日的数据压缩前大约为50G,分天导入。
* 此处创建一个按天VALUE分区,股票哈希20的数据库,每个分区压缩前大约50M
*/
def createSnapshotDbTable(dbName, tbName){
if(existsDatabase(dbName)){
dropDatabase(dbName)
}
dbTime = database(, VALUE, 2021.01.01..2021.12.31)
dbSymbol = database(, HASH, [SYMBOL, 20])
db = database(dbName, COMPO, [dbTime, dbSymbol], , 'TSDB')
name = `SecurityID`TradeTime`PreClosePx`OpenPx`HighPx`LowPx`LastPx`TotalVolumeTrade`TotalValueTrade`InstrumentStatus`BidPrice0`BidPrice1`BidPrice2`BidPrice3`BidPrice4`BidPrice5`BidPrice6`BidPrice7`BidPrice8`BidPrice9`BidOrderQty0`BidOrderQty1`BidOrderQty2`BidOrderQty3`BidOrderQty4`BidOrderQty5`BidOrderQty6`BidOrderQty7`BidOrderQty8`BidOrderQty9`BidOrders0`BidOrders1`BidOrders2`BidOrders3`BidOrders4`BidOrders5`BidOrders6`BidOrders7`BidOrders8`BidOrders9`OfferPrice0`OfferPrice1`OfferPrice2`OfferPrice3`OfferPrice4`OfferPrice5`OfferPrice6`OfferPrice7`OfferPrice8`OfferPrice9`OfferOrderQty0`OfferOrderQty1`OfferOrderQty2`OfferOrderQty3`OfferOrderQty4`OfferOrderQty5`OfferOrderQty6`OfferOrderQty7`OfferOrderQty8`OfferOrderQty9`OfferOrders0`OfferOrders1`OfferOrders2`OfferOrders3`OfferOrders4`OfferOrders5`OfferOrders6`OfferOrders7`OfferOrders8`OfferOrders9`NumTrades`IOPV`TotalBidQty`TotalOfferQty`WeightedAvgBidPx`WeightedAvgOfferPx`TotalBidNumber`TotalOfferNumber`BidTradeMaxDuration`OfferTradeMaxDuration`NumBidOrders`NumOfferOrders`WithdrawBuyNumber`WithdrawBuyAmount`WithdrawBuyMoney`WithdrawSellNumber`WithdrawSellAmount`WithdrawSellMoney`ETFBuyNumber`ETFBuyAmount`ETFBuyMoney`ETFSellNumber`ETFSellAmount`ETFSellMoney
type =`SYMBOL`TIMESTAMP`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`INT`DOUBLE`SYMBOL`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`INT`INT`INT`INT`INT`INT`INT`INT`INT`INT`INT`INT`INT`INT`INT`INT`INT`INT`INT`INT`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`INT`INT`INT`INT`INT`INT`INT`INT`INT`INT`INT`INT`INT`INT`INT`INT`INT`INT`INT`INT`INT`DOUBLE`INT`INT`DOUBLE`DOUBLE`INT`INT`INT`INT`INT`INT`INT`INT`DOUBLE`INT`INT`DOUBLE`INT`INT`DOUBLE`INT`INT`DOUBLE
tbTemp = table(1:0, name, type)
db = database(dbName)
createPartitionedTable(dbHandle=db, table=tbTemp, tableName=tbName, partitionColumns=`TradeTime`SecurityID, compressMethods={TradeTime:"delta"}, sortColumns = `SecurityID`TradeTime, keepDuplicates=LAST)
}
def createSnapshotArrayVectorDbTable(dbName, tbName){
if(existsDatabase(dbName)){
dropDatabase(dbName)
}
db1 = database("", VALUE, 2020.01.01..2020.12.31)
db2 = database("", HASH, [SYMBOL, 20])
db = database(dbName, COMPO, [db1,db2], , 'TSDB')
name = `SecurityID`TradeTime`PreClosePx`OpenPx`HighPx`LowPx`LastPx`TotalVolumeTrade`TotalValueTrade`InstrumentStatus`BidPrice`BidOrderQty`BidOrders`OfferPrice`OfferOrderQty`OfferOrders`NumTrades`IOPV`TotalBidQty`TotalOfferQty`WeightedAvgBidPx`WeightedAvgOfferPx`TotalBidNumber`TotalOfferNumber`BidTradeMaxDuration`OfferTradeMaxDuration`NumBidOrders`NumOfferOrders`WithdrawBuyNumber`WithdrawBuyAmount`WithdrawBuyMoney`WithdrawSellNumber`WithdrawSellAmount`WithdrawSellMoney`ETFBuyNumber`ETFBuyAmount`ETFBuyMoney`ETFSellNumber`ETFSellAmount`ETFSellMoney
type =`SYMBOL`TIMESTAMP`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`INT`DOUBLE`SYMBOL`DOUBLE`INT`INT`DOUBLE`INT`INT`INT`DOUBLE`INT`INT`DOUBLE`DOUBLE`INT`INT`INT`INT`INT`INT`INT`INT`DOUBLE`INT`INT`DOUBLE`INT`INT`DOUBLE`INT`INT`DOUBLE
tbTemp = table(1:0, name, type)
tbTemp.dropColumns!(`BidPrice`BidOrderQty`BidOrders`OfferPrice`OfferOrderQty`OfferOrders)
tbTemp.addColumn(`BidPrice`BidOrderQty`BidOrders`OfferPrice`OfferOrderQty`OfferOrders, [DOUBLE[],INT[],INT[],DOUBLE[],INT[],INT[]])
tbTemp.reorderColumns!(`SecurityID`TradeTime`PreClosePx`OpenPx`HighPx`LowPx`LastPx`TotalVolumeTrade`TotalValueTrade`InstrumentStatus`BidPrice`BidOrderQty`BidOrders`OfferPrice`OfferOrderQty`OfferOrders`NumTrades`IOPV`TotalBidQty`TotalOfferQty`WeightedAvgBidPx`WeightedAvgOfferPx`TotalBidNumber`TotalOfferNumber`BidTradeMaxDuration`OfferTradeMaxDuration`NumBidOrders`NumOfferOrders`WithdrawBuyNumber`WithdrawBuyAmount`WithdrawBuyMoney`WithdrawSellNumber`WithdrawSellAmount`WithdrawSellMoney`ETFBuyNumber`ETFBuyAmount`ETFBuyMoney`ETFSellNumber`ETFSellAmount`ETFSellMoney)
db.createPartitionedTable(tbTemp, tbName, `Tradetime`SecurityID, compressMethods={TradeTime:"delta"}, sortColumns = `SecurityID`Tradetime, keepDuplicates=LAST)
}
//模拟数据定义
def genSnapshotOrigin(n){
tradeDate=table(temporalAdd(2020.01.01,n, "B") as tradeDate)
tradeMin = table((09:30:00.000+0..2400*3*1000) join (13:00:00.000+0..2400*3*1000) as tradeMin)
tradetime = select concatDateTime(tradeDate,tradeMin) as tradetime from cj(tradeDate,tradeMin)
securityid ="sz"+lpad(string(000001..004000), 6, `0)
tmpTable = cj(table(securityid as securityid),tradetime)
resTable = select *, rand(100.0,size(tmpTable )) as PreClosePx , rand(100.0,size(tmpTable )) as OpenPx, rand(100.0,size(tmpTable )) as HighPx, rand(100.0,size(tmpTable )) as LowPx , rand(100.0,size(tmpTable )) as LastPx, rand(100,size(tmpTable )) as TotalVolumeTrade, rand(100.0,size(tmpTable )) as TotalValueTrade , rand(`a`b`c,size(tmpTable )) as InstrumentStatus, rand(100.0,size(tmpTable )) as BidPrice0, rand(100.0,size(tmpTable )) as BidPrice1 , rand(100.0,size(tmpTable )) as BidPrice2 , rand(100.0,size(tmpTable )) as BidPrice3, rand(100.0,size(tmpTable )) as BidPrice4, rand(100.0,size(tmpTable )) as BidPrice5, rand(100.0,size(tmpTable )) as BidPrice6, rand(100.0,size(tmpTable )) as BidPrice7, rand(100.0,size(tmpTable )) as BidPrice8, rand(100.0,size(tmpTable )) as BidPrice9, rand(100,size(tmpTable )) as BidOrderQty0, rand(100,size(tmpTable )) as BidOrderQty1, rand(100,size(tmpTable )) as BidOrderQty2, rand(100,size(tmpTable )) as BidOrderQty3, rand(100,size(tmpTable )) as BidOrderQty4, rand(100,size(tmpTable )) as BidOrderQty5, rand(100,size(tmpTable )) as BidOrderQty6, rand(100,size(tmpTable )) as BidOrderQty7,rand(100,size(tmpTable )) as BidOrderQty8, rand(100,size(tmpTable )) as BidOrderQty9, rand(100,size(tmpTable )) as BidOrders0, rand(100,size(tmpTable )) as BidOrders1,rand(100,size(tmpTable )) as BidOrders2,rand(100,size(tmpTable )) as BidOrders3,rand(100,size(tmpTable )) as BidOrders4,rand(100,size(tmpTable )) as BidOrders5,rand(100,size(tmpTable )) as BidOrders6,rand(100,size(tmpTable )) as BidOrders7,rand(100,size(tmpTable )) as BidOrders8,rand(100,size(tmpTable )) as BidOrders9,rand(100.0,size(tmpTable )) as OfferPrice0,rand(100.0,size(tmpTable )) as OfferPrice1,rand(100.0,size(tmpTable )) as OfferPrice2,rand(100.0,size(tmpTable )) as OfferPrice3,rand(100.0,size(tmpTable )) as OfferPrice4,rand(100.0,size(tmpTable )) as OfferPrice5,rand(100.0,size(tmpTable )) as OfferPrice6,rand(100.0,size(tmpTable )) as OfferPrice7,rand(100.0,size(tmpTable )) as OfferPrice8,rand(100.0,size(tmpTable )) as OfferPrice9,rand(100,size(tmpTable )) as OfferOrderQty0,rand(100,size(tmpTable )) as OfferOrderQty1,rand(100,size(tmpTable )) as OfferOrderQty2,rand(100,size(tmpTable )) as OfferOrderQty3,rand(100,size(tmpTable )) as OfferOrderQty4,rand(100,size(tmpTable )) as OfferOrderQty5,rand(100,size(tmpTable )) as OfferOrderQty6,rand(100,size(tmpTable )) as OfferOrderQty7,rand(100,size(tmpTable )) as OfferOrderQty8,rand(100,size(tmpTable )) as OfferOrderQty9,rand(100,size(tmpTable )) as OfferOrders0,rand(100,size(tmpTable )) as OfferOrders1,rand(100,size(tmpTable )) as OfferOrders2,rand(100,size(tmpTable )) as OfferOrders3,rand(100,size(tmpTable )) as OfferOrders4,rand(100,size(tmpTable )) as OfferOrders5,rand(100,size(tmpTable )) as OfferOrders6,rand(100,size(tmpTable )) as OfferOrders7,rand(100,size(tmpTable )) as OfferOrders8,rand(100,size(tmpTable )) as OfferOrders9,rand(100,size(tmpTable )) as NumTrades,rand(100.0,size(tmpTable )) as IOPV,rand(100,size(tmpTable )) as TotalBidQty,rand(100,size(tmpTable )) as TotalOfferQty,rand(100.0,size(tmpTable )) as WeightedAvgBidPx,rand(100.0,size(tmpTable )) as WeightedAvgOfferPx,rand(100,size(tmpTable )) as TotalBidNumber,rand(100,size(tmpTable )) as TotalOfferNumber,rand(100,size(tmpTable )) as BidTradeMaxDuration,rand(100,size(tmpTable )) as OfferTradeMaxDuration,rand(100,size(tmpTable )) as NumBidOrders,rand(100,size(tmpTable )) as NumOfferOrders,rand(100,size(tmpTable )) as WithdrawBuyNumber,rand(100,size(tmpTable )) as WithdrawBuyAmount,rand(100.0,size(tmpTable )) as WithdrawBuyMoney,rand(100,size(tmpTable )) as WithdrawSellNumber,rand(100,size(tmpTable )) as WithdrawSellAmount, rand(100.0,size(tmpTable )) as WithdrawSellMoney, rand(100,size(tmpTable )) as ETFBuyNumber, rand(100,size(tmpTable )) as ETFBuyAmount, rand(100.0,size(tmpTable )) as ETFBuyMoney, rand(100,size(tmpTable )) as ETFSellNumber, rand(100,size(tmpTable )) as ETFSellAmount, rand(100.0,size(tmpTable )) as ETFSellMoney from tmpTable
db = loadTable("dfs://snapshot_SH_L2_TSDB", "snapshot_SH_L2_TSDB")
db.append!(resTable)
}
//因为每天数据大概有10G,按天顺序导入,不采用submitJob的形式,以控制内存不会过多占用。
def writeInSnapshotByDay(numOfdays){
for (n in 0..(numOfdays-1)){
genSnapshotOrigin(n)
}
}
//从普通snapshot表中,创建用arrayVector存储的数据
def importDataDaily(d, syms){
snapshot = loadTable("dfs://snapshot_SH_L2_TSDB", "snapshot_SH_L2_TSDB")
for(sym in syms){
tmp = select SecurityID,TradeTime,PreClosePx,OpenPx,HighPx,LowPx,LastPx,TotalVolumeTrade,TotalValueTrade,InstrumentStatus,fixedLengthArrayVector(BidPrice0,BidPrice1,BidPrice2,BidPrice3,BidPrice4,BidPrice5,BidPrice6,BidPrice7,BidPrice8,BidPrice9) as BidPrice,fixedLengthArrayVector(BidOrderQty0,BidOrderQty1,BidOrderQty2,BidOrderQty3,BidOrderQty4,BidOrderQty5,BidOrderQty6,BidOrderQty7,BidOrderQty8,BidOrderQty9) as BidOrderQty,fixedLengthArrayVector(BidOrders0,BidOrders1,BidOrders2,BidOrders3,BidOrders4,BidOrders5,BidOrders6,BidOrders7,BidOrders8,BidOrders9) as BidOrders ,fixedLengthArrayVector(OfferPrice0,OfferPrice1,OfferPrice2,OfferPrice3,OfferPrice4,OfferPrice5,OfferPrice6,OfferPrice7,OfferPrice8,OfferPrice9) as OfferPrice,fixedLengthArrayVector(OfferOrderQty0,OfferOrderQty1,OfferOrderQty2,OfferOrderQty3,OfferOrderQty4,OfferOrderQty5,OfferOrderQty6,OfferOrderQty7,OfferOrderQty8,OfferOrderQty9) as OfferOrderQty,fixedLengthArrayVector(OfferOrders0,OfferOrders1,OfferOrders2,OfferOrders3,OfferOrders4,OfferOrders5,OfferOrders6,OfferOrders7,OfferOrders8,OfferOrders9) as OfferOrders,NumTrades,IOPV,TotalBidQty,TotalOfferQty,WeightedAvgBidPx,WeightedAvgOfferPx,TotalBidNumber,TotalOfferNumber,BidTradeMaxDuration,OfferTradeMaxDuration,NumBidOrders,NumOfferOrders,WithdrawBuyNumber,WithdrawBuyAmount,WithdrawBuyMoney,WithdrawSellNumber,WithdrawSellAmount,WithdrawSellMoney,ETFBuyNumber,ETFBuyAmount,ETFBuyMoney,ETFSellNumber,ETFSellAmount,ETFSellMoney from snapshot where SecurityID=sym, date(TradeTime) = d
if(tmp.size()>0){
loadTable("dfs://LEVEL2_Snapshot_ArrayVector", "Snap").append!(tmp)
}
}
}
def writeInSnapshotArrayVectorByDay(numOfdays){
for (i in 0..19){
securityid ="sz"+lpad(string(000001..004000), 6, `0)
hash_bucket_table = table(securityid,hashBucket(securityid,20) as bucket)
syms = exec securityid from hash_bucket_table where bucket = i
for (d in 2020.01.01+0..(numOfdays-1)){
submitJob("array_bucket"+string(i),"importDataDaily_array"+string(d)+"bucket"+string(i),importDataDaily,d,syms)
}
}
}
//1. 普通快照数据建库建表,此处创建一个按天VALUE分区,股票HASH 20的数据库
createSnapshotDbTable("dfs://snapshot_SH_L2_TSDB", "snapshot_SH_L2_TSDB")
//2. 创建存储为arrayVector的库表,与普通表分区相同,只是将多档数据存储为一个字段的ArrayVector中。
createSnapshotArrayVectorDbTable("dfs://LEVEL2_Snapshot_ArrayVector", "Snap")
//3. 分别模拟写入5天数据
writeInSnapshotByDay(5)
writeInSnapshotArrayVectorByDay(5)