time-to-botec

Benchmark sampling in different programming languages
Log | Files | Refs | README

exec.js (7787B)


      1 /**
      2 * @license Apache-2.0
      3 *
      4 * Copyright (c) 2018 The Stdlib Authors.
      5 *
      6 * Licensed under the Apache License, Version 2.0 (the "License");
      7 * you may not use this file except in compliance with the License.
      8 * You may obtain a copy of the License at
      9 *
     10 *    http://www.apache.org/licenses/LICENSE-2.0
     11 *
     12 * Unless required by applicable law or agreed to in writing, software
     13 * distributed under the License is distributed on an "AS IS" BASIS,
     14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     15 * See the License for the specific language governing permissions and
     16 * limitations under the License.
     17 */
     18 
     19 'use strict';
     20 
     21 // MODULES //
     22 
     23 var fork = require( 'child_process' ).fork;
     24 var path = require( 'path' );
     25 var logger = require( 'debug' );
     26 var objectKeys = require( './../../../keys' );
     27 var getOpts = require( './options.js' );
     28 
     29 
     30 // VARIABLES //
     31 
     32 var debug = logger( 'parallel:exec' );
     33 var WORKER_FILEPATH = path.resolve( __dirname, './worker/index.js' );
     34 
     35 
     36 // MAIN //
     37 
     38 /**
     39 * Executes scripts in parallel.
     40 *
     41 * @private
     42 * @param {StringArray} files - script absolute file paths
     43 * @param {Options} opts - options
     44 * @param {PositiveInteger} opts.concurrency - number of scripts to execute concurrently
     45 * @param {PositiveInteger} opts.workers - number of workers
     46 * @param {string} opts.cmd - executable file/command
     47 * @param {boolean} opts.ordered - boolean indicating whether to preserve order of script output
     48 * @param {(NonNegativeInteger|null)} opts.uid - process user identity
     49 * @param {(NonNegativeInteger|null)} opts.gid - process group identity
     50 * @param {string} opts.encoding - `stdio` encoding
     51 * @param {NonNegativeInteger} opts.maxBuffer - max child process `stdio` buffer size
     52 * @param {Callback} clbk - callback to invoke after executing all scripts
     53 */
     54 function exec( files, opts, clbk ) {
     55 	var numClosed;
     56 	var workers;
     57 	var pending;
     58 	var fopts;
     59 	var args;
     60 	var proc;
     61 	var pids;
     62 	var pid;
     63 	var idx;
     64 	var err;
     65 	var i;
     66 
     67 	debug( 'Options: %s.', JSON.stringify( opts ) );
     68 	numClosed = 0;
     69 
     70 	debug( 'Creating %d workers...', opts.workers );
     71 	workers = {};
     72 	args = [];
     73 	fopts = getOpts( opts );
     74 	for ( i = 0; i < opts.workers; i++ ) {
     75 		debug( 'Creating child process...' );
     76 		proc = fork( WORKER_FILEPATH, args, fopts );
     77 
     78 		proc.on( 'error', onError( proc ) );
     79 		proc.on( 'close', onClose( proc ) );
     80 		proc.on( 'exit', onExit( proc ) );
     81 		proc.on( 'disconnect', onDisconnect( proc ) );
     82 		proc.on( 'message', onMessage( proc ) );
     83 
     84 		debug( 'Child process created. pid: %d.', proc.pid );
     85 		workers[ proc.pid ] = proc;
     86 	}
     87 	pids = objectKeys( workers );
     88 	debug( '%d workers created.', pids.length );
     89 
     90 	debug( 'Running %d scripts concurrently...', opts.concurrency );
     91 	pending = {};
     92 	idx = -1;
     93 	for ( i = 0; i < opts.concurrency; i++ ) {
     94 		pid = pids[ i%pids.length ];
     95 		next( workers[ pid ] ); // eslint-disable-line callback-return
     96 	}
     97 
     98 	/**
     99 	* Instructs a child process to run the next script.
    100 	*
    101 	* @private
    102 	* @param {Object} child - child process
    103 	* @returns {void}
    104 	*/
    105 	function next( child ) {
    106 		var numPending;
    107 		idx += 1;
    108 		if ( idx >= files.length ) {
    109 			numPending = objectKeys( pending ).length;
    110 			if ( numPending > 0 ) {
    111 				debug( '%d scripts are pending.', numPending );
    112 				return;
    113 			}
    114 			debug( 'All scripts have finished.' );
    115 			return close();
    116 		}
    117 		debug( 'Instructing child process to run script: %s. pid: %d.', files[ idx ], child.pid );
    118 		child.send( files[ idx ] );
    119 		pending[ files[ idx ] ] = true;
    120 
    121 		debug( '%d of %d scripts have been processed.', idx, files.length );
    122 	}
    123 
    124 	/**
    125 	* Returns a callback to be invoked upon receiving a message from a child process.
    126 	*
    127 	* @private
    128 	* @param {Object} child - child process
    129 	* @returns {Callback} callback
    130 	*/
    131 	function onMessage( child ) {
    132 		return listener;
    133 
    134 		/**
    135 		* Callback invoked upon receiving a message from a child process.
    136 		*
    137 		* @private
    138 		* @param {string} filepath - script filepath
    139 		*/
    140 		function listener( filepath ) {
    141 			debug( 'Child process message: %s. pid: %d.', filepath, child.pid );
    142 
    143 			// Remove the script from the listing of pending scripts:
    144 			delete pending[ filepath ];
    145 
    146 			// Indicate that the child process is ready for its next task:
    147 			next( child );
    148 		}
    149 	}
    150 
    151 	/**
    152 	* Returns a callback to be invoked upon child process close.
    153 	*
    154 	* @private
    155 	* @param {Object} child - child process
    156 	* @returns {Callback} callback
    157 	*/
    158 	function onClose( child ) {
    159 		return listener;
    160 
    161 		/**
    162 		* Callback invoked upon child process close.
    163 		*
    164 		* @private
    165 		* @param {(number|null)} code - exit code
    166 		* @param {(string|null)} signal - termination signal
    167 		*/
    168 		function listener( code, signal ) {
    169 			debug( 'Child process closed. Code: %d. Signal: %s. pid: %d.', code, signal, child.pid );
    170 			processExit( code, signal );
    171 			childClosed();
    172 		}
    173 	}
    174 
    175 	/**
    176 	* Callback invoked if a child closes.
    177 	*
    178 	* @private
    179 	*/
    180 	function childClosed() {
    181 		numClosed += 1;
    182 		debug( '%d of %d child processes have closed.', numClosed, opts.workers );
    183 		if ( numClosed === opts.workers ) {
    184 			done(); // eslint-disable-line callback-return
    185 		}
    186 	}
    187 
    188 	/**
    189 	* Returns a callback to be invoked upon child process exit.
    190 	*
    191 	* @private
    192 	* @param {Object} child - child process
    193 	* @returns {Callback} callback
    194 	*/
    195 	function onExit( child ) {
    196 		return listener;
    197 
    198 		/**
    199 		* Callback invoked upon child process exit.
    200 		*
    201 		* @private
    202 		* @param {(number|null)} code - exit code
    203 		* @param {(string|null)} signal - termination signal
    204 		*/
    205 		function listener( code, signal ) {
    206 			debug( 'Child process exited. Code: %d. Signal: %s. pid: %d.', code, signal, child.pid );
    207 			processExit( code, signal );
    208 		}
    209 	}
    210 
    211 	/**
    212 	* Closes all workers.
    213 	*
    214 	* @private
    215 	* @param {Error} [error] - error object
    216 	*/
    217 	function close( error ) {
    218 		var pids;
    219 		var pid;
    220 		var i;
    221 		if ( error && !err ) {
    222 			err = error;
    223 		}
    224 		debug( 'Instructing child processes to close...' );
    225 		pids = objectKeys( workers );
    226 		for ( i = 0; i < pids.length; i++ ) {
    227 			pid = pids[ i ];
    228 			debug( 'Instructing child process (pid: %d) to close...', pid );
    229 			workers[ pid ].send( 'close' );
    230 		}
    231 	}
    232 
    233 	/**
    234 	* Returns a callback to be invoked upon child process disconnect.
    235 	*
    236 	* @private
    237 	* @param {Object} child - child process
    238 	* @returns {Callback} callback
    239 	*/
    240 	function onDisconnect( child ) {
    241 		return listener;
    242 
    243 		/**
    244 		* Callback invoked upon child process disconnect.
    245 		*
    246 		* @private
    247 		*/
    248 		function listener() {
    249 			debug( 'Child process disconnected. pid: %d.', child.pid );
    250 		}
    251 	}
    252 
    253 	/**
    254 	* Returns a callback to be invoked upon encountering a child process error.
    255 	*
    256 	* @private
    257 	* @param {Object} child - child process
    258 	* @returns {Callback} callback
    259 	*/
    260 	function onError( child ) {
    261 		return listener;
    262 
    263 		/**
    264 		* Callback invoked upon a child process error.
    265 		*
    266 		* @private
    267 		* @param {Error} error - error object
    268 		*/
    269 		function listener( error ) {
    270 			debug( 'Child process error: %s. pid: %d.', error.message, child.pid );
    271 			close( error );
    272 		}
    273 	}
    274 
    275 	/**
    276 	* Processes process exit values. If provided a non-zero exit code or termination signal, instructs the process to close.
    277 	*
    278 	* @private
    279 	* @param {(number|null)} code - exit code
    280 	* @param {(string|null)} signal - termination signal
    281 	* @returns {void}
    282 	*/
    283 	function processExit( code, signal ) {
    284 		var error;
    285 		if ( err ) {
    286 			return;
    287 		}
    288 		if ( code !== null && code !== 0 ) {
    289 			error = new Error( 'Child process failed with exit code: '+code+'.' );
    290 		} else if ( signal !== null ) {
    291 			error = new Error( 'Child process failed due to termination signal: '+signal+'.' );
    292 		}
    293 		if ( error ) {
    294 			error.code = code;
    295 			error.signal = signal;
    296 			return close( error );
    297 		}
    298 	}
    299 
    300 	/**
    301 	* Callback invoked once all tasks are finished.
    302 	*
    303 	* @private
    304 	* @returns {void}
    305 	*/
    306 	function done() {
    307 		if ( err ) {
    308 			return clbk( err );
    309 		}
    310 		clbk();
    311 	}
    312 }
    313 
    314 
    315 // EXPORTS //
    316 
    317 module.exports = exec;