Skip to content

WebSocket Design and Implementation

Standalone Library Available: The WebSocket module is now available as a standalone library at github.com/bingo-project/websocket. You can use it independently without the full Bingo framework.

This document describes Bingo's WebSocket implementation, using JSON-RPC 2.0 protocol with middleware, grouped routing, and connection management support.

Table of Contents

  1. Design Goals
  2. Message Format
  3. Middleware Architecture
  4. Routing and Handler
  5. Authentication Flow
  6. Connection Management
  7. Push and Subscribe
  8. Prometheus Metrics
  9. Connection Limits
  10. Use Cases
  11. Directory Structure

Design Goals

  1. Unified Message Format - Use JSON-RPC 2.0 standard, consistent error format with HTTP/gRPC
  2. Middleware Pattern - Consistent programming model with HTTP (Gin) and gRPC (interceptor)
  3. Flexible Grouping - Support public/private groups, different methods use different middleware chains
  4. Business Reuse - WebSocket Handler directly calls Biz layer, all protocols share business logic

Message Format

WebSocket uses JSON-RPC 2.0 specification, a widely adopted industry standard (MCP, Ethereum, VSCode LSP, etc.).

Request Format

json
{
    "jsonrpc": "2.0",
    "method": "auth.login",
    "params": {"username": "test", "password": "123456", "platform": "web"},
    "id": 1
}
FieldTypeDescription
jsonrpcstringFixed "2.0"
methodstringMethod name, e.g., "auth.login", "subscribe"
paramsobjectRequest parameters, same format as HTTP body
idstring/numberRequest ID for matching responses

Success Response

json
{
    "jsonrpc": "2.0",
    "result": {"accessToken": "xxx", "expiresAt": 1234567890},
    "id": 1
}

The result field format is identical to HTTP response body.

Error Response

json
{
    "jsonrpc": "2.0",
    "error": {
        "code": -32001,
        "reason": "Unauthorized",
        "message": "Login required"
    },
    "id": 1
}
FieldDescription
codeJSON-RPC error code (negative integer)
reasonBusiness error code, same as HTTP response reason
messageError description

Server Push

Server-initiated push uses Notification format (no id field):

json
{
    "jsonrpc": "2.0",
    "method": "session.kicked",
    "params": {"reason": "Your account has logged in on another device"}
}

HTTP → JSON-RPC Error Code Mapping

HTTP StatusJSON-RPC CodeDescription
400-32602Invalid params
401-32001Unauthorized
403-32003Permission denied
404-32004Not found
429-32029Too many requests
500-32603Internal error

Middleware Architecture

WebSocket middleware maintains consistent programming model with HTTP/gRPC.

Architecture Overview

WebSocket Message


┌─────────┐  ┌───────────┐  ┌───────────┐  ┌────────┐
│Recovery │→ │ RequestID │→ │  Logger   │→ │RateLimit│  ← Global middleware
└─────────┘  └───────────┘  └───────────┘  └────────┘


┌─────────────────────────────────────────────────────┐
│                   Route Dispatch                     │
│  ┌─────────────────┐    ┌─────────────────────┐     │
│  │   Public Group  │    │   Private Group     │     │
│  │                 │    │  ┌──────┐           │     │
│  │  • heartbeat    │    │  │ Auth │           │     │
│  │  • auth.login   │    │  └──────┘           │     │
│  │                 │    │  • subscribe        │     │
│  │                 │    │  • auth.user-info   │     │
│  └─────────────────┘    └─────────────────────┘     │
└─────────────────────────────────────────────────────┘


┌─────────┐
│ Handler │ → Biz Layer
└─────────┘

Core Types

go
// github.com/bingo-project/websocket/context.go

// Context middleware context, embeds context.Context for direct Biz layer passing
type Context struct {
    context.Context               // Embedded standard context, can be passed directly to lower layers
    Request   *jsonrpc.Request    // JSON-RPC request
    Client    *Client             // WebSocket client
    Method    string              // Method name
    StartTime time.Time           // Request start time
}

// Response helper methods
func (c *Context) JSON(data any) *jsonrpc.Response   // Return success response
func (c *Context) Error(err error) *jsonrpc.Response // Return error response

// Handler message processing function
type Handler func(*Context) *jsonrpc.Response

// Middleware function
type Middleware func(Handler) Handler

// Chain combines multiple middlewares
func Chain(middlewares ...Middleware) Middleware

