推荐学习书目
Learn Python the Hard Way
Python Sites
PyPI - Python Package Index
http://diveintopython.org/toc/index.html
Pocoo
值得关注的项目
PyPy
Celery
Jinja2
Read the Docs
gevent
pyenv
virtualenv
Stackless Python
Beautiful Soup
结巴中文分词
Green Unicorn
Sentry
Shovel
Pyflakes
pytest
Python 编程
pep8 Checker
Styles
PEP 8
Google Python Style Guide
Code Style from The Hitchhiker's Guide
matters
V2EX  ›  Python

Python WebSocket 长连接到底怎么写才稳?

  •  
  •   matters · 11h 55m ago · 450 views

    最近在做实时行情相关的项目,刚开始觉得很简单:连上、收消息、处理数据就完事了。结果线上跑几天后发现各种问题:

    • 网络抖动导致连接断开
    • 服务端主动踢连接
    • 长时间没消息被中间设备断开
    • 重连太频繁把自己服务器打挂

    后来重新整理了一套比较稳的方案,核心有 3 个:

    • 心跳
    • 重连
    • 退避机制

    1.心跳不能省

    WebSocket 建立连接不一定就一直在线。运营商网络、NAT 设备、负载均衡器都可能清理长时间无流量的连接。所以一定要做心跳。 如果服务端支持 ping/pong ,直接使用协议层心跳即可。 如果不支持,可以定时发送业务心跳消息。 例如:

    async def heartbeat(ws):
    
        while True:
    
            try:
    
                await ws.ping()
    
                await asyncio.sleep(30)
    
            except Exception:
    
                break
    

    30 秒左右发一次通常就够用了。

    2.重连是标配

    WebSocket 断开是常态,不是异常。所以不要把代码写成:

    await websocket.recv()
    

    然后指望它永远不掉线。 我觉得应该是:

    while True:
    
        try:
    
            await connect()
    
        except Exception as e:
    
            print(e)
    
        await asyncio.sleep(5)
    

    连接断开以后自动重新建立连接。这样即使服务端重启或者网络闪断,也能自动恢复。

    3.别直接疯狂重连

    一个常见错误可能是⬇️

    断线:

    while True:
    
        try:
    
            connect()
    
        except:
    
            pass
    

    结果:

    • CPU 飙高
    • 日志刷爆
    • 服务端压力暴增

    这时候就需要退避机制,也就是每次失败后逐渐增加等待时间。例如:

    delay = 1
    
    while True:
    
        try:
    
            await connect()
    
            delay = 1
    
        except Exception:
    
            await asyncio.sleep(delay)
    
            delay = min(delay \* 2, 60)
    

    等待时间变成 1s 、2s 、4s 、8s 、16s 、32s 、60s...

    连接恢复后再重置。这样会稳定很多。

    4.WebSocket 行情订阅

    连接建立后订阅需要的品种即可,以 BTC 为例:

    import asyncio
    
    import json
    
    import websockets
    
    WS\_URL = "wss://quote.alltick.io/quote-b-ws-api"
    
    API\_TOKEN = "YOUR\_API\_TOKEN"
    
    async def subscribe():
    
        async with websockets.connect(WS\_URL) as ws:
    
            sub\_msg = {
    
                "cmd\_id": 22002,
    
                "seq\_id": 1,
    
                "trace": "test",
    
                "data": {
    
                    "symbol\_list": \[
    
                        {
    
                            "code": "BTCUSDT"
    
                        }
    
                    \]
    
                },
    
                "token": API\_TOKEN
    
            }
    
            await ws.send(json.dumps(sub\_msg))
    
            while True:
    
                msg = await ws.recv()
    
                print(msg)
    
    asyncio.run(subscribe())
    

    如果是生产环境,我一般会把:

    • 连接管理
    • 心跳任务
    • 重连逻辑
    • 退避机制

    全部封装到一个 ConnectionManager 里面。业务层只负责消费消息,不关心连接状态。这样后面维护会轻松很多。

    No Comments Yet
    About   ·   Help   ·   Advertise   ·   Blog   ·   API   ·   FAQ   ·   Solana   ·   962 Online   Highest 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 29ms · UTC 19:48 · PVG 03:48 · LAX 12:48 · JFK 15:48
    ♥ Do have faith in what you're doing.