如何建立一個基本的 Socket.IO 客户端
在本指南中,我們將使用 JavaScript 實作一個基本的 Socket.IO 客户端,以便更深入地了解 Socket.IO 協定。
我們將實作下列功能
- 建立 WebSocket 連線
- 管理重新連線
- 傳送事件
- 接收事件
- 手動斷開連線
官方客户端顯然包含更多功能
但這應足以讓您對程式庫在幕後的運作方式有良好的概觀。
我們的目標是達成類似這樣的結果
import { io } from "./basic-client.js";
const socket = io();
// connection
socket.on("connect", () => {
// ...
});
// receiving an event
socket.on("foo", (value) => {
// ...
});
// sending an event
socket.emit("bar", "abc");
準備好了嗎?讓我們開始吧!
事件發射器
Socket.IO API 大量採用 Node.js EventEmitter 類別的靈感。
import { EventEmitter } from "node:events";
const myEmitter = new EventEmitter();
myEmitter.on("foo", () => {
console.log("foo!");
});
myEmitter.emit("foo");
該程式庫提供類似的 API,但介於伺服器和客户端之間
- 伺服器
io.on("connection", (socket) => {
// send a "foo" event to the client
socket.emit("foo");
// receive a "bar" event from the client
socket.on("bar", () => {
// ...
});
});
- 客户端
import { io } from "socket.io-client";
const socket = io();
// receive a "foo" event from the server
socket.on("foo", () => {
// ...
});
// send a "bar" event to the server
socket.emit("bar");
伺服器與用戶端之間的基礎連線(WebSocket 或 HTTP 長輪詢)由程式庫抽象化並管理。
我們來建立一個最小的 EventEmitter
類別
class EventEmitter {
#listeners = new Map();
on(event, listener) {
let listeners = this.#listeners.get(event);
if (!listeners) {
this.#listeners.set(event, listeners = []);
}
listeners.push(listener);
}
emit(event, ...args) {
const listeners = this.#listeners.get(event);
if (listeners) {
for (const listener of listeners) {
listener.apply(null, args);
}
}
}
}
我們的 Socket
類別會延伸這個類別,以便同時公開 on()
和 emit()
方法
class Socket extends EventEmitter {
constructor(uri, opts) {
super();
}
}
在我們的建構函式中,uri
參數可能是
- 由使用者提供
const socket = io("https://example.com");
- 或從
window.location
物件推斷
const socket = io();
我們來建立一個進入點
export function io(uri, opts) {
if (typeof uri !== "string") {
opts = uri;
uri = location.origin;
}
return new Socket(uri, opts);
}
好的,這是一個好的開始!
WebSocket 連線
現在,我們來建立到伺服器的 WebSocket 連線
class Socket extends EventEmitter {
+ #uri;
+ #opts;
+ #ws;
constructor(uri, opts) {
super();
+ this.#uri = uri;
+ this.#opts = Object.assign({
+ path: "/socket.io/"
+ }, opts);
+ this.#open();
}
+ #open() {
+ this.#ws = new WebSocket(this.#createUrl());
+ }
+
+ #createUrl() {
+ const uri = this.#uri.replace(/^http/, "ws");
+ const queryParams = "?EIO=4&transport=websocket";
+ return `${uri}${this.#opts.path}${queryParams}`;
+ }
}
參考:https://developer.mozilla.org/en-US/docs/Web/API/WebSocket
關於 createUrl()
方法的一些說明
- WebSocket URL 以
ws://
或wss://
開頭,因此我們在replace()
呼叫中處理這一點 - Socket.IO URL 始終包含特定的請求路徑,預設為
/socket.io/
- 有兩個強制性的查詢參數
EIO=4
:Engine.IO 協定的版本transport=websocket
:使用的傳輸
因此,最終的 URL 將如下所示:wss://example.com/socket.io/?EIO=4&transport=websocket
Engine.IO 協定
Socket.IO 程式碼庫分為兩個不同的層級
- 低階管道:我們稱之為 Engine.IO,Socket.IO 內部的引擎
- 高階 API:Socket.IO 本身
另請參閱
使用 WebSocket 時,透過網路傳送訊息的格式很簡單:<封包類型><有效負載>
以下是協定第 4 版(因此上面有 EIO=4
)的不同封包類型
名稱 | 表示 | 說明 |
---|---|---|
OPEN | 0 | 在握手期間使用。 |
CLOSE | 1 | 用於表示傳輸可以關閉。 |
PING | 2 | 在心跳機制中使用。 |
PONG | 3 | 在心跳機制中使用。 |
MESSAGE | 4 | 用於將有效負載傳送至另一方。 |
UPGRADE | 5 | 在升級過程中使用(這裡未使用)。 |
NOOP | 6 | 在升級過程中使用(這裡未使用)。 |
範例
4hello
with:
4 => MESSAGE packet type
hello => message payload (UTF-8 encoded)
我們來處理 WebSocket 訊息
+const EIOPacketType = {
+ OPEN: "0",
+ CLOSE: "1",
+ PING: "2",
+ PONG: "3",
+ MESSAGE: "4",
+};
+function noop() {}
class Socket extends EventEmitter {
[...]
#open() {
this.#ws = new WebSocket(this.#createUrl());
+ this.#ws.onmessage = ({ data }) => this.#onMessage(data);
+ this.#ws.onclose = () => this.#onClose("transport close");
}
+ #onMessage(data) {
+ if (typeof data !== "string") {
+ // TODO handle binary payloads
+ return;
+ }
+
+ switch (data[0]) {
+ case EIOPacketType.CLOSE:
+ this.#onClose("transport close");
+ break;
+
+ default:
+ this.#onClose("parse error");
+ break;
+ }
+ }
+
+ #onClose(reason) {
+ if (this.#ws) {
+ this.#ws.onclose = noop;
+ this.#ws.close();
+ }
+ }
+}
心跳
實作了一種心跳機制,以確保伺服器與用戶端之間的連線正常。
伺服器在初始握手期間傳送兩個值:pingInterval
和 pingTimeout
它會每隔 pingInterval
毫秒傳送一個 PING 封包,並期待從用戶端收到一個 PONG 封包。我們來這樣做
class Socket extends EventEmitter {
+ #pingTimeoutTimer;
+ #pingTimeoutDelay;
[...]
#onMessage(data) {
if (typeof data !== "string") {
// TODO handle binary payloads
return;
}
switch (data[0]) {
+ case EIOPacketType.OPEN:
+ this.#onOpen(data);
+ break;
+
case EIOPacketType.CLOSE:
this.#onClose("transport close");
break;
+ case EIOPacketType.PING:
+ this.#resetPingTimeout();
+ this.#send(EIOPacketType.PONG);
+ break;
default:
this.#onClose("parse error");
break;
}
}
+ #onOpen(data) {
+ let handshake;
+ try {
+ handshake = JSON.parse(data.substring(1));
+ } catch (e) {
+ return this.#onClose("parse error");
+ }
+ this.#pingTimeoutDelay = handshake.pingInterval + handshake.pingTimeout;
+ this.#resetPingTimeout();
+ }
+
+ #resetPingTimeout() {
+ clearTimeout(this.#pingTimeoutTimer);
+ this.#pingTimeoutTimer = setTimeout(() => {
+ this.#onClose("ping timeout");
+ }, this.#pingTimeoutDelay);
+ }
+
+ #send(data) {
+ if (this.#ws.readyState === WebSocket.OPEN) {
+ this.#ws.send(data);
+ }
+ }
#onClose(reason) {
if (this.#ws) {
this.#ws.onclose = noop;
this.#ws.close();
}
+ clearTimeout(this.#pingTimeoutTimer);
}
}
重新連線
當我們這樣做時,我們也會處理重新連線。WebSocket 很棒,但它們可能會(而且在實際情況中會)斷線,所以我們必須處理這個問題
class Socket extends EventEmitter {
[...]
constructor(uri, opts) {
super();
this.#uri = uri;
this.#opts = Object.assign(
{
path: "/socket.io/",
+ reconnectionDelay: 2000,
},
opts
);
this.#open();
}
#onClose(reason) {
if (this.#ws) {
this.#ws.onclose = noop;
this.#ws.close();
}
clearTimeout(this.#pingTimeoutTimer);
+ setTimeout(() => this.#open(), this.#opts.reconnectionDelay);
}
}
官方的 Socket.IO 用戶端使用一個隨機的指數延遲,以防止當大量用戶端同時重新連線時出現負載高峰,但我們將在此處保持簡單並使用一個常數值。
好的,讓我們總結一下,我們現在有一個用戶端,它可以
- 開啟一個到伺服器的 WebSocket 連線
- 透過回應 PING 封包來遵守心跳機制
- 在失敗時自動重新連線
這就是 Engine.IO 協定的全部內容!讓我們現在深入探討 Socket.IO 協定。
Socket.IO 協定
Socket.IO 協定建立在先前描述的 Engine.IO 協定之上,這表示每個 Socket.IO 封包在透過網路傳送時,都會加上前綴「4」(Engine.IO MESSAGE 封包類型)。
參考:Socket.IO 協定
在沒有二進位元元素的情況下,格式如下
<packet type>[JSON-stringified payload]
以下是可用封包類型的清單
類型 | ID | 用法 |
---|---|---|
CONNECT | 0 | 在連線到名稱空間期間使用。 |
DISCONNECT | 1 | 在從名稱空間斷線時使用。 |
EVENT | 2 | 用於將資料傳送到另一端。 |
ACK | 3 | 用於確認事件(在此處未使用)。 |
CONNECT_ERROR | 4 | 在連線到名稱空間期間使用(在此處未使用)。 |
BINARY_EVENT | 5 | 用於將二進位元資料傳送到另一端(在此處未使用)。 |
BINARY_ACK | 6 | 用於確認事件(回應包含二進位元資料)(在此處未使用)。 |
範例
2["hello","world"]
with:
2 => EVENT packet type
["hello","world"] => JSON.stringified() payload
連線
用戶端必須在 Socket.IO 會話開始時傳送一個 CONNECT 封包
+const SIOPacketType = {
+ CONNECT: 0,
+ DISCONNECT: 1,
+ EVENT: 2,
+};
class Socket extends EventEmitter {
[...]
#onOpen(data) {
let handshake;
try {
handshake = JSON.parse(data.substring(1));
} catch (e) {
return this.#onClose("parse error");
}
this.#pingTimeoutDelay = handshake.pingInterval + handshake.pingTimeout;
this.#resetPingTimeout();
+ this.#doConnect();
}
+ #doConnect() {
+ this.#sendPacket({ type: SIOPacketType.CONNECT });
+ }
+
+ #sendPacket(packet) {
+ this.#send(EIOPacketType.MESSAGE + encode(packet));
+ }
}
+function encode(packet) {
+ let output = "" + packet.type;
+
+ return output;
+}
如果允許連線,伺服器將會傳送 CONNECT 封包回來
class Socket extends EventEmitter {
+ id;
[...]
#onMessage(data) {
switch (data[0]) {
[...]
+ case EIOPacketType.MESSAGE:
+ let packet;
+ try {
+ packet = decode(data);
+ } catch (e) {
+ return this.#onClose("parse error");
+ }
+ this.#onPacket(packet);
+ break;
}
}
+ #onPacket(packet) {
+ switch (packet.type) {
+ case SIOPacketType.CONNECT:
+ this.#onConnect(packet);
+ break;
+ }
+ }
+ #onConnect(packet) {
+ this.id = packet.data.sid;
+
+ super.emit("connect");
+ }
}
+function decode(data) {
+ let i = 1; // skip "4" prefix
+
+ const packet = {
+ type: parseInt(data.charAt(i++), 10),
+ };
+
+ if (!isPacketValid(packet)) {
+ throw new Error("invalid format");
+ }
+
+ return packet;
+}
+
+function isPacketValid(packet) {
+ switch (packet.type) {
+ case SIOPacketType.CONNECT:
+ return typeof packet.data === "object";
+ default:
+ return false;
+ }
+}
我們使用 super.emit(...)
,這樣我們才能覆寫 emit()
方法,以便稍後傳送事件。
傳送事件
讓我們傳送一些資料到伺服器。我們需要追蹤基礎連線的狀態,並將封包快取,直到連線就緒
class Socket extends EventEmitter {
+ connected = false;
+ #sendBuffer = [];
[...]
+ emit(...args) {
+ const packet = {
+ type: SIOPacketType.EVENT,
+ data: args,
+ };
+
+ if (this.connected) {
+ this.#sendPacket(packet);
+ } else {
+ this.#sendBuffer.push(packet);
+ }
+ }
#onConnect(packet) {
this.id = packet.data.sid;
+ this.connected = true;
+ this.#sendBuffer.forEach((packet) => this.#sendPacket(packet));
+ this.#sendBuffer.slice(0);
super.emit("connect");
}
}
function encode(packet) {
let output = "" + packet.type;
+ if (packet.data) {
+ output += JSON.stringify(packet.data);
+ }
return output;
}
接收事件
相反地,讓我們處理伺服器傳送的 EVENT 封包
class Socket extends EventEmitter {
[...]
#onPacket(packet) {
switch (packet.type) {
case SIOPacketType.CONNECT:
this.#onConnect(packet);
break;
+ case SIOPacketType.EVENT:
+ super.emit.apply(this, packet.data);
+ break;
}
}
}
function decode(data) {
let i = 1; // skip "4" prefix
const packet = {
type: parseInt(data.charAt(i++), 10),
};
+ if (data.charAt(i)) {
+ packet.data = JSON.parse(data.substring(i));
+ }
if (!isPacketValid(packet)) {
throw new Error("invalid format");
}
return packet;
}
function isPacketValid(packet) {
switch (packet.type) {
case SIOPacketType.CONNECT:
return typeof packet.data === "object";
+ case SIOPacketType.EVENT: {
+ const args = packet.data;
+ return (
+ Array.isArray(args) && args.length > 0 && typeof args[0] === "string"
+ );
+ }
default:
return false;
}
}
手動斷開連線
最後,讓我們處理不應嘗試重新連線的少數情況
- 當用戶端呼叫
socket.disconnect()
時 - 當伺服器呼叫
socket.disconnect()
時
class Socket extends EventEmitter {
+ #reconnectTimer;
+ #shouldReconnect = true;
[...]
#onPacket(packet) {
switch (packet.type) {
case SIOPacketType.CONNECT:
this.#onConnect(packet);
break;
+ case SIOPacketType.DISCONNECT:
+ this.#shouldReconnect = false;
+ this.#onClose("io server disconnect");
+ break;
case SIOPacketType.EVENT:
super.emit.apply(this, packet.data);
break;
}
}
#onClose(reason) {
if (this.#ws) {
this.#ws.onclose = noop;
this.#ws.close();
}
clearTimeout(this.#pingTimeoutTimer);
+ clearTimeout(this.#reconnectTimer);
+
+ if (this.#shouldReconnect) {
+ this.#reconnectTimer = setTimeout(
+ () => this.#open(),
+ this.#opts.reconnectionDelay
+ );
+ }
- setTimeout(() => this.#open(), this.#opts.reconnectionDelay);
}
+ disconnect() {
+ this.#shouldReconnect = false;
+ this.#onClose("io client disconnect");
+ }
}
function isPacketValid(packet) {
switch (packet.type) {
case SIOPacketType.CONNECT:
return typeof packet.data === "object";
+ case SIOPacketType.DISCONNECT:
+ return packet.data === undefined;
case SIOPacketType.EVENT: {
const args = packet.data;
return (
Array.isArray(args) && args.length > 0 && typeof args[0] === "string"
);
}
default:
return false;
}
}
結束備註
我們的基本 Socket.IO 用戶端到此結束!讓我們複習一下。
我們已實作下列功能
- 建立 WebSocket 連線
- 管理重新連線
- 傳送事件
- 接收事件
- 手動斷開連線
希望您現在更了解此函式庫在底層是如何運作的。
完整的原始碼可以在 這裡 找到。
感謝您的閱讀!