Built-in Middleware

MiddlewareLocationDescription
Recovery / RecoveryWithLoggermiddleware/recovery.goCatch panic, return 500 error; WithLogger version supports custom logger
RequestIDmiddleware/requestid.goInject request-id into context
Logger / LoggerWithLoggermiddleware/logger.goLog requests and latency; WithLogger version supports custom logger
Authmiddleware/auth.goVerify user is logged in
RateLimitWithStoremiddleware/ratelimit.goToken bucket rate limiting with Redis storage
LoginStateUpdatermiddleware/login.goUpdate client state after successful login

Custom Logger:

go
// Using custom logger
import "github.com/marmotedu/iam/pkg/log"

router.Use(
    middleware.RecoveryWithLogger(log.L()),  // Use project logger
    middleware.LoggerWithLogger(log.L()),    // Use project logger
)

Routing and Handler

Router

go
// github.com/bingo-project/websocket/router.go

// Router WebSocket method router
type Router struct {
    middlewares []Middleware
    handlers    map[string]*handlerEntry
}

// NewRouter creates a router
func NewRouter() *Router

// Use adds global middleware
func (r *Router) Use(middlewares ...Middleware) *Router

// Handle registers method handler
func (r *Router) Handle(method string, handler Handler, middlewares ...Middleware)

// Group creates a group
func (r *Router) Group(middlewares ...Middleware) *Group

// Dispatch dispatches request
func (r *Router) Dispatch(c *Context) *jsonrpc.Response

Handler Registration Example

go
// internal/apiserver/router/ws.go

func RegisterWSHandlers(router *ws.Router, rateLimitStore *middleware.RateLimiterStore) {
    h := wshandler.NewHandler(store.S)

    // Global middleware
    router.Use(
        middleware.Recovery,
        middleware.RequestID,
        middleware.Logger,
        middleware.RateLimitWithStore(rateLimitStore, &middleware.RateLimitConfig{
            Default: 10,
            Methods: map[string]float64{
                "heartbeat": 0, // No limit
            },
        }),
    )

    // Public methods (no authentication required)
    public := router.Group()
    public.Handle("heartbeat", ws.HeartbeatHandler)
    public.Handle("system.healthz", h.Healthz)
    public.Handle("auth.login", h.Login, middleware.LoginStateUpdater)

    // Methods requiring authentication
    private := router.Group(middleware.Auth)
    private.Handle("subscribe", ws.SubscribeHandler)
    private.Handle("unsubscribe", ws.UnsubscribeHandler)
    private.Handle("auth.user-info", h.UserInfo)
}

Handler Signature

All Handlers use a unified signature, similar to Gin's func(c *gin.Context):

go
// github.com/bingo-project/websocket/context.go

// Handler message processing function
type Handler func(*Context) *jsonrpc.Response

// Context contains all request information, embeds context.Context for direct Biz layer passing
type Context struct {
    context.Context               // Embedded standard context
    Request   *jsonrpc.Request    // JSON-RPC request
    Client    *Client             // WebSocket client
    Method    string              // Method name
    StartTime time.Time           // Request start time
}

// BindParams parses request parameters into struct
func (c *Context) BindParams(v any) error

// BindValidate parses and validates request parameters
func (c *Context) BindValidate(v any) error

// JSON returns success response
func (c *Context) JSON(data any) *jsonrpc.Response

// Error returns error response
func (c *Context) Error(err error) *jsonrpc.Response

Handler Implementation Example

go
// internal/apiserver/handler/ws/auth.go

func (h *Handler) Login(c *ws.Context) *jsonrpc.Response {
    var req v1.LoginRequest
    if err := c.BindValidate(&req); err != nil {
        return c.Error(errno.ErrInvalidArgument.WithMessage(err.Error()))
    }

    resp, err := h.b.Auth().Login(c, &req)  // ws.Context embeds context.Context, can be passed directly
    if err != nil {
        return c.Error(err)
    }

    return c.JSON(resp)
}

func (h *Handler) UserInfo(c *ws.Context) *jsonrpc.Response {
    uid := c.UserID()
    if uid == "" {
        return c.Error(errno.ErrTokenInvalid)
    }

    user, err := store.S.User().GetByUID(c, uid)
    if err != nil {
        return c.Error(errno.ErrUserNotFound)
    }

    return c.JSON(&v1.UserInfo{...})
}

Built-in Handlers

