exec.js (7787B)
1 /** 2 * @license Apache-2.0 3 * 4 * Copyright (c) 2018 The Stdlib Authors. 5 * 6 * Licensed under the Apache License, Version 2.0 (the "License"); 7 * you may not use this file except in compliance with the License. 8 * You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 'use strict'; 20 21 // MODULES // 22 23 var fork = require( 'child_process' ).fork; 24 var path = require( 'path' ); 25 var logger = require( 'debug' ); 26 var objectKeys = require( './../../../keys' ); 27 var getOpts = require( './options.js' ); 28 29 30 // VARIABLES // 31 32 var debug = logger( 'parallel:exec' ); 33 var WORKER_FILEPATH = path.resolve( __dirname, './worker/index.js' ); 34 35 36 // MAIN // 37 38 /** 39 * Executes scripts in parallel. 40 * 41 * @private 42 * @param {StringArray} files - script absolute file paths 43 * @param {Options} opts - options 44 * @param {PositiveInteger} opts.concurrency - number of scripts to execute concurrently 45 * @param {PositiveInteger} opts.workers - number of workers 46 * @param {string} opts.cmd - executable file/command 47 * @param {boolean} opts.ordered - boolean indicating whether to preserve order of script output 48 * @param {(NonNegativeInteger|null)} opts.uid - process user identity 49 * @param {(NonNegativeInteger|null)} opts.gid - process group identity 50 * @param {string} opts.encoding - `stdio` encoding 51 * @param {NonNegativeInteger} opts.maxBuffer - max child process `stdio` buffer size 52 * @param {Callback} clbk - callback to invoke after executing all scripts 53 */ 54 function exec( files, opts, clbk ) { 55 var numClosed; 56 var workers; 57 var pending; 58 var fopts; 59 var args; 60 var proc; 61 var pids; 62 var pid; 63 var idx; 64 var err; 65 var i; 66 67 debug( 'Options: %s.', JSON.stringify( opts ) ); 68 numClosed = 0; 69 70 debug( 'Creating %d workers...', opts.workers ); 71 workers = {}; 72 args = []; 73 fopts = getOpts( opts ); 74 for ( i = 0; i < opts.workers; i++ ) { 75 debug( 'Creating child process...' ); 76 proc = fork( WORKER_FILEPATH, args, fopts ); 77 78 proc.on( 'error', onError( proc ) ); 79 proc.on( 'close', onClose( proc ) ); 80 proc.on( 'exit', onExit( proc ) ); 81 proc.on( 'disconnect', onDisconnect( proc ) ); 82 proc.on( 'message', onMessage( proc ) ); 83 84 debug( 'Child process created. pid: %d.', proc.pid ); 85 workers[ proc.pid ] = proc; 86 } 87 pids = objectKeys( workers ); 88 debug( '%d workers created.', pids.length ); 89 90 debug( 'Running %d scripts concurrently...', opts.concurrency ); 91 pending = {}; 92 idx = -1; 93 for ( i = 0; i < opts.concurrency; i++ ) { 94 pid = pids[ i%pids.length ]; 95 next( workers[ pid ] ); // eslint-disable-line callback-return 96 } 97 98 /** 99 * Instructs a child process to run the next script. 100 * 101 * @private 102 * @param {Object} child - child process 103 * @returns {void} 104 */ 105 function next( child ) { 106 var numPending; 107 idx += 1; 108 if ( idx >= files.length ) { 109 numPending = objectKeys( pending ).length; 110 if ( numPending > 0 ) { 111 debug( '%d scripts are pending.', numPending ); 112 return; 113 } 114 debug( 'All scripts have finished.' ); 115 return close(); 116 } 117 debug( 'Instructing child process to run script: %s. pid: %d.', files[ idx ], child.pid ); 118 child.send( files[ idx ] ); 119 pending[ files[ idx ] ] = true; 120 121 debug( '%d of %d scripts have been processed.', idx, files.length ); 122 } 123 124 /** 125 * Returns a callback to be invoked upon receiving a message from a child process. 126 * 127 * @private 128 * @param {Object} child - child process 129 * @returns {Callback} callback 130 */ 131 function onMessage( child ) { 132 return listener; 133 134 /** 135 * Callback invoked upon receiving a message from a child process. 136 * 137 * @private 138 * @param {string} filepath - script filepath 139 */ 140 function listener( filepath ) { 141 debug( 'Child process message: %s. pid: %d.', filepath, child.pid ); 142 143 // Remove the script from the listing of pending scripts: 144 delete pending[ filepath ]; 145 146 // Indicate that the child process is ready for its next task: 147 next( child ); 148 } 149 } 150 151 /** 152 * Returns a callback to be invoked upon child process close. 153 * 154 * @private 155 * @param {Object} child - child process 156 * @returns {Callback} callback 157 */ 158 function onClose( child ) { 159 return listener; 160 161 /** 162 * Callback invoked upon child process close. 163 * 164 * @private 165 * @param {(number|null)} code - exit code 166 * @param {(string|null)} signal - termination signal 167 */ 168 function listener( code, signal ) { 169 debug( 'Child process closed. Code: %d. Signal: %s. pid: %d.', code, signal, child.pid ); 170 processExit( code, signal ); 171 childClosed(); 172 } 173 } 174 175 /** 176 * Callback invoked if a child closes. 177 * 178 * @private 179 */ 180 function childClosed() { 181 numClosed += 1; 182 debug( '%d of %d child processes have closed.', numClosed, opts.workers ); 183 if ( numClosed === opts.workers ) { 184 done(); // eslint-disable-line callback-return 185 } 186 } 187 188 /** 189 * Returns a callback to be invoked upon child process exit. 190 * 191 * @private 192 * @param {Object} child - child process 193 * @returns {Callback} callback 194 */ 195 function onExit( child ) { 196 return listener; 197 198 /** 199 * Callback invoked upon child process exit. 200 * 201 * @private 202 * @param {(number|null)} code - exit code 203 * @param {(string|null)} signal - termination signal 204 */ 205 function listener( code, signal ) { 206 debug( 'Child process exited. Code: %d. Signal: %s. pid: %d.', code, signal, child.pid ); 207 processExit( code, signal ); 208 } 209 } 210 211 /** 212 * Closes all workers. 213 * 214 * @private 215 * @param {Error} [error] - error object 216 */ 217 function close( error ) { 218 var pids; 219 var pid; 220 var i; 221 if ( error && !err ) { 222 err = error; 223 } 224 debug( 'Instructing child processes to close...' ); 225 pids = objectKeys( workers ); 226 for ( i = 0; i < pids.length; i++ ) { 227 pid = pids[ i ]; 228 debug( 'Instructing child process (pid: %d) to close...', pid ); 229 workers[ pid ].send( 'close' ); 230 } 231 } 232 233 /** 234 * Returns a callback to be invoked upon child process disconnect. 235 * 236 * @private 237 * @param {Object} child - child process 238 * @returns {Callback} callback 239 */ 240 function onDisconnect( child ) { 241 return listener; 242 243 /** 244 * Callback invoked upon child process disconnect. 245 * 246 * @private 247 */ 248 function listener() { 249 debug( 'Child process disconnected. pid: %d.', child.pid ); 250 } 251 } 252 253 /** 254 * Returns a callback to be invoked upon encountering a child process error. 255 * 256 * @private 257 * @param {Object} child - child process 258 * @returns {Callback} callback 259 */ 260 function onError( child ) { 261 return listener; 262 263 /** 264 * Callback invoked upon a child process error. 265 * 266 * @private 267 * @param {Error} error - error object 268 */ 269 function listener( error ) { 270 debug( 'Child process error: %s. pid: %d.', error.message, child.pid ); 271 close( error ); 272 } 273 } 274 275 /** 276 * Processes process exit values. If provided a non-zero exit code or termination signal, instructs the process to close. 277 * 278 * @private 279 * @param {(number|null)} code - exit code 280 * @param {(string|null)} signal - termination signal 281 * @returns {void} 282 */ 283 function processExit( code, signal ) { 284 var error; 285 if ( err ) { 286 return; 287 } 288 if ( code !== null && code !== 0 ) { 289 error = new Error( 'Child process failed with exit code: '+code+'.' ); 290 } else if ( signal !== null ) { 291 error = new Error( 'Child process failed due to termination signal: '+signal+'.' ); 292 } 293 if ( error ) { 294 error.code = code; 295 error.signal = signal; 296 return close( error ); 297 } 298 } 299 300 /** 301 * Callback invoked once all tasks are finished. 302 * 303 * @private 304 * @returns {void} 305 */ 306 function done() { 307 if ( err ) { 308 return clbk( err ); 309 } 310 clbk(); 311 } 312 } 313 314 315 // EXPORTS // 316 317 module.exports = exec;