From a06a339d0c0cf558b94824d2b420ab18aa1aea34 Mon Sep 17 00:00:00 2001 From: Anoop M D Date: Mon, 27 Jan 2025 23:44:17 +0530 Subject: [PATCH] feat: async parser workers (#3834) --- packages/bruno-electron/src/app/watcher.js | 121 +++++++++--------- packages/bruno-electron/src/bru/index.js | 108 +++++----------- .../bruno-electron/src/bru/workers/index.js | 42 ++++-- .../src/bru/workers/scripts/json-to-bru.js | 13 -- .../workers/scripts/json-to-collection-bru.js | 13 -- packages/bruno-electron/src/ipc/collection.js | 4 +- 6 files changed, 120 insertions(+), 181 deletions(-) delete mode 100644 packages/bruno-electron/src/bru/workers/scripts/json-to-bru.js delete mode 100644 packages/bruno-electron/src/bru/workers/scripts/json-to-collection-bru.js diff --git a/packages/bruno-electron/src/app/watcher.js b/packages/bruno-electron/src/app/watcher.js index ede91f963..fd3fe2491 100644 --- a/packages/bruno-electron/src/app/watcher.js +++ b/packages/bruno-electron/src/app/watcher.js @@ -3,7 +3,7 @@ const fs = require('fs'); const path = require('path'); const chokidar = require('chokidar'); const { hasBruExtension, isWSLPath, normalizeAndResolvePath, normalizeWslPath, sizeInMB } = require('../utils/filesystem'); -const { bruToEnvJson, bruToJson, collectionBruToJson, bruToJsonSync } = require('../bru'); +const { bruToEnvJson, bruToJson, collectionBruToJson, bruToJsonViaWorker, collectionBruToJsonViaWorker } = require('../bru'); const { dotenvToJson } = require('@usebruno/lang'); const { uuid } = require('../utils/common'); @@ -160,7 +160,7 @@ const unlinkEnvironmentFile = async (win, pathname, collectionUid) => { } }; -const add = async ({ win, pathname, collectionUid, watchPath: collectionPath, shouldLoadAsync }) => { +const add = async (win, pathname, collectionUid, collectionPath, useWorkerThread) => { console.log(`watcher add: ${pathname}`); if (isBrunoConfigFile(pathname, collectionPath)) { @@ -254,63 +254,60 @@ const add = async ({ win, pathname, collectionUid, watchPath: collectionPath, sh } }; - let fileStats; - try { - let bruContent = fs.readFileSync(pathname, 'utf8'); - if (shouldLoadAsync) { - try { - const fileStats = fs.statSync(pathname); - const metaJson = await bruToJson(getBruFileMeta(bruContent), true); - file.data = metaJson; - file.partial = true; - file.loading = false; - file.size = sizeInMB(fileStats?.size); - hydrateRequestWithUuid(file.data, pathname); - win.webContents.send('main:collection-tree-updated', 'addFile', file); - if (fileStats.size < MAX_FILE_SIZE) { - file.data = metaJson; - file.partial = false; - file.loading = true; - hydrateRequestWithUuid(file.data, pathname); - win.webContents.send('main:collection-tree-updated', 'addFile', file); - file.data = await bruToJson(bruContent); - file.partial = false; - file.loading = false; - hydrateRequestWithUuid(file.data, pathname); - win.webContents.send('main:collection-tree-updated', 'addFile', file); - } - } - catch(error) { - const file = { - meta: { - collectionUid, - pathname, - name: path.basename(pathname) - } - }; - file.data = {}; - file.partial = true; - file.loading = false; - file.size = sizeInMB(fileStats?.size); - hydrateRequestWithUuid(file.data, pathname); - win.webContents.send('main:collection-tree-updated', 'addFile', file); - } - } - else { - file.data = bruToJsonSync(bruContent); + const fileStats = fs.statSync(pathname); + let bruContent = fs.readFileSync(pathname, 'utf8'); + // If worker thread is not used, we can directly parse the file + if (!useWorkerThread) { + try { + file.data = bruToJson(bruContent); file.partial = false; file.loading = false; file.size = sizeInMB(fileStats?.size); hydrateRequestWithUuid(file.data, pathname); win.webContents.send('main:collection-tree-updated', 'addFile', file); + } catch (error) { + console.error(error); } - } catch (err) { - console.error(err); + return; + } + + try { + // we need to send a partial file info to the UI + // so that the UI can display the file in the collection tree + file.data = { + name: path.basename(pathname), + type: 'http-request' + }; + file.partial = true; + file.loading = false; + file.size = sizeInMB(fileStats?.size); + hydrateRequestWithUuid(file.data, pathname); + win.webContents.send('main:collection-tree-updated', 'addFile', file); + + // If the file is smaller than the max file size, we can parse the file + // and send the full file info to the UI + if (fileStats.size < MAX_FILE_SIZE) { + file.data = await bruToJsonViaWorker(bruContent); + file.partial = false; + file.loading = false; + hydrateRequestWithUuid(file.data, pathname); + win.webContents.send('main:collection-tree-updated', 'addFile', file); + } + } catch(error) { + file.data = { + name: path.basename(pathname), + type: 'http-request' + }; + file.partial = true; + file.loading = false; + file.size = sizeInMB(fileStats?.size); + hydrateRequestWithUuid(file.data, pathname); + win.webContents.send('main:collection-tree-updated', 'addFile', file); } } }; -const addDirectory = ({ win, pathname, collectionUid, watchPath: collectionPath }) => { +const addDirectory = (win, pathname, collectionUid, collectionPath) => { const envDirectory = path.join(collectionPath, 'environments'); if (pathname === envDirectory) { @@ -327,7 +324,7 @@ const addDirectory = ({ win, pathname, collectionUid, watchPath: collectionPath win.webContents.send('main:collection-tree-updated', 'addDir', directory); }; -const change = async ({ win, pathname, collectionUid, watchPath: collectionPath }) => { +const change = async (win, pathname, collectionUid, collectionPath) => { if (isBrunoConfigFile(pathname, collectionPath)) { try { const content = fs.readFileSync(pathname, 'utf8'); @@ -411,7 +408,7 @@ const change = async ({ win, pathname, collectionUid, watchPath: collectionPath } }; -const unlink = ({ win, pathname, collectionUid, watchPath: collectionPath }) => { +const unlink = (win, pathname, collectionUid, collectionPath) => { console.log(`watcher unlink: ${pathname}`); if (isBruEnvironmentConfig(pathname, collectionPath)) { @@ -430,7 +427,7 @@ const unlink = ({ win, pathname, collectionUid, watchPath: collectionPath }) => } }; -const unlinkDir = ({ win, pathname, collectionUid, watchPath: collectionPath }) => { +const unlinkDir = (win, pathname, collectionUid, collectionPath) => { const envDirectory = path.join(collectionPath, 'environments'); if (pathname === envDirectory) { @@ -447,10 +444,10 @@ const unlinkDir = ({ win, pathname, collectionUid, watchPath: collectionPath }) win.webContents.send('main:collection-tree-updated', 'unlinkDir', directory); }; -const onWatcherSetupComplete = ({ win, watchPath: collectionPath }) => { +const onWatcherSetupComplete = (win, watchPath) => { const UiStateSnapshotStore = new UiStateSnapshot(); const collectionsSnapshotState = UiStateSnapshotStore.getCollections(); - const collectionSnapshotState = collectionsSnapshotState?.find(c => c?.pathname == collectionPath); + const collectionSnapshotState = collectionsSnapshotState?.find(c => c?.pathname == watchPath); win.webContents.send('main:hydrate-app-with-ui-state-snapshot', collectionSnapshotState); }; @@ -459,7 +456,7 @@ class Watcher { this.watchers = {}; } - addWatcher(win, watchPath, collectionUid, brunoConfig, forcePolling = false, shouldLoadAsync) { + addWatcher(win, watchPath, collectionUid, brunoConfig, forcePolling = false, useWorkerThread) { if (this.watchers[watchPath]) { this.watchers[watchPath].close(); } @@ -489,12 +486,12 @@ class Watcher { let startedNewWatcher = false; watcher - .on('ready', () => onWatcherSetupComplete({ win, watchPath })) - .on('add', (pathname) => add({win, pathname, collectionUid, watchPath, shouldLoadAsync })) - .on('addDir', (pathname) => addDirectory({ win, pathname, collectionUid, watchPath, shouldLoadAsync })) - .on('change', (pathname) => change({ win, pathname, collectionUid, watchPath, shouldLoadAsync })) - .on('unlink', (pathname) => unlink({ win, pathname, collectionUid, watchPath, shouldLoadAsync })) - .on('unlinkDir', (pathname) => unlinkDir({ win, pathname, collectionUid, watchPath, shouldLoadAsync })) + .on('ready', () => onWatcherSetupComplete(win, watchPath)) + .on('add', (pathname) => add(win, pathname, collectionUid, watchPath, useWorkerThread)) + .on('addDir', (pathname) => addDirectory(win, pathname, collectionUid, watchPath)) + .on('change', (pathname) => change(win, pathname, collectionUid, watchPath)) + .on('unlink', (pathname) => unlink(win, pathname, collectionUid, watchPath)) + .on('unlinkDir', (pathname) => unlinkDir(win, pathname, collectionUid, watchPath)) .on('error', (error) => { // `EMFILE` is an error code thrown when to many files are watched at the same time see: https://github.com/usebruno/bruno/issues/627 // `ENOSPC` stands for "Error No space" but is also thrown if the file watcher limit is reached. @@ -511,7 +508,7 @@ class Watcher { 'Update you system config to allow more concurrently watched files with:', '"echo fs.inotify.max_user_watches=524288 | sudo tee -a /etc/sysctl.conf && sudo sysctl -p"' ); - this.addWatcher(win, watchPath, collectionUid, brunoConfig, true, shouldLoadAsync); + this.addWatcher(win, watchPath, collectionUid, brunoConfig, true, useWorkerThread); } else { console.error(`An error occurred in the watcher for: ${watchPath}`, error); } diff --git a/packages/bruno-electron/src/bru/index.js b/packages/bruno-electron/src/bru/index.js index 62217681c..31386f869 100644 --- a/packages/bruno-electron/src/bru/index.js +++ b/packages/bruno-electron/src/bru/index.js @@ -1,21 +1,18 @@ const _ = require('lodash'); -const { bruToJsonV2, bruToEnvJsonV2, envJsonToBruV2 } = require('@usebruno/lang'); -const BruWorker = require('./workers'); +const { + bruToJsonV2, + bruToEnvJsonV2, + envJsonToBruV2, + collectionBruToJson: _collectionBruToJson, + jsonToCollectionBru: _jsonToCollectionBru +} = require('@usebruno/lang'); +const BruParserWorker = require('./workers'); -// collections can have bru files of varying sizes. we use two worker threads: -// - one thread handles smaller files (<0.1MB), so they get processed quickly and show up in the gui faster. -// - the other thread takes care of larger files (>=0.1MB). Splitting the processing like this helps with parsing performance. -const bruWorker = new BruWorker({ - lanes: [{ - maxSize: 0.1 - },{ - maxSize: 100 - }] -}); +const bruParserWorker = new BruParserWorker(); -const collectionBruToJson = async (bru) => { +const collectionBruToJson = async (data, parsed = false) => { try { - const json = await bruWorker?.collectionBruToJson(bru); + const json = parsed ? data : _collectionBruToJson(data); const transformedJson = { request: { @@ -43,6 +40,16 @@ const collectionBruToJson = async (bru) => { } }; +const collectionBruToJsonViaWorker = async (bru) => { + try { + const json = await bruParserWorker?.collectionBruToJson(bru); + + return collectionBruToJson(json); + } catch (error) { + return Promise.reject(error); + } +}; + const jsonToCollectionBru = async (json, isFolder) => { try { const collectionBruJson = { @@ -72,8 +79,7 @@ const jsonToCollectionBru = async (json, isFolder) => { collectionBruJson.auth = _.get(json, 'request.auth', {}); } - const bru = await bruWorker?.jsonToCollectionBru(collectionBruJson); - return bru; + return _jsonToCollectionBru(collectionBruJson); } catch (error) { return Promise.reject(error); } @@ -111,18 +117,12 @@ const envJsonToBru = async (json) => { * We map the json response from the bru lang and transform it into the DSL * format that the app uses * - * @param {string} bru The BRU file content. + * @param {string} data The BRU file content. * @returns {object} The JSON representation of the BRU file. */ -const bruToJson = async (data, parsed = false) => { +const bruToJson = (data, parsed = false) => { try { - let json; - if (parsed) { - json = data; - } - else { - json = await bruWorker?.bruToJson(data); - } + const json = parsed ? data : bruToJsonV2(data); let requestType = _.get(json, 'meta.type'); if (requestType === 'http') { @@ -162,58 +162,11 @@ const bruToJson = async (data, parsed = false) => { } }; -/** - * The transformer function for converting a BRU file to JSON. - * - * We map the json response from the bru lang and transform it into the DSL - * format that the app uses - * - * @param {string} bru The BRU file content. - * @returns {object} The JSON representation of the BRU file. - */ -const bruToJsonSync = (data, parsed = false) => { +const bruToJsonViaWorker = async (data) => { try { - let json; - if (parsed) { - json = data; - } - else { - json = bruToJsonV2(data); - } + const json = await bruParserWorker?.bruToJson(data); - let requestType = _.get(json, 'meta.type'); - if (requestType === 'http') { - requestType = 'http-request'; - } else if (requestType === 'graphql') { - requestType = 'graphql-request'; - } else { - requestType = 'http-request'; - } - - const sequence = _.get(json, 'meta.seq'); - const transformedJson = { - type: requestType, - name: _.get(json, 'meta.name'), - seq: !isNaN(sequence) ? Number(sequence) : 1, - request: { - method: _.upperCase(_.get(json, 'http.method')), - url: _.get(json, 'http.url'), - params: _.get(json, 'params', []), - headers: _.get(json, 'headers', []), - auth: _.get(json, 'auth', {}), - body: _.get(json, 'body', {}), - script: _.get(json, 'script', {}), - vars: _.get(json, 'vars', {}), - assertions: _.get(json, 'assertions', []), - tests: _.get(json, 'tests', ''), - docs: _.get(json, 'docs', '') - } - }; - - transformedJson.request.auth.mode = _.get(json, 'http.auth', 'none'); - transformedJson.request.body.mode = _.get(json, 'http.body', 'none'); - - return transformedJson; + return bruToJson(json, true); } catch (e) { return Promise.reject(e); } @@ -265,16 +218,17 @@ const jsonToBru = async (json) => { docs: _.get(json, 'request.docs', '') }; - const bru = await bruWorker?.jsonToBru(bruJson) + const bru = await bruParserWorker?.jsonToBru(bruJson) return bru; }; module.exports = { bruToJson, - bruToJsonSync, + bruToJsonViaWorker, jsonToBru, bruToEnvJson, envJsonToBru, collectionBruToJson, + collectionBruToJsonViaWorker, jsonToCollectionBru }; diff --git a/packages/bruno-electron/src/bru/workers/index.js b/packages/bruno-electron/src/bru/workers/index.js index e4ed05762..8f992a058 100644 --- a/packages/bruno-electron/src/bru/workers/index.js +++ b/packages/bruno-electron/src/bru/workers/index.js @@ -5,39 +5,53 @@ const getSize = (data) => { return typeof data === 'string' ? Buffer.byteLength(data, 'utf8') : Buffer.byteLength(JSON.stringify(data), 'utf8'); } -class BruWorker { - constructor({ lanes = [] }) { - this.workerQueues = lanes?.map(lane => ({ +/** + * Lanes are used to determine which worker queue to use based on the size of the data. + * + * The first lane is for smaller files (<0.1MB), the second lane is for larger files (>=0.1MB). + * This helps with parsing performance. + */ +const LANES = [{ + maxSize: 0.1 +},{ + maxSize: 100 +}]; + +class BruParserWorker { + constructor() { + this.workerQueues = LANES?.map(lane => ({ maxSize: lane?.maxSize, workerQueue: new WorkerQueue() })); } getWorkerQueue(size) { - return this.workerQueues.find((wq) => wq?.maxSize >= size)?.workerQueue || this.workerQueues.at(-1)?.workerQueue; + // Find the first queue that can handle the given size + // or fallback to the last queue for largest files + const queueForSize = this.workerQueues.find((queue) => + queue.maxSize >= size + ); + + return queueForSize?.workerQueue ?? this.workerQueues.at(-1).workerQueue; } async enqueueTask({data, scriptFile }) { const size = getSize(data); const workerQueue = this.getWorkerQueue(size); - return workerQueue.enqueue({ data, priority: size, scriptPath: path.join(__dirname, `./scripts/${scriptFile}.js`) }); + return workerQueue.enqueue({ + data, + priority: size, + scriptPath: path.join(__dirname, `./scripts/${scriptFile}.js`) + }); } async bruToJson(data) { return this.enqueueTask({ data, scriptFile: `bru-to-json` }); } - async jsonToBru(data) { - return this.enqueueTask({ data, scriptFile: `json-to-bru` }); - } - async collectionBruToJson(data) { return this.enqueueTask({ data, scriptFile: `collection-bru-to-json` }); } - - async jsonToCollectionBru(data) { - return this.enqueueTask({ data, scriptFile: `json-to-collection-bru` }); - } } -module.exports = BruWorker; \ No newline at end of file +module.exports = BruParserWorker; \ No newline at end of file diff --git a/packages/bruno-electron/src/bru/workers/scripts/json-to-bru.js b/packages/bruno-electron/src/bru/workers/scripts/json-to-bru.js deleted file mode 100644 index 796d8e246..000000000 --- a/packages/bruno-electron/src/bru/workers/scripts/json-to-bru.js +++ /dev/null @@ -1,13 +0,0 @@ -const { workerData, parentPort } = require('worker_threads'); -const { - jsonToBruV2, -} = require('@usebruno/lang'); - -try { - const json = workerData; - const bru = jsonToBruV2(json); - parentPort.postMessage(bru); -} -catch(error) { - console.error(error); -} \ No newline at end of file diff --git a/packages/bruno-electron/src/bru/workers/scripts/json-to-collection-bru.js b/packages/bruno-electron/src/bru/workers/scripts/json-to-collection-bru.js deleted file mode 100644 index fbecfd47e..000000000 --- a/packages/bruno-electron/src/bru/workers/scripts/json-to-collection-bru.js +++ /dev/null @@ -1,13 +0,0 @@ -const { workerData, parentPort } = require('worker_threads'); -const { - jsonToCollectionBru, -} = require('@usebruno/lang'); - -try { - const json = workerData; - const bru = jsonToCollectionBru(json); - parentPort.postMessage(bru); -} -catch(error) { - console.error(error); -} \ No newline at end of file diff --git a/packages/bruno-electron/src/ipc/collection.js b/packages/bruno-electron/src/ipc/collection.js index ad1208f7b..b262692d4 100644 --- a/packages/bruno-electron/src/ipc/collection.js +++ b/packages/bruno-electron/src/ipc/collection.js @@ -4,7 +4,7 @@ const fsExtra = require('fs-extra'); const os = require('os'); const path = require('path'); const { ipcMain, shell, dialog, app } = require('electron'); -const { envJsonToBru, bruToJson, jsonToBru, jsonToCollectionBru, bruToJsonSync } = require('../bru'); +const { envJsonToBru, bruToJson, jsonToBru, jsonToCollectionBru, bruToJsonViaWorker } = require('../bru'); const { isValidPathname, @@ -873,7 +873,7 @@ const registerRendererEventHandlers = (mainWindow, watcher, lastOpenedCollection } }; let bruContent = fs.readFileSync(pathname, 'utf8'); - file.data = bruToJsonSync(bruContent); + file.data = bruToJson(bruContent); file.partial = false; file.loading = true; file.size = sizeInMB(fileStats?.size);