-
Notifications
You must be signed in to change notification settings - Fork 5
/
universal-sub-xpubxsub.c
144 lines (114 loc) · 3.95 KB
/
universal-sub-xpubxsub.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
/* This software is the client component of the ND-OV system.
* Each multipart message it receives from the ND-OV system
* consisting of an envelope and its data is rebroadcasted
* to all connected clients of the serviceprovider.
*
* Requirements: zeromq3.2
* gcc -lzmq -o universal-sub-xpubxsub universal-sub-xpubxsub.c
*
* Changes:
* - Initial version <[email protected]>
* - zeromq 3.2 compatibility added,
* pubsub binding bugfix <[email protected]>
*/
#include <pwd.h>
#include <stdio.h>
#include <stdint.h>
#include <stdlib.h>
#include <string.h>
#include <assert.h>
#include <zmq.h>
#include <unistd.h>
#include <errno.h>
#include <sys/time.h>
#include <sys/stat.h>
#include <sys/types.h>
int main (int argc, char *argv[]) {
/* Our process ID and Session ID */
pid_t pid, sid;
if (argc < 3) {
printf("%s [subscriber] (filter1 filter2 filterN) [pubsub]\n\nEx.\n" \
"%s tcp://127.0.0.1:7817 tcp://127.0.0.1:7827\n",
argv[0], argv[0]);
exit(-1);
}
/* Fork off the parent process */
pid = fork();
/* If we got a good PID, then we can exit the parent process. */
if (pid > 0) {
exit(EXIT_SUCCESS);
}
/* If forking actually didn't work */
if (pid < 0) {
/* Create a new SID for the child process */
sid = setsid();
/* Close out the standard file descriptors */
close(STDIN_FILENO);
close(STDOUT_FILENO);
close(STDERR_FILENO);
}
chdir("/var/empty");
void *context = zmq_init (1);
void *pubsub = zmq_socket (context, ZMQ_XPUB);
/* Apply a high water mark at the PubSub */
uint64_t hwm = 8192;
zmq_setsockopt (pubsub, ZMQ_SNDHWM, &hwm, sizeof(hwm));
zmq_setsockopt (pubsub, ZMQ_RCVHWM, &hwm, sizeof(hwm));
zmq_bind (pubsub, argv[argc - 1]);
/* Check if we are root */
if (getuid() == 0 || geteuid() == 0) {
struct passwd *pw;
uid_t puid = 65534; /* just use the traditional value */
gid_t pgid = 65534;
/* Now we chroot to this directory, preventing any write access outside it */
chroot("/var/empty");
/* After we bind to the socket and chrooted, we drop priviledges to nobody */
setuid(puid);
setgid(pgid);
}
zmq_pollitem_t items[1];
init:
items[0].socket = zmq_socket (context, ZMQ_SUB);
items[0].events = ZMQ_POLLIN;
/* Apply filters to the subscription from the remote source */
if (argc > 3) {
unsigned int i;
for (i = 2; i < (argc - 1); i++) {
zmq_setsockopt(items[0].socket, ZMQ_SUBSCRIBE, argv[i], strlen(argv[i]));
}
} else {
zmq_setsockopt(items[0].socket, ZMQ_SUBSCRIBE, "", 0);
}
zmq_connect (items[0].socket, argv[1]);
int rc;
size_t more_size = sizeof(int);
/* Ensure that every 60s there is data */
while ((rc = zmq_poll (items, 1, 60 * 1000L)) >= 0) {
if (rc > 0) {
int more;
do {
/* Create an empty 0MQ message to hold the message part */
zmq_msg_t part;
rc = zmq_msg_init (&part);
assert (rc == 0);
/* Block until a message is available to be received from the socket */
rc = zmq_msg_recv (&part, items[0].socket, 0);
assert (rc != -1);
/* Determine if more message parts are to follow */
rc = zmq_getsockopt (items[0].socket, ZMQ_RCVMORE, &more, &more_size);
assert (rc == 0);
/* Send the message, when more is set, apply the flag, otherwise don't */
zmq_msg_send (&part, pubsub, (more ? ZMQ_SNDMORE : 0));
zmq_msg_close (&part);
} while (more);
} else {
zmq_close (items[0].socket);
sleep (1);
goto init;
}
}
zmq_close (items[0].socket);
zmq_close (pubsub);
zmq_ctx_destroy (context);
return 0;
}