From 0abed8833343904dee95976b62c00d399cfabcf0 Mon Sep 17 00:00:00 2001 From: "Schmeits Cedric (DC-AE/ESW5)" Date: Thu, 15 Sep 2022 16:27:10 +0200 Subject: [PATCH] Replaced all tabs by spaces within the C and c++ examples Replaced all tabs by 4 spaces for readability within the documentation --- examples/C++/hwserver.cpp | 2 +- examples/C++/lbbroker2.cpp | 216 +++++++++++++++---------------- examples/C++/mdbroker.cpp | 12 +- examples/C++/mspoller.cpp | 4 +- examples/C++/msreader.cpp | 6 +- examples/C++/mtrelay.cpp | 28 ++--- examples/C++/pathosub.cpp | 4 +- examples/C++/psenvsub.cpp | 12 +- examples/C++/rrclient.cpp | 22 ++-- examples/C++/rrworker.cpp | 33 +++-- examples/C++/syncpub.cpp | 24 ++-- examples/C++/taskwork2.cpp | 8 +- examples/C++/wuclient.cpp | 12 +- examples/C++/wuserver.cpp | 4 +- examples/C++/zhelpers.hpp | 126 +++++++++---------- examples/C/.gitignore | 8 +- examples/C/hwclient2.c | 78 ++++++------ examples/C/hwclient3.c | 6 +- examples/C/hwclient4.c | 6 +- examples/C/hwserver2.c | 2 +- examples/C/hwserver3.c | 2 +- examples/C/hwserver4.c | 2 +- examples/C/lbbroker2.c | 252 ++++++++++++++++++------------------- 23 files changed, 436 insertions(+), 433 deletions(-) diff --git a/examples/C++/hwserver.cpp b/examples/C++/hwserver.cpp index bc9f40e7b..bb401f3b5 100644 --- a/examples/C++/hwserver.cpp +++ b/examples/C++/hwserver.cpp @@ -11,7 +11,7 @@ #else #include -#define sleep(n) Sleep(n) +#define sleep(n) Sleep(n) #endif int main () { diff --git a/examples/C++/lbbroker2.cpp b/examples/C++/lbbroker2.cpp index 884029ef9..8c076d422 100644 --- a/examples/C++/lbbroker2.cpp +++ b/examples/C++/lbbroker2.cpp @@ -15,25 +15,25 @@ static void * client_task(void *args) { - zctx_t *ctx = zctx_new(); - void *client = zsocket_new(ctx, ZMQ_REQ); + zctx_t *ctx = zctx_new(); + void *client = zsocket_new(ctx, ZMQ_REQ); #if (defined (WIN32)) - zsocket_connect(client, "tcp://localhost:5672"); // frontend + zsocket_connect(client, "tcp://localhost:5672"); // frontend #else - zsocket_connect(client, "ipc://frontend.ipc"); + zsocket_connect(client, "ipc://frontend.ipc"); #endif - // Send request, get reply - zstr_send(client, "HELLO"); - char *reply = zstr_recv(client); - if (reply) { - std::cout << "Client: " << reply << std::endl; - free(reply); - } + // Send request, get reply + zstr_send(client, "HELLO"); + char *reply = zstr_recv(client); + if (reply) { + std::cout << "Client: " << reply << std::endl; + free(reply); + } - zctx_destroy(&ctx); - return NULL; + zctx_destroy(&ctx); + return NULL; } // Worker using REQ socket to do load-balancing @@ -41,126 +41,126 @@ client_task(void *args) static void * worker_task(void *args) { - zctx_t *ctx = zctx_new(); - void *worker = zsocket_new(ctx, ZMQ_REQ); + zctx_t *ctx = zctx_new(); + void *worker = zsocket_new(ctx, ZMQ_REQ); #if (defined (WIN32)) - zsocket_connect(worker, "tcp://localhost:5673"); // backend + zsocket_connect(worker, "tcp://localhost:5673"); // backend #else - zsocket_connect(worker, "ipc://backend.ipc"); + zsocket_connect(worker, "ipc://backend.ipc"); #endif - // Tell broker we're ready for work - zframe_t *frame = zframe_new(WORKER_READY, strlen(WORKER_READY)); - zframe_send(&frame, worker, 0); - - // Process messages as they arrive - while (1) { - zmsg_t *msg = zmsg_recv(worker); - if (!msg) - break; // Interrupted - zframe_print(zmsg_last(msg), "Worker: "); - zframe_reset(zmsg_last(msg), "OK", 2); - zmsg_send(&msg, worker); - } - zctx_destroy(&ctx); - return NULL; + // Tell broker we're ready for work + zframe_t *frame = zframe_new(WORKER_READY, strlen(WORKER_READY)); + zframe_send(&frame, worker, 0); + + // Process messages as they arrive + while (1) { + zmsg_t *msg = zmsg_recv(worker); + if (!msg) + break; // Interrupted + zframe_print(zmsg_last(msg), "Worker: "); + zframe_reset(zmsg_last(msg), "OK", 2); + zmsg_send(&msg, worker); + } + zctx_destroy(&ctx); + return NULL; } // .split main task // Now we come to the main task. This has the identical functionality to -// the previous {{lbbroker}} broker example, but uses CZMQ to start child +// the previous {{lbbroker}} broker example, but uses CZMQ to start child // threads, to hold the list of workers, and to read and send messages: int main(void) { - zctx_t *ctx = zctx_new(); - void *frontend = zsocket_new(ctx, ZMQ_ROUTER); - void *backend = zsocket_new(ctx, ZMQ_ROUTER); + zctx_t *ctx = zctx_new(); + void *frontend = zsocket_new(ctx, ZMQ_ROUTER); + void *backend = zsocket_new(ctx, ZMQ_ROUTER); - // IPC doesn't yet work on MS Windows. + // IPC doesn't yet work on MS Windows. #if (defined (WIN32)) - zsocket_bind(frontend, "tcp://*:5672"); - zsocket_bind(backend, "tcp://*:5673"); + zsocket_bind(frontend, "tcp://*:5672"); + zsocket_bind(backend, "tcp://*:5673"); #else - zsocket_bind(frontend, "ipc://frontend.ipc"); - zsocket_bind(backend, "ipc://backend.ipc"); + zsocket_bind(frontend, "ipc://frontend.ipc"); + zsocket_bind(backend, "ipc://backend.ipc"); #endif - int client_nbr; - for (client_nbr = 0; client_nbr < NBR_CLIENTS; client_nbr++) - zthread_new(client_task, NULL); - int worker_nbr; - for (worker_nbr = 0; worker_nbr < NBR_WORKERS; worker_nbr++) - zthread_new(worker_task, NULL); - - // Queue of available workers - zlist_t *workers = zlist_new(); - - // .split main load-balancer loop - // Here is the main loop for the load balancer. It works the same way - // as the previous example, but is a lot shorter because CZMQ gives - // us an API that does more with fewer calls: - while (1) { - zmq_pollitem_t items[] = { - { backend, 0, ZMQ_POLLIN, 0 }, - { frontend, 0, ZMQ_POLLIN, 0 } - }; - // Poll frontend only if we have available workers - int rc = zmq_poll(items, zlist_size(workers) ? 2 : 1, -1); - if (rc == -1) - break; // Interrupted - - // Handle worker activity on backend - if (items[0].revents & ZMQ_POLLIN) { - // Use worker identity for load-balancing - zmsg_t *msg = zmsg_recv(backend); - if (!msg) - break; // Interrupted + int client_nbr; + for (client_nbr = 0; client_nbr < NBR_CLIENTS; client_nbr++) + zthread_new(client_task, NULL); + int worker_nbr; + for (worker_nbr = 0; worker_nbr < NBR_WORKERS; worker_nbr++) + zthread_new(worker_task, NULL); + + // Queue of available workers + zlist_t *workers = zlist_new(); + + // .split main load-balancer loop + // Here is the main loop for the load balancer. It works the same way + // as the previous example, but is a lot shorter because CZMQ gives + // us an API that does more with fewer calls: + while (1) { + zmq_pollitem_t items[] = { + { backend, 0, ZMQ_POLLIN, 0 }, + { frontend, 0, ZMQ_POLLIN, 0 } + }; + // Poll frontend only if we have available workers + int rc = zmq_poll(items, zlist_size(workers) ? 2 : 1, -1); + if (rc == -1) + break; // Interrupted + + // Handle worker activity on backend + if (items[0].revents & ZMQ_POLLIN) { + // Use worker identity for load-balancing + zmsg_t *msg = zmsg_recv(backend); + if (!msg) + break; // Interrupted #if 0 - // zmsg_unwrap is DEPRECATED as over-engineered, poor style - zframe_t *identity = zmsg_unwrap(msg); + // zmsg_unwrap is DEPRECATED as over-engineered, poor style + zframe_t *identity = zmsg_unwrap(msg); #else - zframe_t *identity = zmsg_pop(msg); - zframe_t *delimiter = zmsg_pop(msg); - zframe_destroy(&delimiter); + zframe_t *identity = zmsg_pop(msg); + zframe_t *delimiter = zmsg_pop(msg); + zframe_destroy(&delimiter); #endif - zlist_append(workers, identity); - - // Forward message to client if it's not a READY - zframe_t *frame = zmsg_first(msg); - if (memcmp(zframe_data(frame), WORKER_READY, strlen(WORKER_READY)) == 0) { - zmsg_destroy(&msg); - } else { - zmsg_send(&msg, frontend); - if (--client_nbr == 0) - break; // Exit after N messages - } - } - if (items[1].revents & ZMQ_POLLIN) { - // Get client request, route to first available worker - zmsg_t *msg = zmsg_recv(frontend); - if (msg) { + zlist_append(workers, identity); + + // Forward message to client if it's not a READY + zframe_t *frame = zmsg_first(msg); + if (memcmp(zframe_data(frame), WORKER_READY, strlen(WORKER_READY)) == 0) { + zmsg_destroy(&msg); + } else { + zmsg_send(&msg, frontend); + if (--client_nbr == 0) + break; // Exit after N messages + } + } + if (items[1].revents & ZMQ_POLLIN) { + // Get client request, route to first available worker + zmsg_t *msg = zmsg_recv(frontend); + if (msg) { #if 0 - // zmsg_wrap is DEPRECATED as unsafe - zmsg_wrap(msg, (zframe_t *)zlist_pop(workers)); + // zmsg_wrap is DEPRECATED as unsafe + zmsg_wrap(msg, (zframe_t *)zlist_pop(workers)); #else - zmsg_pushmem(msg, NULL, 0); // delimiter - zmsg_push(msg, (zframe_t *)zlist_pop(workers)); + zmsg_pushmem(msg, NULL, 0); // delimiter + zmsg_push(msg, (zframe_t *)zlist_pop(workers)); #endif - zmsg_send(&msg, backend); - } - } - } - // When we're done, clean up properly - while (zlist_size(workers)) { - zframe_t *frame = (zframe_t *)zlist_pop(workers); - zframe_destroy(&frame); - } - zlist_destroy(&workers); - zctx_destroy(&ctx); - return 0; + zmsg_send(&msg, backend); + } + } + } + // When we're done, clean up properly + while (zlist_size(workers)) { + zframe_t *frame = (zframe_t *)zlist_pop(workers); + zframe_destroy(&frame); + } + zlist_destroy(&workers); + zctx_destroy(&ctx); + return 0; } diff --git a/examples/C++/mdbroker.cpp b/examples/C++/mdbroker.cpp index 5294d39cf..dae545010 100644 --- a/examples/C++/mdbroker.cpp +++ b/examples/C++/mdbroker.cpp @@ -99,7 +99,7 @@ class broker { m_socket->bind(m_endpoint.c_str()); s_console ("I: MDP broker/0.1.1 is active at %s", endpoint.c_str()); } - + private: // --------------------------------------------------------------------- @@ -114,9 +114,9 @@ class broker { { if ((*wrk)->m_expiry <= now) toCull.push_back(*wrk); - } + } for (std::deque::iterator wrk = toCull.begin(); wrk != toCull.end(); ++wrk) - { + { if (m_verbose) { s_console ("I: deleting expired worker: %s", (*wrk)->m_identity.c_str()); @@ -168,7 +168,7 @@ class broker { if ((*next)->m_expiry > (*wrk)->m_expiry) wrk = next; } - + zmsg *msg = srv->m_requests.front(); srv->m_requests.pop_front(); worker_send (*wrk, (char*)MDPW_REQUEST, "", msg); @@ -382,7 +382,7 @@ class broker { service_dispatch (srv, msg); } } - + public: // Get and process messages forever or until interrupted @@ -470,5 +470,3 @@ int main (int argc, char *argv []) return 0; } - - diff --git a/examples/C++/mspoller.cpp b/examples/C++/mspoller.cpp index bd1b81bb0..15c752bd0 100644 --- a/examples/C++/mspoller.cpp +++ b/examples/C++/mspoller.cpp @@ -16,7 +16,7 @@ int main (int argc, char *argv[]) // Connect to weather server zmq::socket_t subscriber(context, ZMQ_SUB); - subscriber.connect("tcp://localhost:5556"); + subscriber.connect("tcp://localhost:5556"); subscriber.setsockopt(ZMQ_SUBSCRIBE, "10001 ", 6); // Initialize poll set @@ -28,7 +28,7 @@ int main (int argc, char *argv[]) while (1) { zmq::message_t message; zmq::poll (&items [0], 2, -1); - + if (items [0].revents & ZMQ_POLLIN) { receiver.recv(&message); // Process task diff --git a/examples/C++/msreader.cpp b/examples/C++/msreader.cpp index 3bf18425a..13f0744df 100644 --- a/examples/C++/msreader.cpp +++ b/examples/C++/msreader.cpp @@ -24,7 +24,7 @@ int main (int argc, char *argv[]) // Process messages from both sockets // We prioritize traffic from the task ventilator while (1) { - + // Process any waiting tasks bool rc; do { @@ -33,7 +33,7 @@ int main (int argc, char *argv[]) // process task } } while(rc == true); - + // Process any waiting weather updates do { zmq::message_t update; @@ -42,7 +42,7 @@ int main (int argc, char *argv[]) } } while(rc == true); - + // No activity, so sleep for 1 msec s_sleep(1); } diff --git a/examples/C++/mtrelay.cpp b/examples/C++/mtrelay.cpp index 85ece1b79..ea9dbf456 100644 --- a/examples/C++/mtrelay.cpp +++ b/examples/C++/mtrelay.cpp @@ -8,26 +8,26 @@ // Step 1 pushes one message to step 2 void *step1 (void *arg) { - - zmq::context_t * context = static_cast(arg); - - // Signal downstream to step 2 - zmq::socket_t sender (*context, ZMQ_PAIR); - sender.connect("inproc://step2"); - s_send (sender, ""); + zmq::context_t * context = static_cast(arg); - return (NULL); + // Signal downstream to step 2 + zmq::socket_t sender (*context, ZMQ_PAIR); + sender.connect("inproc://step2"); + + s_send (sender, ""); + + return (NULL); } // Step 2 relays the signal to step 3 void *step2 (void *arg) { - zmq::context_t * context = static_cast(arg); - + zmq::context_t * context = static_cast(arg); + // Bind to inproc: endpoint, then start upstream thread - zmq::socket_t receiver (*context, ZMQ_PAIR); + zmq::socket_t receiver (*context, ZMQ_PAIR); receiver.bind("inproc://step2"); pthread_t thread; @@ -47,8 +47,8 @@ void *step2 (void *arg) { // Main program starts steps 1 and 2 and acts as step 3 int main () { - - zmq::context_t context(1); + + zmq::context_t context(1); // Bind to inproc: endpoint, then start upstream thread zmq::socket_t receiver (context, ZMQ_PAIR); @@ -59,7 +59,7 @@ int main () { // Wait for signal s_recv (receiver); - + std::cout << "Test successful!" << std::endl; return 0; diff --git a/examples/C++/pathosub.cpp b/examples/C++/pathosub.cpp index a948f9082..683905361 100644 --- a/examples/C++/pathosub.cpp +++ b/examples/C++/pathosub.cpp @@ -23,8 +23,8 @@ int main (int argc, char *argv []) subscriber.setsockopt( ZMQ_SUBSCRIBE, ss.str().c_str(), ss.str().size()); while (1) { - std::string topic = s_recv (subscriber); - std::string data = s_recv (subscriber); + std::string topic = s_recv (subscriber); + std::string data = s_recv (subscriber); if (topic != ss.str()) break; std::cout << data << std::endl; diff --git a/examples/C++/psenvsub.cpp b/examples/C++/psenvsub.cpp index cf2c9fa4c..f4a6912aa 100644 --- a/examples/C++/psenvsub.cpp +++ b/examples/C++/psenvsub.cpp @@ -12,12 +12,12 @@ int main () { subscriber.setsockopt( ZMQ_SUBSCRIBE, "B", 1); while (1) { - - // Read envelope with address - std::string address = s_recv (subscriber); - // Read message contents - std::string contents = s_recv (subscriber); - + + // Read envelope with address + std::string address = s_recv (subscriber); + // Read message contents + std::string contents = s_recv (subscriber); + std::cout << "[" << address << "] " << contents << std::endl; } return 0; diff --git a/examples/C++/rrclient.cpp b/examples/C++/rrclient.cpp index 40a54cf83..f67400ea7 100644 --- a/examples/C++/rrclient.cpp +++ b/examples/C++/rrclient.cpp @@ -4,20 +4,20 @@ // #include "zhelpers.hpp" - + int main (int argc, char *argv[]) { zmq::context_t context(1); - zmq::socket_t requester(context, ZMQ_REQ); - requester.connect("tcp://localhost:5559"); - - for( int request = 0 ; request < 10 ; request++) { - - s_send (requester, std::string("Hello")); + zmq::socket_t requester(context, ZMQ_REQ); + requester.connect("tcp://localhost:5559"); + + for( int request = 0 ; request < 10 ; request++) { + + s_send (requester, std::string("Hello")); std::string string = s_recv (requester); - - std::cout << "Received reply " << request - << " [" << string << "]" << std::endl; - } + + std::cout << "Received reply " << request + << " [" << string << "]" << std::endl; + } } diff --git a/examples/C++/rrworker.cpp b/examples/C++/rrworker.cpp index 9785e88cf..3f379d6e0 100644 --- a/examples/C++/rrworker.cpp +++ b/examples/C++/rrworker.cpp @@ -6,27 +6,26 @@ #include "zhelpers.hpp" - + int main (int argc, char *argv[]) { zmq::context_t context(1); - zmq::socket_t responder(context, ZMQ_REP); - responder.connect("tcp://localhost:5560"); - - while(1) - { - // Wait for next request from client - std::string string = s_recv (responder); - - std::cout << "Received request: " << string << std::endl; - - // Do some 'work' + zmq::socket_t responder(context, ZMQ_REP); + responder.connect("tcp://localhost:5560"); + + while(1) + { + // Wait for next request from client + std::string string = s_recv (responder); + + std::cout << "Received request: " << string << std::endl; + + // Do some 'work' sleep (1); - + // Send reply back to client - s_send (responder, std::string("World")); - - } -} + s_send (responder, std::string("World")); + } +} diff --git a/examples/C++/syncpub.cpp b/examples/C++/syncpub.cpp index 5f375b858..ac4926e7e 100644 --- a/examples/C++/syncpub.cpp +++ b/examples/C++/syncpub.cpp @@ -8,7 +8,7 @@ #define SUBSCRIBERS_EXPECTED 10 int main () { - zmq::context_t context(1); + zmq::context_t context(1); // Socket to talk to clients zmq::socket_t publisher (context, ZMQ_PUB); @@ -25,23 +25,23 @@ int main () { // Get synchronization from subscribers int subscribers = 0; while (subscribers < SUBSCRIBERS_EXPECTED) { - - // - wait for synchronization request - s_recv (syncservice); - - // - send synchronization reply - s_send (syncservice, ""); + + // - wait for synchronization request + s_recv (syncservice); + + // - send synchronization reply + s_send (syncservice, ""); subscribers++; } - + // Now broadcast exactly 1M updates followed by END int update_nbr; - for (update_nbr = 0; update_nbr < 1000000; update_nbr++) { - s_send (publisher, "Rhubarb"); - } - + for (update_nbr = 0; update_nbr < 1000000; update_nbr++) { + s_send (publisher, "Rhubarb"); + } + s_send (publisher, "END"); sleep (1); // Give 0MQ time to flush output diff --git a/examples/C++/taskwork2.cpp b/examples/C++/taskwork2.cpp index f1ccbef84..4cefa6b0b 100644 --- a/examples/C++/taskwork2.cpp +++ b/examples/C++/taskwork2.cpp @@ -9,7 +9,7 @@ int main (int argc, char *argv[]) { zmq::context_t context(1); - + // Socket to receive messages on zmq::socket_t receiver(context, ZMQ_PULL); receiver.connect("tcp://localhost:5557"); @@ -32,13 +32,13 @@ int main (int argc, char *argv[]) while (1) { zmq::message_t message; zmq::poll (&items [0], 2, -1); - + if (items [0].revents & ZMQ_POLLIN) { receiver.recv(&message); // Process task int workload; // Workload in msecs - + std::string sdata(static_cast(message.data()), message.size()); std::istringstream iss(sdata); iss >> workload; @@ -56,7 +56,7 @@ int main (int argc, char *argv[]) } // Any waiting controller command acts as 'KILL' if (items [1].revents & ZMQ_POLLIN) { - std::cout << std::endl; + std::cout << std::endl; break; // Exit loop } } diff --git a/examples/C++/wuclient.cpp b/examples/C++/wuclient.cpp index aa6ea49e3..f71884c3c 100644 --- a/examples/C++/wuclient.cpp +++ b/examples/C++/wuclient.cpp @@ -18,7 +18,7 @@ int main (int argc, char *argv[]) subscriber.connect("tcp://localhost:5556"); // Subscribe to zipcode, default is NYC, 10001 - const char *filter = (argc > 1)? argv [1]: "10001 "; + const char *filter = (argc > 1)? argv [1]: "10001 "; subscriber.setsockopt(ZMQ_SUBSCRIBE, filter, strlen (filter)); // Process 100 updates @@ -32,12 +32,12 @@ int main (int argc, char *argv[]) subscriber.recv(update, zmq::recv_flags::none); std::istringstream iss(static_cast(update.data())); - iss >> zipcode >> temperature >> relhumidity ; + iss >> zipcode >> temperature >> relhumidity ; - total_temp += temperature; + total_temp += temperature; } - std::cout << "Average temperature for zipcode '"<< filter - <<"' was "<<(int) (total_temp / update_nbr) <<"F" - << std::endl; + std::cout << "Average temperature for zipcode '" << filter + << "' was " << (int) (total_temp / update_nbr) << "F" + << std::endl; return 0; } diff --git a/examples/C++/wuserver.cpp b/examples/C++/wuserver.cpp index 5775b9937..cddfd2810 100644 --- a/examples/C++/wuserver.cpp +++ b/examples/C++/wuserver.cpp @@ -20,7 +20,7 @@ int main () { zmq::context_t context (1); zmq::socket_t publisher (context, zmq::socket_type::pub); publisher.bind("tcp://*:5556"); - publisher.bind("ipc://weather.ipc"); // Not usable on Windows. + publisher.bind("ipc://weather.ipc"); // Not usable on Windows. // Initialize random number generator srandom ((unsigned) time (NULL)); @@ -36,7 +36,7 @@ int main () { // Send message to all subscribers zmq::message_t message(20); snprintf ((char *) message.data(), 20 , - "%05d %d %d", zipcode, temperature, relhumidity); + "%05d %d %d", zipcode, temperature, relhumidity); publisher.send(message, zmq::send_flags::none); } diff --git a/examples/C++/zhelpers.hpp b/examples/C++/zhelpers.hpp index 004cc9347..8228366df 100644 --- a/examples/C++/zhelpers.hpp +++ b/examples/C++/zhelpers.hpp @@ -30,7 +30,7 @@ // On some version of Windows, POSIX subsystem is not installed by default. // So define srandom and random ourself. -// +// #if (defined (WIN32)) # define srandom srand # define random rand @@ -43,29 +43,29 @@ #define snprintf c99_snprintf #define vsnprintf c99_vsnprintf - inline int c99_vsnprintf(char *outBuf, size_t size, const char *format, va_list ap) - { - int count = -1; + inline int c99_vsnprintf(char *outBuf, size_t size, const char *format, va_list ap) + { + int count = -1; - if (size != 0) - count = _vsnprintf_s(outBuf, size, _TRUNCATE, format, ap); - if (count == -1) - count = _vscprintf(format, ap); + if (size != 0) + count = _vsnprintf_s(outBuf, size, _TRUNCATE, format, ap); + if (count == -1) + count = _vscprintf(format, ap); - return count; - } + return count; + } - inline int c99_snprintf(char *outBuf, size_t size, const char *format, ...) - { - int count; - va_list ap; + inline int c99_snprintf(char *outBuf, size_t size, const char *format, ...) + { + int count; + va_list ap; - va_start(ap, format); - count = c99_vsnprintf(outBuf, size, format, ap); - va_end(ap); + va_start(ap, format); + count = c99_vsnprintf(outBuf, size, format, ap); + va_end(ap); - return count; - } + return count; + } #endif @@ -76,20 +76,20 @@ // Caller must free returned string. inline static char * s_recv(void *socket, int flags = 0) { - zmq_msg_t message; - zmq_msg_init(&message); + zmq_msg_t message; + zmq_msg_init(&message); - int rc = zmq_msg_recv(&message, socket, flags); + int rc = zmq_msg_recv(&message, socket, flags); - if (rc < 0) - return nullptr; // Context terminated, exit + if (rc < 0) + return nullptr; // Context terminated, exit - size_t size = zmq_msg_size(&message); - char *string = (char*)malloc(size + 1); - memcpy(string, zmq_msg_data(&message), size); - zmq_msg_close(&message); - string[size] = 0; - return (string); + size_t size = zmq_msg_size(&message); + char *string = (char*)malloc(size + 1); + memcpy(string, zmq_msg_data(&message), size); + zmq_msg_close(&message); + string[size] = 0; + return (string); } // Receive 0MQ string from socket and convert into string @@ -104,27 +104,27 @@ s_recv (zmq::socket_t & socket, int flags = 0) { inline static bool s_recv(zmq::socket_t & socket, std::string & ostring, int flags = 0) { - zmq::message_t message; - bool rc = socket.recv(&message, flags); - - if (rc) { - ostring = std::string(static_cast(message.data()), message.size()); - } - - return (rc); + zmq::message_t message; + bool rc = socket.recv(&message, flags); + + if (rc) { + ostring = std::string(static_cast(message.data()), message.size()); + } + + return (rc); } // Convert C string to 0MQ string and send to socket inline static int s_send(void *socket, const char *string, int flags = 0) { - int rc; - zmq_msg_t message; - zmq_msg_init_size(&message, strlen(string)); - memcpy(zmq_msg_data(&message), string, strlen(string)); - rc = zmq_msg_send(&message, socket, flags); - assert(-1 != rc); - zmq_msg_close(&message); - return (rc); + int rc; + zmq_msg_t message; + zmq_msg_init_size(&message, strlen(string)); + memcpy(zmq_msg_data(&message), string, strlen(string)); + rc = zmq_msg_send(&message, socket, flags); + assert(-1 != rc); + zmq_msg_close(&message); + return (rc); } // Convert string to 0MQ string and send to socket @@ -141,15 +141,15 @@ s_send (zmq::socket_t & socket, const std::string & string, int flags = 0) { // Sends string as 0MQ string, as multipart non-terminal inline static int s_sendmore(void *socket, char *string) { - int rc; - zmq_msg_t message; - zmq_msg_init_size(&message, strlen(string)); - memcpy(zmq_msg_data(&message), string, strlen(string)); - //rc = zmq_send(socket, string, strlen(string), ZMQ_SNDMORE); - rc = zmq_msg_send(&message, socket, ZMQ_SNDMORE); - assert(-1 != rc); - zmq_msg_close(&message); - return (rc); + int rc; + zmq_msg_t message; + zmq_msg_init_size(&message, strlen(string)); + memcpy(zmq_msg_data(&message), string, strlen(string)); + //rc = zmq_send(socket, string, strlen(string), ZMQ_SNDMORE); + rc = zmq_msg_send(&message, socket, ZMQ_SNDMORE); + assert(-1 != rc); + zmq_msg_close(&message); + return (rc); } // Sends string as 0MQ string, as multipart non-terminal @@ -210,7 +210,7 @@ s_dump (zmq::socket_t & socket) // Set simple random printable identity on socket // Caution: // DO NOT call this version of s_set_id from multiple threads on MS Windows -// since s_set_id will call rand() on MS Windows. rand(), however, is not +// since s_set_id will call rand() on MS Windows. rand(), however, is not // reentrant or thread-safe. See issue #521. inline std::string s_set_id (zmq::socket_t & socket) @@ -264,13 +264,13 @@ inline static int64_t s_clock (void) { #if (defined (WIN32)) - FILETIME fileTime; - GetSystemTimeAsFileTime(&fileTime); - unsigned __int64 largeInt = fileTime.dwHighDateTime; - largeInt <<= 32; - largeInt |= fileTime.dwLowDateTime; - largeInt /= 10000; // FILETIME is in units of 100 nanoseconds - return (int64_t)largeInt; + FILETIME fileTime; + GetSystemTimeAsFileTime(&fileTime); + unsigned __int64 largeInt = fileTime.dwHighDateTime; + largeInt <<= 32; + largeInt |= fileTime.dwLowDateTime; + largeInt /= 10000; // FILETIME is in units of 100 nanoseconds + return (int64_t)largeInt; #else struct timeval tv; gettimeofday (&tv, NULL); diff --git a/examples/C/.gitignore b/examples/C/.gitignore index b67cde10e..eb185b95e 100644 --- a/examples/C/.gitignore +++ b/examples/C/.gitignore @@ -3,7 +3,13 @@ core *.lst *.log hwclient +hwclient2 +hwclient3 +hwclient4 hwserver +hwserver2 +hwserver3 +hwserver4 msgqueue mspoller msreader @@ -54,6 +60,7 @@ mdclient mdbroker mdworker interrupt +mktestdata mmiecho mdclient2 callgrind.out.* @@ -102,4 +109,3 @@ udpping3 eagain rtreq dechat - diff --git a/examples/C/hwclient2.c b/examples/C/hwclient2.c index afb8e9367..679372cc6 100644 --- a/examples/C/hwclient2.c +++ b/examples/C/hwclient2.c @@ -8,43 +8,43 @@ send "Hello" to server,expect receive "World" int main (void) { - printf ("Connecting to hello world server...\n"); - void *context = zmq_ctx_new (); - void *requester = zmq_socket (context, ZMQ_DEALER); - zmq_connect (requester, "tcp://localhost:5555"); - int request_nbr; //request number - int reply_nbr = 0; //receive respond number - for (request_nbr = 0; request_nbr < 10; request_nbr++) - { - char buffer [10]; - memset(buffer,0,sizeof(buffer)); - printf ("Sending request msg: Hello NO=%d...\n", request_nbr+1); - //send request msg to server - s_sendmore(requester,""); //send multi part msg,the first part is empty part - zmq_send (requester, "Hello", 5, 0); //the second part is your request msg - //receive reply msg - int len; - len = zmq_recv (requester, buffer, 10, 0); - if(len == -1){ - printf("Error:%s\n", zmq_strerror(errno)); - exit(-1); - } - //if the first part you received is empty part,then continue receiving next part - if (strcmp(buffer,"") == 0){ - memset(buffer,0,sizeof(buffer)); - len = zmq_recv(requester, buffer, 10, 0); - if(len == -1){ - printf("Error:%s\n", zmq_strerror(errno)); - exit(-1); - } - printf("Received respond msg: %s NO=%d\n\n", buffer,++reply_nbr); - } - //if the first part you received is not empty part,discard the whole ZMQ msg - else{ - printf("Discard the ZMQ message!\n"); - } - } - zmq_close(requester); - zmq_ctx_destroy (context); - return 0; + printf ("Connecting to hello world server...\n"); + void *context = zmq_ctx_new (); + void *requester = zmq_socket (context, ZMQ_DEALER); + zmq_connect (requester, "tcp://localhost:5555"); + int request_nbr; //request number + int reply_nbr = 0; //receive respond number + for (request_nbr = 0; request_nbr < 10; request_nbr++) + { + char buffer [10]; + memset(buffer,0,sizeof(buffer)); + printf ("Sending request msg: Hello NO=%d...\n", request_nbr+1); + //send request msg to server + s_sendmore(requester,""); //send multi part msg,the first part is empty part + zmq_send (requester, "Hello", 5, 0); //the second part is your request msg + //receive reply msg + int len; + len = zmq_recv (requester, buffer, 10, 0); + if(len == -1){ + printf("Error:%s\n", zmq_strerror(errno)); + exit(-1); + } + //if the first part you received is empty part,then continue receiving next part + if (strcmp(buffer,"") == 0){ + memset(buffer,0,sizeof(buffer)); + len = zmq_recv(requester, buffer, 10, 0); + if(len == -1){ + printf("Error:%s\n", zmq_strerror(errno)); + exit(-1); + } + printf("Received respond msg: %s NO=%d\n\n", buffer,++reply_nbr); + } + //if the first part you received is not empty part,discard the whole ZMQ msg + else{ + printf("Discard the ZMQ message!\n"); + } + } + zmq_close(requester); + zmq_ctx_destroy (context); + return 0; } diff --git a/examples/C/hwclient3.c b/examples/C/hwclient3.c index 6e6e913f5..e0d371da5 100644 --- a/examples/C/hwclient3.c +++ b/examples/C/hwclient3.c @@ -16,12 +16,12 @@ int main (void) zmq_connect (requester, "tcp://localhost:5555"); int request_nbr; for (request_nbr = 0; request_nbr < 10; request_nbr++) - { + { char buffer [10]={0}; - //send request msg + //send request msg printf ("Sending request msg: Hello NO=%d...\n", request_nbr+1); zmq_send (requester, "Hello", 5, 0); - //receive respond msg + //receive respond msg zmq_recv (requester, buffer, 10, 0); printf ("Received respond msg: %s NO=%d\n\n", buffer,request_nbr+1); } diff --git a/examples/C/hwclient4.c b/examples/C/hwclient4.c index 1466fa588..503351578 100644 --- a/examples/C/hwclient4.c +++ b/examples/C/hwclient4.c @@ -16,12 +16,12 @@ int main (void) zmq_connect (requester, "tcp://localhost:5555"); int request_nbr; for (request_nbr = 0; request_nbr < 10; request_nbr++) - { + { printf ("Sending request msg: Hello NO=%d...\n", request_nbr+1); - //send request msg + //send request msg s_sendmore(requester, ""); //send empty delimiter frame s_send(requester, "Hello"); //send data frame - //recv reply msg + //recv reply msg char *empty=s_recv(requester); assert(empty[0] == 0); char *reply=s_recv(requester); diff --git a/examples/C/hwserver2.c b/examples/C/hwserver2.c index ac52134c0..27055ba57 100644 --- a/examples/C/hwserver2.c +++ b/examples/C/hwserver2.c @@ -14,7 +14,7 @@ int main (void) int rc = zmq_bind (responder, "tcp://*:5555"); assert (rc == 0); while (1) - { + { char buffer [10]={0}; zmq_recv (responder, buffer, 10, 0); printf("Received request msg: %s\n", buffer); diff --git a/examples/C/hwserver3.c b/examples/C/hwserver3.c index 48e9efec4..594ab2c4c 100644 --- a/examples/C/hwserver3.c +++ b/examples/C/hwserver3.c @@ -14,7 +14,7 @@ int main (void) assert (rc == 0); while (1) - { + { char *identity, *string, *content; //receive client request msg identity=s_recv(responder); //receive the first part is identity frame diff --git a/examples/C/hwserver4.c b/examples/C/hwserver4.c index 89dd5b9ec..fbb3892fc 100644 --- a/examples/C/hwserver4.c +++ b/examples/C/hwserver4.c @@ -14,7 +14,7 @@ int main (void) assert (rc == 0); while (1) - { + { char identity[10]={0}; //recv client`s request msg //the 1st received frame is identity frame diff --git a/examples/C/lbbroker2.c b/examples/C/lbbroker2.c index 44ab3e530..478934647 100644 --- a/examples/C/lbbroker2.c +++ b/examples/C/lbbroker2.c @@ -12,25 +12,25 @@ static void client_task(zsock_t *pipe, void *args) { - // Signal caller zactor has started - zsock_signal(pipe, 0); - zsock_t *client = zsock_new(ZMQ_REQ); + // Signal caller zactor has started + zsock_signal(pipe, 0); + zsock_t *client = zsock_new(ZMQ_REQ); #if (defined (WIN32)) - zsock_connect(client, "tcp://localhost:5672"); // frontend + zsock_connect(client, "tcp://localhost:5672"); // frontend #else - zsock_connect(client, "ipc://frontend.ipc"); + zsock_connect(client, "ipc://frontend.ipc"); #endif - // Send request, get reply - zstr_send(client, "HELLO"); - char *reply = zstr_recv(client); - if (reply) { - printf("Client: %s\n", reply); - free(reply); - } + // Send request, get reply + zstr_send(client, "HELLO"); + char *reply = zstr_recv(client); + if (reply) { + printf("Client: %s\n", reply); + free(reply); + } - zsock_destroy(&client); + zsock_destroy(&client); } // Worker using REQ socket to do load-balancing @@ -38,43 +38,43 @@ client_task(zsock_t *pipe, void *args) static void worker_task(zsock_t *pipe, void *args) { - // Signal caller zactor has started - zsock_signal(pipe, 0); - zsock_t *worker = zsock_new(ZMQ_REQ); + // Signal caller zactor has started + zsock_signal(pipe, 0); + zsock_t *worker = zsock_new(ZMQ_REQ); #if (defined (WIN32)) - zsock_connect(worker, "tcp://localhost:5673"); // backend + zsock_connect(worker, "tcp://localhost:5673"); // backend #else - zsock_connect(worker, "ipc://backend.ipc"); + zsock_connect(worker, "ipc://backend.ipc"); #endif // Tell broker we're ready for work - zframe_t *frame = zframe_new(WORKER_READY, strlen(WORKER_READY)); - zframe_send(&frame, worker, 0); - - // Process messages as they arrive - zpoller_t *poll = zpoller_new(pipe, worker, NULL); - while (true) { - zsock_t *ready = zpoller_wait(poll, -1); - if (ready == pipe) + zframe_t *frame = zframe_new(WORKER_READY, strlen(WORKER_READY)); + zframe_send(&frame, worker, 0); + + // Process messages as they arrive + zpoller_t *poll = zpoller_new(pipe, worker, NULL); + while (true) { + zsock_t *ready = zpoller_wait(poll, -1); + if (ready == pipe) break; // Done - assert(ready == worker); - zmsg_t *msg = zmsg_recv(worker); - if (!msg) - break; // Interrupted - zframe_print(zmsg_last(msg), "Worker: "); - zframe_reset(zmsg_last(msg), "OK", 2); - zmsg_send(&msg, worker); - } - - if (frame) - zframe_destroy(&frame); - zsock_destroy(&worker); - zpoller_destroy(&poll); - - // Signal done - zsock_signal(pipe, 0); + assert(ready == worker); + zmsg_t *msg = zmsg_recv(worker); + if (!msg) + break; // Interrupted + zframe_print(zmsg_last(msg), "Worker: "); + zframe_reset(zmsg_last(msg), "OK", 2); + zmsg_send(&msg, worker); + } + + if (frame) + zframe_destroy(&frame); + zsock_destroy(&worker); + zpoller_destroy(&poll); + + // Signal done + zsock_signal(pipe, 0); } // .split main task @@ -84,102 +84,102 @@ worker_task(zsock_t *pipe, void *args) int main(void) { - zsock_t *frontend = zsock_new(ZMQ_ROUTER); - zsock_t *backend = zsock_new(ZMQ_ROUTER); + zsock_t *frontend = zsock_new(ZMQ_ROUTER); + zsock_t *backend = zsock_new(ZMQ_ROUTER); - // IPC doesn't yet work on MS Windows. + // IPC doesn't yet work on MS Windows. #if (defined (WIN32)) - zsock_bind(frontend, "tcp://*:5672"); - zsock_bind(backend, "tcp://*:5673"); + zsock_bind(frontend, "tcp://*:5672"); + zsock_bind(backend, "tcp://*:5673"); #else - zsock_bind(frontend, "ipc://frontend.ipc"); - zsock_bind(backend, "ipc://backend.ipc"); + zsock_bind(frontend, "ipc://frontend.ipc"); + zsock_bind(backend, "ipc://backend.ipc"); #endif - int actor_nbr = 0; - zactor_t *actors[NBR_CLIENTS + NBR_WORKERS]; - - int client_nbr; - for (client_nbr = 0; client_nbr < NBR_CLIENTS; client_nbr++) - actors[actor_nbr++] = zactor_new(client_task, NULL); - int worker_nbr; - for (worker_nbr = 0; worker_nbr < NBR_WORKERS; worker_nbr++) - actors[actor_nbr++] = zactor_new(worker_task, NULL); - - // Queue of available workers - zlist_t *workers = zlist_new(); - - // .split main load-balancer loop - // Here is the main loop for the load balancer. It works the same way - // as the previous example, but is a lot shorter because CZMQ gives - // us an API that does more with fewer calls: - zpoller_t *poll1 = zpoller_new(backend, NULL); - zpoller_t *poll2 = zpoller_new(backend, frontend, NULL); - while (true) { - // Poll frontend only if we have available workers - zpoller_t *poll = zlist_size(workers) ? poll2 : poll1; - zsock_t *ready = zpoller_wait(poll, -1); - if (ready == NULL) - break; // Interrupted - - // Handle worker activity on backend - if (ready == backend) { - // Use worker identity for load-balancing - zmsg_t *msg = zmsg_recv(backend); - if (!msg) - break; // Interrupted + int actor_nbr = 0; + zactor_t *actors[NBR_CLIENTS + NBR_WORKERS]; + + int client_nbr; + for (client_nbr = 0; client_nbr < NBR_CLIENTS; client_nbr++) + actors[actor_nbr++] = zactor_new(client_task, NULL); + int worker_nbr; + for (worker_nbr = 0; worker_nbr < NBR_WORKERS; worker_nbr++) + actors[actor_nbr++] = zactor_new(worker_task, NULL); + + // Queue of available workers + zlist_t *workers = zlist_new(); + + // .split main load-balancer loop + // Here is the main loop for the load balancer. It works the same way + // as the previous example, but is a lot shorter because CZMQ gives + // us an API that does more with fewer calls: + zpoller_t *poll1 = zpoller_new(backend, NULL); + zpoller_t *poll2 = zpoller_new(backend, frontend, NULL); + while (true) { + // Poll frontend only if we have available workers + zpoller_t *poll = zlist_size(workers) ? poll2 : poll1; + zsock_t *ready = zpoller_wait(poll, -1); + if (ready == NULL) + break; // Interrupted + + // Handle worker activity on backend + if (ready == backend) { + // Use worker identity for load-balancing + zmsg_t *msg = zmsg_recv(backend); + if (!msg) + break; // Interrupted #if 0 - // zmsg_unwrap is DEPRECATED as over-engineered, poor style - zframe_t *identity = zmsg_unwrap(msg); + // zmsg_unwrap is DEPRECATED as over-engineered, poor style + zframe_t *identity = zmsg_unwrap(msg); #else - zframe_t *identity = zmsg_pop(msg); - zframe_t *delimiter = zmsg_pop(msg); - zframe_destroy(&delimiter); + zframe_t *identity = zmsg_pop(msg); + zframe_t *delimiter = zmsg_pop(msg); + zframe_destroy(&delimiter); #endif - zlist_append(workers, identity); - - // Forward message to client if it's not a READY - zframe_t *frame = zmsg_first(msg); - if (memcmp(zframe_data(frame), WORKER_READY, strlen(WORKER_READY)) == 0) { - zmsg_destroy(&msg); - } else { - zmsg_send(&msg, frontend); - if (--client_nbr == 0) - break; // Exit after N messages - } - } - else if (ready == frontend) { - // Get client request, route to first available worker - zmsg_t *msg = zmsg_recv(frontend); - if (msg) { + zlist_append(workers, identity); + + // Forward message to client if it's not a READY + zframe_t *frame = zmsg_first(msg); + if (memcmp(zframe_data(frame), WORKER_READY, strlen(WORKER_READY)) == 0) { + zmsg_destroy(&msg); + } else { + zmsg_send(&msg, frontend); + if (--client_nbr == 0) + break; // Exit after N messages + } + } + else if (ready == frontend) { + // Get client request, route to first available worker + zmsg_t *msg = zmsg_recv(frontend); + if (msg) { #if 0 - // zmsg_wrap is DEPRECATED as unsafe - zmsg_wrap(msg, (zframe_t *)zlist_pop(workers)); + // zmsg_wrap is DEPRECATED as unsafe + zmsg_wrap(msg, (zframe_t *)zlist_pop(workers)); #else - zmsg_pushmem(msg, NULL, 0); // delimiter - zmsg_push(msg, (zframe_t *)zlist_pop(workers)); + zmsg_pushmem(msg, NULL, 0); // delimiter + zmsg_push(msg, (zframe_t *)zlist_pop(workers)); #endif - zmsg_send(&msg, backend); - } - } - } - // When we're done, clean up properly - while (zlist_size(workers)) { - zframe_t *frame = (zframe_t *)zlist_pop(workers); - zframe_destroy(&frame); - } - zlist_destroy(&workers); - - for (actor_nbr = 0; actor_nbr < NBR_CLIENTS + NBR_WORKERS; actor_nbr++) { - zactor_destroy(&actors[actor_nbr]); - } - - zpoller_destroy(&poll1); - zpoller_destroy(&poll2); - zsock_destroy(&frontend); - zsock_destroy(&backend); - return 0; + zmsg_send(&msg, backend); + } + } + } + // When we're done, clean up properly + while (zlist_size(workers)) { + zframe_t *frame = (zframe_t *)zlist_pop(workers); + zframe_destroy(&frame); + } + zlist_destroy(&workers); + + for (actor_nbr = 0; actor_nbr < NBR_CLIENTS + NBR_WORKERS; actor_nbr++) { + zactor_destroy(&actors[actor_nbr]); + } + + zpoller_destroy(&poll1); + zpoller_destroy(&poll2); + zsock_destroy(&frontend); + zsock_destroy(&backend); + return 0; }