fix: message queue

This commit is contained in:
Siddharth Gelera
2025-09-16 11:24:05 +05:30
parent 1bf47039d1
commit 8435239099
3 changed files with 77 additions and 23 deletions

View File

@@ -270,6 +270,21 @@ export const startWsConnection = async (item, collection, environment, runtimeVa
});
};
/**
* Sends a message to an existing WebSocket connection
* @param {string} requestId - The request ID to send a message to
* @param {string} collectionUid - The collection ID the message is for
* @param {*} message - The message
* @returns {Promise<Object>} - The result of the send operation
*/
export const queueWsMessage = async (item, collectionUid, message) => {
return new Promise((resolve, reject) => {
const { ipcRenderer } = window;
ipcRenderer.invoke('ws:queue-message', item.uid, collectionUid, message).then(resolve).catch(reject);
});
};
/**
* Sends a message to an existing WebSocket connection
* @param {string} requestId - The request ID to send a message to

View File

@@ -30,7 +30,7 @@ const prepareWsRequest = async (item, collection, environment, runtimeVariables,
const envVars = getEnvVars(environment);
const processEnvVars = getProcessEnvVars(collection.uid);
const headers = {}
const headers = {};
each(get(request, 'headers', []), (h) => {
if (h.enabled) {
@@ -199,8 +199,15 @@ const registerWsEventHandlers = (window) => {
timestamp: Date.now()
};
const hasMessages = preparedRequest.body.ws.some((msg) => msg.content.length);
if (hasMessages) {
preparedRequest.body.ws.forEach((message) => {
wsClient.queueMessage(preparedRequest.uid, collection.uid, message.content);
});
}
// Start WebSocket connection
const connectionInstance = await wsClient.startConnection({
await wsClient.startConnection({
request: preparedRequest,
collection,
options: {
@@ -210,16 +217,6 @@ const registerWsEventHandlers = (window) => {
}
});
// If the body already has messages then send it after connection
connectionInstance.on('open', async () => {
const hasMessages = preparedRequest.body.ws.some((msg) => msg.content.length);
if (hasMessages) {
preparedRequest.body.ws.forEach((message) => {
wsClient.sendMessage(preparedRequest.uid, collection.uid, message.content);
});
}
});
sendEvent('ws:request', preparedRequest.uid, collection.uid, requestSent);
// Send OAuth credentials update if available
@@ -258,6 +255,16 @@ const registerWsEventHandlers = (window) => {
}
});
ipcMain.handle('ws:queue-message', (event, requestId, collectionUid, message) => {
try {
wsClient.queueMessage(requestId, collectionUid, message);
return { success: true };
} catch (error) {
console.error('Error queuing WebSocket message:', error);
return { success: false, error: error.message };
}
});
// Send a message to an existing WebSocket connection
ipcMain.handle('ws:send-message', (event, requestId, collectionUid, message) => {
try {

View File

@@ -61,9 +61,11 @@ const getParsedWsUrlObject = (url) => {
};
class WsClient {
messageQueue = [];
activeConnections = new Map();
connectionKeepAlive = new Map();
constructor(eventCallback) {
this.activeConnections = new Map();
this.connectionKeepAlive = new Map();
this.eventCallback = eventCallback;
}
@@ -104,14 +106,19 @@ class WsClient {
timestamp: Date.now()
});
if (keepAlive) {
const handle = setInterval(() => {
wsConnection.isAlive = false;
wsConnection.ping();
}, keepAliveInterval);
this.connectionKeepAlive.set(requestId, handle);
}
return wsConnection
wsConnection.addEventListener('open', () => {
this.#flushQueue(requestId, collectionUid);
if (keepAlive) {
const handle = setInterval(() => {
wsConnection.isAlive = false;
wsConnection.ping();
}, keepAliveInterval);
this.connectionKeepAlive.set(requestId, handle);
}
});
return wsConnection;
} catch (error) {
console.error('Error creating WebSocket connection:', error);
this.eventCallback('ws:error', requestId, collectionUid, {
@@ -121,6 +128,31 @@ class WsClient {
}
}
queueMessage(requestId, collectionId, message) {
const connection = this.activeConnections.get(requestId);
if (connection && connection.readyState === WebSocket.OPEN) {
this.#flushQueue(requestId, collectionId);
this.sendMessage(requestId, collectionId, message);
return;
}
this.messageQueue.push({
requestId,
collectionId,
payload: message
});
}
#flushQueue(requestId, collectionId) {
const connection = this.activeConnections.get(requestId);
for (const message of this.messageQueue) {
if (message.requestId !== requestId) continue;
if (message.collectionId !== collectionId) continue;
this.sendMessage(requestId, collectionId, message.payload);
}
}
/**
* Send a message to an active WebSocket connection
* @param {string} requestId - The request ID of the active connection
@@ -296,7 +328,7 @@ class WsClient {
#removeConnection(requestId) {
if (this.connectionKeepAlive.has(requestId)) {
clearInterval(this.connectionKeepAlive.get(requestId));
this.connectionKeepAlive.delete(requestId)
this.connectionKeepAlive.delete(requestId);
}
if (this.activeConnections.has(requestId)) {
this.activeConnections.delete(requestId);