fix: add a hard sequence in SSE and WS requests (#6569)

* fix: sse sequence in ipc layer

* fix: remove tick rate and flushing

* fix: added sequence logic for websockets

* fix: added sequence logic for websockets per request based

* fix: correct the order for how the messages are added.

`WSMessagesList` already handles a lot of the ordering for us, don't modify the order the messages are added since redirect and connection are internal states, it changes the execution trail

* chore: reduce whitespace diffs

* fix: a possible null case exception

Though we always create an empty data buffer at source so shouldn't happen unless that is modified

* fix: implement sequence logic for WebSocket messages

* fix: remove unused sequenceState property from WsClient

* fix: update message sorting logic to handle missing sequence numbers

* fix: remove unused lodash import

* fix: add clean method to sequencer for better sequence management

* fix: don't show dropdown when streaming

---------

Co-authored-by: Sid <siddharth@usebruno.com>
This commit is contained in:
SHUBH
2026-01-03 11:29:46 +05:30
committed by GitHub
parent c83c055654
commit 30dbe34e2e
5 changed files with 86 additions and 20 deletions

View File

@@ -6,7 +6,6 @@ import CodeEditor from 'components/CodeEditor/index';
import { useTheme } from 'providers/Theme';
import { useState } from 'react';
import { useSelector } from 'react-redux';
import _ from 'lodash';
import { useRef } from 'react';
import { useEffect } from 'react';
@@ -183,12 +182,16 @@ const WSMessagesList = ({ order = -1, messages = [] }) => {
if (!messages.length) {
return <StyledWrapper><div className="empty-state">No messages yet.</div></StyledWrapper>;
}
const ordered = order === -1 ? messages : messages.slice().reverse();
// sort based on order, seq was newly added and might be missing in some cases and when missing,
// the timestamp will be used instead
const ordered = messages.toSorted((x, y) => ((x.seq ?? x.timestamp) - (y.seq ?? y.timestamp)) * order);
return (
<StyledWrapper className="ws-messages-list flex flex-col">
{ordered.map((msg, idx, src) => {
const inFocus = order === -1 ? src.length - 1 === idx : idx === 0;
return <WSMessageItem key={msg.timestamp} inFocus={inFocus} id={idx} message={msg} />;
return <WSMessageItem key={msg.seq ? msg.seq : msg.timestamp} inFocus={inFocus} id={idx} message={msg} />;
})}
</StyledWrapper>
);

View File

@@ -225,7 +225,7 @@ const ResponsePane = ({ item, collection }) => {
onClick={() => setShowScriptErrorCard(true)}
/>
)}
{focusedTab?.responsePaneTab === 'response' && item?.response ? (
{focusedTab?.responsePaneTab === 'response' && item?.response && !(item.response?.stream ?? false) ? (
<>
{/* Result View Tabs (Visualizations + Response Format) */}
<div className="result-view-tabs">

View File

@@ -383,7 +383,7 @@ export const collectionsSlice = createSlice({
}
},
requestCancelled: (state, action) => {
const { itemUid, collectionUid } = action.payload;
const { itemUid, collectionUid, seq, timestamp } = action.payload;
const collection = findCollectionByUid(state.collections, collectionUid);
if (collection) {
@@ -394,7 +394,7 @@ export const collectionsSlice = createSlice({
const startTimestamp = item.requestSent.timestamp;
item.response.duration = startTimestamp ? Date.now() - startTimestamp : item.response.duration;
item.response.data = [{ type: 'info', timestamp: Date.now(), message: 'Connection Closed' }].concat(item.response.data);
item.response.data = [{ type: 'info', timestamp: Date.now(), seq: seq, message: 'Connection Closed' }].concat(item.response.data);
} else {
item.response = null;
item.requestUid = null;
@@ -3118,7 +3118,7 @@ export const collectionsSlice = createSlice({
}
},
streamDataReceived: (state, action) => {
const { itemUid, collectionUid, data } = action.payload;
const { itemUid, collectionUid, seq, timestamp, data } = action.payload;
const collection = findCollectionByUid(state.collections, collectionUid);
if (collection) {
@@ -3127,12 +3127,16 @@ export const collectionsSlice = createSlice({
item.response.data ||= [];
item.response.data = [{
type: 'incoming',
seq,
message: data.data,
messageHexdump: hexdump(data.data),
timestamp: Date.now()
timestamp: timestamp || Date.now()
}].concat(item.response.data);
}
item.response.dataBuffer = Buffer.concat([Buffer.from(item.response.dataBuffer), Buffer.from(data.dataBuffer)]);
if (item.response.dataBuffer && item.response.dataBuffer.length && data.dataBuffer) {
item.response.dataBuffer = Buffer.concat([Buffer.from(item.response.dataBuffer), Buffer.from(data.dataBuffer)]);
}
item.response.size = data.data?.length + (item.response.size || 0);
}
},
@@ -3255,7 +3259,8 @@ export const collectionsSlice = createSlice({
updatedResponse.responses.push({
message: eventData.message,
type: eventData.type,
timestamp: eventData.timestamp
timestamp: eventData.timestamp,
seq: eventData.seq
});
break;
@@ -3271,7 +3276,8 @@ export const collectionsSlice = createSlice({
updatedResponse.responses.push({
message: `Connected to ${eventData.url}`,
type: 'info',
timestamp: eventData.timestamp
timestamp: eventData.timestamp,
seq: eventData.seq
});
break;
@@ -3287,7 +3293,8 @@ export const collectionsSlice = createSlice({
updatedResponse.responses.push({
type: code !== 1000 ? 'info' : 'error',
message: reason.trim().length ? ['Closed:', reason.trim()].join(' ') : 'Closed',
timestamp
timestamp: eventData.timestamp,
seq: eventData.seq
});
break;
@@ -3302,7 +3309,8 @@ export const collectionsSlice = createSlice({
updatedResponse.responses.push({
type: 'error',
message: errorDetails || 'WebSocket error occurred',
timestamp
timestamp: eventData.timestamp,
seq: eventData.seq
});
break;

View File

@@ -1003,6 +1003,7 @@ const registerNetworkIpc = (mainWindow) => {
// handler for sending http request
ipcMain.handle('send-http-request', async (event, item, collection, environment, runtimeVariables) => {
let seq = 0;
const collectionUid = collection.uid;
const envVars = getEnvVars(environment);
const processEnvVars = getProcessEnvVars(collectionUid);
@@ -1012,16 +1013,29 @@ const registerNetworkIpc = (mainWindow) => {
response.stream = { running: response.status >= 200 && response.status < 300 };
stream.on('data', (newData) => {
seq += 1;
const parsed = parseDataFromResponse({ data: newData, headers: {} });
mainWindow.webContents.send('main:http-stream-new-data', { collectionUid, itemUid: item.uid, data: parsed });
mainWindow.webContents.send('main:http-stream-new-data', {
collectionUid,
itemUid: item.uid,
seq,
timestamp: Date.now(),
data: parsed
});
});
stream.on('close', () => {
if (!cancelTokens[response.cancelTokenUid]) {
return;
}
if (!cancelTokens[response.cancelTokenUid]) return;
mainWindow.webContents.send('main:http-stream-end', {
collectionUid,
itemUid: item.uid,
seq: seq + 1,
timestamp: Date.now()
});
mainWindow.webContents.send('main:http-stream-end', { collectionUid, itemUid: item.uid });
deleteCancelToken(response.cancelTokenUid);
});
}

View File

@@ -49,6 +49,36 @@ const normalizeMessageByFormat = (message, format) => {
}
};
const createSequencer = () => {
const seq = {};
const nextSeq = (requestId, collectionId) => {
seq[requestId] ||= {};
seq[requestId][collectionId] ||= 0;
return ++seq[requestId][collectionId];
};
/**
* @param {string} requestId
* @param {string} [collectionId]
*/
const clean = (requestId, collectionId = undefined) => {
if (collectionId) {
delete seq[requestId][collectionId];
}
if (!Object.keys(seq[requestId]).length) {
delete seq[requestId];
}
};
return {
next: nextSeq,
clean
};
};
const seq = createSequencer();
class WsClient {
messageQueues = {};
activeConnections = new Map();
@@ -181,6 +211,7 @@ class WsClient {
message: payload,
messageHexdump: hexdump(payload),
type: 'outgoing',
seq: seq.next(requestId, collectionUid),
timestamp: Date.now()
});
}
@@ -204,6 +235,7 @@ class WsClient {
if (connectionMeta?.connection) {
connectionMeta.connection.close(code, reason);
this.#removeConnection(requestId);
seq.clean(requestId);
}
}
@@ -283,7 +315,8 @@ class WsClient {
this.eventCallback('main:ws:open', requestId, collectionUid, {
timestamp: Date.now(),
url: ws.url
url: ws.url,
seq: seq.next(requestId, collectionUid)
});
});
@@ -294,7 +327,8 @@ class WsClient {
message: `Redirected to ${url}`,
type: 'info',
timestamp: Date.now(),
headers: headers
headers: headers,
seq: seq.next(requestId, collectionUid)
});
});
@@ -302,6 +336,7 @@ class WsClient {
this.eventCallback('main:ws:upgrade', requestId, collectionUid, {
type: 'info',
timestamp: Date.now(),
seq: seq.next(requestId, collectionUid),
headers: { ...response.headers }
});
});
@@ -313,6 +348,7 @@ class WsClient {
message,
messageHexdump: hexdump(Buffer.from(data)),
type: 'incoming',
seq: seq.next(requestId, collectionUid),
timestamp: Date.now()
});
} catch (error) {
@@ -321,6 +357,7 @@ class WsClient {
message: data.toString(),
messageHexdump: hexdump(data),
type: 'incoming',
seq: seq.next(requestId, collectionUid),
timestamp: Date.now()
});
}
@@ -330,14 +367,17 @@ class WsClient {
this.eventCallback('main:ws:close', requestId, collectionUid, {
code,
reason: Buffer.from(reason).toString(),
seq: seq.next(requestId, collectionUid),
timestamp: Date.now()
});
seq.clean(requestId, collectionUid);
this.#removeConnection(requestId);
});
ws.on('error', (error) => {
this.eventCallback('main:ws:error', requestId, collectionUid, {
error: error.message,
seq: seq.next(requestId, collectionUid),
timestamp: Date.now()
});
});
@@ -356,6 +396,7 @@ class WsClient {
this.eventCallback('main:ws:connections-changed', {
type: 'added',
requestId,
seq: seq.next(requestId, collectionUid),
activeConnectionIds: this.getActiveConnectionIds()
});
}