HandlerDescription
HeartbeatHandlerHeartbeat response, returns server time
SubscribeHandlerSubscribe to topic
UnsubscribeHandlerUnsubscribe from topic

Authentication Flow

WebSocket supports two authentication methods:

Method 1: Login After Connection

  1. Client establishes WebSocket connection (anonymous state)
  2. Send auth.login request
  3. After server verification, LoginStateUpdater middleware updates client state
  4. Subsequent requests can access authenticated methods
Client                          Server
  │                                │
  │──── WebSocket Connect ────────>│  Anonymous connection
  │                                │
  │──── auth.login ───────────────>│
  │<─── {accessToken: xxx} ────────│  Login successful
  │                                │  LoginStateUpdater updates state
  │                                │
  │──── subscribe ────────────────>│  Authenticated, allowed
  │<─── {subscribed: [...]} ───────│

Method 2: Token in Connection (Optional Extension)

Token can be passed via query parameter or header during WebSocket upgrade:

ws://example.com/ws?token=xxx

Authentication State Management

go
// github.com/bingo-project/websocket/client.go

// IsAuthenticated checks if logged in
func (c *Client) IsAuthenticated() bool {
    return c.UserID != "" && c.Platform != "" && c.LoginTime > 0
}

// NotifyLogin notifies Hub of successful login
func (c *Client) NotifyLogin(userID, platform string, tokenExpiresAt int64)

Connection Management

Hub

Hub is the central manager for WebSocket connections, responsible for:

  • Client registration/unregistration
  • User login state management
  • Topic subscription/publishing
  • Connection cleanup
go
// github.com/bingo-project/websocket/hub.go

type Hub struct {
    anonymous   map[*Client]bool  // Anonymous connections
    clients     map[*Client]bool  // Authenticated connections
    users       map[string]*Client // platform_userID -> Client
    clientsByID map[string]*Client // clientID -> Client
    topics      map[string]map[*Client]bool // Topic subscriptions
}

// Management API
func (h *Hub) GetClient(clientID string) *Client
func (h *Hub) GetClientsByUser(userID string) []*Client
func (h *Hub) KickClient(clientID string, reason string) bool
func (h *Hub) KickUser(userID string, reason string) int
func (h *Hub) Stats() *HubStats

// Push API
func (h *Hub) PushToTopic(topic, method string, data any)
func (h *Hub) PushToUser(platform, userID, method string, data any)
func (h *Hub) PushToUserAllPlatforms(userID, method string, data any)

Connection Lifecycle

┌─────────────────────────────────────────────────────────────┐
│                      Connection Lifecycle                    │
│                                                              │
│  Connect ──> Register ──> [Anonymous]                        │
│                              │                               │
│                         auth.login                           │
│                              │                               │
│                              ▼                               │
│                        [Authenticated]                       │
│                              │                               │
│            ┌─────────────────┼─────────────────┐             │
│            ▼                 ▼                 ▼             │
│      Token Expired     Heartbeat Timeout    Kicked          │
│            │                 │                 │             │
│            └─────────────────┼─────────────────┘             │
│                              ▼                               │
│                         Unregister                           │
└─────────────────────────────────────────────────────────────┘

Single Device Login

Only one active connection per user per platform (web/mobile/desktop). When a new connection logs in, the old connection receives a kick notification:

json
{
    "jsonrpc": "2.0",
    "method": "session.kicked",
    "params": {"reason": "Your account has logged in on another device"}
}

Heartbeat Mechanism

Dual-layer heartbeat architecture:

┌─────────────────────────────────────────────────────────────┐
│                    Protocol Layer (WebSocket)                │
│                                                              │
│  Server ──── ping (every 54s) ────→ Client                  │
│  Server ←─── pong ─────────────── Client                    │
│                                                              │
│  Purpose: Detect TCP connection liveness                     │
│  Timeout: 60s without pong → disconnect                      │
└─────────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────────┐
│                    Application Layer (JSON-RPC)              │
│                                                              │
│  Server ←─── heartbeat (every 30s) ─── Client               │
│  Server ──── response ──────────────→ Client                │
│                                                              │
│  Purpose: Confirm client is consuming data, maintain NAT     │
│  Timeout: 90s without any message → disconnect              │
└─────────────────────────────────────────────────────────────┘

Why Dual-Layer Heartbeat?

