Skip to content

WebSocket 设计与实现

独立库已发布:WebSocket 模块现已作为独立库发布在 github.com/bingo-project/websocket。你可以在不依赖完整 Bingo 框架的情况下单独使用它。

本文档介绍 Bingo 的 WebSocket 实现,采用 JSON-RPC 2.0 协议,支持中间件、分组路由和连接管理。

目录

  1. 设计目标
  2. 消息格式
  3. 中间件架构
  4. 路由与 Handler
  5. 认证流程
  6. 连接管理
  7. 推送与订阅
  8. Prometheus 指标
  9. 连接限制
  10. 适用场景
  11. 目录结构

设计目标

  1. 统一消息格式 - 采用 JSON-RPC 2.0 标准,与 HTTP/gRPC 错误格式一致
  2. 中间件模式 - 与 HTTP (Gin) 和 gRPC (interceptor) 保持一致的编程模型
  3. 灵活分组 - 支持 public/private 分组,不同方法使用不同中间件链
  4. 业务复用 - WebSocket Handler 直接调用 Biz 层,三协议共享业务逻辑

消息格式

WebSocket 采用 JSON-RPC 2.0 规范,这是行业广泛采用的标准(MCP、以太坊、VSCode LSP 等)。

请求格式

json
{
    "jsonrpc": "2.0",
    "method": "auth.login",
    "params": {"username": "test", "password": "123456", "platform": "web"},
    "id": 1
}
字段类型说明
jsonrpcstring固定 "2.0"
methodstring方法名,如 "auth.login"、"subscribe"
paramsobject请求参数,与 HTTP body 格式一致
idstring/number请求 ID,用于匹配响应

成功响应

json
{
    "jsonrpc": "2.0",
    "result": {"accessToken": "xxx", "expiresAt": 1234567890},
    "id": 1
}

result 字段与 HTTP 响应 body 格式完全一致。

错误响应

json
{
    "jsonrpc": "2.0",
    "error": {
        "code": -32001,
        "reason": "Unauthorized",
        "message": "Login required"
    },
    "id": 1
}
字段说明
codeJSON-RPC 错误码(负整数)
reason业务错误码,与 HTTP 响应的 reason 一致
message错误描述

服务端推送

服务端主动推送使用 Notification 格式(无 id 字段):

json
{
    "jsonrpc": "2.0",
    "method": "session.kicked",
    "params": {"reason": "您的账号已在其他设备登录"}
}

HTTP → JSON-RPC 错误码映射

HTTP StatusJSON-RPC Code说明
400-32602Invalid params
401-32001Unauthorized
403-32003Permission denied
404-32004Not found
429-32029Too many requests
500-32603Internal error

中间件架构

WebSocket 中间件与 HTTP/gRPC 保持一致的编程模型。

架构概览

WebSocket Message


┌─────────┐  ┌───────────┐  ┌───────────┐  ┌────────┐
│Recovery │→ │ RequestID │→ │  Logger   │→ │RateLimit│  ← 全局中间件
└─────────┘  └───────────┘  └───────────┘  └────────┘


┌─────────────────────────────────────────────────────┐
│                   路由分发                           │
│  ┌─────────────────┐    ┌─────────────────────┐     │
│  │   Public Group  │    │   Private Group     │     │
│  │                 │    │  ┌──────┐           │     │
│  │  • heartbeat    │    │  │ Auth │           │     │
│  │  • auth.login   │    │  └──────┘           │     │
│  │                 │    │  • subscribe        │     │
│  │                 │    │  • auth.user-info   │     │
│  └─────────────────┘    └─────────────────────┘     │
└─────────────────────────────────────────────────────┘


┌─────────┐
│ Handler │ → Biz Layer
└─────────┘

核心类型

go
// github.com/bingo-project/websocket/context.go

// Context 中间件上下文,嵌入 context.Context 可直接传递给 Biz 层
type Context struct {
    context.Context               // 嵌入标准 context,可直接传递给下层
    Request   *jsonrpc.Request    // JSON-RPC 请求
    Client    *Client             // WebSocket 客户端
    Method    string              // 方法名
    StartTime time.Time           // 请求开始时间
}

// 响应辅助方法
func (c *Context) JSON(data any) *jsonrpc.Response   // 返回成功响应
func (c *Context) Error(err error) *jsonrpc.Response // 返回错误响应

// Handler 消息处理函数
type Handler func(*Context) *jsonrpc.Response

// Middleware 中间件函数
type Middleware func(Handler) Handler

