跳到主要內容

如何建立一個基本的 Socket.IO 客户端

在本指南中,我們將使用 JavaScript 實作一個基本的 Socket.IO 客户端,以便更深入地了解 Socket.IO 協定。

我們將實作下列功能

  • 建立 WebSocket 連線
  • 管理重新連線
  • 傳送事件
  • 接收事件
  • 手動斷開連線

官方客户端顯然包含更多功能

但這應足以讓您對程式庫在幕後的運作方式有良好的概觀。

我們的目標是達成類似這樣的結果

import { io } from "./basic-client.js";

const socket = io();

// connection
socket.on("connect", () => {
// ...
});

// receiving an event
socket.on("foo", (value) => {
// ...
});

// sending an event
socket.emit("bar", "abc");

準備好了嗎?讓我們開始吧!

事件發射器

Socket.IO API 大量採用 Node.js EventEmitter 類別的靈感。

import { EventEmitter } from "node:events";

const myEmitter = new EventEmitter();

myEmitter.on("foo", () => {
console.log("foo!");
});

myEmitter.emit("foo");

該程式庫提供類似的 API,但介於伺服器和客户端之間

  • 伺服器
io.on("connection", (socket) => {
// send a "foo" event to the client
socket.emit("foo");

// receive a "bar" event from the client
socket.on("bar", () => {
// ...
});
});
  • 客户端
import { io } from "socket.io-client";

const socket = io();

// receive a "foo" event from the server
socket.on("foo", () => {
// ...
});

// send a "bar" event to the server
socket.emit("bar");

伺服器與用戶端之間的基礎連線(WebSocket 或 HTTP 長輪詢)由程式庫抽象化並管理。

我們來建立一個最小的 EventEmitter 類別

class EventEmitter {
#listeners = new Map();

on(event, listener) {
let listeners = this.#listeners.get(event);
if (!listeners) {
this.#listeners.set(event, listeners = []);
}
listeners.push(listener);
}

emit(event, ...args) {
const listeners = this.#listeners.get(event);
if (listeners) {
for (const listener of listeners) {
listener.apply(null, args);
}
}
}
}

我們的 Socket 類別會延伸這個類別,以便同時公開 on()emit() 方法

class Socket extends EventEmitter {
constructor(uri, opts) {
super();
}
}

在我們的建構函式中,uri 參數可能是

  • 由使用者提供
const socket = io("https://example.com");
const socket = io();

我們來建立一個進入點

export function io(uri, opts) {
if (typeof uri !== "string") {
opts = uri;
uri = location.origin;
}
return new Socket(uri, opts);
}

好的,這是一個好的開始!

WebSocket 連線

現在,我們來建立到伺服器的 WebSocket 連線

class Socket extends EventEmitter {
+ #uri;
+ #opts;
+ #ws;

constructor(uri, opts) {
super();
+ this.#uri = uri;
+ this.#opts = Object.assign({
+ path: "/socket.io/"
+ }, opts);
+ this.#open();
}

+ #open() {
+ this.#ws = new WebSocket(this.#createUrl());
+ }
+
+ #createUrl() {
+ const uri = this.#uri.replace(/^http/, "ws");
+ const queryParams = "?EIO=4&transport=websocket";
+ return `${uri}${this.#opts.path}${queryParams}`;
+ }
}

參考:https://developer.mozilla.org/en-US/docs/Web/API/WebSocket

關於 createUrl() 方法的一些說明

  • WebSocket URL 以 ws://wss:// 開頭,因此我們在 replace() 呼叫中處理這一點
  • Socket.IO URL 始終包含特定的請求路徑,預設為 /socket.io/
  • 有兩個強制性的查詢參數
    • EIO=4:Engine.IO 協定的版本
    • transport=websocket:使用的傳輸

因此,最終的 URL 將如下所示:wss://example.com/socket.io/?EIO=4&transport=websocket

Engine.IO 協定

Socket.IO 程式碼庫分為兩個不同的層級

  • 低階管道:我們稱之為 Engine.IO,Socket.IO 內部的引擎
  • 高階 API:Socket.IO 本身

