1 /// Reactor aware process management 2 module mecca.reactor.lib.process; 3 4 // Licensed under the Boost license. Full copyright information in the AUTHORS file 5 6 import core.stdc.errno; 7 import core.sys.posix.sys.wait; 8 import core.sys.posix.unistd; 9 import std.algorithm : move; 10 import std.conv : emplace; 11 import process = std.process; 12 13 import mecca.containers.pools; 14 import mecca.lib.exception; 15 import mecca.lib.io; 16 import mecca.lib.time; 17 import mecca.log; 18 import mecca.platform.os : OSSignal; 19 import mecca.reactor; 20 import mecca.reactor.io.fd; 21 import mecca.reactor.io.signals; 22 import mecca.reactor.sync.event; 23 24 /** Subprocess handler 25 * 26 * Do not create direct instances of this struct. Use `ProcessManager.alloc` instead. 27 */ 28 struct Process { 29 /// The three IO streams that can be redirected 30 enum StdIO { 31 StdIn = 0, /// Child's stdin stream. 32 StdOut, /// Child's stdout stream. 33 StdErr /// Child's stderr stream. 34 } 35 private: 36 // Prevent accidental copying 37 @disable this(this); 38 39 pid_t _pid; 40 Event processDone; 41 // Child side end of standard streams 42 FD[3] childStdIO; 43 int exitStatus; 44 version(assert) { 45 bool managerInitialized; 46 } 47 48 public: 49 /// Parent side end of standard streams 50 ReactorFD[3] stdIO; 51 52 /** Redirect a standard IO for the child to an FD 53 * 54 * Params: 55 * io = which of the IO streams to redirect 56 * fd = An fd to be assigned. Ownership is moved to the child 57 */ 58 void redirectIO(StdIO io, FD fd) nothrow @safe @nogc { 59 verifyManagerInitialized(); 60 ASSERT!"Cannot set redirection on child once it has been run"(pid==0); 61 stdIO[io].close(); 62 move(fd, childStdIO[io]); 63 } 64 65 /** Redirect a standard IO for the child to a pipe 66 * 67 * Redirects the child's IO to a pipe whose other side is accessible by the parent. 68 * After running, the other end of the pipe will be available as `stdIO[io]` 69 * 70 * Params: 71 * io = which of the IO streams to redirect 72 */ 73 void redirectIO(StdIO io) @trusted @nogc { 74 verifyManagerInitialized(); 75 ASSERT!"Cannot set redirection on child once it has been run"(pid==0); 76 77 FD parentEnd; 78 FD* writeEnd, readEnd; 79 if( io==StdIO.StdIn ) { 80 writeEnd = &parentEnd; 81 readEnd = &childStdIO[io]; 82 } else { 83 writeEnd = &childStdIO[io]; 84 readEnd = &parentEnd; 85 } 86 87 createPipe( *readEnd, *writeEnd ); 88 stdIO[io] = ReactorFD( move(parentEnd) ); 89 } 90 91 /** Redirect stderr to stdout 92 * 93 * Make the child's stderr point to the same stream as its stdout 94 */ 95 void redirectErrToOut() @safe @nogc { 96 verifyManagerInitialized(); 97 ASSERT!"Cannot set redirection on child once it has been run"(pid==0); 98 ASSERT!"Cannot redirect when stdout is not valid"(childStdIO[StdIO.StdOut].isValid); 99 100 childStdIO[StdIO.StdErr] = childStdIO[StdIO.StdOut].dup(); 101 stdIO[StdIO.StdErr].close(); 102 } 103 104 /// Test whether the child process is still running 105 bool isRunning() const nothrow @safe @nogc { 106 return !processDone.isSet; 107 } 108 109 /** Suspend fiber until the child process finishes 110 * 111 * Returns: 112 * Return the child's return value 113 */ 114 int wait(Timeout timeout = Timeout.infinite) @safe @nogc { 115 processDone.wait(timeout); 116 117 return exitStatus; 118 } 119 120 /** Run the child with the given arguments. 121 * 122 * This function returns immediately. Use `wait` if you want to wait for the child to finish execution. 123 * 124 * Errors: 125 * On failure to create a child process, this function will throw an `ErrnoException`. 126 * If the exec itself fails (e.g.: trying to run a command that doesn't exist), this function will report success, 127 * but calling `wait` will return a child exit status of 255. 128 */ 129 void run(string[] args...) @trusted @nogc { 130 verifyManagerInitialized(); 131 import mecca.lib.reflection : as; 132 133 /* 134 We are @nogc. @nogc states that the function will not trigger a GC before it returns. 135 136 This function creates a child process. The parent doesn't trigger a GC. The child calls execve, and doesn't 137 return. 138 139 So we are @nogc. 140 */ 141 ASSERT!"run must be called with at least one argument"(args.length>0); 142 _pid = vfork(); 143 144 if( _pid==0 ) { 145 as!"@nogc"({ runChildHelper(args); }); 146 } 147 148 // Cleanups 149 foreach( ref fd; childStdIO ) { 150 fd.close(); 151 } 152 153 as!"@nogc"({ theProcessManager.processes[_pid] = &this; }); 154 155 INFO!"Launched child process %s"(pid); 156 } 157 158 /** Send the process a signal 159 * 160 * Params: 161 * signal = Signal to send. SIGTERM by default 162 * 163 * Throws: 164 * Throws `ErrnoException` if sending the signal failed 165 */ 166 void kill(OSSignal signal = OSSignal.SIGTERM) @trusted @nogc { 167 ASSERT!"Cannot kill process before it was started"(pid!=0); 168 169 errnoEnforceNGC( .kill( pid, signal )==0, "Failed to send signal to process" ); 170 } 171 172 /// Returns the pid of the child. 173 /// 174 /// Returns 0 if child has not started running 175 @property pid_t pid() const pure nothrow @safe @nogc { 176 return _pid; 177 } 178 179 // Do not call!!! 180 void _poolElementInit() nothrow @safe @nogc { 181 emplace(&this); 182 } 183 184 private: 185 // runChildHelper is called in a vfork'd child, any data modifications affect the parent process, so must not affect 186 // tracing indent level/etc, thus @notrace is mandatory! 187 @notrace void runChildHelper(string[] args) nothrow @trusted { 188 try { 189 sigset_t emptyMask; 190 errnoEnforceNGC( sigprocmask( SIG_SETMASK, &emptyMask, null )==0, "sigprocmask failed" ); 191 192 // Perform IO redirection 193 foreach( int ioNum, ref fd ; childStdIO ) { 194 // Leave IO streams that were not redirected as they are 195 if( !fd.isValid ) 196 continue; 197 198 fd.checkedCall!dup2(ioNum, "Failed to redirect IO"); 199 } 200 201 process.execvp(args[0], args); 202 assert(false, "Must never reach this point"); 203 } catch(Exception ex) { 204 ERROR!"Execution of %s failed: %s"( args[0], ex.msg ); 205 _exit(255); 206 } 207 } 208 209 void handleExit(int status) nothrow @safe { 210 exitStatus = status; 211 processDone.set(); 212 213 destroy( theProcessManager.processes[pid] ); 214 } 215 216 void verifyManagerInitialized() pure const nothrow @safe @nogc { 217 version(assert) { 218 DBG_ASSERT!"Process was directly initialized. Use ProcessManager.alloc instead."( managerInitialized ); 219 } 220 } 221 } 222 223 /// Process tracking management 224 struct ProcessManager { 225 private: 226 // Prevent accidental copying 227 @disable this(this); 228 229 enum OUTPUT_BUFF_SIZE = 1024; 230 231 SimplePool!Process processPool; 232 Process*[pid_t] processes; // XXX change to a nogc construct 233 void delegate(pid_t pid, int exitStatus) nothrow @system customHandler; 234 235 /// RAII wrapper for a process 236 static struct ProcessPtr { 237 Process* ptr; 238 239 @disable this(this); 240 241 ~this() nothrow @safe @nogc { 242 if( ptr is null ) 243 return; 244 245 theProcessManager.processes.remove(ptr.pid); 246 247 theProcessManager.processPool.release(ptr); 248 } 249 250 alias ptr this; 251 } 252 253 public: 254 /** initialize the process manager 255 * 256 * You must call this before trying to spawn new processes. Must be called when the reactor is already active. 257 * 258 * Params: 259 * maxProcesses = Maximum number of concurrent processes to be tracked. 260 */ 261 void open(size_t maxProcesses) @trusted @nogc { 262 processPool.open(maxProcesses); 263 264 reactorSignal.registerHandler(OSSignal.SIGCHLD, &sigChildHandler); 265 customHandler = null; 266 } 267 268 /** Shut down the process manager 269 * 270 * There is no real need to call this. This is mostly useful for unit tests. Must be called while the reactor is 271 * still active. 272 */ 273 void close() @trusted @nogc { 274 reactorSignal.unregisterHandler(OSSignal.SIGCHLD); 275 processPool.close(); 276 } 277 278 /** Allocate a new handler for a child process. 279 * 280 * This is the way to allocate a new child process handler. 281 * 282 * Returns a smart object referencing the Process struct. Struct is destructed when the referenece is destructed. 283 * If the reference is destructed, this is effectively the same as detaching from the running process. The process 284 * remains running, but we will no longer be notified when it exits. 285 */ 286 ProcessPtr alloc() nothrow @safe @nogc { 287 Process* ptr = processPool.alloc(); 288 version(assert) { 289 ptr.managerInitialized = true; 290 } 291 292 return ProcessPtr(ptr); 293 } 294 295 alias OutputHandlerDlg = void delegate(Process* child, const(char)[] output) nothrow; 296 /** 297 * Run a command and pass its output to a callback 298 * 299 * Run a command with the supplied command line. Call `outputProcessor` whenever the command outputs anything on 300 * stdout or stderr. 301 * 302 * In addition to those, outputProcessor gets called three more times. The first time is right before the process is 303 * started (with `output` set to empty). This allows outputProcessor to do any additional manipulations (such as 304 * redirecting the input). This can be tested for because `child.pid` is 0. 305 * 306 * The second time is when the output closes. Again, this call will have a `output` set to an empty range. 307 * 308 * The third time is after the process has already exit. This can be tested for because `child.isAlive` will be 309 * false. 310 */ 311 void collectCommandOutput( OutputHandlerDlg outputProcessor, string[] cmdline... ) { 312 ASSERT!"cmdline must have at least one argument (command to run)"(cmdline.length > 0); 313 theReactor.spawnFiber!_commandOutputFiber(outputProcessor, cmdline); 314 } 315 316 /** Register custom SIGCHLD handler 317 * 318 * With this function you can register a custom SIGCHLD handler. This handler will be called only for child 319 * processes that were not started by the ProcessManager, or for child processes that did not exit. 320 * 321 * The handler delegate will be called under a critical section lock and must not sleep. 322 */ 323 void registerSigChldHandler(void delegate(pid_t pid, int status) nothrow @system handler) nothrow @safe @nogc { 324 customHandler = handler; 325 } 326 327 package(mecca.reactor) void sigChildHandler(OSSignal) @system { 328 // Ignore the siginfo, as SIGCHLD notifications might be merged 329 330 bool handled; 331 int status; 332 pid_t childPid; 333 while( (childPid = waitpid(-1, &status, WNOHANG))>0 ) { 334 handled = true; 335 336 Process** child = childPid in processes; 337 338 if( (WIFEXITED(status) || WIFSIGNALED(status)) && child !is null ) { 339 INFO!"Received child exit notification from %s: 0x%x"(childPid, status); 340 (*child).handleExit(status); 341 } else { 342 if( customHandler !is null ) { 343 customHandler( childPid, status ); 344 } else { 345 // This is probably because we abandoned the child PID 346 INFO!"Received child exit notification from abandoned child %s status 0x%x"( childPid, status ); 347 } 348 } 349 } 350 351 if( !handled ) { 352 // This isn't as serious a condition as the code might suggest. We might have handled that PID in a previous 353 // invocation. 354 DEBUG!"SIGCHLD reported, but wait failed with errno %s"(errno); 355 } 356 } 357 358 // Fiber function for collectCommandOutput 359 // DMDBUG public because otherwise we can't template on it 360 public static void _commandOutputFiber( OutputHandlerDlg outputProcessor, string[] cmdline ) { 361 auto child = theProcessManager.alloc(); 362 child.redirectIO( Process.StdIO.StdOut ); 363 child.redirectErrToOut(); 364 365 // First call, before we run anything 366 outputProcessor(child, null); 367 368 child.run(cmdline); 369 370 char[OUTPUT_BUFF_SIZE] buffer; 371 ssize_t numRead; 372 do { 373 numRead = child.stdIO[Process.StdIO.StdOut].read(buffer); 374 // Call with what we got. This will be the empty range the last time we call 375 outputProcessor( child, buffer[0..numRead] ); 376 } while( numRead>0 ); 377 378 child.wait(); 379 380 outputProcessor( child, null ); 381 } 382 } 383 384 @notrace ref ProcessManager theProcessManager() nothrow @trusted @nogc { 385 return _theProcessManager; 386 } 387 388 private __gshared ProcessManager _theProcessManager; 389 390 unittest { 391 import mecca.reactor; 392 enum GRACE_UT_TIMEOUT = 10.seconds; 393 394 testWithReactor({ 395 theProcessManager.open(24); 396 scope(exit) theProcessManager.close(); 397 398 { 399 // Test detached child handling 400 auto child2 = theProcessManager.alloc(); 401 child2.run("true"); 402 } 403 404 auto child = theProcessManager.alloc(); 405 child.redirectIO(Process.StdIO.StdOut); 406 child.redirectErrToOut(); 407 408 child.run("echo", "-e", "Hello\\r", "world"); 409 410 version (linux) 411 enum expectedBuffer = "Hello\r world\n"; 412 else version (Darwin) 413 enum expectedBuffer = "-e Hello\\r world\n"; // BSD `echo` doesn't support the `-e` flag 414 else 415 static assert(false, "Unsupported platform"); 416 417 char[] buffer; 418 ssize_t res; 419 do { 420 enum readChunk = 128; 421 size_t oldLen = buffer.length; 422 buffer.length = oldLen + readChunk; 423 res = child.stdIO[Process.StdIO.StdOut].read(buffer[oldLen..oldLen+readChunk], Timeout(GRACE_UT_TIMEOUT)); 424 buffer.length = oldLen + res; 425 } while( res>0 ); 426 427 DEBUG!"Child stdout: %s"(buffer); 428 child.wait(Timeout(10.seconds)); 429 430 theReactor.sleep(4.msecs); // Allow time for child2 to actually exit 431 432 ASSERT!"Child closed stdout but is still running"(!child.isRunning); 433 ASSERT!"Child output is \"%s\", not as expected"( buffer == expectedBuffer, buffer ); 434 }); 435 } 436 437 unittest { 438 import mecca.reactor; 439 440 struct RunContext { 441 Event done; 442 uint howManyWaysMustAManWalkDown; 443 bool killed; 444 445 void callback( Process* child, const(char)[] buffer ) nothrow { 446 try { 447 if( child.pid == 0 ) { 448 // First call, before we started 449 child.redirectIO(Process.StdIO.StdIn, FD( "/dev/zero", O_RDONLY )); 450 451 return; 452 } 453 454 foreach(i, c; buffer) { 455 ASSERT!"buffer not filled with 0, has %s instead at offset %s"( 456 c=='\0', c, i+howManyWaysMustAManWalkDown); 457 } 458 459 howManyWaysMustAManWalkDown += buffer.length; 460 if( !killed && howManyWaysMustAManWalkDown > 8196 ) { 461 child.kill(); 462 killed = true; 463 } 464 465 if( !child.isRunning ) 466 done.set(); 467 } catch(Exception ex) { 468 ERROR!"Failed: %s"(ex.msg); 469 assert(false); 470 } 471 } 472 } 473 474 testWithReactor({ 475 theProcessManager.open(24); 476 scope(exit) theProcessManager.close(); 477 478 import mecca.reactor.io.signals; 479 480 void termHandler(OSSignal) { 481 // Nothing. We don't expect this to get called 482 } 483 484 // Register a TERM handler so that the signal is masked in the parent 485 reactorSignal.registerHandler(OSSignal.SIGTERM, &termHandler); 486 487 RunContext ctx; 488 489 theProcessManager.collectCommandOutput( &ctx.callback, "cat" ); 490 491 ctx.done.wait(); 492 }); 493 }