// Chain 组合多个中间件
func Chain(middlewares ...Middleware) Middleware

内置中间件

中间件位置说明
Recovery / RecoveryWithLoggermiddleware/recovery.go捕获 panic,返回 500 错误;WithLogger 版本支持自定义日志记录器
RequestIDmiddleware/requestid.go注入 request-id 到 context
Logger / LoggerWithLoggermiddleware/logger.go记录请求日志和延迟;WithLogger 版本支持自定义日志记录器
Authmiddleware/auth.go验证用户已登录
RateLimitWithStoremiddleware/ratelimit.go令牌桶限流,使用 Redis 存储
LoginStateUpdatermiddleware/login.go登录成功后更新客户端状态

自定义日志记录器

go
// 使用自定义日志记录器
import "github.com/marmotedu/iam/pkg/log"

router.Use(
    middleware.RecoveryWithLogger(log.L()),  // 使用项目日志记录器
    middleware.LoggerWithLogger(log.L()),    // 使用项目日志记录器
)

路由与 Handler

Router

go
// github.com/bingo-project/websocket/router.go

// Router WebSocket 方法路由器
type Router struct {
    middlewares []Middleware
    handlers    map[string]*handlerEntry
}

// NewRouter 创建路由器
func NewRouter() *Router

// Use 添加全局中间件
func (r *Router) Use(middlewares ...Middleware) *Router

// Handle 注册方法处理器
func (r *Router) Handle(method string, handler Handler, middlewares ...Middleware)

// Group 创建分组
func (r *Router) Group(middlewares ...Middleware) *Group

// Dispatch 分发请求
func (r *Router) Dispatch(c *Context) *jsonrpc.Response

Handler 注册示例

go
// internal/apiserver/router/ws.go

func RegisterWSHandlers(router *ws.Router, rateLimitStore *middleware.RateLimiterStore) {
    h := wshandler.NewHandler(store.S)

    // 全局中间件
    router.Use(
        middleware.Recovery,
        middleware.RequestID,
        middleware.Logger,
        middleware.RateLimitWithStore(rateLimitStore, &middleware.RateLimitConfig{
            Default: 10,
            Methods: map[string]float64{
                "heartbeat": 0, // 不限制
            },
        }),
    )

    // 公开方法(无需认证)
    public := router.Group()
    public.Handle("heartbeat", ws.HeartbeatHandler)
    public.Handle("system.healthz", h.Healthz)
    public.Handle("auth.login", h.Login, middleware.LoginStateUpdater)

    // 需要认证的方法
    private := router.Group(middleware.Auth)
    private.Handle("subscribe", ws.SubscribeHandler)
    private.Handle("unsubscribe", ws.UnsubscribeHandler)
    private.Handle("auth.user-info", h.UserInfo)
}

Handler 签名

所有 Handler 使用统一的签名,类似 Gin 的 func(c *gin.Context)

go
// github.com/bingo-project/websocket/context.go

// Handler 消息处理函数
type Handler func(*Context) *jsonrpc.Response

// Context 包含请求的所有信息,嵌入 context.Context 可直接传递给 Biz 层
type Context struct {
    context.Context               // 嵌入标准 context
    Request   *jsonrpc.Request    // JSON-RPC 请求
    Client    *Client             // WebSocket 客户端
    Method    string              // 方法名
    StartTime time.Time           // 请求开始时间
}

// BindParams 解析请求参数到结构体
func (c *Context) BindParams(v any) error

// BindValidate 解析并验证请求参数
func (c *Context) BindValidate(v any) error

// JSON 返回成功响应
func (c *Context) JSON(data any) *jsonrpc.Response

// Error 返回错误响应
func (c *Context) Error(err error) *jsonrpc.Response

Handler 实现示例

go
// internal/apiserver/handler/ws/auth.go

func (h *Handler) Login(c *ws.Context) *jsonrpc.Response {
    var req v1.LoginRequest
    if err := c.BindValidate(&req); err != nil {
        return c.Error(errno.ErrInvalidArgument.WithMessage(err.Error()))
    }

    resp, err := h.b.Auth().Login(c, &req)  // ws.Context 嵌入 context.Context,可直接传递
    if err != nil {
        return c.Error(err)
    }

    return c.JSON(resp)
}

