From daa1ab60cb09d4266cc820dc8f7e62ee4f179f55 Mon Sep 17 00:00:00 2001 From: Siddharth Gelera Date: Wed, 17 Sep 2025 17:48:38 +0530 Subject: [PATCH] fix: avoid duplicate messages avoid message getting queued and sent twice --- packages/bruno-requests/src/ws/ws-client.js | 51 +++++++++++---------- 1 file changed, 26 insertions(+), 25 deletions(-) diff --git a/packages/bruno-requests/src/ws/ws-client.js b/packages/bruno-requests/src/ws/ws-client.js index c645332f6..37b9f2a79 100644 --- a/packages/bruno-requests/src/ws/ws-client.js +++ b/packages/bruno-requests/src/ws/ws-client.js @@ -94,7 +94,7 @@ class WsClient { }); // Set up event handlers - this.#setupWsEventHandlers(wsConnection, requestId, collectionUid, {keepAlive,keepAliveInterval}); + this.#setupWsEventHandlers(wsConnection, requestId, collectionUid, { keepAlive, keepAliveInterval }); // Store the connection this.#addConnection(requestId, wsConnection); @@ -115,17 +115,16 @@ class WsClient { queueMessage(requestId, collectionUid, message) { const connection = this.activeConnections.get(requestId); - if (connection && connection.readyState === WebSocket.OPEN) { - this.#flushQueue(requestId, collectionUid); - this.sendMessage(requestId, collectionUid, message); - return; - } - this.messageQueue.push({ requestId, collectionUid, payload: message }); + + if (connection && connection.readyState === WebSocket.OPEN) { + this.#flushQueue(requestId, collectionUid); + return; + } } #flushQueue(requestId, collectionUid) { @@ -191,7 +190,6 @@ class WsClient { */ close(requestId, code = 1000, reason = 'Client initiated close') { const connection = this.activeConnections.get(requestId); - if (connection) { connection.close(code, reason); this.#removeConnection(requestId); @@ -244,7 +242,7 @@ class WsClient { * @param {WebSocket} ws - The WebSocket instance * @param {string} requestId - The request ID * @param {string} collectionUid - The collection UID - * @param {object} options + * @param {object} options * @param {boolean} options.keepAlive - keep the connection alive * @param {number} options.keepAliveInterval - What the interval for keeping interval * @private @@ -253,12 +251,12 @@ class WsClient { ws.on('open', () => { this.#flushQueue(requestId, collectionUid); - if(options.keepAlive){ + if (options.keepAlive) { const handle = setInterval(() => { - console.log("pinging to keep alive") - ws.isAlive = false; - ws.ping(); - }, options.keepAliveInterval); + console.log('pinging to keep alive'); + ws.isAlive = false; + ws.ping(); + }, options.keepAliveInterval); this.connectionKeepAlive.set(requestId, handle); } @@ -270,23 +268,23 @@ class WsClient { }); ws.on('redirect', (url, req) => { - const headerNames = req.getHeaderNames() - const headers = Object.fromEntries(headerNames.map(d=> [d,req.getHeader(d)])) + const headerNames = req.getHeaderNames(); + const headers = Object.fromEntries(headerNames.map((d) => [d, req.getHeader(d)])); this.eventCallback('ws:redirect', requestId, collectionUid, { - message:`Redirected to ${url}`, - type: 'info', - timestamp: Date.now(), - headers: headers + message: `Redirected to ${url}`, + type: 'info', + timestamp: Date.now(), + headers: headers }); - }) + }); ws.on('upgrade', (response) => { this.eventCallback('ws:upgrade', requestId, collectionUid, { - type: 'info', - timestamp: Date.now(), - headers: {...response.headers} + type: 'info', + timestamp: Date.now(), + headers: { ...response.headers } }); - }) + }); ws.on('message', (data) => { try { @@ -350,9 +348,12 @@ class WsClient { clearInterval(this.connectionKeepAlive.get(requestId)); this.connectionKeepAlive.delete(requestId); } + if (this.activeConnections.has(requestId)) { this.activeConnections.delete(requestId); + this.messageQueue = this.messageQueue.filter((d) => d.requestId != requestId); + // Emit an event with all active connection IDs this.eventCallback('ws:connections-changed', { type: 'removed',