跳转至

gateway的订单管理

  • 网关模块负责管理和同步订单。确保整个系统内,所有的进程拿到的订单快照数据是一致的
  • 网关模块是订单的唯一维护者,其它所有进程只需要读取本地同步后,最新的订单快照数据即可,订单的跨进程同步基于底层的 IPC 发布&订阅 模块来实现的

订单数据流

  • IPC Command:mm1/mm2 → Gateway(下单/撤单指令)
  • 同步:Gateway ↔ Exchange(订单状态双向同步)
  • IPC Pub/Sub:Gateway → mm1/mm2 orders(订单状态广播)
  ┌──────────────┐     IPC Command      ┌──────────────┐
  │     mm1      │─────────────────────▶│              │
  │  (做市策略1)  │                      │              │
  │              │                      │              │
  │ ┌──────────┐ │                      │   Gateway    │◀═══════▶┌──────────────┐
  │ │  orders  │ │◀──┐                  │              │         │   Exchange   │
  │ └──────────┘ │   │                  │ ┌──────────┐ │         │   (Kraken)   │
  └──────────────┘   │                  │ │  orders  │ │         │              │
                     │   IPC Pub/Sub    │ └──────────┘ │         │ ┌──────────┐ │
  ┌──────────────┐   │   (订单状态)      │              │         │ │  orders  │ │
  │     mm2      │───│─────────────────▶│              │         │ └──────────┘ │
  │  (做市策略2)  │   │                  │              │         │              │
  │              │   │                  └──────────────┘         └──────────────┘
  │ ┌──────────┐ │   │                         │
  │ │  orders  │ │◀──┘                         │
  │ └──────────┘ │                             │
  └──────────────┘                             │
                                               │
                                               ▼
┌────────────────────────────────────────────────────────────────────────────────┐
│                                                                                │
│   ╔═══════════╗     ╔═══════════╗     ╔═══════════╗     ╔═══════════╗          │
│   ║  orders   ║  ≡  ║  orders   ║  ≡  ║  orders   ║  ≡  ║  orders   ║          │
│   ║   (mm1)   ║     ║   (mm2)   ║     ║ (gateway) ║     ║ (exchange)║          │
│   ╚═══════════╝     ╚═══════════╝     ╚═══════════╝     ╚═══════════╝          │
│                                                                                │
│                        ▲  同步机制保证订单快照一致  ▲                              │
│                                                                                │
└────────────────────────────────────────────────────────────────────────────────┘

数据流向

┌───────────────────────────────────────────────────┐
│  指令:   mm ──IPC Command──▶ Gateway ──▶ Exchange │
│  状态:   Exchange ──▶ Gateway ──IPC Pub/Sub──▶ mm │
└───────────────────────────────────────────────────┘

订单状态

  • 不同交易所推送的订单状态会有差异,这里是我们系统内部对订单状态的定义
  • 只有NONE,NEW,PARTIAL_FILLED三种状态的订单应该留在本地订单列表中,其它状态都应该立即从本地删除
enum class OrderStatus : uint8_t
{
    UNKNOWN = 0,       // 未知状态
    NONE = 1,          // 下单时预占位,订单已发送但尚未被交易所确认
    NEW,               // 订单已被交易所接受,挂在订单册上
    FAILED,            // 下单失败
    PARTIAL_FILLED,    // 部分成交
    FILLED,            // 完全成交
    CANCELED,          // 已撤销
    CLOSED,            // 已关闭
    EXPIRED            // 已过期
};

如何维护订单

  • 初始化:系统初始化的时候,通过REST API或者WS推送获取全量的订单数据,初始化gateway本地订单缓存 → 广播order
  • 订单状态变化:每次收到交易所推送的订单状态变化信息,更新gateway本地订单缓存 → 广播order
  • 下单:收到系统内部其它组件的下单请求,会立即将这个订单添加到gateway本地订单缓存,不需要等交易所返回 → 广播order
  • 定时同步:每60秒 REST API 校验并修正,有具体的校验和修正逻辑,并把修正后的结果广播出去 → 广播order_batch
    • 每60秒gateway会广播一次订单snapshot,所有的service在收到订单快照之后,直接覆盖本地的订单列表即可
    • 所有的service进程在启动之后,都会有大于60秒的预热期,这段时间是不会下单的,确保service在下单前,一定能同步到一份最新的订单数据
  • 错误处理:撤单的时候,如果遇到订单不存在的的返回值,gateway会把订单从本地缓存删除,同时广播订单CLOSED的消息进行订单的同步 → 广播order
    • 只会广播一次。如果gateway本地存在这个订单,才会广播CLOSED消息;如果本地不存在,则直接忽略订单不存在的返回值
    • 在其它节点同步订单的时候,由于异常导致的僵尸订单会被snapshot覆盖,从而在其它节点上被删除
  • 网络异常,时序问题:有记住处理网络异常和订单同步时候的时序问题

注意

  • 总的来说我们采用一种保守的订单同步机制,确保本地的订单是交易所订单的超集,这种机制在风控,头寸管理,资金管理上更有优势
  • 交易所有,而本地没有:不管任何错误或者异常发生,有机制确保交易所的订单一定会被同步到本地而不会漏掉,从而一定会被撤掉,而不会有订单一直留在交易所的订单册上
  • 本地有,而交易所没有:不管任何错误或者异常发生,本地都不会有僵尸订单长期存在,这些订单一定会被清理掉
  • 下单之后,会立即将订单添加到本地订单缓存中,不需要等交易所的返回
  • 下单之后,也可以立即撤单,不需要等交易所推送的下单成功消息

Service如何同步订单

对于service来说,在配置中增加如下配置,订单同步信息会自动路由到对应的on_orderon_my_order_batch回调中,所有的同步逻辑,框架会在底层自动完成

配置信息:

{
    "service.mm_demo": {
        "class_name": "MMDemoService",
        "setting": {
            ...
            "ipc_subscribers": [
                {
                    "exchange": "binance",
                    "channel": "my_order",
                    "symbols": []
                },
                {
                    "exchange": "binance",
                    "channel": "my_order_batch",
                    "symbols": []
                }
            ],
            ...
        }
    }
}

回调函数:

class MMDemoService(ServiceBase):
    def on_my_order(self, my_order: Any) -> None:
        pass
    def on_my_order_batch(self, my_order_list: List[Any]) -> None:
        pass