fix: avoid duplicate messages

avoid message getting queued and sent twice
This commit is contained in:
Siddharth Gelera
2025-09-17 17:48:38 +05:30
parent f2cdf8d595
commit daa1ab60cb

View File

@@ -94,7 +94,7 @@ class WsClient {
});
// Set up event handlers
this.#setupWsEventHandlers(wsConnection, requestId, collectionUid, {keepAlive,keepAliveInterval});
this.#setupWsEventHandlers(wsConnection, requestId, collectionUid, { keepAlive, keepAliveInterval });
// Store the connection
this.#addConnection(requestId, wsConnection);
@@ -115,17 +115,16 @@ class WsClient {
queueMessage(requestId, collectionUid, message) {
const connection = this.activeConnections.get(requestId);
if (connection && connection.readyState === WebSocket.OPEN) {
this.#flushQueue(requestId, collectionUid);
this.sendMessage(requestId, collectionUid, message);
return;
}
this.messageQueue.push({
requestId,
collectionUid,
payload: message
});
if (connection && connection.readyState === WebSocket.OPEN) {
this.#flushQueue(requestId, collectionUid);
return;
}
}
#flushQueue(requestId, collectionUid) {
@@ -191,7 +190,6 @@ class WsClient {
*/
close(requestId, code = 1000, reason = 'Client initiated close') {
const connection = this.activeConnections.get(requestId);
if (connection) {
connection.close(code, reason);
this.#removeConnection(requestId);
@@ -244,7 +242,7 @@ class WsClient {
* @param {WebSocket} ws - The WebSocket instance
* @param {string} requestId - The request ID
* @param {string} collectionUid - The collection UID
* @param {object} options
* @param {object} options
* @param {boolean} options.keepAlive - keep the connection alive
* @param {number} options.keepAliveInterval - What the interval for keeping interval
* @private
@@ -253,12 +251,12 @@ class WsClient {
ws.on('open', () => {
this.#flushQueue(requestId, collectionUid);
if(options.keepAlive){
if (options.keepAlive) {
const handle = setInterval(() => {
console.log("pinging to keep alive")
ws.isAlive = false;
ws.ping();
}, options.keepAliveInterval);
console.log('pinging to keep alive');
ws.isAlive = false;
ws.ping();
}, options.keepAliveInterval);
this.connectionKeepAlive.set(requestId, handle);
}
@@ -270,23 +268,23 @@ class WsClient {
});
ws.on('redirect', (url, req) => {
const headerNames = req.getHeaderNames()
const headers = Object.fromEntries(headerNames.map(d=> [d,req.getHeader(d)]))
const headerNames = req.getHeaderNames();
const headers = Object.fromEntries(headerNames.map((d) => [d, req.getHeader(d)]));
this.eventCallback('ws:redirect', requestId, collectionUid, {
message:`Redirected to ${url}`,
type: 'info',
timestamp: Date.now(),
headers: headers
message: `Redirected to ${url}`,
type: 'info',
timestamp: Date.now(),
headers: headers
});
})
});
ws.on('upgrade', (response) => {
this.eventCallback('ws:upgrade', requestId, collectionUid, {
type: 'info',
timestamp: Date.now(),
headers: {...response.headers}
type: 'info',
timestamp: Date.now(),
headers: { ...response.headers }
});
})
});
ws.on('message', (data) => {
try {
@@ -350,9 +348,12 @@ class WsClient {
clearInterval(this.connectionKeepAlive.get(requestId));
this.connectionKeepAlive.delete(requestId);
}
if (this.activeConnections.has(requestId)) {
this.activeConnections.delete(requestId);
this.messageQueue = this.messageQueue.filter((d) => d.requestId != requestId);
// Emit an event with all active connection IDs
this.eventCallback('ws:connections-changed', {
type: 'removed',