mirror of
https://github.com/usebruno/bruno.git
synced 2026-06-11 09:51:30 +00:00
fix: close previous SSE connection before sending new request (#7474)
* fix: update system proxy fetching to use finally (#7652) * fix: update system proxy fetching to use finally for improved reliability * Update packages/bruno-electron/src/index.js Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * fix: allow file selection in multipart form without entering a key first (#7640) * fix: close previous SSE connection before sending new request When resending an SSE (Server-Sent Events) request using Cmd+Enter, the previous connection was not being closed, causing connection leaks. Changes: - Add SSE cancellation logic to sendRequest action - checks for running stream and cancels it before sending new request - Add return to cancelRequest action to make it properly chainable - Simplify RequestTabPanel by removing duplicate cancel logic (now handled centrally in sendRequest) - Add SSE endpoints to test server for e2e testing - Add Playwright e2e test to verify SSE connection cancellation * fix: address PR review feedback for SSE connection cancellation - Use platform-aware modifier (Meta on macOS, Control on Linux/Windows) instead of hardcoded Meta+Enter for cross-platform CI compatibility - Replace waitForTimeout with expect.poll for deterministic assertions - Remove dead try/catch around cancelRequest (errors already swallowed by cancelRequest's internal .catch) * fix: updated the test to check of connectionIds --------- Co-authored-by: Sid <siddharth@usebruno.com> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Co-authored-by: Pooja <pooja@usebruno.com> Co-authored-by: Chirag Chandrashekhar <cchirag85@gmail.com>
This commit is contained in:
committed by
GitHub
parent
9465de02ee
commit
5e1a36f8c8
@@ -8,7 +8,7 @@ import GrpcRequestPane from 'components/RequestPane/GrpcRequestPane/index';
|
||||
import ResponsePane from 'components/ResponsePane';
|
||||
import GrpcResponsePane from 'components/ResponsePane/GrpcResponsePane';
|
||||
import { findItemInCollection } from 'utils/collections';
|
||||
import { cancelRequest, sendRequest } from 'providers/ReduxStore/slices/collections/actions';
|
||||
import { sendRequest } from 'providers/ReduxStore/slices/collections/actions';
|
||||
import { updateGqlDocsOpen } from 'providers/ReduxStore/slices/tabs';
|
||||
import RequestNotFound from './RequestNotFound';
|
||||
import QueryUrl from 'components/RequestPane/QueryUrl/index';
|
||||
@@ -298,13 +298,7 @@ const RequestTabPanel = () => {
|
||||
toast.error('Please enter a valid WebSocket URL');
|
||||
return;
|
||||
}
|
||||
|
||||
if (item.response?.stream?.running) {
|
||||
dispatch(cancelRequest(item.cancelTokenUid, item, collection)).catch((err) =>
|
||||
toast.custom((t) => <NetworkError onClose={() => toast.dismiss(t.id)} />, {
|
||||
duration: 5000
|
||||
}));
|
||||
} else if (item.requestState !== 'sending' && item.requestState !== 'queued') {
|
||||
if (item.requestState !== 'sending' && item.requestState !== 'queued') {
|
||||
dispatch(sendRequest(item, collection.uid)).catch((err) =>
|
||||
toast.custom((t) => <NetworkError onClose={() => toast.dismiss(t.id)} />, {
|
||||
duration: 5000
|
||||
|
||||
@@ -530,6 +530,10 @@ export const sendRequest = (item, collectionUid) => (dispatch, getState) => {
|
||||
return reject(new Error('Collection not found'));
|
||||
}
|
||||
|
||||
if (item.response?.stream?.running && item.cancelTokenUid) {
|
||||
await dispatch(cancelRequest(item.cancelTokenUid, item, collection));
|
||||
}
|
||||
|
||||
let collectionCopy = cloneDeep(collection);
|
||||
|
||||
const itemCopy = cloneDeep(item);
|
||||
@@ -638,7 +642,7 @@ export const sendRequest = (item, collectionUid) => (dispatch, getState) => {
|
||||
};
|
||||
|
||||
export const cancelRequest = (cancelTokenUid, item, collection) => (dispatch) => {
|
||||
cancelNetworkRequest(cancelTokenUid)
|
||||
return cancelNetworkRequest(cancelTokenUid)
|
||||
.then(() => {
|
||||
dispatch(
|
||||
requestCancelled({
|
||||
|
||||
@@ -10,6 +10,7 @@ const redirectRouter = require('./redirect');
|
||||
const mixRouter = require('./mix');
|
||||
const wsRouter = require('./ws');
|
||||
const setupGraphQL = require('./graphql');
|
||||
const sseRouter = require('./sse');
|
||||
|
||||
const app = new express();
|
||||
const port = process.env.PORT || 8081;
|
||||
@@ -49,6 +50,7 @@ app.use('/api/echo', echoRouter);
|
||||
app.use('/api/multipart', multipartRouter);
|
||||
app.use('/api/redirect', redirectRouter);
|
||||
app.use('/api/mix', mixRouter);
|
||||
app.use('/api/sse', sseRouter);
|
||||
|
||||
app.get('/ping', function (req, res) {
|
||||
return res.send('pong');
|
||||
|
||||
58
packages/bruno-tests/src/sse/index.js
Normal file
58
packages/bruno-tests/src/sse/index.js
Normal file
@@ -0,0 +1,58 @@
|
||||
const express = require('express');
|
||||
const router = express.Router();
|
||||
|
||||
// Track active SSE connections
|
||||
let activeConnections = new Set();
|
||||
let connectionIdCounter = 0;
|
||||
|
||||
// GET /api/sse/stream - SSE endpoint that streams data
|
||||
router.get('/stream', (req, res) => {
|
||||
const connectionId = ++connectionIdCounter;
|
||||
activeConnections.add(connectionId);
|
||||
|
||||
console.log(`[SSE] Connection ${connectionId} opened. Active: ${activeConnections.size}`);
|
||||
|
||||
res.writeHead(200, {
|
||||
'Content-Type': 'text/event-stream',
|
||||
'Cache-Control': 'no-cache',
|
||||
'Connection': 'keep-alive'
|
||||
});
|
||||
|
||||
// Send initial connection message
|
||||
res.write(`data: ${JSON.stringify({ message: 'Connected', connectionId, activeConnections: activeConnections.size })}\n\n`);
|
||||
|
||||
// Send data every 500ms
|
||||
const interval = setInterval(() => {
|
||||
const data = {
|
||||
connectionId,
|
||||
timestamp: new Date().toISOString(),
|
||||
activeConnections: activeConnections.size,
|
||||
seq: Date.now()
|
||||
};
|
||||
res.write(`data: ${JSON.stringify(data)}\n\n`);
|
||||
}, 500);
|
||||
|
||||
// Handle client disconnect
|
||||
req.on('close', () => {
|
||||
clearInterval(interval);
|
||||
activeConnections.delete(connectionId);
|
||||
console.log(`[SSE] Connection ${connectionId} closed. Active: ${activeConnections.size}`);
|
||||
});
|
||||
});
|
||||
|
||||
// GET /api/sse/connections - Returns count of active connections
|
||||
router.get('/connections', (req, res) => {
|
||||
res.json({
|
||||
activeConnections: activeConnections.size,
|
||||
connectionIds: Array.from(activeConnections)
|
||||
});
|
||||
});
|
||||
|
||||
// POST /api/sse/reset - Reset connection tracking (for test cleanup)
|
||||
router.post('/reset', (req, res) => {
|
||||
activeConnections.clear();
|
||||
connectionIdCounter = 0;
|
||||
res.json({ message: 'Reset complete', activeConnections: 0 });
|
||||
});
|
||||
|
||||
module.exports = router;
|
||||
6
tests/sse/fixtures/collection/bruno.json
Normal file
6
tests/sse/fixtures/collection/bruno.json
Normal file
@@ -0,0 +1,6 @@
|
||||
{
|
||||
"version": "1",
|
||||
"name": "sse-test",
|
||||
"type": "collection",
|
||||
"ignore": ["node_modules", ".git"]
|
||||
}
|
||||
11
tests/sse/fixtures/collection/sse-stream-request.bru
Normal file
11
tests/sse/fixtures/collection/sse-stream-request.bru
Normal file
@@ -0,0 +1,11 @@
|
||||
meta {
|
||||
name: sse-stream-request
|
||||
type: http
|
||||
seq: 1
|
||||
}
|
||||
|
||||
get {
|
||||
url: http://localhost:8081/api/sse/stream
|
||||
body: none
|
||||
auth: none
|
||||
}
|
||||
12
tests/sse/init-user-data/preferences.json
Normal file
12
tests/sse/init-user-data/preferences.json
Normal file
@@ -0,0 +1,12 @@
|
||||
{
|
||||
"maximized": false,
|
||||
"lastOpenedCollections": [
|
||||
"{{collectionPath}}"
|
||||
],
|
||||
"preferences": {
|
||||
"onboarding": {
|
||||
"hasLaunchedBefore": true,
|
||||
"hasSeenWelcomeModal": true
|
||||
}
|
||||
}
|
||||
}
|
||||
36
tests/sse/sse-cancellation.spec.ts
Normal file
36
tests/sse/sse-cancellation.spec.ts
Normal file
@@ -0,0 +1,36 @@
|
||||
import { test, expect } from '../../playwright';
|
||||
|
||||
test.describe('SSE Connection Cancellation', () => {
|
||||
test.beforeEach(async ({ pageWithUserData: page }) => {
|
||||
// Reset SSE connections before each test
|
||||
await page.request.post('http://localhost:8081/api/sse/reset');
|
||||
});
|
||||
|
||||
test('should close previous SSE connection when resending request', async ({ pageWithUserData: page }) => {
|
||||
await page.locator('#sidebar-collection-name').click();
|
||||
|
||||
await page.getByTestId('sidebar-collection-item-row').filter({ hasText: 'sse-stream-request' }).click();
|
||||
|
||||
await page.getByTestId('send-arrow-icon').waitFor({ state: 'visible' });
|
||||
|
||||
await page.getByTestId('send-arrow-icon').click();
|
||||
|
||||
// Poll until the SSE connection is established
|
||||
await expect.poll(async () => {
|
||||
const response = await page.request.get('http://localhost:8081/api/sse/connections');
|
||||
const data = await response.json();
|
||||
return data.connectionIds;
|
||||
}, { timeout: 5000 }).toStrictEqual([1]);
|
||||
|
||||
// Resend the request (this should cancel the old connection and start a new one)
|
||||
const resendShortcut = process.platform === 'darwin' ? 'Meta+Enter' : 'Control+Enter';
|
||||
await page.keyboard.press(resendShortcut);
|
||||
|
||||
// Poll until the old connection is closed and a new one is established
|
||||
await expect.poll(async () => {
|
||||
const response = await page.request.get('http://localhost:8081/api/sse/connections');
|
||||
const data = await response.json();
|
||||
return data.connectionIds;
|
||||
}, { timeout: 5000 }).toStrictEqual([2]);
|
||||
});
|
||||
});
|
||||
Reference in New Issue
Block a user