另請參閱

使用 WebSocket 時,透過網路傳送訊息的格式很簡單:<封包類型><有效負載>

以下是協定第 4 版(因此上面有 EIO=4)的不同封包類型

名稱表示說明
OPEN0在握手期間使用。
CLOSE1用於表示傳輸可以關閉。
PING2在心跳機制中使用。
PONG3在心跳機制中使用。
MESSAGE4用於將有效負載傳送至另一方。
UPGRADE5在升級過程中使用(這裡未使用)。
NOOP6在升級過程中使用(這裡未使用)。

範例

4hello

with:

4 => MESSAGE packet type
hello => message payload (UTF-8 encoded)

我們來處理 WebSocket 訊息

+const EIOPacketType = {
+ OPEN: "0",
+ CLOSE: "1",
+ PING: "2",
+ PONG: "3",
+ MESSAGE: "4",
+};

+function noop() {}

class Socket extends EventEmitter {
[...]

#open() {
this.#ws = new WebSocket(this.#createUrl());
+ this.#ws.onmessage = ({ data }) => this.#onMessage(data);
+ this.#ws.onclose = () => this.#onClose("transport close");
}

+ #onMessage(data) {
+ if (typeof data !== "string") {
+ // TODO handle binary payloads
+ return;
+ }
+
+ switch (data[0]) {
+ case EIOPacketType.CLOSE:
+ this.#onClose("transport close");
+ break;
+
+ default:
+ this.#onClose("parse error");
+ break;
+ }
+ }
+
+ #onClose(reason) {
+ if (this.#ws) {
+ this.#ws.onclose = noop;
+ this.#ws.close();
+ }
+ }
+}

心跳

實作了一種心跳機制,以確保伺服器與用戶端之間的連線正常。

伺服器在初始握手期間傳送兩個值:pingIntervalpingTimeout

它會每隔 pingInterval 毫秒傳送一個 PING 封包,並期待從用戶端收到一個 PONG 封包。我們來這樣做

class Socket extends EventEmitter {
+ #pingTimeoutTimer;
+ #pingTimeoutDelay;

[...]

#onMessage(data) {
if (typeof data !== "string") {
// TODO handle binary payloads
return;
}

switch (data[0]) {
+ case EIOPacketType.OPEN:
+ this.#onOpen(data);
+ break;
+
case EIOPacketType.CLOSE:
this.#onClose("transport close");
break;

+ case EIOPacketType.PING:
+ this.#resetPingTimeout();
+ this.#send(EIOPacketType.PONG);
+ break;

default:
this.#onClose("parse error");
break;
}
}

+ #onOpen(data) {
+ let handshake;
+ try {
+ handshake = JSON.parse(data.substring(1));
+ } catch (e) {
+ return this.#onClose("parse error");
+ }
+ this.#pingTimeoutDelay = handshake.pingInterval + handshake.pingTimeout;
+ this.#resetPingTimeout();
+ }
+
+ #resetPingTimeout() {
+ clearTimeout(this.#pingTimeoutTimer);
+ this.#pingTimeoutTimer = setTimeout(() => {
+ this.#onClose("ping timeout");
+ }, this.#pingTimeoutDelay);
+ }
+
+ #send(data) {
+ if (this.#ws.readyState === WebSocket.OPEN) {
+ this.#ws.send(data);
+ }
+ }

#onClose(reason) {
if (this.#ws) {
this.#ws.onclose = noop;
this.#ws.close();
}

+ clearTimeout(this.#pingTimeoutTimer);
}
}

重新連線

當我們這樣做時,我們也會處理重新連線。WebSocket 很棒,但它們可能會(而且在實際情況中會)斷線,所以我們必須處理這個問題

class Socket extends EventEmitter {
[...]

constructor(uri, opts) {
super();
this.#uri = uri;
this.#opts = Object.assign(
{
path: "/socket.io/",
+ reconnectionDelay: 2000,
},
opts
);
this.#open();
}

