zproc(3)
Class for process configuration and status
Description
ZPROC
NAME
zproc - Class for process configuration and status
SYNOPSIS
// This is a
draft class, and may change without notice. It is disabled
in
// stable builds by default. If you use this in
applications, please ask
// for it to be pushed to stable state. Use --enable-drafts
to enable.
#ifdef CZMQ_BUILD_DRAFT_API
// *** Draft method, for development use, may change without
warning ***
// Create a new zproc.
// NOTE: On Windows and with libzmq3 and libzmq2 this
function
// returns NULL. Code needs to be ported there.
CZMQ_EXPORT zproc_t *
zproc_new (void);
// *** Draft
method, for development use, may change without warning ***
// Destroy zproc, wait until process ends.
CZMQ_EXPORT void
zproc_destroy (zproc_t **self_p);
// *** Draft
method, for development use, may change without warning ***
// Return command line arguments (the first item is the
executable) or
// NULL if not set.
// Caller owns return value and must destroy it when done.
CZMQ_EXPORT zlist_t *
zproc_args (zproc_t *self);
// *** Draft
method, for development use, may change without warning ***
// Setup the command line arguments, the first item must be
an (absolute) filename
// to run.
CZMQ_EXPORT void
zproc_set_args (zproc_t *self, zlist_t **arguments);
// *** Draft
method, for development use, may change without warning ***
// Setup the command line arguments, the first item must be
an (absolute) filename
// to run. Variadic function, must be NULL terminated.
CZMQ_EXPORT void
zproc_set_argsx (zproc_t *self, const char *arguments,
...);
// *** Draft
method, for development use, may change without warning ***
// Setup the environment variables for the process.
CZMQ_EXPORT void
zproc_set_env (zproc_t *self, zhash_t **arguments);
// *** Draft
method, for development use, may change without warning ***
// Connects process stdin with a readable ('>', connect)
zeromq socket. If
// socket argument is NULL, zproc creates own managed pair
of inproc
// sockets. The writable one is then accessbile via
zproc_stdin method.
CZMQ_EXPORT void
zproc_set_stdin (zproc_t *self, void *socket);
// *** Draft
method, for development use, may change without warning ***
// Connects process stdout with a writable ('@', bind)
zeromq socket. If
// socket argument is NULL, zproc creates own managed pair
of inproc
// sockets. The readable one is then accessbile via
zproc_stdout method.
CZMQ_EXPORT void
zproc_set_stdout (zproc_t *self, void *socket);
// *** Draft
method, for development use, may change without warning ***
// Connects process stderr with a writable ('@', bind)
zeromq socket. If
// socket argument is NULL, zproc creates own managed pair
of inproc
// sockets. The readable one is then accessbile via
zproc_stderr method.
CZMQ_EXPORT void
zproc_set_stderr (zproc_t *self, void *socket);
// *** Draft
method, for development use, may change without warning ***
// Return subprocess stdin writable socket. NULL for
// not initialized or external sockets.
CZMQ_EXPORT void *
zproc_stdin (zproc_t *self);
// *** Draft
method, for development use, may change without warning ***
// Return subprocess stdout readable socket. NULL for
// not initialized or external sockets.
CZMQ_EXPORT void *
zproc_stdout (zproc_t *self);
// *** Draft
method, for development use, may change without warning ***
// Return subprocess stderr readable socket. NULL for
// not initialized or external sockets.
CZMQ_EXPORT void *
zproc_stderr (zproc_t *self);
// *** Draft
method, for development use, may change without warning ***
// Starts the process, return just before
execve/CreateProcess.
CZMQ_EXPORT int
zproc_run (zproc_t *self);
// *** Draft
method, for development use, may change without warning ***
// process exit code
CZMQ_EXPORT int
zproc_returncode (zproc_t *self);
// *** Draft
method, for development use, may change without warning ***
// PID of the process
CZMQ_EXPORT int
zproc_pid (zproc_t *self);
// *** Draft
method, for development use, may change without warning ***
// return true if process is running, false if not yet
started or finished
CZMQ_EXPORT bool
zproc_running (zproc_t *self);
// *** Draft
method, for development use, may change without warning ***
// The timeout should be zero or greater, or -1 to wait
indefinitely.
// wait or poll process status, return return code
CZMQ_EXPORT int
zproc_wait (zproc_t *self, int timeout);
// *** Draft
method, for development use, may change without warning ***
// send SIGTERM signal to the subprocess, wait for grace
period and
// eventually send SIGKILL
CZMQ_EXPORT void
zproc_shutdown (zproc_t *self, int timeout);
// *** Draft
method, for development use, may change without warning ***
// return internal actor, useful for the polling if process
died
CZMQ_EXPORT void *
zproc_actor (zproc_t *self);
// *** Draft
method, for development use, may change without warning ***
// send a signal to the subprocess
CZMQ_EXPORT void
zproc_kill (zproc_t *self, int signal);
// *** Draft
method, for development use, may change without warning ***
// set verbose mode
CZMQ_EXPORT void
zproc_set_verbose (zproc_t *self, bool verbose);
// *** Draft
method, for development use, may change without warning ***
// Self test of this class.
CZMQ_EXPORT void
zproc_test (bool verbose);
#endif //
CZMQ_BUILD_DRAFT_API
Please add '@interface' section in './../src/zproc.c'.
DESCRIPTION
zproc - process configuration and status, plus unix pipes on steroids
Warning
zproc class have several limitations atm * is tested on zmq4 on Linux and OSX. * does not work on Windows, where you get empty stubs for most of the methods * does not work on libzmq3 and libzmq2. We have experienced stalls and timeouts when running tests against such old version
Note: zproc is not yet stable, so there are no guarantees regarding API stability. Some methods can have weird semantics or strange API.
Class zproc run an external process and to use ZeroMQ sockets to communicate with it. In other words standard input and outputs MAY be connected with appropriate zeromq socket and data flow is managed by zproc itself. This makes zproc the best in class way how to run and manage sub processes.
Data are sent and received as zframes (zframe_t), so zproc does not try to interpret content of the messages in any way. See test example on how to use it.
+----------------------------------------+
| /bin/cat cat /etc/passwd |
| stdin | stdout | stderr |
|------||--------||---------------||-----|
| fd1 fd2 fd3 |
| ˆ v v |
|zmq://stdin |zmq://stdout |zmq://stderr |
| [zproc supervisor] |
+----------------------------------------+
----------> zeromq magic here <-----------
+----------------------------------------+
|zmq://stdin |zmq://stdout |zmq://stderr |
| |
| consumer |
| |
| |
+----------------------------------------+
Please add @discuss section in ./../src/zproc.c.
EXAMPLE
From zproc_test method.
// variable
file contains path to zsp executable:
// char *file = "path/to/zsp";
#if defined
(__WINDOWS__)
printf ("Very limited (on Windows) ");
{
zsys_init ();
zproc_t *self = zproc_new ();
assert (self);
zproc_set_verbose
(self, verbose);
zproc_set_argsx (self, file, "-v", NULL);
zproc_run (self);
zclock_sleep (100); // to let actor start the process
assert (zproc_pid (self));
zproc_kill
(self, SIGTERM);
assert (zproc_returncode (self) == 255);
zproc_destroy (&self);
}
printf ("OK\n");
return;
#endif
{
// Test case #1: run command, wait until it ends and get the
(stdandard) output
zproc_t *self = zproc_new ();
assert (self);
zproc_set_verbose (self, verbose);
// join stdout
of the process to zeromq socket
// all data will be readable from zproc_stdout socket
assert (!zproc_stdout (self));
zproc_set_stdout (self, NULL);
assert (zproc_stdout (self));
zproc_set_argsx (self, file, "--help", NULL);
if (verbose)
zsys_debug("zproc_test() : launching helper '%s'
--help", file );
int r =
zproc_run (self);
assert (r == 0);
zframe_t *frame;
zsock_brecv (zproc_stdout (self), "f",
&frame);
assert (frame);
assert (zframe_data (frame));
// TODO: real test
if (verbose)
zframe_print (frame, "1:");
zframe_destroy (&frame);
r = zproc_wait (self, -1);
assert (r == 0);
zproc_destroy (&self);
}
{
// Test case#2: run zsp helper with a content written on
stdin, check if it was passed to stdout
zproc_t *self = zproc_new ();
assert (self);
zproc_set_verbose (self, verbose);
// forward input from stdin to stderr
zproc_set_argsx (self, file, "--stdin",
"--stderr", NULL);
// FIXME: there is a BUG in zproc somewhere, you can't gen
an output from both stdout/stderr
//zproc_set_argsx (self, file, "--stdin",
"--stdout", "--stderr", NULL);
zproc_set_stdin (self, NULL);
// FIXME: the bug
//zproc_set_stdout (self, NULL);
zproc_set_stderr (self, NULL);
// send data to
stdin
int r = zproc_run (self);
assert (r == 0);
zframe_t *frame = zframe_new ("Lorem ipsum\0\0",
strlen ("Lorem ipsum")+2);
assert (frame);
zsock_bsend (zproc_stdin (self), "f", frame);
zframe_destroy (&frame);
// FIXME: the
bug
//zproc_set_stdout (self, NULL);
// read data from stdout
/*
zsys_debug ("BAF1");
zsock_brecv (zproc_stdout (self), "f",
&frame);
zsys_debug ("BAF2");
assert (frame);
assert (zframe_data (frame));
if (verbose)
zframe_print (frame, "2.stdout:");
assert (!strncmp ((char*) zframe_data (frame), "Lorem
ipsum", 11));
*/
// read data
from stderr
zsock_brecv (zproc_stderr (self), "f",
&frame);
assert (frame);
assert (zframe_data (frame));
if (verbose)
zframe_print (frame, "2.stderr:");
assert (!strncmp ((char*) zframe_data (frame), "Lorem
ipsum", 11));
zproc_kill (self, SIGTERM);
zproc_wait (self, -1);
zframe_destroy (&frame);
zproc_destroy (&self);
}
{
// Test case#3: run non existing binary
zproc_t *self = zproc_new ();
assert (self);
zproc_set_verbose (self, verbose);
// forward input from stdin to stderr
zproc_set_argsx (self, "/not/existing/file",
NULL);
int r =
zproc_run (self);
assert (r == -1);
zproc_destroy (&self);
}
{
// Test case #4: child abort itself
zproc_t *self = zproc_new ();
assert (self);
zproc_set_verbose (self, verbose);
zproc_set_argsx (self, file, "--verbose",
"--abrt", NULL);
zproc_set_stdout (self, NULL);
zproc_set_stderr (self, NULL);
zproc_set_stdin (self, NULL);
int r =
zproc_run (self);
zclock_sleep (100); // to let actor start the process
assert (r != -1);
zclock_sleep (100);
zframe_t *frame;
zsock_brecv (zproc_stdout (self), "f",
&frame);
assert (zframe_is (frame));
assert (zframe_size (frame) > 0);
zframe_destroy (&frame);
zproc_wait (self, -1);
assert (zproc_returncode (self) == -SIGABRT);
zproc_destroy (&self);
}
{
// Test case #5: use never ending subprocess and poller to
read data from it
// Create new zproc instance
zproc_t *self = zproc_new ();
assert (self);
zproc_set_verbose (self, verbose);
// join stdout of the process to zeromq socket
// all data will be readable from zproc_stdout socket
zproc_set_stdout (self, NULL);
zlist_t *args =
zlist_new ();
zlist_autofree (args);
zlist_append (args, file);
zlist_append (args, "--stdout");
zproc_set_args (self, &args);
zhash_t *env =
zhash_new ();
zhash_autofree (env);
zhash_insert (env, "ZSP_MESSAGE", "czmq is
great\n");
zproc_set_env (self, &env);
// execute the
binary. It runs in own actor, which monitor the process and
// pass data accross pipes and zeromq sockets
if (verbose)
zsys_debug("zproc_test() : launching helper '%s'",
file );
zproc_run (self);
zpoller_t *poller = zpoller_new (zproc_stdout (self),
NULL);
// kill the
binary, it never ends, but the test must:
// termination also flushes the output streams so we can
// read them entirely; note that other process runs in
// parallel to this thread
if (verbose)
zsys_debug("zproc_test() : sleeping 4000 msec to gather
some output from helper");
zclock_sleep (4000);
zproc_kill (self, SIGTERM);
zproc_wait (self, -1);
// read the
content from zproc_stdout - use zpoller and a loop
bool stdout_read = false;
int64_t zproc_timeout_msec = 10000;
int64_t zproc_test_start_msec = zclock_mono();
int64_t zproc_test_elapsed_msec = 0;
while
(!zsys_interrupted) {
void *which = zpoller_wait (poller, 800);
zproc_test_elapsed_msec = zclock_mono() -
zproc_test_start_msec;
if (!which) {
if (stdout_read) {
if (verbose)
zsys_debug("zproc_test() : did not get stdout from
helper, but we already have some (%" PRIi64 " msec
remaining to retry)", (zproc_timeout_msec -
zproc_test_elapsed_msec) );
break;
}
if (zproc_timeout_msec > zproc_test_elapsed_msec) {
if (verbose)
zsys_debug("zproc_test() : did not get stdout from
helper, %" PRIi64 " msec remaining to retry",
(zproc_timeout_msec - zproc_test_elapsed_msec) );
continue;
}
// ...else : we've slept a lot and got no response; kill the
helper
if (verbose)
zsys_debug("zproc_test() : did not get stdout from
helper, patience expired (%" PRIi64 " msec
remaining to retry)", (zproc_timeout_msec -
zproc_test_elapsed_msec) );
break;
}
if (which ==
zproc_stdout (self)) {
// it suffices for us to have read something
// we only check the first frame, others may start with the
// expected key string broken mid-way due to alignment etc.,
// but we drain the whole incoming queue of stdout frames.
zframe_t *frame;
zsock_brecv (zproc_stdout (self), "f",
&frame);
assert (frame);
assert (zframe_data (frame));
if (!stdout_read) {
if (verbose)
zsys_debug("zproc_test() : got stdout from helper,
%" PRIi64 " msec was remaining to retry",
(zproc_timeout_msec - zproc_test_elapsed_msec));
assert (!strncmp(
"czmq is great\n",
(char*) zframe_data (frame),
14));
stdout_read = true;
}
if (verbose)
zframe_print (frame, "zproc_test");
zframe_destroy
(&frame);
continue;
}
// should not
get there
if (verbose)
zsys_debug("zproc_test() : reached the unreachable
point (unexpected zpoller result), %" PRIi64 "
msec was remaining to retry", (zproc_timeout_msec -
zproc_test_elapsed_msec) );
assert (false);
}
assert
(stdout_read);
zpoller_destroy (&poller);
zproc_destroy (&self);
}
{
// testcase #6 wait for process that hangs, kill it
zproc_t *self = zproc_new ();
assert (self);
zproc_set_verbose (self, verbose);
zproc_set_argsx (self, file, NULL);
if (verbose)
zsys_debug("zproc_test() : launching helper '%s'",
file);
int r =
zproc_run (self);
assert (r == 0);
r = zproc_wait (self, 1000);
assert (r == ZPROC_RUNNING);
assert (zproc_running (self));
zproc_shutdown (self, 1000);
assert (!zproc_running (self));
zproc_destroy (&self);
}
{
// testcase #7 wait for process that exits earlier
zproc_t *self = zproc_new ();
assert (self);
zproc_set_verbose (self, verbose);
zproc_set_argsx (self, file, "--quit", "1", NULL);
if (verbose)
zsys_debug("zproc_test() : launching helper '%s' --quit
1", file);
int r =
zproc_run (self);
assert (r == 0);
int t = zclock_mono ();
r = zproc_wait (self, 8000);
assert (r == 0);
t = zclock_mono () - t;
assert (t < 2000);
zproc_destroy (&self);
}
AUTHORS
The czmq manual was written by the authors in the AUTHORS file.
RESOURCES
Main web site:
Report bugs to the email <zeromq-dev@lists.zeromq.org [1] >
COPYRIGHT
Copyright (c) the Contributors as noted in the AUTHORS file. This file is part of CZMQ, the high-level C binding for 0MQ: http://czmq.zeromq.org. This Source Code Form is subject to the terms of the Mozilla Public License, v. 2.0. If a copy of the MPL was not distributed with this file, You can obtain one at http://mozilla.org/MPL/2.0/. LICENSE included with the czmq distribution.
NOTES
|
1. |
zeromq-dev@lists.zeromq.org |
mailto:zeromq-dev@lists.zeromq.org