gateway的订单管理¶
- 网关模块负责管理和同步订单。确保整个系统内,所有的进程拿到的订单快照数据是一致的
- 网关模块是订单的唯一维护者,其它所有进程只需要读取本地同步后,最新的订单快照数据即可,订单的跨进程同步基于底层的 IPC 发布&订阅 模块来实现的
订单数据流¶
- IPC Command:mm1/mm2 → Gateway(下单/撤单指令)
- 用 IPC 命令 模块来实现的
- 同步:Gateway ↔ Exchange(订单状态双向同步)
- IPC Pub/Sub:Gateway → mm1/mm2 orders(订单状态广播)
┌──────────────┐ IPC Command ┌──────────────┐
│ mm1 │─────────────────────▶│ │
│ (做市策略1) │ │ │
│ │ │ │
│ ┌──────────┐ │ │ Gateway │◀═══════▶┌──────────────┐
│ │ orders │ │◀──┐ │ │ │ Exchange │
│ └──────────┘ │ │ │ ┌──────────┐ │ │ (Kraken) │
└──────────────┘ │ │ │ orders │ │ │ │
│ IPC Pub/Sub │ └──────────┘ │ │ ┌──────────┐ │
┌──────────────┐ │ (订单状态) │ │ │ │ orders │ │
│ mm2 │───│─────────────────▶│ │ │ └──────────┘ │
│ (做市策略2) │ │ │ │ │ │
│ │ │ └──────────────┘ └──────────────┘
│ ┌──────────┐ │ │ │
│ │ orders │ │◀──┘ │
│ └──────────┘ │ │
└──────────────┘ │
│
▼
┌────────────────────────────────────────────────────────────────────────────────┐
│ │
│ ╔═══════════╗ ╔═══════════╗ ╔═══════════╗ ╔═══════════╗ │
│ ║ orders ║ ≡ ║ orders ║ ≡ ║ orders ║ ≡ ║ orders ║ │
│ ║ (mm1) ║ ║ (mm2) ║ ║ (gateway) ║ ║ (exchange)║ │
│ ╚═══════════╝ ╚═══════════╝ ╚═══════════╝ ╚═══════════╝ │
│ │
│ ▲ 同步机制保证订单快照一致 ▲ │
│ │
└────────────────────────────────────────────────────────────────────────────────┘
数据流向
┌───────────────────────────────────────────────────┐
│ 指令: mm ──IPC Command──▶ Gateway ──▶ Exchange │
│ 状态: Exchange ──▶ Gateway ──IPC Pub/Sub──▶ mm │
└───────────────────────────────────────────────────┘
订单状态¶
- 不同交易所推送的订单状态会有差异,这里是我们系统内部对订单状态的定义
- 只有
NONE,NEW,PARTIAL_FILLED三种状态的订单应该留在本地订单列表中,其它状态都应该立即从本地删除
enum class OrderStatus : uint8_t
{
UNKNOWN = 0, // 未知状态
NONE = 1, // 下单时预占位,订单已发送但尚未被交易所确认
NEW, // 订单已被交易所接受,挂在订单册上
FAILED, // 下单失败
PARTIAL_FILLED, // 部分成交
FILLED, // 完全成交
CANCELED, // 已撤销
CLOSED, // 已关闭
EXPIRED // 已过期
};
如何维护订单¶
- 初始化:系统初始化的时候,通过REST API或者WS推送获取全量的订单数据,初始化gateway本地订单缓存 → 广播order
- 订单状态变化:每次收到交易所推送的订单状态变化信息,更新gateway本地订单缓存 → 广播order
- 下单:收到系统内部其它组件的下单请求,会立即将这个订单添加到gateway本地订单缓存,不需要等交易所返回 → 广播order
- 定时同步:每60秒 REST API 校验并修正,有具体的校验和修正逻辑,并把修正后的结果广播出去 → 广播order_batch
- 每60秒gateway会广播一次订单snapshot,所有的service在收到订单快照之后,直接覆盖本地的订单列表即可
- 所有的service进程在启动之后,都会有大于60秒的预热期,这段时间是不会下单的,确保service在下单前,一定能同步到一份最新的订单数据
- 错误处理:撤单的时候,如果遇到订单不存在的的返回值,gateway会把订单从本地缓存删除,同时广播订单
CLOSED的消息进行订单的同步 → 广播order- 只会广播一次。如果gateway本地存在这个订单,才会广播
CLOSED消息;如果本地不存在,则直接忽略订单不存在的返回值 - 在其它节点同步订单的时候,由于异常导致的僵尸订单会被snapshot覆盖,从而在其它节点上被删除
- 只会广播一次。如果gateway本地存在这个订单,才会广播
- 网络异常,时序问题:有记住处理网络异常和订单同步时候的时序问题
注意
- 总的来说我们采用一种保守的订单同步机制,确保本地的订单是交易所订单的超集,这种机制在风控,头寸管理,资金管理上更有优势
- 交易所有,而本地没有:不管任何错误或者异常发生,有机制确保交易所的订单一定会被同步到本地而不会漏掉,从而一定会被撤掉,而不会有订单一直留在交易所的订单册上
- 本地有,而交易所没有:不管任何错误或者异常发生,本地都不会有僵尸订单长期存在,这些订单一定会被清理掉
- 下单之后,会立即将订单添加到本地订单缓存中,不需要等交易所的返回
- 下单之后,也可以立即撤单,不需要等交易所推送的下单成功消息
Service如何同步订单¶
对于service来说,在配置中增加如下配置,订单同步信息会自动路由到对应的on_order和on_my_order_batch回调中,所有的同步逻辑,框架会在底层自动完成
配置信息:
{
"service.mm_demo": {
"class_name": "MMDemoService",
"setting": {
...
"ipc_subscribers": [
{
"exchange": "binance",
"channel": "my_order",
"symbols": []
},
{
"exchange": "binance",
"channel": "my_order_batch",
"symbols": []
}
],
...
}
}
}
回调函数:
class MMDemoService(ServiceBase):
def on_my_order(self, my_order: Any) -> None:
pass
def on_my_order_batch(self, my_order_list: List[Any]) -> None:
pass