MarketStoreはファイナンスにおける時系列データに特化したデータベースサーバです。 スケーラビリティについて十分に考慮されており、あなたのシステムのあらゆるところからアクセス可能で、かつ拡張可能なDataFrameサービスとして使用することができます。
膨大な金融市場データの扱いにおいて問題となるスケーラビリティを確保するために、MarketStoreはゼロベースから設計されています。アルゴリズム取引におけるバックテストやチャート作成, 何年間にもわたるデータからなる価格履歴をTick(株式、債権、為替などの金融商品における価格変化の最小単位)レベルの粒度で扱うことができます。 米国株式や暗号通貨の分野を主な対象として扱っています。
もしあなたが大量のHDF5ファイルの扱いに苦しんでいるのであれば、MarketStoreはその完璧な解決策となるでしょう。
インストールしていただくことで入る機能に加えて、GDAXから取得した暗号通貨の価格データを用いてDBに書き込みを行うなど、プラグイン設定も簡単に使うことができます。
MarketStoreを使うことで、ネットワーク越しであってもローカルディスクにあるHDF5ファイルに対して行うのと同じくらい低いレイテンシでDataFrameに対してクエリをかけることができます。新規データの書き込みであれば、DataFrameの100倍以上の速度が見込めます。これはストレージの形式を特定のデータのタイプやユースケースに最適化しており、また最近のファイルシステム/ハードウェアの特性を考慮した上でMarketStoreが設計されているためです。
MarketStoreは本番環境で使っていただける品質になっています。すでにAlpacaでは重要なビジネス向けに何年も使用を重ねてきています。 もしバグを発見したり、MarketStoreに興味がある方は開発にご協力いただければと思います。
すぐにMarketStoreを使ってみたい場合は、最新の docker image を使ってDBインスタンスを生成する方法をおすすめします。 デフォルトの mkts.yml (設定ファイル)もあり、 /data
をデータ保存先のルートディレクトリとして定義された状態でコンテナを起動できます。 デフォルト設定でコンテナを立ち上げる場合は以下のコマンドを実行してください。
docker run -i -p 5993:5993 alpacamarkets/marketstore:latest
自分でカスタマイズした設定ファイル mkts.yml
でインスタンスを起動したい場合は、
下記のような手順で自分で新しいコンテナを作って、その中に設定ファイルを埋め込んでください。
docker create --name mktsdb -p 5993:5993 alpacamarkets/marketstore:latest
docker cp mkts.yml mktsdb:/etc/mkts.yml
docker start -i mktsdb
起動中のDockerコンテナに対してセッションを張る場合は以下のようにしてください。
marketstore connect --url localhost:5993
MarketStoreはGoを用いて実装されているので、go install
コマンドでインストールすることができます。
go install github.com/alpacahq/marketstore/v4@latest
# export GOROOT=$HOME/go
# export PATH=$PATH:$GOROOT/bin
marketstore --version
ソースコードからビルドすることも簡単です。Go 1.11
以上のバージョンを使用し、また依存管理にはgo mod
を使用しています。
go get -u github.com/alpacahq/marketstore
以下でバイナリをコンパイル&インストールしてください。
make install
必須ではありませんが、本リポジトリに付属しているプラグインをインストールすることも簡単にできます。
make plugins
marketstore
コマンドで、使用することができるコマンド一覧が得られます。
marketstore
$GOPATHの設定によってはこちら
$GOPATH/bin/marketstore
mkts.yml
という名前で設定ファイルを独自に定義することができますが、デフォルトの設定ファイルであればコマンドで生成することもできます。
$GOPATH/bin/marketstore init
MarketStoreを起動します。
$GOPATH/bin/marketstore start
以下のような出力が得られたら起動成功です。
example@alpaca:~/go/bin/src/github.com/alpacahq/marketstore$ marketstore
I0619 16:29:30.102101 7835 log.go:14] Disabling "enable_last_known" feature until it is fixed...
I0619 16:29:30.102980 7835 log.go:14] Initializing MarketStore...
I0619 16:29:30.103092 7835 log.go:14] WAL Setup: initCatalog true, initWALCache true, backgroundSync true, WALBypass false:
I0619 16:29:30.103179 7835 log.go:14] Root Directory: /example/go/bin/src/github.com/alpacahq/marketstore/project/data/mktsdb
I0619 16:29:30.144461 7835 log.go:14] My WALFILE: WALFile.1529450970104303654.walfile
I0619 16:29:30.144486 7835 log.go:14] Found a WALFILE: WALFile.1529450306968096708.walfile, entering replay...
I0619 16:29:30.244778 7835 log.go:14] Beginning WAL Replay
I0619 16:29:30.244861 7835 log.go:14] Partial Read
I0619 16:29:30.244882 7835 log.go:14] Entering replay of TGData
I0619 16:29:30.244903 7835 log.go:14] Replay of WAL file /example/go/bin/src/github.com/alpacahq/marketstore/project/data/mktsdb/WALFile.1529450306968096708.walfile finished
I0619 16:29:30.289401 7835 log.go:14] Finished replay of TGData
I0619 16:29:30.340760 7835 log.go:14] Launching rpc data server...
I0619 16:29:30.340792 7835 log.go:14] Initializing websocket...
I0619 16:29:30.340814 7835 plugins.go:14] InitializeTriggers
I0619 16:29:30.340824 7835 plugins.go:42] InitializeBgWorkers
MarketStoreを実行するにはYAMLで書かれた設定ファイル mkts.yml
が必要になります。marketstore init
コマンドでデフォルトを生成できます。設定ファイルのパスは marketstore start --config [設定ファイルへのパス]
という形で --config
オプションで指定してください。指定されなかった場合は、marketstoreが実行されたパス内からmkts.yml
を探します。
Var | Type | Description |
---|---|---|
root_directory | string | MarketStore データベースが使用するディレクトリ |
listen_port | int | MarketStoreがJSON-RPC APIに使用するポート番号 |
grpc_listen_port | int | MarketStoreがGRPC APIに使用するポート番号 |
timezone | string | タイムゾーン. TZ に定義されている値 (例 America/New_York) |
log_level | string | 出力する最低ログレベル `(info |
stop_grace_period | int | SIGINT シグナルを受信してから終了するまでに待つ時間 |
wal_rotate_interval | int | ディスクにフラッシュしてWALファイルがトリムされる頻度[分] |
stale_threshold | int | MarketStoreがシンボルを古いものと認識するしきい値[日] |
triggers | slice | triggerプラグインのリスト |
bgworkers | slice | background workerプラグインのリスト |
root_directory: data
listen_port: 5993
grpc_listen_port: 5995
log_level: info
stop_grace_period: 0
wal_rotate_interval: 5
stale_threshold: 5
MarketStoreインスタンスが起動したあとは、Tickデータを読み書きし始めることができます。
pymarketstore が標準のPythonクライアントになっています。すでにMarketStoreが起動していることを確認の上、ご使用ください。
In [1]: import pymarketstore as pymkts
## query (データの参照)
In [2]: param = pymkts.Params('BTC', '1Min', 'OHLCV', limit=10)
In [3]: cli = pymkts.Client()
In [4]: reply = cli.query(param)
In [5]: reply.first().df()
Out[5]:
Open High Low Close Volume
Epoch
2018-01-17 17:19:00+00:00 10400.00 10400.25 10315.00 10337.25 7.772154
2018-01-17 17:20:00+00:00 10328.22 10359.00 10328.22 10337.00 14.206040
2018-01-17 17:21:00+00:00 10337.01 10337.01 10180.01 10192.15 7.906481
2018-01-17 17:22:00+00:00 10199.99 10200.00 10129.88 10160.08 28.119562
2018-01-17 17:23:00+00:00 10140.01 10161.00 10115.00 10115.01 11.283704
2018-01-17 17:24:00+00:00 10115.00 10194.99 10102.35 10194.99 10.617131
2018-01-17 17:25:00+00:00 10194.99 10240.00 10194.98 10220.00 8.586766
2018-01-17 17:26:00+00:00 10210.02 10210.02 10101.00 10138.00 6.616969
2018-01-17 17:27:00+00:00 10137.99 10138.00 10108.76 10124.94 9.962978
2018-01-17 17:28:00+00:00 10124.95 10142.39 10124.94 10142.39 2.262249
## write (データの書き込み)
In [7]: import numpy as np
In [8]: import pandas as pd
In [9]: data = np.array([(pd.Timestamp('2017-01-01 00:00').value / 10**9, 10.0)], dtype=[('Epoch', 'i8'), ('Ask', 'f4')])
In [10]: cli.write(data, 'TEST/1Min/Tick')
Out[10]: {'responses': None}
In [11]: cli.query(pymkts.Params('TEST', '1Min', 'Tick')).first().df()
Out[11]:
Ask
Epoch
2017-01-01 00:00:00+00:00 10.0
- 可変長レコード
marketstoreは一つのtimeframe(時間幅)に存在するレコード数を1つに絞ることで高速化を実現しているデータベースで、
同じtimeframeに対するレコードの書き込みは基本的に上書きされます。
1D
(1日)から1Sec
(1秒)までのtimeframeに対応しており、
データの頻度に応じてより長い時間幅を指定していただくことで、より高速に読み書きを行うことができます。
しかし、板情報やTICKデータなど、特定のtimeframeごとにデータが到着しなかったり、 1秒よりも更に頻繁に到着するデータの保存にも対応しています。 特定のtimeframeに複数のレコードを書き込むことができる機能として、 これをvariable-length(可変長)レコード機能と呼んでいます。
pymarketstoreで該当バケットへのwriteを行う際にisvariablelength=True
を指定していただくことで、
該当のバケットの特定timeframeへ複数レコードを書き込むことができ、
1秒ごとよりも速い間隔で届くTICKデータ等を読み書きする際に活用できます。
import numpy as np, pandas as pd, pymarketstore as pymkts
symbol, timeframe, attribute_group = "TEST", "1Sec", "Tick"
data_type = [('Epoch', 'i8'), ('Bid', 'f4'), ('Ask', 'f4'), ('Nanoseconds', 'i4')]
tbk = "{}/{}/{}".format(symbol, timeframe, attribute_group)
client = pymkts.Client()
# --- write variable length records
data = np.array([
(pd.Timestamp('2021-01-01 00:00:00').value / 10 ** 9, 10.0, 20.0, 1000000),
(pd.Timestamp('2021-01-01 00:00:00').value / 10 ** 9, 30.0, 40.0, 2000000),
(pd.Timestamp('2021-01-01 00:00:00').value / 10 ** 9, 50.0, 60.0, 3000000),
], dtype=data_type)
client.write(data, tbk, isvariablelength=True)
# --- query variable length records
params = pymkts.Params(symbol, timeframe, attribute_group)
print(client.query(params=params).first().df())
# --- tearDown
client.destroy(tbk)
shows
Bid Ask Nanoseconds
Epoch
2021-01-01 00:00:00+00:00 10.0 20.0 1000000
2021-01-01 00:00:00+00:00 30.0 40.0 2000000
2021-01-01 00:00:00+00:00 50.0 60.0 3000000
marketstoreではレコードの時刻は常にEpoch
カラム、'i8'(int64)のエポック秒で表現される作りになっており、
これは1秒より細かい精度まで時刻情報を保存するには十分なデータサイズではありません。
そのためmarketstoreでは別のNanoseconds
('i4'データタイプ)に1秒以下の情報を保存することで可変長レコード機能を実現しています。
marketstoreインスタンスに接続するコマンドはこちらです。
// For a local db-
marketstore connect --dir <path>
// For a server-
marketstore connect --url <address>
接続後はsqlセッションを通じてコマンドを実行することができます。
Goにおけるプラグインは Go 1.10以上(Linux)で最適に動きます。プラグインについての詳細は plugins package をご覧ください。 ここでは特に取り上げたいいくつかのプラグインについてご紹介します。
WebSocketのストリームを利用してリアルタイムでアップデートを受け取ることができます。
/ws
でWebSocket接続を受けつけることができ、データをPushすることもできます。
詳細については the package をご参照ください。
GDAX から暗号通貨のデータを取得することができます。 MarketStoreをインストールすると、すぐにDataFrame型の情報をqueryして取得することができます。ネットワーク越しであっても、ローカルディスク上のHDF5ファイルを読み込むのと同じくらい低いレイテンシで扱うことができますし、最後尾にデータを追加する処理はDataFrameで行う100倍も速いはずです。MarketStoreはストレージの形式を特定のデータ型、ユースケース、またファイルシステムやハードウェアの特性に最適化した形で設計されているためです。
data pollerを設定してGDAXからデータを取得しはじめましょう。 詳細については the package をご参照ください。
このプラグインを使うと、 tick/分 レベルのデータだけを気にすればよくなります。時系列ベースでディスク上のデータのアグリゲーションをしてくれるプラグインです。詳しくは the packageをご参照ください。
marketstoreはmasterからreplicaへのレプリケーションをサポートしています。
mkts.yml
に以下の設定を追記することで、データのレプリケーションを有効化できます。
- master instance
replication:
# when enabled=true, this instance works as master instance and accept connections from replicas
enabled: true
# when tls_enabled=true, transport security between master and replica is enabled.
# tls_enabled: true # both master and replica should have tls_enabled=true to enable TLS
# public/private key pair from a pair of files. The files must contain PEM encoded data.
# The cert file may contain intermediate certificates following the leaf certificate to form a certificate chain.
# cert_file: "/absolute/path/to/server.crt" # both master and replica should have this config to enable TLS
# key_file: "/absolute/path/to/server.key"
# port to be used for the replication protocol
listen_port: 5996
- replica instance(s)
replication:
# when master_host is set, this instance works as a replica instance
master_host: "127.0.0.1:5995"
# when tls_enabled=true on master server, GRPC communication between master and replica is encrypted by SSL.
# tls_enabled: true
# cert_file: "/absolute/path/to/server.crt" # both master and replica should have this config to enable TLS
興味がある方はぜひMarketStoreの開発にご協力ください!
GithubのIssue報告やPullRequestでも構いませんし、oss@alpaca.markets
に直接ご連絡いただいても構いません。
Pull Requestを作る際は、下記のコマンドを実行してテストが通る状態であることをご確認ください。
make unittest
我々の取り組む分野に対する要求は多岐に渡るため、MarketStoreは柔軟にプラグインを追加できるアーキテクチャをとっています。 独自のプラグインを生成したい方は plugins をご覧ください。