feat: async parser workers (#3834)

This commit is contained in:
Anoop M D
2025-01-27 23:44:17 +05:30
parent e34ac3de7c
commit a06a339d0c
6 changed files with 120 additions and 181 deletions

View File

@@ -3,7 +3,7 @@ const fs = require('fs');
const path = require('path');
const chokidar = require('chokidar');
const { hasBruExtension, isWSLPath, normalizeAndResolvePath, normalizeWslPath, sizeInMB } = require('../utils/filesystem');
const { bruToEnvJson, bruToJson, collectionBruToJson, bruToJsonSync } = require('../bru');
const { bruToEnvJson, bruToJson, collectionBruToJson, bruToJsonViaWorker, collectionBruToJsonViaWorker } = require('../bru');
const { dotenvToJson } = require('@usebruno/lang');
const { uuid } = require('../utils/common');
@@ -160,7 +160,7 @@ const unlinkEnvironmentFile = async (win, pathname, collectionUid) => {
}
};
const add = async ({ win, pathname, collectionUid, watchPath: collectionPath, shouldLoadAsync }) => {
const add = async (win, pathname, collectionUid, collectionPath, useWorkerThread) => {
console.log(`watcher add: ${pathname}`);
if (isBrunoConfigFile(pathname, collectionPath)) {
@@ -254,63 +254,60 @@ const add = async ({ win, pathname, collectionUid, watchPath: collectionPath, sh
}
};
let fileStats;
try {
let bruContent = fs.readFileSync(pathname, 'utf8');
if (shouldLoadAsync) {
try {
const fileStats = fs.statSync(pathname);
const metaJson = await bruToJson(getBruFileMeta(bruContent), true);
file.data = metaJson;
file.partial = true;
file.loading = false;
file.size = sizeInMB(fileStats?.size);
hydrateRequestWithUuid(file.data, pathname);
win.webContents.send('main:collection-tree-updated', 'addFile', file);
if (fileStats.size < MAX_FILE_SIZE) {
file.data = metaJson;
file.partial = false;
file.loading = true;
hydrateRequestWithUuid(file.data, pathname);
win.webContents.send('main:collection-tree-updated', 'addFile', file);
file.data = await bruToJson(bruContent);
file.partial = false;
file.loading = false;
hydrateRequestWithUuid(file.data, pathname);
win.webContents.send('main:collection-tree-updated', 'addFile', file);
}
}
catch(error) {
const file = {
meta: {
collectionUid,
pathname,
name: path.basename(pathname)
}
};
file.data = {};
file.partial = true;
file.loading = false;
file.size = sizeInMB(fileStats?.size);
hydrateRequestWithUuid(file.data, pathname);
win.webContents.send('main:collection-tree-updated', 'addFile', file);
}
}
else {
file.data = bruToJsonSync(bruContent);
const fileStats = fs.statSync(pathname);
let bruContent = fs.readFileSync(pathname, 'utf8');
// If worker thread is not used, we can directly parse the file
if (!useWorkerThread) {
try {
file.data = bruToJson(bruContent);
file.partial = false;
file.loading = false;
file.size = sizeInMB(fileStats?.size);
hydrateRequestWithUuid(file.data, pathname);
win.webContents.send('main:collection-tree-updated', 'addFile', file);
} catch (error) {
console.error(error);
}
} catch (err) {
console.error(err);
return;
}
try {
// we need to send a partial file info to the UI
// so that the UI can display the file in the collection tree
file.data = {
name: path.basename(pathname),
type: 'http-request'
};
file.partial = true;
file.loading = false;
file.size = sizeInMB(fileStats?.size);
hydrateRequestWithUuid(file.data, pathname);
win.webContents.send('main:collection-tree-updated', 'addFile', file);
// If the file is smaller than the max file size, we can parse the file
// and send the full file info to the UI
if (fileStats.size < MAX_FILE_SIZE) {
file.data = await bruToJsonViaWorker(bruContent);
file.partial = false;
file.loading = false;
hydrateRequestWithUuid(file.data, pathname);
win.webContents.send('main:collection-tree-updated', 'addFile', file);
}
} catch(error) {
file.data = {
name: path.basename(pathname),
type: 'http-request'
};
file.partial = true;
file.loading = false;
file.size = sizeInMB(fileStats?.size);
hydrateRequestWithUuid(file.data, pathname);
win.webContents.send('main:collection-tree-updated', 'addFile', file);
}
}
};
const addDirectory = ({ win, pathname, collectionUid, watchPath: collectionPath }) => {
const addDirectory = (win, pathname, collectionUid, collectionPath) => {
const envDirectory = path.join(collectionPath, 'environments');
if (pathname === envDirectory) {
@@ -327,7 +324,7 @@ const addDirectory = ({ win, pathname, collectionUid, watchPath: collectionPath
win.webContents.send('main:collection-tree-updated', 'addDir', directory);
};
const change = async ({ win, pathname, collectionUid, watchPath: collectionPath }) => {
const change = async (win, pathname, collectionUid, collectionPath) => {
if (isBrunoConfigFile(pathname, collectionPath)) {
try {
const content = fs.readFileSync(pathname, 'utf8');
@@ -411,7 +408,7 @@ const change = async ({ win, pathname, collectionUid, watchPath: collectionPath
}
};
const unlink = ({ win, pathname, collectionUid, watchPath: collectionPath }) => {
const unlink = (win, pathname, collectionUid, collectionPath) => {
console.log(`watcher unlink: ${pathname}`);
if (isBruEnvironmentConfig(pathname, collectionPath)) {
@@ -430,7 +427,7 @@ const unlink = ({ win, pathname, collectionUid, watchPath: collectionPath }) =>
}
};
const unlinkDir = ({ win, pathname, collectionUid, watchPath: collectionPath }) => {
const unlinkDir = (win, pathname, collectionUid, collectionPath) => {
const envDirectory = path.join(collectionPath, 'environments');
if (pathname === envDirectory) {
@@ -447,10 +444,10 @@ const unlinkDir = ({ win, pathname, collectionUid, watchPath: collectionPath })
win.webContents.send('main:collection-tree-updated', 'unlinkDir', directory);
};
const onWatcherSetupComplete = ({ win, watchPath: collectionPath }) => {
const onWatcherSetupComplete = (win, watchPath) => {
const UiStateSnapshotStore = new UiStateSnapshot();
const collectionsSnapshotState = UiStateSnapshotStore.getCollections();
const collectionSnapshotState = collectionsSnapshotState?.find(c => c?.pathname == collectionPath);
const collectionSnapshotState = collectionsSnapshotState?.find(c => c?.pathname == watchPath);
win.webContents.send('main:hydrate-app-with-ui-state-snapshot', collectionSnapshotState);
};
@@ -459,7 +456,7 @@ class Watcher {
this.watchers = {};
}
addWatcher(win, watchPath, collectionUid, brunoConfig, forcePolling = false, shouldLoadAsync) {
addWatcher(win, watchPath, collectionUid, brunoConfig, forcePolling = false, useWorkerThread) {
if (this.watchers[watchPath]) {
this.watchers[watchPath].close();
}
@@ -489,12 +486,12 @@ class Watcher {
let startedNewWatcher = false;
watcher
.on('ready', () => onWatcherSetupComplete({ win, watchPath }))
.on('add', (pathname) => add({win, pathname, collectionUid, watchPath, shouldLoadAsync }))
.on('addDir', (pathname) => addDirectory({ win, pathname, collectionUid, watchPath, shouldLoadAsync }))
.on('change', (pathname) => change({ win, pathname, collectionUid, watchPath, shouldLoadAsync }))
.on('unlink', (pathname) => unlink({ win, pathname, collectionUid, watchPath, shouldLoadAsync }))
.on('unlinkDir', (pathname) => unlinkDir({ win, pathname, collectionUid, watchPath, shouldLoadAsync }))
.on('ready', () => onWatcherSetupComplete(win, watchPath))
.on('add', (pathname) => add(win, pathname, collectionUid, watchPath, useWorkerThread))
.on('addDir', (pathname) => addDirectory(win, pathname, collectionUid, watchPath))
.on('change', (pathname) => change(win, pathname, collectionUid, watchPath))
.on('unlink', (pathname) => unlink(win, pathname, collectionUid, watchPath))
.on('unlinkDir', (pathname) => unlinkDir(win, pathname, collectionUid, watchPath))
.on('error', (error) => {
// `EMFILE` is an error code thrown when to many files are watched at the same time see: https://github.com/usebruno/bruno/issues/627
// `ENOSPC` stands for "Error No space" but is also thrown if the file watcher limit is reached.
@@ -511,7 +508,7 @@ class Watcher {
'Update you system config to allow more concurrently watched files with:',
'"echo fs.inotify.max_user_watches=524288 | sudo tee -a /etc/sysctl.conf && sudo sysctl -p"'
);
this.addWatcher(win, watchPath, collectionUid, brunoConfig, true, shouldLoadAsync);
this.addWatcher(win, watchPath, collectionUid, brunoConfig, true, useWorkerThread);
} else {
console.error(`An error occurred in the watcher for: ${watchPath}`, error);
}

View File

@@ -1,21 +1,18 @@
const _ = require('lodash');
const { bruToJsonV2, bruToEnvJsonV2, envJsonToBruV2 } = require('@usebruno/lang');
const BruWorker = require('./workers');
const {
bruToJsonV2,
bruToEnvJsonV2,
envJsonToBruV2,
collectionBruToJson: _collectionBruToJson,
jsonToCollectionBru: _jsonToCollectionBru
} = require('@usebruno/lang');
const BruParserWorker = require('./workers');
// collections can have bru files of varying sizes. we use two worker threads:
// - one thread handles smaller files (<0.1MB), so they get processed quickly and show up in the gui faster.
// - the other thread takes care of larger files (>=0.1MB). Splitting the processing like this helps with parsing performance.
const bruWorker = new BruWorker({
lanes: [{
maxSize: 0.1
},{
maxSize: 100
}]
});
const bruParserWorker = new BruParserWorker();
const collectionBruToJson = async (bru) => {
const collectionBruToJson = async (data, parsed = false) => {
try {
const json = await bruWorker?.collectionBruToJson(bru);
const json = parsed ? data : _collectionBruToJson(data);
const transformedJson = {
request: {
@@ -43,6 +40,16 @@ const collectionBruToJson = async (bru) => {
}
};
const collectionBruToJsonViaWorker = async (bru) => {
try {
const json = await bruParserWorker?.collectionBruToJson(bru);
return collectionBruToJson(json);
} catch (error) {
return Promise.reject(error);
}
};
const jsonToCollectionBru = async (json, isFolder) => {
try {
const collectionBruJson = {
@@ -72,8 +79,7 @@ const jsonToCollectionBru = async (json, isFolder) => {
collectionBruJson.auth = _.get(json, 'request.auth', {});
}
const bru = await bruWorker?.jsonToCollectionBru(collectionBruJson);
return bru;
return _jsonToCollectionBru(collectionBruJson);
} catch (error) {
return Promise.reject(error);
}
@@ -111,18 +117,12 @@ const envJsonToBru = async (json) => {
* We map the json response from the bru lang and transform it into the DSL
* format that the app uses
*
* @param {string} bru The BRU file content.
* @param {string} data The BRU file content.
* @returns {object} The JSON representation of the BRU file.
*/
const bruToJson = async (data, parsed = false) => {
const bruToJson = (data, parsed = false) => {
try {
let json;
if (parsed) {
json = data;
}
else {
json = await bruWorker?.bruToJson(data);
}
const json = parsed ? data : bruToJsonV2(data);
let requestType = _.get(json, 'meta.type');
if (requestType === 'http') {
@@ -162,58 +162,11 @@ const bruToJson = async (data, parsed = false) => {
}
};
/**
* The transformer function for converting a BRU file to JSON.
*
* We map the json response from the bru lang and transform it into the DSL
* format that the app uses
*
* @param {string} bru The BRU file content.
* @returns {object} The JSON representation of the BRU file.
*/
const bruToJsonSync = (data, parsed = false) => {
const bruToJsonViaWorker = async (data) => {
try {
let json;
if (parsed) {
json = data;
}
else {
json = bruToJsonV2(data);
}
const json = await bruParserWorker?.bruToJson(data);
let requestType = _.get(json, 'meta.type');
if (requestType === 'http') {
requestType = 'http-request';
} else if (requestType === 'graphql') {
requestType = 'graphql-request';
} else {
requestType = 'http-request';
}
const sequence = _.get(json, 'meta.seq');
const transformedJson = {
type: requestType,
name: _.get(json, 'meta.name'),
seq: !isNaN(sequence) ? Number(sequence) : 1,
request: {
method: _.upperCase(_.get(json, 'http.method')),
url: _.get(json, 'http.url'),
params: _.get(json, 'params', []),
headers: _.get(json, 'headers', []),
auth: _.get(json, 'auth', {}),
body: _.get(json, 'body', {}),
script: _.get(json, 'script', {}),
vars: _.get(json, 'vars', {}),
assertions: _.get(json, 'assertions', []),
tests: _.get(json, 'tests', ''),
docs: _.get(json, 'docs', '')
}
};
transformedJson.request.auth.mode = _.get(json, 'http.auth', 'none');
transformedJson.request.body.mode = _.get(json, 'http.body', 'none');
return transformedJson;
return bruToJson(json, true);
} catch (e) {
return Promise.reject(e);
}
@@ -265,16 +218,17 @@ const jsonToBru = async (json) => {
docs: _.get(json, 'request.docs', '')
};
const bru = await bruWorker?.jsonToBru(bruJson)
const bru = await bruParserWorker?.jsonToBru(bruJson)
return bru;
};
module.exports = {
bruToJson,
bruToJsonSync,
bruToJsonViaWorker,
jsonToBru,
bruToEnvJson,
envJsonToBru,
collectionBruToJson,
collectionBruToJsonViaWorker,
jsonToCollectionBru
};

View File

@@ -5,39 +5,53 @@ const getSize = (data) => {
return typeof data === 'string' ? Buffer.byteLength(data, 'utf8') : Buffer.byteLength(JSON.stringify(data), 'utf8');
}
class BruWorker {
constructor({ lanes = [] }) {
this.workerQueues = lanes?.map(lane => ({
/**
* Lanes are used to determine which worker queue to use based on the size of the data.
*
* The first lane is for smaller files (<0.1MB), the second lane is for larger files (>=0.1MB).
* This helps with parsing performance.
*/
const LANES = [{
maxSize: 0.1
},{
maxSize: 100
}];
class BruParserWorker {
constructor() {
this.workerQueues = LANES?.map(lane => ({
maxSize: lane?.maxSize,
workerQueue: new WorkerQueue()
}));
}
getWorkerQueue(size) {
return this.workerQueues.find((wq) => wq?.maxSize >= size)?.workerQueue || this.workerQueues.at(-1)?.workerQueue;
// Find the first queue that can handle the given size
// or fallback to the last queue for largest files
const queueForSize = this.workerQueues.find((queue) =>
queue.maxSize >= size
);
return queueForSize?.workerQueue ?? this.workerQueues.at(-1).workerQueue;
}
async enqueueTask({data, scriptFile }) {
const size = getSize(data);
const workerQueue = this.getWorkerQueue(size);
return workerQueue.enqueue({ data, priority: size, scriptPath: path.join(__dirname, `./scripts/${scriptFile}.js`) });
return workerQueue.enqueue({
data,
priority: size,
scriptPath: path.join(__dirname, `./scripts/${scriptFile}.js`)
});
}
async bruToJson(data) {
return this.enqueueTask({ data, scriptFile: `bru-to-json` });
}
async jsonToBru(data) {
return this.enqueueTask({ data, scriptFile: `json-to-bru` });
}
async collectionBruToJson(data) {
return this.enqueueTask({ data, scriptFile: `collection-bru-to-json` });
}
async jsonToCollectionBru(data) {
return this.enqueueTask({ data, scriptFile: `json-to-collection-bru` });
}
}
module.exports = BruWorker;
module.exports = BruParserWorker;

View File

@@ -1,13 +0,0 @@
const { workerData, parentPort } = require('worker_threads');
const {
jsonToBruV2,
} = require('@usebruno/lang');
try {
const json = workerData;
const bru = jsonToBruV2(json);
parentPort.postMessage(bru);
}
catch(error) {
console.error(error);
}

View File

@@ -1,13 +0,0 @@
const { workerData, parentPort } = require('worker_threads');
const {
jsonToCollectionBru,
} = require('@usebruno/lang');
try {
const json = workerData;
const bru = jsonToCollectionBru(json);
parentPort.postMessage(bru);
}
catch(error) {
console.error(error);
}

View File

@@ -4,7 +4,7 @@ const fsExtra = require('fs-extra');
const os = require('os');
const path = require('path');
const { ipcMain, shell, dialog, app } = require('electron');
const { envJsonToBru, bruToJson, jsonToBru, jsonToCollectionBru, bruToJsonSync } = require('../bru');
const { envJsonToBru, bruToJson, jsonToBru, jsonToCollectionBru, bruToJsonViaWorker } = require('../bru');
const {
isValidPathname,
@@ -873,7 +873,7 @@ const registerRendererEventHandlers = (mainWindow, watcher, lastOpenedCollection
}
};
let bruContent = fs.readFileSync(pathname, 'utf8');
file.data = bruToJsonSync(bruContent);
file.data = bruToJson(bruContent);
file.partial = false;
file.loading = true;
file.size = sizeInMB(fileStats?.size);