• 请不要在回答技术问题时复制粘贴 AI 生成的内容
loveyoubaby5120
V2EX  ›  程序员

[分享创造] mqttkit:给 MQTT 写应用,像写 Elysia / Hono 那样

  •  
  •   loveyoubaby5120 · 10h 5m ago · 346 views

    给 MQTT 写应用,为什么不能像写 Elysia / Hono 一样?

    mqttkit:在 Aedes / EMQX 之上加一层应用框架——有序中间件、类型化 topic 路由、MQTT 5 RPC 、自动生成 AsyncAPI 文档。

    起因:MQTT 的应用层一直是一片散沙

    只要在 Node 里写过认真一点的 MQTT 后端,下面这段代码你大概率写过:

    client.on('message', (topic, payload) => {
      if (topic.startsWith('devices/') && topic.endsWith('/events')) {
        const uid = topic.split('/')[1]
        // 临时手写鉴权
        // 临时手写 JSON.parse + 校验
        // 临时手写错误处理
        // 临时手写埋点
        // ...
      } else if (topic.startsWith('server/')) {
        // ...
      }
    })
    

    这就是 HTTP 世界十年前那种 http.createServer((req, res) => { if (req.url === '/users') ... }) 的写法——只不过换成了 MQTT 。HTTP 那边我们靠 Express 、Koa 、Fastify ,最近还有 Hono 、Elysia 把这个模式彻底解决了。MQTT 这边一直没有。

    mqttkit 想补齐的就是这一层。

    设计决策:不重新实现 MQTT 协议

    Node 生态本来就有很好的 broker:

    • Aedes — 嵌入式 broker ,CONNECT / SUBSCRIBE / PUBLISH / QoS / retain / session / persistence / MQTT-over-WebSocket 全套都给你。
    • EMQX 、Mosquitto 、NanoMQ — 真要扛百万连接时用这些。

    这些东西没必要重写。真正缺的是应用层

    • 怎么声明式地说「这个 topic 必须经过 XX 鉴权」?
    • 怎么用我已经在 HTTP 路由里用着的同一份 schema 校验 MQTT 的 payload ?
    • 怎么用 MQTT 5 做请求/响应,而不用手工管理 correlationData ?
    • 怎么白嫖一份 AsyncAPI 文档?
    • 怎么接 Prometheus / OpenTelemetry ,而不用去 hack broker ?

    mqttkit 就是这一层。通过 @mqttkit/aedes 适配到 Aedes ,broker 是可插拔的——你也可以自己写适配器接 EMQX 、NanoMQ 。

    代码长这样

    import { aedes } from '@mqttkit/aedes'
    import { MqttApp, router } from '@mqttkit/core'
    import { z } from 'zod'
    
    const app = new MqttApp<{ principal?: { uid: string } }>()
      .use(
        aedes({
          tcp: { port: 1883 },
          ws: { port: 8888, path: '/mqtt' },
          authenticate({ clientId, username }) {
            if (!username) return false
            return { uid: username || clientId }
          },
        }),
      )
      .use(
        router<{ principal?: { uid: string } }>()
          .topic('devices/:uid/events', {
            publish: ({ params, principal }) => params.uid === principal?.uid,
            schema: { body: z.object({ temperature: z.number() }) },
            timeout: 1_000,
            concurrency: 100,
            async onMessage(ctx) {
              // ctx.params.uid       → string
              // ctx.body.temperature → number (已校验、有类型)
              await ctx.publish(`server/${ctx.params.uid}/ack`, 'ok')
            },
          }),
      )
    
    await app.listen()
    

    写过 Elysia / Hono 的肌肉记忆可以直接迁移:use()、有序中间件、泛型驱动的类型推断、plugin 组合。

    这些特性是真正省时间的部分

    1. Topic 参数 + Standard Schema 校验

    router().topic('devices/:uid/events', {
      schema: { body: z.object({ temperature: z.number() }) },
      async onMessage(ctx) {
        ctx.params.uid          // string ,从 topic 模式抽出来
        ctx.body.temperature    // number ,校验过、有类型
      },
    })
    

    任何 Standard Schema 实现都能用:zod 、valibot 、arktype 。不需要学 mqttkit 专属的 schema 方言。

    2. 发布/订阅策略写成数据,而不是散落各处的回调

    .topic('devices/:uid/events', {
      publish:   ({ params, principal }) => params.uid === principal?.uid,
      subscribe: ({ params, principal }) => params.uid === principal?.uid,
    })
    

    像路由定义一样易读,在正确的阶段被调用。再也不用满项目搜 aedes.authorizePublish

    3. MQTT 5 RPC 带重试

    const reply = await app.request('devices/alpha/cmd', 'reboot', {
      timeout: 500,
      retries: 2,
      retryDelay: 20,
    })
    

    correlationData 、responseTopic 、超时、重试,全部处理好。设备端用 ctx.reply(...) 收尾。

    4. 路由级护栏

    .topic('expensive/op', {
      timeout: 1_000,          // 超时直接 fail-fast
      concurrency: 100,        // 路由级过载保护
      onError: ({ error }) => metrics.routeFailures.inc(),
      async onMessage(ctx) { /* ... */ },
    })
    

    超时和并发上限内置,触发后会以命名 phase 走 onErrortimeout / overload / validation / policy / handler / middleware / publish

    5. AsyncAPI 3.0 文档零成本生成

    import { asyncapi } from '@mqttkit/asyncapi'
    
    app.use(asyncapi({
      info: { title: 'My IoT API', version: '1.0.0' },
      http: { port: 9000 },     // 9000 端口直接看文档
    }))
    

    topic 路由本来就是声明式的——mqttkit 遍历一遍就能吐出 AsyncAPI 3.0 。配 @mqttkit/zod@mqttkit/typebox,payload 的 JSON Schema 也会一起带上。

    6. 结构化指标接 Prometheus / OTel

    app.onMetric((evt) => {
      // evt.kind: 'dispatch' | 'publish'
      // evt.route, evt.topic, evt.durationMs, evt.outcome
      prometheus.observe(evt)
    })
    

    不用 monkey-patch broker ,不用包装 handler 。框架本身就知道每次 dispatch 的开始和结束。

    7. 共享订阅、生命周期事件、服务端主动 publish

    router().topic('$share/workers/jobs/+/run', { /* 多实例扇出 */ })
    
    app.on('client.connect', ({ clientId }) => audit.log('connect', clientId))
    app.publish('server/broadcast', JSON.stringify({ shutdown: true }))
    

    8. 内存版 TestBroker 跑单测

    import { createTestApp } from '@mqttkit/core/testing'
    
    const { app, broker } = createTestApp()
    // 不开 TCP 、不开 socket ,dispatch 走的是同一条 pipeline
    

    毫秒级跑完真实的中间件 / router / RPC 链路。

    什么时候适合 mqttkit ,什么时候不适合

    适合:

    • 在做 IoT 后端、设备遥测管线、实时游戏服、或者任何 TypeScript 的 MQTT 应用。
    • 想要鉴权 / 校验 / 指标 / 文档,但不想重复写五遍。
    • 喜欢 Elysia / Hono / Fastify 的心智模型。
    • 用 Bun (一等公民)或 Node 20+。

    不适合:

    • 你需要的是一个能扛十万连接的 broker——那应该直接用 EMQX 或 NanoMQ ,mqttkit 是应用层,不是 broker 层。
    • 你写的是五行的桥接脚本——mqtt.js 就够了。

    安装

    bun add @mqttkit/core @mqttkit/aedes aedes
    # 或
    npm install @mqttkit/core @mqttkit/aedes aedes
    

    包的分工:

    • @mqttkit/core — app 、router 、middleware 、RPC 、testing broker
    • @mqttkit/aedes — Aedes 适配器( TCP + WebSocket )
    • @mqttkit/asyncapi — AsyncAPI 3.0 生成器
    • @mqttkit/typebox@mqttkit/zod — schema helper

    链接

    写过 MQTT 后端、踩过同样的坑的朋友,欢迎来仓库提 issue 、PR 或者直接反馈你觉得还缺什么。觉得有用顺手给个 star——这是最便宜的支持方式。


    mqttkit 是 MIT 协议,面向 Bun + TypeScript 构建。

    No Comments Yet
    About   ·   Help   ·   Advertise   ·   Blog   ·   API   ·   FAQ   ·   Solana   ·   920 Online   Highest 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 42ms · UTC 20:09 · PVG 04:09 · LAX 13:09 · JFK 16:09
    ♥ Do have faith in what you're doing.