最近在做实时行情相关的项目,刚开始觉得很简单:连上、收消息、处理数据就完事了。结果线上跑几天后发现各种问题:
- 网络抖动导致连接断开
- 服务端主动踢连接
- 长时间没消息被中间设备断开
- 重连太频繁把自己服务器打挂
后来重新整理了一套比较稳的方案,核心有 3 个:
- 心跳
- 重连
- 退避机制
1.心跳不能省
WebSocket 建立连接不一定就一直在线。运营商网络、NAT 设备、负载均衡器都可能清理长时间无流量的连接。所以一定要做心跳。 如果服务端支持 ping/pong ,直接使用协议层心跳即可。 如果不支持,可以定时发送业务心跳消息。 例如:
async def heartbeat(ws):
while True:
try:
await ws.ping()
await asyncio.sleep(30)
except Exception:
break
30 秒左右发一次通常就够用了。
2.重连是标配
WebSocket 断开是常态,不是异常。所以不要把代码写成:
await websocket.recv()
然后指望它永远不掉线。 我觉得应该是:
while True:
try:
await connect()
except Exception as e:
print(e)
await asyncio.sleep(5)
连接断开以后自动重新建立连接。这样即使服务端重启或者网络闪断,也能自动恢复。
3.别直接疯狂重连
一个常见错误可能是⬇️
断线:
while True:
try:
connect()
except:
pass
结果:
- CPU 飙高
- 日志刷爆
- 服务端压力暴增
这时候就需要退避机制,也就是每次失败后逐渐增加等待时间。例如:
delay = 1
while True:
try:
await connect()
delay = 1
except Exception:
await asyncio.sleep(delay)
delay = min(delay \* 2, 60)
等待时间变成 1s 、2s 、4s 、8s 、16s 、32s 、60s...
连接恢复后再重置。这样会稳定很多。
4.WebSocket 行情订阅
连接建立后订阅需要的品种即可,以 BTC 为例:
import asyncio
import json
import websockets
WS\_URL = "wss://quote.alltick.io/quote-b-ws-api"
API\_TOKEN = "YOUR\_API\_TOKEN"
async def subscribe():
async with websockets.connect(WS\_URL) as ws:
sub\_msg = {
"cmd\_id": 22002,
"seq\_id": 1,
"trace": "test",
"data": {
"symbol\_list": \[
{
"code": "BTCUSDT"
}
\]
},
"token": API\_TOKEN
}
await ws.send(json.dumps(sub\_msg))
while True:
msg = await ws.recv()
print(msg)
asyncio.run(subscribe())
如果是生产环境,我一般会把:
- 连接管理
- 心跳任务
- 重连逻辑
- 退避机制
全部封装到一个 ConnectionManager 里面。业务层只负责消费消息,不关心连接状态。这样后面维护会轻松很多。