#onClose(reason) {
if (this.#ws) {
this.#ws.onclose = noop;
this.#ws.close();
}

clearTimeout(this.#pingTimeoutTimer);

+ setTimeout(() => this.#open(), this.#opts.reconnectionDelay);
}
}
資訊

官方的 Socket.IO 用戶端使用一個隨機的指數延遲,以防止當大量用戶端同時重新連線時出現負載高峰,但我們將在此處保持簡單並使用一個常數值。

好的,讓我們總結一下,我們現在有一個用戶端,它可以

  • 開啟一個到伺服器的 WebSocket 連線
  • 透過回應 PING 封包來遵守心跳機制
  • 在失敗時自動重新連線

這就是 Engine.IO 協定的全部內容!讓我們現在深入探討 Socket.IO 協定。

Socket.IO 協定

Socket.IO 協定建立在先前描述的 Engine.IO 協定之上,這表示每個 Socket.IO 封包在透過網路傳送時,都會加上前綴「4」(Engine.IO MESSAGE 封包類型)。

參考:Socket.IO 協定

在沒有二進位元元素的情況下,格式如下

<packet type>[JSON-stringified payload]

以下是可用封包類型的清單

類型ID用法
CONNECT0在連線到名稱空間期間使用。
DISCONNECT1在從名稱空間斷線時使用。
EVENT2用於將資料傳送到另一端。
ACK3用於確認事件(在此處未使用)。
CONNECT_ERROR4在連線到名稱空間期間使用(在此處未使用)。
BINARY_EVENT5用於將二進位元資料傳送到另一端(在此處未使用)。
BINARY_ACK6用於確認事件(回應包含二進位元資料)(在此處未使用)。

範例

2["hello","world"]

with:

2 => EVENT packet type
["hello","world"] => JSON.stringified() payload

連線

用戶端必須在 Socket.IO 會話開始時傳送一個 CONNECT 封包

+const SIOPacketType = {
+ CONNECT: 0,
+ DISCONNECT: 1,
+ EVENT: 2,
+};

class Socket extends EventEmitter {
[...]

#onOpen(data) {
let handshake;
try {
handshake = JSON.parse(data.substring(1));
} catch (e) {
return this.#onClose("parse error");
}
this.#pingTimeoutDelay = handshake.pingInterval + handshake.pingTimeout;
this.#resetPingTimeout();
+ this.#doConnect();
}

+ #doConnect() {
+ this.#sendPacket({ type: SIOPacketType.CONNECT });
+ }
+
+ #sendPacket(packet) {
+ this.#send(EIOPacketType.MESSAGE + encode(packet));
+ }
}

+function encode(packet) {
+ let output = "" + packet.type;
+
+ return output;
+}

如果允許連線,伺服器將會傳送 CONNECT 封包回來

class Socket extends EventEmitter {
+ id;

[...]

#onMessage(data) {
switch (data[0]) {
[...]

+ case EIOPacketType.MESSAGE:
+ let packet;
+ try {
+ packet = decode(data);
+ } catch (e) {
+ return this.#onClose("parse error");
+ }
+ this.#onPacket(packet);
+ break;
}
}

+ #onPacket(packet) {
+ switch (packet.type) {
+ case SIOPacketType.CONNECT:
+ this.#onConnect(packet);
+ break;
+ }
+ }

+ #onConnect(packet) {
+ this.id = packet.data.sid;
+
+ super.emit("connect");
+ }
}

+function decode(data) {
+ let i = 1; // skip "4" prefix
+
+ const packet = {
+ type: parseInt(data.charAt(i++), 10),
+ };
+
+ if (!isPacketValid(packet)) {
+ throw new Error("invalid format");
+ }
+
+ return packet;
+}
+
+function isPacketValid(packet) {
+ switch (packet.type) {
+ case SIOPacketType.CONNECT:
+ return typeof packet.data === "object";
+ default:
+ return false;
+ }
+}
注意

我們使用 super.emit(...),這樣我們才能覆寫 emit() 方法,以便稍後傳送事件。