func (h *Handler) UserInfo(c *ws.Context) *jsonrpc.Response {
    uid := c.UserID()
    if uid == "" {
        return c.Error(errno.ErrTokenInvalid)
    }

    user, err := store.S.User().GetByUID(c, uid)
    if err != nil {
        return c.Error(errno.ErrUserNotFound)
    }

    return c.JSON(&v1.UserInfo{...})
}

内置 Handler

Handler说明
HeartbeatHandler心跳响应,返回服务器时间
SubscribeHandler订阅主题
UnsubscribeHandler取消订阅

认证流程

WebSocket 支持两种认证方式:

方式一:连接后登录

  1. 客户端建立 WebSocket 连接(匿名状态)
  2. 发送 auth.login 请求
  3. 服务端验证成功后,LoginStateUpdater 中间件更新客户端状态
  4. 后续请求可访问需要认证的方法
Client                          Server
  │                                │
  │──── WebSocket Connect ────────>│  匿名连接
  │                                │
  │──── auth.login ───────────────>│
  │<─── {accessToken: xxx} ────────│  登录成功
  │                                │  LoginStateUpdater 更新状态
  │                                │
  │──── subscribe ────────────────>│  已认证,允许
  │<─── {subscribed: [...]} ───────│

方式二:连接时携带 Token(可选扩展)

可以在 WebSocket 升级时通过 query 参数或 header 传递 token:

ws://example.com/ws?token=xxx

认证状态管理

go
// github.com/bingo-project/websocket/client.go

// IsAuthenticated 检查是否已登录
func (c *Client) IsAuthenticated() bool {
    return c.UserID != "" && c.Platform != "" && c.LoginTime > 0
}

// NotifyLogin 通知 Hub 登录成功
func (c *Client) NotifyLogin(userID, platform string, tokenExpiresAt int64)

连接管理

Hub

Hub 是 WebSocket 连接的中央管理器,负责:

  • 客户端注册/注销
  • 用户登录状态管理
  • 主题订阅/发布
  • 连接清理
go
// github.com/bingo-project/websocket/hub.go

type Hub struct {
    anonymous   map[*Client]bool  // 匿名连接
    clients     map[*Client]bool  // 已认证连接
    users       map[string]*Client // platform_userID -> Client
    clientsByID map[string]*Client // clientID -> Client
    topics      map[string]map[*Client]bool // 主题订阅
}

// 管理 API
func (h *Hub) GetClient(clientID string) *Client
func (h *Hub) GetClientsByUser(userID string) []*Client
func (h *Hub) KickClient(clientID string, reason string) bool
func (h *Hub) KickUser(userID string, reason string) int
func (h *Hub) Stats() *HubStats

// 推送 API
func (h *Hub) PushToTopic(topic, method string, data any)
func (h *Hub) PushToUser(platform, userID, method string, data any)
func (h *Hub) PushToUserAllPlatforms(userID, method string, data any)

连接生命周期

┌─────────────────────────────────────────────────────────────┐
│                      连接生命周期                            │
│                                                             │
│  Connect ──> Register ──> [Anonymous]                       │
│                              │                              │
│                         auth.login                          │
│                              │                              │
│                              ▼                              │
│                        [Authenticated]                      │
│                              │                              │
│            ┌─────────────────┼─────────────────┐            │
│            ▼                 ▼                 ▼            │
│      Token 过期         心跳超时          被踢下线           │
│            │                 │                 │            │
│            └─────────────────┼─────────────────┘            │
│                              ▼                              │
│                         Unregister                          │
└─────────────────────────────────────────────────────────────┘

单设备登录

同一用户在同一平台(web/mobile/desktop)只能有一个活跃连接。新连接登录时,旧连接会收到踢出通知:

json
{
    "jsonrpc": "2.0",
    "method": "session.kicked",
    "params": {"reason": "您的账号已在其他设备登录"}
}

心跳机制

采用双层心跳架构:

┌─────────────────────────────────────────────────────────────┐
│                    协议层 (WebSocket)                        │
│                                                              │
│  服务端 ──── ping (每54s) ────→ 客户端                       │
│  服务端 ←─── pong ─────────── 客户端                        │
│                                                              │
│  目的:检测 TCP 连接活性                                      │
│  超时:60s 未收到 pong → 断开                                │
└─────────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────────┐
│                    应用层 (JSON-RPC)                         │
│                                                              │
│  服务端 ←─── heartbeat (每30s) ─── 客户端                   │
│  服务端 ──── response ──────────→ 客户端                    │
│                                                              │
│  目的:确认客户端还在消费数据、保持 NAT 映射                   │
│  超时:90s 未收到任何消息 → 断开                             │
└─────────────────────────────────────────────────────────────┘