LayerDetection TargetScenario
ProtocolTCP connection aliveNetwork disconnect, client crash
ApplicationClient consuming dataApp killed but TCP not closed

Client Requirements

  1. After login, send heartbeat request every 30 seconds
  2. 90 seconds without any message will be disconnected by server
  3. Implement reconnection logic after disconnect
  4. After reconnection, re-login and re-subscribe to topics

Push and Subscribe

Push API

go
// User push
hub.PushToUser("ios", "user123", "order.created", data)
hub.PushToUserAllPlatforms("user123", "security.alert", data)

// Topic push
hub.PushToTopic("group:123", "message.new", data)

// Broadcast
hub.Broadcast("system.maintenance", data)

Topic Subscription

Topics are used for publish/subscribe pattern, supporting various real-time data scenarios:

PrefixPurposeExample
group:Group chatgroup:123
room:Chat roomroom:lobby
ticker:Real-time quotesticker:BTC/USDT
device:IoT devicedevice:12345

Subscribe Request:

json
{
    "jsonrpc": "2.0",
    "method": "subscribe",
    "params": {"topics": ["group:123", "room:lobby"]},
    "id": 2
}

Subscribe Response:

json
{
    "jsonrpc": "2.0",
    "result": {"subscribed": ["group:123", "room:lobby"]},
    "id": 2
}

Prometheus Metrics

The WebSocket library includes built-in Prometheus metrics support for monitoring connection status and message throughput.

Enable Metrics

go
import "github.com/prometheus/client_golang/prometheus"

// Create and register metrics
metrics := websocket.NewMetrics("myapp", "websocket")
metrics.MustRegister(prometheus.DefaultRegisterer)

// Attach metrics to Hub
hub := websocket.NewHub(websocket.WithMetrics(metrics))

Available Metrics

Metric NameTypeDescription
{namespace}_{subsystem}_connections_totalCounterTotal connections
{namespace}_{subsystem}_connections_currentGaugeCurrent connections
{namespace}_{subsystem}_connections_authenticatedGaugeAuthenticated connections
{namespace}_{subsystem}_connections_anonymousGaugeAnonymous connections
{namespace}_{subsystem}_messages_sent_totalCounterTotal messages sent
{namespace}_{subsystem}_broadcasts_totalCounterTotal broadcasts
{namespace}_{subsystem}_errors_totalCounterTotal errors (grouped by type)
{namespace}_{subsystem}_topics_currentGaugeCurrent topics
{namespace}_{subsystem}_subscriptions_totalCounterTotal subscriptions

Connection Limits

The WebSocket library supports configuring maximum connections and per-user connection limits to prevent resource exhaustion.

Configuration

go
cfg := &websocket.HubConfig{
    MaxConnections: 10000,  // Max total connections (0 = unlimited)
    MaxConnsPerUser: 5,     // Max connections per user (0 = unlimited)
    // ... other config
}

hub := websocket.NewHubWithConfig(cfg)

Usage

go
// Check before accepting connection (optional, for early rejection)
if !hub.CanAcceptConnection() {
    http.Error(w, "Too many connections", http.StatusServiceUnavailable)
    return
}

// Check before login (optional)
if !hub.CanUserConnect(userID) {
    return c.Error(errors.New(429, "TooManyConnections", "Max connections reached"))
}

// Limits are also enforced automatically within Hub

Use Cases

ScenarioTopic ExampleMessage Characteristics
Instant Messaginggroup:{groupID}Broadcast, @mentions
Collaborative Docsdoc:{docID}Multi-user editing, cursor sync
Real-time Quotesticker:{symbol}High-frequency small messages
Order NotificationsUser private pushStatus changes
System MaintenanceBroadcastGlobal notifications

Directory Structure

The WebSocket module is now a standalone library. Import paths:

go
import (
    "github.com/bingo-project/websocket"          // Core types (Hub, Client, Router, Context)
    "github.com/bingo-project/websocket/jsonrpc"  // JSON-RPC message types
    "github.com/bingo-project/websocket/middleware" // Built-in middleware
)

Bingo integration files:

internal/apiserver/
├── ws.go               # WebSocket initialization
├── router/
│   └── ws.go           # Handler registration
└── handler/ws/
    ├── handler.go      # Handler definition
    ├── auth.go         # Authentication related
    └── system.go       # System methods


Next Step: Learn about gRPC-Gateway Integration to understand how to support both HTTP and gRPC with a single codebase.

Released under the Apache 2.0 License.