傳送事件

讓我們傳送一些資料到伺服器。我們需要追蹤基礎連線的狀態,並將封包快取,直到連線就緒

class Socket extends EventEmitter {
+ connected = false;

+ #sendBuffer = [];

[...]

+ emit(...args) {
+ const packet = {
+ type: SIOPacketType.EVENT,
+ data: args,
+ };
+
+ if (this.connected) {
+ this.#sendPacket(packet);
+ } else {
+ this.#sendBuffer.push(packet);
+ }
+ }

#onConnect(packet) {
this.id = packet.data.sid;
+ this.connected = true;

+ this.#sendBuffer.forEach((packet) => this.#sendPacket(packet));
+ this.#sendBuffer.slice(0);

super.emit("connect");
}
}

function encode(packet) {
let output = "" + packet.type;

+ if (packet.data) {
+ output += JSON.stringify(packet.data);
+ }

return output;
}

接收事件

相反地,讓我們處理伺服器傳送的 EVENT 封包

class Socket extends EventEmitter {
[...]

#onPacket(packet) {
switch (packet.type) {
case SIOPacketType.CONNECT:
this.#onConnect(packet);
break;

+ case SIOPacketType.EVENT:
+ super.emit.apply(this, packet.data);
+ break;
}
}
}

function decode(data) {
let i = 1; // skip "4" prefix

const packet = {
type: parseInt(data.charAt(i++), 10),
};

+ if (data.charAt(i)) {
+ packet.data = JSON.parse(data.substring(i));
+ }

if (!isPacketValid(packet)) {
throw new Error("invalid format");
}

return packet;
}

function isPacketValid(packet) {
switch (packet.type) {
case SIOPacketType.CONNECT:
return typeof packet.data === "object";
+ case SIOPacketType.EVENT: {
+ const args = packet.data;
+ return (
+ Array.isArray(args) && args.length > 0 && typeof args[0] === "string"
+ );
+ }
default:
return false;
}
}

手動斷開連線

最後,讓我們處理不應嘗試重新連線的少數情況

  • 當用戶端呼叫 socket.disconnect()
  • 當伺服器呼叫 socket.disconnect()
class Socket extends EventEmitter {
+ #reconnectTimer;
+ #shouldReconnect = true;

[...]

#onPacket(packet) {
switch (packet.type) {
case SIOPacketType.CONNECT:
this.#onConnect(packet);
break;

+ case SIOPacketType.DISCONNECT:
+ this.#shouldReconnect = false;
+ this.#onClose("io server disconnect");
+ break;

case SIOPacketType.EVENT:
super.emit.apply(this, packet.data);
break;
}
}

#onClose(reason) {
if (this.#ws) {
this.#ws.onclose = noop;
this.#ws.close();
}

clearTimeout(this.#pingTimeoutTimer);
+ clearTimeout(this.#reconnectTimer);
+
+ if (this.#shouldReconnect) {
+ this.#reconnectTimer = setTimeout(
+ () => this.#open(),
+ this.#opts.reconnectionDelay
+ );
+ }
- setTimeout(() => this.#open(), this.#opts.reconnectionDelay);
}

+ disconnect() {
+ this.#shouldReconnect = false;
+ this.#onClose("io client disconnect");
+ }
}

function isPacketValid(packet) {
switch (packet.type) {
case SIOPacketType.CONNECT:
return typeof packet.data === "object";
+ case SIOPacketType.DISCONNECT:
+ return packet.data === undefined;
case SIOPacketType.EVENT: {
const args = packet.data;
return (
Array.isArray(args) && args.length > 0 && typeof args[0] === "string"
);
}
default:
return false;
}
}

結束備註

我們的基本 Socket.IO 用戶端到此結束!讓我們複習一下。

我們已實作下列功能

  • 建立 WebSocket 連線
  • 管理重新連線
  • 傳送事件
  • 接收事件
  • 手動斷開連線

希望您現在更了解此函式庫在底層是如何運作的。

完整的原始碼可以在 這裡 找到。

感謝您的閱讀!