mirror of
https://github.com/usebruno/bruno.git
synced 2026-06-11 09:51:30 +00:00
fix: improve websocket message handling and serialization
- Added normalization for message format to prevent double encoding. - Updated queueMessage and sendMessage methods to handle message format. - Refactored code for better readability and maintainability. fix: enhance message normalization in WebSocket client
This commit is contained in:
@@ -27,7 +27,9 @@ const { setAuthHeaders } = require('./prepare-request');
|
||||
const prepareWsRequest = async (item, collection, environment, runtimeVariables, certsAndProxyConfig = {}) => {
|
||||
const request = item.draft ? item.draft.request : item.request;
|
||||
const collectionRoot = collection?.draft?.root ? get(collection, 'draft.root', {}) : get(collection, 'root', {});
|
||||
const brunoConfig = collection.draft?.brunoConfig ? get(collection, 'draft.brunoConfig', {}) : get(collection, 'brunoConfig', {});
|
||||
const brunoConfig = collection.draft?.brunoConfig
|
||||
? get(collection, 'draft.brunoConfig', {})
|
||||
: get(collection, 'brunoConfig', {});
|
||||
const rawHeaders = cloneDeep(request.headers ?? []);
|
||||
const headers = {};
|
||||
|
||||
@@ -39,7 +41,9 @@ const prepareWsRequest = async (item, collection, environment, runtimeVariables,
|
||||
mergeVars(collection, request, requestTreePath);
|
||||
mergeAuth(collection, request, requestTreePath);
|
||||
request.globalEnvironmentVariables = collection?.globalEnvironmentVariables;
|
||||
request.oauth2CredentialVariables = getFormattedCollectionOauth2Credentials({ oauth2Credentials: collection?.oauth2Credentials });
|
||||
request.oauth2CredentialVariables = getFormattedCollectionOauth2Credentials({
|
||||
oauth2Credentials: collection?.oauth2Credentials
|
||||
});
|
||||
}
|
||||
|
||||
each(get(collectionRoot, 'request.headers', []), (h) => {
|
||||
@@ -54,9 +58,12 @@ const prepareWsRequest = async (item, collection, environment, runtimeVariables,
|
||||
}
|
||||
});
|
||||
|
||||
const socketProtocols = rawHeaders.filter((header) => {
|
||||
return header.name && header.name.toLowerCase() === 'sec-websocket-protocol' && header.enabled;
|
||||
}).map((d) => d.value.trim()).join(',');
|
||||
const socketProtocols = rawHeaders
|
||||
.filter((header) => {
|
||||
return header.name && header.name.toLowerCase() === 'sec-websocket-protocol' && header.enabled;
|
||||
})
|
||||
.map((d) => d.value.trim())
|
||||
.join(',');
|
||||
|
||||
if (socketProtocols.length > 0) {
|
||||
headers['Sec-WebSocket-Protocol'] = socketProtocols;
|
||||
@@ -118,7 +125,7 @@ const prepareWsRequest = async (item, collection, environment, runtimeVariables,
|
||||
const url = new URL(request.url);
|
||||
url?.searchParams?.set(tokenQueryKey, credentials?.access_token);
|
||||
request.url = url?.toString();
|
||||
} catch (error) { }
|
||||
} catch (error) {}
|
||||
}
|
||||
break;
|
||||
case 'client_credentials':
|
||||
@@ -148,7 +155,7 @@ const prepareWsRequest = async (item, collection, environment, runtimeVariables,
|
||||
const url = new URL(request.url);
|
||||
url?.searchParams?.set(tokenQueryKey, credentials?.access_token);
|
||||
request.url = url?.toString();
|
||||
} catch (error) { }
|
||||
} catch (error) {}
|
||||
}
|
||||
break;
|
||||
case 'password':
|
||||
@@ -178,7 +185,7 @@ const prepareWsRequest = async (item, collection, environment, runtimeVariables,
|
||||
const url = new URL(request.url);
|
||||
url?.searchParams?.set(tokenQueryKey, credentials?.access_token);
|
||||
request.url = url?.toString();
|
||||
} catch (error) { }
|
||||
} catch (error) {}
|
||||
}
|
||||
break;
|
||||
}
|
||||
@@ -207,7 +214,8 @@ const registerWsEventHandlers = (window) => {
|
||||
wsClient = new WsClient(sendEvent);
|
||||
|
||||
// Start a new WebSocket connection
|
||||
ipcMain.handle('renderer:ws:start-connection',
|
||||
ipcMain.handle(
|
||||
'renderer:ws:start-connection',
|
||||
async (event, { request, collection, environment, runtimeVariables, settings, options = {} }) => {
|
||||
try {
|
||||
const requestCopy = cloneDeep(request);
|
||||
@@ -290,7 +298,8 @@ const registerWsEventHandlers = (window) => {
|
||||
sendEvent('main:ws:error', request.uid, collection.uid, { error: error.message });
|
||||
return { success: false, error: error.message };
|
||||
}
|
||||
});
|
||||
}
|
||||
);
|
||||
|
||||
// Get all active connection IDs
|
||||
ipcMain.handle('renderer:ws:get-active-connections', (event) => {
|
||||
@@ -303,7 +312,8 @@ const registerWsEventHandlers = (window) => {
|
||||
}
|
||||
});
|
||||
|
||||
ipcMain.handle('renderer:ws:queue-message',
|
||||
ipcMain.handle(
|
||||
'renderer:ws:queue-message',
|
||||
async (event, { item, collection, environment, runtimeVariables, messageContent }) => {
|
||||
try {
|
||||
const itemCopy = cloneDeep(item);
|
||||
@@ -318,7 +328,8 @@ const registerWsEventHandlers = (window) => {
|
||||
|
||||
if (messageIndex >= 0 && preparedRequest.body?.ws?.[messageIndex]) {
|
||||
// Queue the interpolated version of the specific message
|
||||
wsClient.queueMessage(preparedRequest.uid, collection.uid, preparedRequest.body.ws[messageIndex].content);
|
||||
const message = preparedRequest.body.ws[messageIndex];
|
||||
wsClient.queueMessage(preparedRequest.uid, collection.uid, message.content, message.type);
|
||||
} else {
|
||||
// Message not found in request body, queue as-is (shouldn't happen in normal flow)
|
||||
wsClient.queueMessage(preparedRequest.uid, collection.uid, messageContent);
|
||||
@@ -329,7 +340,7 @@ const registerWsEventHandlers = (window) => {
|
||||
preparedRequest.body.ws
|
||||
.filter((message) => message && message.content)
|
||||
.forEach((message) => {
|
||||
wsClient.queueMessage(preparedRequest.uid, collection.uid, message.content);
|
||||
wsClient.queueMessage(preparedRequest.uid, collection.uid, message.content, message.type);
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -339,7 +350,8 @@ const registerWsEventHandlers = (window) => {
|
||||
console.error('Error queuing WebSocket message:', error);
|
||||
return { success: false, error: error.message };
|
||||
}
|
||||
});
|
||||
}
|
||||
);
|
||||
|
||||
// Send a message to an existing WebSocket connection
|
||||
ipcMain.handle('renderer:ws:send-message', (event, requestId, collectionUid, message) => {
|
||||
|
||||
@@ -22,6 +22,33 @@ const safeParseJSON = (jsonString, context = 'JSON string') => {
|
||||
}
|
||||
};
|
||||
|
||||
const normalizeMessageByFormat = (message, format) => {
|
||||
if (!message) {
|
||||
return '';
|
||||
}
|
||||
switch (format) {
|
||||
case 'json':
|
||||
// If it was already stringified, do not double encode
|
||||
if (typeof message === 'string') {
|
||||
return message;
|
||||
}
|
||||
return JSON.stringify(message);
|
||||
case 'raw':
|
||||
case 'xml':
|
||||
return message;
|
||||
default: {
|
||||
if (typeof message === 'string') {
|
||||
return message;
|
||||
}
|
||||
if (typeof message === 'object') {
|
||||
return JSON.stringify(message);
|
||||
}
|
||||
console.warn('Received message of unhandled type.', { type: typeof message });
|
||||
return '';
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
class WsClient {
|
||||
messageQueues = {};
|
||||
activeConnections = new Map();
|
||||
@@ -53,10 +80,12 @@ class WsClient {
|
||||
// Create WebSocket connection
|
||||
// Note: unlike the standard Websocket constructor the `ws` library doesn't support adding Protocols as a single string
|
||||
// and instead needs it broken down manually, make sure this tested with multiple protocols again.
|
||||
const protocols = [].concat([headers['Sec-WebSocket-Protocol'], headers['sec-websocket-protocol']])
|
||||
const protocols = []
|
||||
.concat([headers['Sec-WebSocket-Protocol'], headers['sec-websocket-protocol']])
|
||||
.filter(Boolean)
|
||||
.map((d) => d.split(','))
|
||||
.flat().map((d) => d.trim());
|
||||
.flat()
|
||||
.map((d) => d.trim());
|
||||
|
||||
const protocolVersion = headers['Sec-WebSocket-Version'] || headers['sec-websocket-version'];
|
||||
|
||||
@@ -105,12 +134,15 @@ class WsClient {
|
||||
return `${requestId}`;
|
||||
}
|
||||
|
||||
queueMessage(requestId, collectionUid, message) {
|
||||
queueMessage(requestId, collectionUid, message, format = 'raw') {
|
||||
const connectionMeta = this.activeConnections.get(requestId);
|
||||
|
||||
const mqKey = this.#getMessageQueueId(requestId);
|
||||
this.messageQueues[mqKey] ||= [];
|
||||
this.messageQueues[mqKey].push(message);
|
||||
this.messageQueues[mqKey].push({
|
||||
message,
|
||||
format
|
||||
});
|
||||
|
||||
if (connectionMeta && connectionMeta.connection && connectionMeta.connection.readyState === WebSocket.OPEN) {
|
||||
this.#flushQueue(requestId, collectionUid);
|
||||
@@ -122,7 +154,8 @@ class WsClient {
|
||||
const mqKey = this.#getMessageQueueId(requestId);
|
||||
if (!(mqKey in this.messageQueues)) return;
|
||||
while (this.messageQueues[mqKey].length > 0) {
|
||||
this.sendMessage(requestId, collectionUid, this.messageQueues[mqKey].shift());
|
||||
const { message, format } = this.messageQueues[mqKey].shift();
|
||||
this.sendMessage(requestId, collectionUid, message, format);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -132,26 +165,11 @@ class WsClient {
|
||||
* @param {string} collectionUid - The collection UID for the request
|
||||
* @param {Object|string} message - The message to send
|
||||
*/
|
||||
sendMessage(requestId, collectionUid, message) {
|
||||
sendMessage(requestId, collectionUid, message, format = 'raw') {
|
||||
const connectionMeta = this.activeConnections.get(requestId);
|
||||
|
||||
if (connectionMeta.connection && connectionMeta.connection.readyState === WebSocket.OPEN) {
|
||||
let messageToSend;
|
||||
|
||||
// Parse the message if it's a string
|
||||
if (typeof message === 'string') {
|
||||
try {
|
||||
messageToSend = safeParseJSON(message, 'message content');
|
||||
} catch (parseError) {
|
||||
// If parsing fails, send as string
|
||||
messageToSend = message;
|
||||
}
|
||||
} else {
|
||||
messageToSend = message;
|
||||
}
|
||||
|
||||
// If messageToSend is a string, send it raw. If it is an object, stringify it.
|
||||
const payload = typeof messageToSend === 'string' ? messageToSend : JSON.stringify(messageToSend);
|
||||
const payload = normalizeMessageByFormat(message, format);
|
||||
|
||||
// Send the message
|
||||
connectionMeta.connection.send(payload, (error) => {
|
||||
@@ -160,8 +178,8 @@ class WsClient {
|
||||
} else {
|
||||
// Emit message sent event
|
||||
this.eventCallback('main:ws:message', requestId, collectionUid, {
|
||||
message: messageToSend,
|
||||
messageHexdump: hexdump(JSON.stringify(messageToSend)),
|
||||
message: payload,
|
||||
messageHexdump: hexdump(payload),
|
||||
type: 'outgoing',
|
||||
timestamp: Date.now()
|
||||
});
|
||||
|
||||
Reference in New Issue
Block a user