gdp-if / mqtt-gateway / mqtt-gdp-gateway.c @ master
History | View | Annotate | Download (17.6 KB)
1 | 68f8751e | Eric Allman | /* vim: set ai sw=4 sts=4 ts=4 :*/
|
---|---|---|---|
2 | |||
3 | 271eed7f | Eric Allman | /*
|
4 | ** ----- BEGIN LICENSE BLOCK -----
|
||
5 | ** MQTT gateway to the Global Data Plane
|
||
6 | ** From the Ubiquitous Swarm Lab, 490 Cory Hall, U.C. Berkeley.
|
||
7 | **
|
||
8 | ** Copyright (c) 2015-2017, Regents of the University of California.
|
||
9 | ** All rights reserved.
|
||
10 | **
|
||
11 | ** Permission is hereby granted, without written agreement and without
|
||
12 | ** license or royalty fees, to use, copy, modify, and distribute this
|
||
13 | ** software and its documentation for any purpose, provided that the above
|
||
14 | ** copyright notice and the following two paragraphs appear in all copies
|
||
15 | ** of this software.
|
||
16 | **
|
||
17 | ** IN NO EVENT SHALL REGENTS BE LIABLE TO ANY PARTY FOR DIRECT, INDIRECT,
|
||
18 | ** SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING LOST
|
||
19 | ** PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION,
|
||
20 | ** EVEN IF REGENTS HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
||
21 | **
|
||
22 | ** REGENTS SPECIFICALLY DISCLAIMS ANY WARRANTIES, INCLUDING, BUT NOT
|
||
23 | ** LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
|
||
24 | ** FOR A PARTICULAR PURPOSE. THE SOFTWARE AND ACCOMPANYING DOCUMENTATION,
|
||
25 | ** IF ANY, PROVIDED HEREUNDER IS PROVIDED "AS IS". REGENTS HAS NO
|
||
26 | ** OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS,
|
||
27 | ** OR MODIFICATIONS.
|
||
28 | ** ----- END LICENSE BLOCK -----
|
||
29 | */
|
||
30 | |||
31 | 68f8751e | Eric Allman | #include <gdp/gdp.h> |
32 | #include <ep/ep_app.h> |
||
33 | #include <ep/ep_dbg.h> |
||
34 | #include <ep/ep_log.h> |
||
35 | 6857407d | Eric Allman | #include <ep/ep_sd.h> |
36 | 68f8751e | Eric Allman | #include <ep/ep_string.h> |
37 | |||
38 | #include <mosquitto.h> |
||
39 | |||
40 | #include <errno.h> |
||
41 | #include <getopt.h> |
||
42 | a8a86417 | Eric Allman | #include <regex.h> |
43 | 68f8751e | Eric Allman | #include <sysexits.h> |
44 | 4bee4508 | Eric Allman | #include <sys/stat.h> |
45 | 68f8751e | Eric Allman | |
46 | |||
47 | 271eed7f | Eric Allman | static EP_DBG Dbg = EP_DBG_INIT("mqtt-gdp-gateway", "MQTT to GDP gateway"); |
48 | 68f8751e | Eric Allman | |
49 | |||
50 | 029f4aec | Eric Allman | bool SkipMetadata = false; // exclude topic, qos, and len on output |
51 | bool DropDups = false; // drop duplicate records |
||
52 | time_t HeartbeatTime = -1; // include dups every so often |
||
53 | time_t WarnMessageInterval = 600; // issue traffic warnings every 10m |
||
54 | f5a0bdf5 | Eric Allman | const char *MqttBroker = NULL; // host name of broker |
55 | 029f4aec | Eric Allman | int MaxRestartBackoff = 30; // max reconnect backoff in seconds |
56 | e8cae538 | Eric Allman | |
57 | |||
58 | 68f8751e | Eric Allman | /*
|
59 | ** Called by Mosquitto to do logging
|
||
60 | */
|
||
61 | |||
62 | void
|
||
63 | log_cb(struct mosquitto *mosq,
|
||
64 | void *udata,
|
||
65 | int level,
|
||
66 | const char *str) |
||
67 | { |
||
68 | const char *level_str = NULL; |
||
69 | |||
70 | switch (level)
|
||
71 | { |
||
72 | case MOSQ_LOG_DEBUG:
|
||
73 | //level_str = "debug";
|
||
74 | break;
|
||
75 | |||
76 | case MOSQ_LOG_INFO:
|
||
77 | level_str = "info";
|
||
78 | break;
|
||
79 | |||
80 | case MOSQ_LOG_WARNING:
|
||
81 | level_str = "warning";
|
||
82 | break;
|
||
83 | |||
84 | case MOSQ_LOG_ERR:
|
||
85 | level_str = "error";
|
||
86 | break;
|
||
87 | |||
88 | case MOSQ_LOG_NOTICE:
|
||
89 | level_str = "notice";
|
||
90 | break;
|
||
91 | |||
92 | default:
|
||
93 | fprintf(stderr, "unknown log level %d\n", level);
|
||
94 | level_str = "???";
|
||
95 | break;
|
||
96 | } |
||
97 | if (level_str != NULL) |
||
98 | fprintf(stderr, "MOSQ(%s): %s\n", level_str, str);
|
||
99 | } |
||
100 | |||
101 | |||
102 | /*
|
||
103 | 1cd5bd6c | Eric Allman | ** GET_TOPIC_INFO --- get information about a given topic
|
104 | **
|
||
105 | ** This implementation is pretty bad.
|
||
106 | */
|
||
107 | |||
108 | struct topic_info
|
||
109 | { |
||
110 | const char *topic_pat; // pattern for topic |
||
111 | 0b2f82a9 | Eric Allman | const char *log_xname; // printable external log name |
112 | 3ca57859 | Eric Allman | gdp_gin_t *gin; // associated GDP Instance
|
113 | a8a86417 | Eric Allman | char *oldrec; // text of old record |
114 | char *currec; // text of current record |
||
115 | size_t recbuflen; // size of previous two buffers
|
||
116 | time_t oldrectime; // time of old record
|
||
117 | 0b2f82a9 | Eric Allman | time_t warntime; // warn if no record written in N seconds
|
118 | time_t lastwarn; // time of last warning
|
||
119 | 1cd5bd6c | Eric Allman | }; |
120 | |||
121 | struct topic_info **Topics; // information about topics |
||
122 | int NTopics = 0; // number of topics allocated |
||
123 | |||
124 | struct topic_info *
|
||
125 | get_topic_info(const struct mosquitto_message *msg) |
||
126 | { |
||
127 | int tno;
|
||
128 | |||
129 | cb03d434 | Eric Allman | ep_dbg_cprintf(Dbg, 22, "get_topic_info(%s): ", msg->topic); |
130 | 1cd5bd6c | Eric Allman | for (tno = 0; tno < NTopics; tno++) |
131 | { |
||
132 | struct topic_info *t = Topics[tno];
|
||
133 | if (t == NULL) |
||
134 | continue;
|
||
135 | bool res;
|
||
136 | int istat = mosquitto_topic_matches_sub(t->topic_pat, msg->topic, &res);
|
||
137 | if (istat != 0 || !res) |
||
138 | continue;
|
||
139 | |||
140 | // topic matches
|
||
141 | cb03d434 | Eric Allman | ep_dbg_cprintf(Dbg, 22, "%p (%s)\n", t, t->topic_pat); |
142 | 1cd5bd6c | Eric Allman | return t;
|
143 | } |
||
144 | |||
145 | // no match
|
||
146 | db5b43ee | Eric Allman | if (ep_dbg_test(Dbg, 22)) |
147 | ep_dbg_printf("not found\n");
|
||
148 | else
|
||
149 | ep_dbg_cprintf(Dbg, 2, "get_topic_info(%s): not found\n", msg->topic); |
||
150 | 1cd5bd6c | Eric Allman | return NULL; |
151 | } |
||
152 | |||
153 | |||
154 | 0b2f82a9 | Eric Allman | struct topic_info *
|
155 | add_topic_info(const char *topic_pat) |
||
156 | 1cd5bd6c | Eric Allman | { |
157 | struct topic_info *t;
|
||
158 | |||
159 | t = ep_mem_zalloc(sizeof *t);
|
||
160 | t->topic_pat = topic_pat; |
||
161 | |||
162 | Topics = ep_mem_realloc(Topics, (NTopics + 1) * sizeof t); |
||
163 | Topics[NTopics++] = t; |
||
164 | 0b2f82a9 | Eric Allman | |
165 | 1cd5bd6c | Eric Allman | ep_dbg_cprintf(Dbg, 3, "add_topic_info(%s): %p\n", topic_pat, t); |
166 | 0b2f82a9 | Eric Allman | return t;
|
167 | 1cd5bd6c | Eric Allman | } |
168 | |||
169 | |||
170 | int
|
||
171 | subscribe_to_all_topics(struct mosquitto *mosq, int qos) |
||
172 | { |
||
173 | int tno;
|
||
174 | |||
175 | for (tno = 0; tno < NTopics; tno++) |
||
176 | { |
||
177 | int istat;
|
||
178 | int mid; // message id --- do we need this? |
||
179 | struct topic_info *t = Topics[tno];
|
||
180 | |||
181 | if (t == NULL) |
||
182 | continue;
|
||
183 | ep_dbg_cprintf(Dbg, 5, "subscribe_to_all_topics(%s)\n", t->topic_pat); |
||
184 | istat = mosquitto_subscribe(mosq, &mid, t->topic_pat, qos); |
||
185 | if (istat != 0) |
||
186 | return istat;
|
||
187 | } |
||
188 | return 0; |
||
189 | } |
||
190 | |||
191 | |||
192 | /*
|
||
193 | a8a86417 | Eric Allman | ** Determine if a payload is a duplicate of the previous record,
|
194 | ** i.e. if the record payload has changed in an interesting way.
|
||
195 | **
|
||
196 | ** Not just strcmp because we treat dates as being identical.
|
||
197 | */
|
||
198 | |||
199 | bool
|
||
200 | record_is_dup(const char *newrec, struct topic_info *t) |
||
201 | { |
||
202 | int istat;
|
||
203 | size_t newreclen = strlen(newrec) + 1;
|
||
204 | struct timeval currectime;
|
||
205 | static bool regex_compiled = false; |
||
206 | static regex_t datepat;
|
||
207 | // regular expression to match ISO 8601 dates (a subset thereof)
|
||
208 | const char *textdatepat = |
||
209 | "([0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}(:[0-9]{2}(\\.[0-9]*)?)?)Z";
|
||
210 | regmatch_t pmatch[1];
|
||
211 | |||
212 | // compile the regular expression if not already done
|
||
213 | if (!regex_compiled)
|
||
214 | { |
||
215 | istat = regcomp(&datepat, textdatepat, REG_EXTENDED); |
||
216 | if (istat != 0) |
||
217 | { |
||
218 | char ebuf[256]; |
||
219 | |||
220 | regerror(istat, &datepat, ebuf, sizeof ebuf);
|
||
221 | ep_app_fatal("Cannot compile regex \"%s\": %s\n",
|
||
222 | textdatepat, ebuf); |
||
223 | } |
||
224 | regex_compiled = true;
|
||
225 | } |
||
226 | |||
227 | // copy the new record to the current record so we can step on it
|
||
228 | if (newreclen > t->recbuflen)
|
||
229 | { |
||
230 | t->currec = ep_mem_realloc(t->currec, newreclen); |
||
231 | t->oldrec = ep_mem_realloc(t->oldrec, newreclen); |
||
232 | t->recbuflen = newreclen; |
||
233 | } |
||
234 | memcpy(t->currec, newrec, newreclen); |
||
235 | |||
236 | while ((istat = regexec(&datepat, t->currec, 1, pmatch, 0)) == 0) |
||
237 | { |
||
238 | if (pmatch[0].rm_so >= 0) |
||
239 | { |
||
240 | size_t i; |
||
241 | |||
242 | ep_dbg_cprintf(Dbg, 81,
|
||
243 | "record_is_dup found date from %zd to %zd\n",
|
||
244 | 95fcea29 | Eric Allman | (size_t) pmatch[0].rm_so,
|
245 | (size_t) pmatch[0].rm_eo);
|
||
246 | a8a86417 | Eric Allman | |
247 | for (i = pmatch[0].rm_so; i < pmatch[0].rm_eo; i++) |
||
248 | t->currec[i] = '_'; // any fixed character will do |
||
249 | } |
||
250 | } |
||
251 | |||
252 | ep_dbg_cprintf(Dbg, 80,
|
||
253 | "record_is_dup: normalized record:\n %s\n",
|
||
254 | t->currec); |
||
255 | |||
256 | gettimeofday(&currectime, NULL);
|
||
257 | |||
258 | // dates are normalized; let's see if there is a match
|
||
259 | if (strcmp(t->oldrec, t->currec) == 0) |
||
260 | { |
||
261 | // yes, they match, but check time
|
||
262 | if (HeartbeatTime > 0 && |
||
263 | currectime.tv_sec >= t->oldrectime + HeartbeatTime) |
||
264 | { |
||
265 | ep_dbg_cprintf(Dbg, 80,
|
||
266 | "dup record, but passed timeout (cur = %ld, old = %ld)\n",
|
||
267 | currectime.tv_sec, t->oldrectime + HeartbeatTime); |
||
268 | } |
||
269 | else
|
||
270 | { |
||
271 | ep_dbg_cprintf(Dbg, 81, "duplicate record\n"); |
||
272 | return true; |
||
273 | } |
||
274 | } |
||
275 | |||
276 | // not a dup --- save currec for next time
|
||
277 | memcpy(t->oldrec, t->currec, newreclen); |
||
278 | t->oldrectime = currectime.tv_sec; |
||
279 | ep_dbg_cprintf(Dbg, 80, "new record\n"); |
||
280 | return false; |
||
281 | } |
||
282 | |||
283 | |||
284 | /*
|
||
285 | 0b2f82a9 | Eric Allman | ** Check all topics to see that some data has been received recently.
|
286 | ** If not, give a warning so we can poke things.
|
||
287 | */
|
||
288 | |||
289 | void
|
||
290 | show_idle(struct topic_info *t, time_t now)
|
||
291 | { |
||
292 | if (t->lastwarn + WarnMessageInterval > now)
|
||
293 | { |
||
294 | // suppress output
|
||
295 | return;
|
||
296 | } |
||
297 | #if HUMAN_OUTPUT
|
||
298 | printf("Topic %s%s%s idle for %ld sec (log %s%s%s)\n",
|
||
299 | EpChar->lquote, t->topic_pat, EpChar->rquote, |
||
300 | now - t->oldrectime, |
||
301 | EpChar->lquote, t->log_xname, EpChar->rquote); |
||
302 | #else
|
||
303 | 7e63faf1 | Eric Allman | printf("{ \"tag\": \"idle mqtt topic\", "
|
304 | f5a0bdf5 | Eric Allman | "\"broker\": \"%s\", "
|
305 | 006a6eee | Eric Allman | "\"topic\": \"%s\", "
|
306 | "\"log\": \"%s\", "
|
||
307 | "\"idle\": %ld }\n",
|
||
308 | f5a0bdf5 | Eric Allman | MqttBroker, t->topic_pat, t->log_xname, now - t->oldrectime); |
309 | 0b2f82a9 | Eric Allman | #endif
|
310 | t->lastwarn = now; |
||
311 | } |
||
312 | |||
313 | void
|
||
314 | check_traffic(void)
|
||
315 | { |
||
316 | int tno;
|
||
317 | struct timeval now;
|
||
318 | |||
319 | gettimeofday(&now, NULL);
|
||
320 | |||
321 | for (tno = 0; tno < NTopics; tno++) |
||
322 | { |
||
323 | struct topic_info *t = Topics[tno];
|
||
324 | |||
325 | if (t->warntime > 0 && t->oldrectime + t->warntime <= now.tv_sec) |
||
326 | { |
||
327 | // nothing has been sent recently
|
||
328 | show_idle(t, now.tv_sec); |
||
329 | } |
||
330 | } |
||
331 | } |
||
332 | |||
333 | |||
334 | /*
|
||
335 | 68f8751e | Eric Allman | ** Called by Mosquitto when a message comes in.
|
336 | */
|
||
337 | |||
338 | void
|
||
339 | message_cb(struct mosquitto *mosq,
|
||
340 | void *udata,
|
||
341 | const struct mosquitto_message *msg) |
||
342 | { |
||
343 | 1cd5bd6c | Eric Allman | struct topic_info *tinfo;
|
344 | 68f8751e | Eric Allman | const char *payload; |
345 | a8a86417 | Eric Allman | size_t paylen; |
346 | 68f8751e | Eric Allman | |
347 | a8a86417 | Eric Allman | ep_dbg_cprintf(Dbg, 65, "message_cb: %s @%d => %s\n", |
348 | 95fcea29 | Eric Allman | msg->topic, msg->qos, (const char *) msg->payload); |
349 | 68f8751e | Eric Allman | |
350 | a8a86417 | Eric Allman | paylen = msg->payloadlen; |
351 | payload = msg->payload; |
||
352 | if (paylen <= 0) |
||
353 | { |
||
354 | c65c18be | Eric Allman | payload = "null";
|
355 | a8a86417 | Eric Allman | paylen = strlen(payload) + 1;
|
356 | } |
||
357 | 1cd5bd6c | Eric Allman | |
358 | tinfo = get_topic_info(msg); |
||
359 | if (tinfo == NULL) |
||
360 | { |
||
361 | // no place to log this message
|
||
362 | return;
|
||
363 | } |
||
364 | |||
365 | a8a86417 | Eric Allman | // if this record is a duplicate, skip it
|
366 | if (DropDups && record_is_dup(payload, tinfo))
|
||
367 | return;
|
||
368 | |||
369 | 3ca57859 | Eric Allman | if (tinfo->gin != NULL) |
370 | 68f8751e | Eric Allman | { |
371 | EP_STAT estat; |
||
372 | gdp_datum_t *datum = gdp_datum_new(); |
||
373 | df7d92f7 | Eric Allman | gdp_recno_t oldnrecs; |
374 | 68f8751e | Eric Allman | |
375 | e8cae538 | Eric Allman | if (SkipMetadata)
|
376 | { |
||
377 | gdp_buf_printf(gdp_datum_getbuf(datum), |
||
378 | "%s\n", payload);
|
||
379 | } |
||
380 | else
|
||
381 | { |
||
382 | gdp_buf_printf(gdp_datum_getbuf(datum), |
||
383 | c8b5ad07 | Eric Allman | "{\"topic\":\"%s\", \"qos\":%d, \"len\":%d, \"payload\":%s}\n",
|
384 | a8a86417 | Eric Allman | msg->topic, msg->qos, paylen, payload); |
385 | e8cae538 | Eric Allman | } |
386 | 3ca57859 | Eric Allman | oldnrecs = gdp_gin_getnrecs(tinfo->gin); |
387 | estat = gdp_gin_append(tinfo->gin, datum, NULL);
|
||
388 | 68f8751e | Eric Allman | if (!EP_STAT_ISOK(estat))
|
389 | { |
||
390 | df7d92f7 | Eric Allman | ep_log(estat, "cannot log MQTT message for %s "
|
391 | "(nrecs %" PRIgdp_recno " => %" PRIgdp_recno ")", |
||
392 | 3ca57859 | Eric Allman | msg->topic, oldnrecs, gdp_gin_getnrecs(tinfo->gin)); |
393 | 68f8751e | Eric Allman | } |
394 | gdp_datum_free(datum); |
||
395 | } |
||
396 | else
|
||
397 | { |
||
398 | e8cae538 | Eric Allman | if (SkipMetadata)
|
399 | { |
||
400 | printf("%s\n", payload);
|
||
401 | } |
||
402 | else
|
||
403 | { |
||
404 | c8b5ad07 | Eric Allman | printf("{\"topic\":\"%s\", \"qos\":%d, \"len\":%zd, \"payload\":%s}\n",
|
405 | a8a86417 | Eric Allman | msg->topic, msg->qos, paylen, payload); |
406 | e8cae538 | Eric Allman | } |
407 | 68f8751e | Eric Allman | } |
408 | } |
||
409 | |||
410 | |||
411 | /*
|
||
412 | 029f4aec | Eric Allman | ** Run Mosquitto loop until failure.
|
413 | */
|
||
414 | |||
415 | int
|
||
416 | run_mosquitto_loop(struct mosquitto *mosq)
|
||
417 | { |
||
418 | int istat;
|
||
419 | |||
420 | do
|
||
421 | { |
||
422 | // check for MQTT messages (1 second timeout)
|
||
423 | istat = mosquitto_loop(mosq, 1000, 1); |
||
424 | |||
425 | if (istat == MOSQ_ERR_SUCCESS)
|
||
426 | { |
||
427 | // check for stuck topics
|
||
428 | check_traffic(); |
||
429 | } |
||
430 | } while (istat == MOSQ_ERR_SUCCESS);
|
||
431 | |||
432 | // some failure, can we recover?
|
||
433 | ep_dbg_cprintf(Dbg, 3, "mosquitto_loop stat %d (%s) errno %d (%s)\n", |
||
434 | istat, mosquitto_strerror(istat), errno, strerror(errno)); |
||
435 | return istat;
|
||
436 | } |
||
437 | |||
438 | |||
439 | /*
|
||
440 | 68f8751e | Eric Allman | ** Run Mosquitto against a designated broker.
|
441 | */
|
||
442 | |||
443 | EP_STAT |
||
444 | 029f4aec | Eric Allman | run_new_mosquitto( |
445 | const char *broker, |
||
446 | int subscr_qos,
|
||
447 | 68f8751e | Eric Allman | void *udata)
|
448 | { |
||
449 | struct mosquitto *mosq;
|
||
450 | const char *phase; |
||
451 | 029f4aec | Eric Allman | int istat = MOSQ_ERR_ERRNO;
|
452 | EP_STAT estat = EP_STAT_ABORT; |
||
453 | 68f8751e | Eric Allman | |
454 | // get a mosquitto context
|
||
455 | 029f4aec | Eric Allman | phase = "setup";
|
456 | 68f8751e | Eric Allman | mosq = mosquitto_new(NULL, true, udata); |
457 | if (mosq == NULL) |
||
458 | 029f4aec | Eric Allman | goto fail0;
|
459 | 68f8751e | Eric Allman | |
460 | // can't just pass in NULL for udata, since mosquitto_new automatically
|
||
461 | // maps NULL to mosq
|
||
462 | mosquitto_user_data_set(mosq, udata); |
||
463 | |||
464 | e9df66b8 | Eric Allman | // adjust reconnection parameters
|
465 | mosquitto_reconnect_delay_set(mosq, 2, 30, true); |
||
466 | |||
467 | 68f8751e | Eric Allman | // set up callbacks
|
468 | mosquitto_log_callback_set(mosq, &log_cb); |
||
469 | mosquitto_message_callback_set(mosq, &message_cb); |
||
470 | |||
471 | // connect to the broker
|
||
472 | 029f4aec | Eric Allman | phase = "connect";
|
473 | c42b17eb | Eric Allman | { |
474 | // parse the broker information
|
||
475 | int mqtt_port = 1883; |
||
476 | 029f4aec | Eric Allman | char *bname = ep_mem_strdup(broker);
|
477 | char *p = strrchr(bname, ':'); |
||
478 | c42b17eb | Eric Allman | if (p != NULL && strchr(p, ']') == NULL) |
479 | { |
||
480 | *p++ = '\0';
|
||
481 | mqtt_port = atol(p); |
||
482 | } |
||
483 | 3cb74c50 | Eric Allman | ep_dbg_cprintf(Dbg, 1,
|
484 | 029f4aec | Eric Allman | "mosquitto_connect(%s:%d)\n", bname, mqtt_port);
|
485 | istat = mosquitto_connect(mosq, bname, mqtt_port, 60);
|
||
486 | ep_mem_free(bname); |
||
487 | c42b17eb | Eric Allman | } |
488 | 029f4aec | Eric Allman | if (istat != 0) |
489 | goto fail0;
|
||
490 | 68f8751e | Eric Allman | |
491 | phase = "subscribe";
|
||
492 | 1cd5bd6c | Eric Allman | istat = subscribe_to_all_topics(mosq, subscr_qos); |
493 | 68f8751e | Eric Allman | if (istat != 0) |
494 | 029f4aec | Eric Allman | goto fail0;
|
495 | 68f8751e | Eric Allman | |
496 | 6857407d | Eric Allman | // we can claim to be started now
|
497 | ep_sd_notifyf("READY=1\n");
|
||
498 | |||
499 | 68f8751e | Eric Allman | phase = "loop";
|
500 | 029f4aec | Eric Allman | istat = run_mosquitto_loop(mosq); |
501 | |||
502 | // recoverable?
|
||
503 | switch (istat)
|
||
504 | 0b2f82a9 | Eric Allman | { |
505 | 029f4aec | Eric Allman | case MOSQ_ERR_NO_CONN:
|
506 | case MOSQ_ERR_CONN_REFUSED:
|
||
507 | case MOSQ_ERR_CONN_LOST:
|
||
508 | estat = EP_STAT_ERROR; |
||
509 | 0b2f82a9 | Eric Allman | break;
|
510 | |||
511 | 029f4aec | Eric Allman | case MOSQ_ERR_ERRNO:
|
512 | if (errno != EPROTO)
|
||
513 | estat = ep_stat_from_errno(errno); |
||
514 | break;
|
||
515 | 0b2f82a9 | Eric Allman | } |
516 | 68f8751e | Eric Allman | |
517 | 029f4aec | Eric Allman | fail0:
|
518 | e22005b8 | Eric Allman | fprintf(stderr, "mosquitto error in %s: %s\n",
|
519 | phase, mosquitto_strerror(istat)); |
||
520 | 68f8751e | Eric Allman | |
521 | 029f4aec | Eric Allman | if (EP_STAT_ISABORT(estat))
|
522 | ep_sd_notifyf("STOPPING=1\n");
|
||
523 | else
|
||
524 | ep_sd_notifyf("RELOADING=1\n");
|
||
525 | if (mosq != NULL) |
||
526 | { |
||
527 | (void) mosquitto_disconnect(mosq); // ignore possible MOSQ_ERR_NO_CONN |
||
528 | // "not connected" error
|
||
529 | mosquitto_destroy(mosq); |
||
530 | } |
||
531 | return estat;
|
||
532 | 68f8751e | Eric Allman | } |
533 | |||
534 | |||
535 | 3ca57859 | Eric Allman | gdp_gin_t * |
536 | 1cd5bd6c | Eric Allman | open_signed_log(const char *gdp_log_name, const char *signing_key_file) |
537 | 68f8751e | Eric Allman | { |
538 | EP_STAT estat; |
||
539 | 1cd5bd6c | Eric Allman | gdp_name_t gname; |
540 | 3ca57859 | Eric Allman | gdp_open_info_t *info = NULL;
|
541 | gdp_gin_t *gin = NULL;
|
||
542 | 68f8751e | Eric Allman | |
543 | 4bee4508 | Eric Allman | // make sure we can parse the log name
|
544 | estat = gdp_parse_name(gdp_log_name, gname); |
||
545 | if (!EP_STAT_ISOK(estat))
|
||
546 | { |
||
547 | char ebuf[60]; |
||
548 | |||
549 | ep_app_error("cannot parse log name %s%s%s: %s",
|
||
550 | EpChar->lquote, gdp_log_name, EpChar->rquote, |
||
551 | ep_stat_tostr(estat, ebuf, sizeof ebuf));
|
||
552 | goto fail0;
|
||
553 | } |
||
554 | 68f8751e | Eric Allman | |
555 | if (signing_key_file != NULL) |
||
556 | { |
||
557 | FILE *fp; |
||
558 | EP_CRYPTO_KEY *skey; |
||
559 | 4bee4508 | Eric Allman | struct stat st;
|
560 | |||
561 | // set up any open information
|
||
562 | 3ca57859 | Eric Allman | info = gdp_open_info_new(); |
563 | 4bee4508 | Eric Allman | |
564 | if (stat(signing_key_file, &st) == 0 && |
||
565 | (st.st_mode & S_IFMT) == S_IFDIR) |
||
566 | { |
||
567 | size_t sz; |
||
568 | char *p;
|
||
569 | gdp_pname_t pname; |
||
570 | |||
571 | // we need the printable name for getting the key file name
|
||
572 | gdp_printable_name(gname, pname); |
||
573 | |||
574 | // find the file in the directory
|
||
575 | sz = strlen(signing_key_file) + sizeof pname + 6; |
||
576 | p = alloca(sz); |
||
577 | snprintf(p, sz, "%s/%s.pem", signing_key_file, pname);
|
||
578 | signing_key_file = p; |
||
579 | } |
||
580 | 68f8751e | Eric Allman | |
581 | fp = fopen(signing_key_file, "r");
|
||
582 | if (fp == NULL) |
||
583 | { |
||
584 | e60c9d55 | Eric Allman | estat = ep_stat_from_errno(errno); |
585 | 34c11eee | Eric Allman | ep_app_error("cannot open signing key file %s, log %s",
|
586 | signing_key_file, gdp_log_name); |
||
587 | 68f8751e | Eric Allman | goto fail1;
|
588 | } |
||
589 | |||
590 | skey = ep_crypto_key_read_fp(fp, signing_key_file, |
||
591 | EP_CRYPTO_KEYFORM_PEM, EP_CRYPTO_F_SECRET); |
||
592 | if (skey == NULL) |
||
593 | { |
||
594 | e60c9d55 | Eric Allman | estat = ep_stat_from_errno(errno); |
595 | if (EP_STAT_ISOK(estat))
|
||
596 | estat = EP_STAT_CRYPTO_FAIL; |
||
597 | 34c11eee | Eric Allman | ep_app_error("cannot read signing key file %s, log %s",
|
598 | signing_key_file, gdp_log_name); |
||
599 | 68f8751e | Eric Allman | goto fail1;
|
600 | } |
||
601 | |||
602 | 3ca57859 | Eric Allman | estat = gdp_open_info_set_signing_key(info, skey); |
603 | 1cd5bd6c | Eric Allman | fail1:
|
604 | e60c9d55 | Eric Allman | if (fp != NULL) |
605 | fclose(fp); |
||
606 | 68f8751e | Eric Allman | } |
607 | |||
608 | e60c9d55 | Eric Allman | EP_STAT_CHECK(estat, goto fail0);
|
609 | |||
610 | 3ca57859 | Eric Allman | // open a log with the provided name
|
611 | estat = gdp_gin_open(gname, GDP_MODE_AO, info, &gin); |
||
612 | 1cd5bd6c | Eric Allman | if (!EP_STAT_ISOK(estat))
|
613 | { |
||
614 | char ebuf[60]; |
||
615 | 68f8751e | Eric Allman | |
616 | 1cd5bd6c | Eric Allman | ep_app_error("cannot open log %s%s%s: %s",
|
617 | EpChar->lquote, gdp_log_name, EpChar->rquote, |
||
618 | ep_stat_tostr(estat, ebuf, sizeof ebuf));
|
||
619 | goto fail0;
|
||
620 | } |
||
621 | 68f8751e | Eric Allman | |
622 | fail0:
|
||
623 | 1cd5bd6c | Eric Allman | if (info != NULL) |
624 | 3ca57859 | Eric Allman | gdp_open_info_free(info); |
625 | return gin;
|
||
626 | 68f8751e | Eric Allman | } |
627 | |||
628 | |||
629 | void
|
||
630 | usage(void)
|
||
631 | { |
||
632 | fprintf(stderr, |
||
633 | a8a86417 | Eric Allman | "Usage: %s [-d] [-D dbgspec] [-G router_addr] [-h]\n"
|
634 | "\t[-K signing_key_file] [-M broker_addr] [-q qos] [-s]\n"
|
||
635 | 0b2f82a9 | Eric Allman | "\t[-w idletime] [-W msg_interval] mqtt_topic gdp_log ...\n"
|
636 | a8a86417 | Eric Allman | " -d drop duplicate records\n"
|
637 | 68f8751e | Eric Allman | " -D set debugging flags\n"
|
638 | " -G IP host to contact for gdp_router\n"
|
||
639 | 0b2f82a9 | Eric Allman | " -h record \"heartbeat\" interval, even if dups (seconds)\n"
|
640 | 68f8751e | Eric Allman | " -K key file to use to sign GDP log writes\n"
|
641 | " -M IP host to contact for MQTT broker\n"
|
||
642 | " -q MQTT Quality of Service to request\n"
|
||
643 | e8cae538 | Eric Allman | " -s skip metadata in output (topic, qos, len)\n"
|
644 | 0b2f82a9 | Eric Allman | " -w set warn interval for no topic traffic (seconds)\n"
|
645 | " -W only issue warning messages every N seconds\n"
|
||
646 | a8a86417 | Eric Allman | "mqtt_topic and gdp_log must be in pairs\n"
|
647 | 68f8751e | Eric Allman | "",
|
648 | ep_app_getprogname()); |
||
649 | exit(EX_USAGE); |
||
650 | } |
||
651 | |||
652 | |||
653 | int
|
||
654 | main(int argc, char **argv) |
||
655 | { |
||
656 | EP_STAT estat; |
||
657 | bool show_usage = false; |
||
658 | int mqtt_qos = 2; |
||
659 | 1cd5bd6c | Eric Allman | char *gdp_addr = NULL; |
660 | 68f8751e | Eric Allman | char *signing_key_file = NULL; |
661 | int opt;
|
||
662 | 0b2f82a9 | Eric Allman | time_t warntime = 0;
|
663 | struct timeval now;
|
||
664 | 1cd5bd6c | Eric Allman | char ebuf[60]; |
665 | 68f8751e | Eric Allman | |
666 | 0b2f82a9 | Eric Allman | while ((opt = getopt(argc, argv, "dD:G:h:K:M:q:sw:W:")) > 0) |
667 | 68f8751e | Eric Allman | { |
668 | switch (opt)
|
||
669 | { |
||
670 | a8a86417 | Eric Allman | case 'd': |
671 | DropDups = true;
|
||
672 | break;
|
||
673 | |||
674 | 68f8751e | Eric Allman | case 'D': |
675 | ep_dbg_set(optarg); |
||
676 | break;
|
||
677 | |||
678 | case 'G': |
||
679 | 1cd5bd6c | Eric Allman | gdp_addr = optarg; |
680 | 68f8751e | Eric Allman | break;
|
681 | |||
682 | a8a86417 | Eric Allman | case 'h': |
683 | HeartbeatTime = atol(optarg); |
||
684 | break;
|
||
685 | |||
686 | 68f8751e | Eric Allman | case 'K': |
687 | signing_key_file = optarg; |
||
688 | break;
|
||
689 | |||
690 | case 'M': |
||
691 | f5a0bdf5 | Eric Allman | MqttBroker = optarg; |
692 | 68f8751e | Eric Allman | break;
|
693 | |||
694 | case 'q': |
||
695 | mqtt_qos = atoi(optarg); |
||
696 | break;
|
||
697 | |||
698 | e8cae538 | Eric Allman | case 's': |
699 | SkipMetadata = true;
|
||
700 | break;
|
||
701 | |||
702 | 0b2f82a9 | Eric Allman | case 'w': |
703 | warntime = atoi(optarg); |
||
704 | break;
|
||
705 | |||
706 | case 'W': |
||
707 | WarnMessageInterval = atoi(optarg); |
||
708 | break;
|
||
709 | |||
710 | 68f8751e | Eric Allman | default:
|
711 | show_usage = true;
|
||
712 | break;
|
||
713 | } |
||
714 | } |
||
715 | argc -= optind; |
||
716 | argv += optind; |
||
717 | |||
718 | if (show_usage)
|
||
719 | usage(); |
||
720 | 1cd5bd6c | Eric Allman | if (argc < 2) |
721 | 68f8751e | Eric Allman | { |
722 | fprintf(stderr, "missing required argument (argc = %d)\n", argc);
|
||
723 | usage(); |
||
724 | } |
||
725 | |||
726 | 1cd5bd6c | Eric Allman | // initialize GDP library
|
727 | estat = gdp_init(gdp_addr); |
||
728 | if (!EP_STAT_ISOK(estat))
|
||
729 | { |
||
730 | ep_app_error("GDP Initialization failed: %s",
|
||
731 | ep_stat_tostr(estat, ebuf, sizeof ebuf));
|
||
732 | exit(EX_UNAVAILABLE); |
||
733 | } |
||
734 | |||
735 | // allow thread to settle to avoid interspersed debug output
|
||
736 | ep_time_nanosleep(INT64_C(100000000));
|
||
737 | |||
738 | // open any logs we may use
|
||
739 | 0b2f82a9 | Eric Allman | gettimeofday(&now, NULL);
|
740 | 1cd5bd6c | Eric Allman | for (; argc > 1; argv += 2, argc -= 2) |
741 | { |
||
742 | 0b2f82a9 | Eric Allman | struct topic_info *t;
|
743 | 1cd5bd6c | Eric Allman | char *mqtt_topic = argv[0]; |
744 | char *gdp_log_name = argv[1]; |
||
745 | 3ca57859 | Eric Allman | gdp_gin_t *gin = NULL;
|
746 | c42b17eb | Eric Allman | |
747 | 1cd5bd6c | Eric Allman | if (strcmp(gdp_log_name, "-") != 0) |
748 | { |
||
749 | 3ca57859 | Eric Allman | gin = open_signed_log(gdp_log_name, signing_key_file); |
750 | if (gin == NULL) |
||
751 | 1cd5bd6c | Eric Allman | { |
752 | // message already given
|
||
753 | show_usage = true;
|
||
754 | continue;
|
||
755 | } |
||
756 | } |
||
757 | |||
758 | 0b2f82a9 | Eric Allman | t = add_topic_info(mqtt_topic); |
759 | 3ca57859 | Eric Allman | t->gin = gin; |
760 | 0b2f82a9 | Eric Allman | t->log_xname = gdp_log_name; |
761 | t->warntime = warntime; |
||
762 | t->oldrectime = now.tv_sec; |
||
763 | 7e63faf1 | Eric Allman | |
764 | if (warntime > 0) |
||
765 | { |
||
766 | printf("{ \"tag\": \"initial mqtt topic\", "
|
||
767 | "\"broker\": \"%s\", "
|
||
768 | "\"topic\": \"%s\", "
|
||
769 | "\"log\": \"%s\" } \n",
|
||
770 | MqttBroker, t->topic_pat, t->log_xname); |
||
771 | } |
||
772 | 1cd5bd6c | Eric Allman | } |
773 | |||
774 | if (show_usage)
|
||
775 | usage(); |
||
776 | 68f8751e | Eric Allman | |
777 | f5a0bdf5 | Eric Allman | if (MqttBroker == NULL) |
778 | 68f8751e | Eric Allman | { |
779 | f5a0bdf5 | Eric Allman | MqttBroker = ep_adm_getstrparam("swarm.mqtt-gdp-gateway.broker",
|
780 | c42b17eb | Eric Allman | "127.0.0.1");
|
781 | 68f8751e | Eric Allman | } |
782 | |||
783 | a8a86417 | Eric Allman | if (HeartbeatTime < 0) |
784 | { |
||
785 | HeartbeatTime = ep_adm_getlongparam("swarm.mqtt-gdp-gateway.heartbeat.time",
|
||
786 | 60);
|
||
787 | } |
||
788 | |||
789 | 029f4aec | Eric Allman | mosquitto_lib_init(); |
790 | int backoff = 0; |
||
791 | for (;;)
|
||
792 | { |
||
793 | estat = run_new_mosquitto(MqttBroker, mqtt_qos, NULL);
|
||
794 | ep_log(estat, "run_new_mosquitto %s returned", MqttBroker);
|
||
795 | |||
796 | // if this is an abort condition, die now
|
||
797 | if (EP_STAT_ISABORT(estat))
|
||
798 | break;
|
||
799 | |||
800 | // try again (with backoff)
|
||
801 | if (backoff > 0) |
||
802 | { |
||
803 | sleep(backoff); |
||
804 | backoff *= 2;
|
||
805 | } |
||
806 | else
|
||
807 | { |
||
808 | backoff = 1;
|
||
809 | } |
||
810 | if (backoff > MaxRestartBackoff)
|
||
811 | backoff = MaxRestartBackoff; |
||
812 | } |
||
813 | 68f8751e | Eric Allman | |
814 | exit(EX_SOFTWARE); |
||
815 | } |