refactor: faster flushing

This commit is contained in:
Siddharth Gelera
2025-09-16 11:52:49 +05:30
parent 8435239099
commit 2d7bd54e24
4 changed files with 75 additions and 70 deletions

View File

@@ -72,7 +72,7 @@ const WsQueryUrl = ({ item, collection, handleRun }) => {
};
const handleConnect = (e) => {
connectWS(item, collection);
connectWS(item, collection, undefined, undefined, {connectOnly:true});
};
const onSave = (finalValue) => {

View File

@@ -209,9 +209,9 @@ export const generateGrpcSampleMessage = async (methodPath, existingMessage = nu
});
};
export const connectWS = async (item, collection, environment, runtimeVariables) => {
export const connectWS = async (item, collection, environment, runtimeVariables, options) => {
return new Promise((resolve, reject) => {
startWsConnection(item, collection, environment, runtimeVariables)
startWsConnection(item, collection, environment, runtimeVariables, options)
.then((initialState) => {
// Return an initial state object to update the UI
// The real response data will be handled by event listeners
@@ -232,9 +232,8 @@ export const sendWsRequest = (item, collection, environment, runtimeVariables) =
await connectWS(item, collection, environment, runtimeVariables);
}
};
await ensureConnection()
const { request } = item.draft ? item.draft : item;
sendWsMessage(item, collection.uid, request.body.ws[0].content)
queueWsMessage(item, collection.uid, request.body.ws[0].content)
.then((initialState) => {
// Return an initial state object to update the UI
// The real response data will be handled by event listeners
@@ -243,15 +242,15 @@ export const sendWsRequest = (item, collection, environment, runtimeVariables) =
});
})
.catch((err) => reject(err));
await ensureConnection();
});
};
export const startWsConnection = async (item, collection, environment, runtimeVariables) => {
export const startWsConnection = async (item, collection, environment, runtimeVariables, options) => {
return new Promise((resolve, reject) => {
const { ipcRenderer } = window;
const request = item.draft ? item.draft : item;
const settings = item.draft ? item.draft.settings : item.settings
const settings = item.draft ? item.draft.settings : item.settings;
ipcRenderer
.invoke('ws:start-connection', {
@@ -259,7 +258,8 @@ export const startWsConnection = async (item, collection, environment, runtimeVa
collection,
environment,
runtimeVariables,
settings
settings,
options
})
.then(() => {
resolve();
@@ -270,7 +270,6 @@ export const startWsConnection = async (item, collection, environment, runtimeVa
});
};
/**
* Sends a message to an existing WebSocket connection
* @param {string} requestId - The request ID to send a message to

View File

@@ -186,63 +186,68 @@ const registerWsEventHandlers = (window) => {
});
// Start a new WebSocket connection
ipcMain.handle('ws:start-connection', async (event, { request, collection, environment, runtimeVariables, settings }) => {
try {
const requestCopy = cloneDeep(request);
const preparedRequest = await prepareWsRequest(requestCopy, collection, environment, runtimeVariables, {});
ipcMain.handle(
'ws:start-connection',
async (event, { request, collection, environment, runtimeVariables, settings, options = {} }) => {
try {
const requestCopy = cloneDeep(request);
const preparedRequest = await prepareWsRequest(requestCopy, collection, environment, runtimeVariables, {});
const connectOnly = options?.connectOnly ?? false;
const requestSent = {
type: 'request',
url: preparedRequest.url,
headers: preparedRequest.headers,
body: preparedRequest.body,
timestamp: Date.now()
};
const requestSent = {
type: 'request',
url: preparedRequest.url,
headers: preparedRequest.headers,
body: preparedRequest.body,
timestamp: Date.now()
};
const hasMessages = preparedRequest.body.ws.some((msg) => msg.content.length);
if (hasMessages) {
preparedRequest.body.ws.forEach((message) => {
wsClient.queueMessage(preparedRequest.uid, collection.uid, message.content);
});
}
// Start WebSocket connection
await wsClient.startConnection({
request: preparedRequest,
collection,
options: {
timeout: settings.connectionTimeout,
keepAlive: settings.keepAliveInterval > 0 ? true : false,
keepAliveInterval: settings.keepAliveInterval
if (!connectOnly) {
const hasMessages = preparedRequest.body.ws.some((msg) => msg.content.length);
if (hasMessages) {
preparedRequest.body.ws.forEach((message) => {
wsClient.queueMessage(preparedRequest.uid, collection.uid, message.content);
});
}
}
});
sendEvent('ws:request', preparedRequest.uid, collection.uid, requestSent);
// Send OAuth credentials update if available
if (preparedRequest?.oauth2Credentials) {
window.webContents.send('main:credentials-update', {
credentials: preparedRequest.oauth2Credentials?.credentials,
url: preparedRequest.oauth2Credentials?.url,
collectionUid: collection.uid,
credentialsId: preparedRequest.oauth2Credentials?.credentialsId,
...(preparedRequest.oauth2Credentials?.folderUid
? { folderUid: preparedRequest.oauth2Credentials.folderUid }
: { itemUid: preparedRequest.uid }),
debugInfo: preparedRequest.oauth2Credentials.debugInfo
// Start WebSocket connection
await wsClient.startConnection({
request: preparedRequest,
collection,
options: {
timeout: settings.connectionTimeout,
keepAlive: settings.keepAliveInterval > 0 ? true : false,
keepAliveInterval: settings.keepAliveInterval
}
});
}
return { success: true };
} catch (error) {
console.error('Error starting WebSocket connection:', error);
if (error instanceof Error) {
throw error;
sendEvent('ws:request', preparedRequest.uid, collection.uid, requestSent);
// Send OAuth credentials update if available
if (preparedRequest?.oauth2Credentials) {
window.webContents.send('main:credentials-update', {
credentials: preparedRequest.oauth2Credentials?.credentials,
url: preparedRequest.oauth2Credentials?.url,
collectionUid: collection.uid,
credentialsId: preparedRequest.oauth2Credentials?.credentialsId,
...(preparedRequest.oauth2Credentials?.folderUid
? { folderUid: preparedRequest.oauth2Credentials.folderUid }
: { itemUid: preparedRequest.uid }),
debugInfo: preparedRequest.oauth2Credentials.debugInfo
});
}
return { success: true };
} catch (error) {
console.error('Error starting WebSocket connection:', error);
if (error instanceof Error) {
throw error;
}
sendEvent('ws:error', request.uid, collection.uid, { error: error.message });
return { success: false, error: error.message };
}
sendEvent('ws:error', request.uid, collection.uid, { error: error.message });
return { success: false, error: error.message };
}
});
);
// Get all active connection IDs
ipcMain.handle('ws:get-active-connections', (event) => {

View File

@@ -128,28 +128,29 @@ class WsClient {
}
}
queueMessage(requestId, collectionId, message) {
queueMessage(requestId, collectionUid, message) {
const connection = this.activeConnections.get(requestId);
if (connection && connection.readyState === WebSocket.OPEN) {
this.#flushQueue(requestId, collectionId);
this.sendMessage(requestId, collectionId, message);
this.#flushQueue(requestId, collectionUid);
this.sendMessage(requestId, collectionUid, message);
return;
}
this.messageQueue.push({
requestId,
collectionId,
collectionUid,
payload: message
});
}
#flushQueue(requestId, collectionId) {
const connection = this.activeConnections.get(requestId);
for (const message of this.messageQueue) {
if (message.requestId !== requestId) continue;
if (message.collectionId !== collectionId) continue;
this.sendMessage(requestId, collectionId, message.payload);
#flushQueue(requestId, collectionUid) {
for (const ind in this.messageQueue) {
const message = this.messageQueue[ind];
if (message.requestId != requestId) continue;
if (message.collectionUid != collectionUid) continue;
this.sendMessage(requestId, collectionUid, message.payload);
this.messageQueue.splice(ind, 1);
}
}