为什么需要双层心跳?

层级检测目标场景
协议层TCP 连接是否存活网络断开、客户端崩溃
应用层客户端是否在消费数据App 被杀但 TCP 未断

客户端要求

  1. 登录后 每 30 秒 发送一次 heartbeat 请求
  2. 90 秒 内无任何消息将被服务端断开
  3. 断开后需自行实现 重连逻辑
  4. 重连后需 重新登录重新订阅 Topic

推送与订阅

推送 API

go
// 用户推送
hub.PushToUser("ios", "user123", "order.created", data)
hub.PushToUserAllPlatforms("user123", "security.alert", data)

// Topic 推送
hub.PushToTopic("group:123", "message.new", data)

// 广播
hub.Broadcast("system.maintenance", data)

Topic 订阅

Topic 用于发布/订阅模式,支持多种实时数据场景:

前缀用途示例
group:群聊group:123
room:聊天室room:lobby
ticker:实时行情ticker:BTC/USDT
device:IoT 设备device:12345

订阅请求

json
{
    "jsonrpc": "2.0",
    "method": "subscribe",
    "params": {"topics": ["group:123", "room:lobby"]},
    "id": 2
}

订阅响应

json
{
    "jsonrpc": "2.0",
    "result": {"subscribed": ["group:123", "room:lobby"]},
    "id": 2
}

Prometheus 指标

WebSocket 库内置 Prometheus 指标支持,便于监控连接状态和消息吞吐。

启用指标

go
import "github.com/prometheus/client_golang/prometheus"

// 创建并注册指标
metrics := websocket.NewMetrics("myapp", "websocket")
metrics.MustRegister(prometheus.DefaultRegisterer)

// 将指标附加到 Hub
hub := websocket.NewHub(websocket.WithMetrics(metrics))

可用指标

指标名类型说明
{namespace}_{subsystem}_connections_totalCounter连接总数
{namespace}_{subsystem}_connections_currentGauge当前连接数
{namespace}_{subsystem}_connections_authenticatedGauge已认证连接数
{namespace}_{subsystem}_connections_anonymousGauge匿名连接数
{namespace}_{subsystem}_messages_sent_totalCounter发送消息总数
{namespace}_{subsystem}_broadcasts_totalCounter广播总数
{namespace}_{subsystem}_errors_totalCounter错误总数(按类型分组)
{namespace}_{subsystem}_topics_currentGauge当前主题数
{namespace}_{subsystem}_subscriptions_totalCounter订阅总数

连接限制

WebSocket 库支持配置最大连接数和单用户连接数限制,防止资源耗尽。

配置

go
cfg := &websocket.HubConfig{
    MaxConnections: 10000,  // 最大总连接数 (0 = 不限制)
    MaxConnsPerUser: 5,     // 每用户最大连接数 (0 = 不限制)
    // ... 其他配置
}

hub := websocket.NewHubWithConfig(cfg)

使用

go
// 接受连接前检查(可选,用于提前拒绝)
if !hub.CanAcceptConnection() {
    http.Error(w, "连接数过多", http.StatusServiceUnavailable)
    return
}

// 登录前检查(可选)
if !hub.CanUserConnect(userID) {
    return c.Error(errors.New(429, "TooManyConnections", "已达到最大连接数"))
}

// 限制也会在 Hub 内部自动执行

适用场景

场景Topic 示例消息特点
即时通讯group:{groupID}广播、@提醒
协同文档doc:{docID}多人编辑、光标同步
实时行情ticker:{symbol}高频小消息
订单通知用户私有推送状态变更
系统维护广播全局通知

目录结构

WebSocket 模块现已作为独立库发布。导入路径:

go
import (
    "github.com/bingo-project/websocket"          // 核心类型 (Hub, Client, Router, Context)
    "github.com/bingo-project/websocket/jsonrpc"  // JSON-RPC 消息类型
    "github.com/bingo-project/websocket/middleware" // 内置中间件
)

Bingo 集成文件:

internal/apiserver/
├── ws.go               # WebSocket 初始化
├── router/
│   └── ws.go           # Handler 注册
└── handler/ws/
    ├── handler.go      # Handler 定义
    ├── auth.go         # 认证相关
    └── system.go       # 系统方法

相关文档


下一步:了解 gRPC-Gateway 集成,学习如何让一份代码同时支持 HTTP 和 gRPC。

Released under the Apache 2.0 License.