Project

General

Profile

Statistics
| Branch: | Revision:

gdp-if / mqtt-gateway / mqtt-gdp-gateway.c @ master

History | View | Annotate | Download (17.6 KB)

1 68f8751e Eric Allman
/* vim: set ai sw=4 sts=4 ts=4 :*/
2
3 271eed7f Eric Allman
/*
4
**        ----- BEGIN LICENSE BLOCK -----
5
**        MQTT gateway to the Global Data Plane
6
**        From the Ubiquitous Swarm Lab, 490 Cory Hall, U.C. Berkeley.
7
**
8
**        Copyright (c) 2015-2017, Regents of the University of California.
9
**        All rights reserved.
10
**
11
**        Permission is hereby granted, without written agreement and without
12
**        license or royalty fees, to use, copy, modify, and distribute this
13
**        software and its documentation for any purpose, provided that the above
14
**        copyright notice and the following two paragraphs appear in all copies
15
**        of this software.
16
**
17
**        IN NO EVENT SHALL REGENTS BE LIABLE TO ANY PARTY FOR DIRECT, INDIRECT,
18
**        SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING LOST
19
**        PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION,
20
**        EVEN IF REGENTS HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
21
**
22
**        REGENTS SPECIFICALLY DISCLAIMS ANY WARRANTIES, INCLUDING, BUT NOT
23
**        LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24
**        FOR A PARTICULAR PURPOSE. THE SOFTWARE AND ACCOMPANYING DOCUMENTATION,
25
**        IF ANY, PROVIDED HEREUNDER IS PROVIDED "AS IS". REGENTS HAS NO
26
**        OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS,
27
**        OR MODIFICATIONS.
28
**        ----- END LICENSE BLOCK -----
29
*/
30
31 68f8751e Eric Allman
#include <gdp/gdp.h>
32
#include <ep/ep_app.h>
33
#include <ep/ep_dbg.h>
34
#include <ep/ep_log.h>
35 6857407d Eric Allman
#include <ep/ep_sd.h>
36 68f8751e Eric Allman
#include <ep/ep_string.h>
37
38
#include <mosquitto.h>
39
40
#include <errno.h>
41
#include <getopt.h>
42 a8a86417 Eric Allman
#include <regex.h>
43 68f8751e Eric Allman
#include <sysexits.h>
44 4bee4508 Eric Allman
#include <sys/stat.h>
45 68f8751e Eric Allman
46
47 271eed7f Eric Allman
static EP_DBG        Dbg = EP_DBG_INIT("mqtt-gdp-gateway", "MQTT to GDP gateway");
48 68f8751e Eric Allman
49
50 029f4aec Eric Allman
bool                SkipMetadata = false;                // exclude topic, qos, and len on output
51
bool                DropDups = false;                        // drop duplicate records
52
time_t                HeartbeatTime = -1;                        // include dups every so often
53
time_t                WarnMessageInterval = 600;        // issue traffic warnings every 10m
54 f5a0bdf5 Eric Allman
const char        *MqttBroker = NULL;                        // host name of broker
55 029f4aec Eric Allman
int                        MaxRestartBackoff = 30;                // max reconnect backoff in seconds
56 e8cae538 Eric Allman
57
58 68f8751e Eric Allman
/*
59
**  Called by Mosquitto to do logging
60
*/
61
62
void
63
log_cb(struct mosquitto *mosq,
64
                                void *udata,
65
                                int level,
66
                                const char *str)
67
{
68
        const char *level_str = NULL;
69
70
        switch (level)
71
        {
72
        case MOSQ_LOG_DEBUG:
73
                //level_str = "debug";
74
                break;
75
76
        case MOSQ_LOG_INFO:
77
                level_str = "info";
78
                break;
79
80
        case MOSQ_LOG_WARNING:
81
                level_str = "warning";
82
                break;
83
84
        case MOSQ_LOG_ERR:
85
                level_str = "error";
86
                break;
87
88
        case MOSQ_LOG_NOTICE:
89
                level_str = "notice";
90
                break;
91
92
        default:
93
                fprintf(stderr, "unknown log level %d\n", level);
94
                level_str = "???";
95
                break;
96
        }
97
        if (level_str != NULL)
98
                fprintf(stderr, "MOSQ(%s): %s\n", level_str, str);
99
}
100
101
102
/*
103 1cd5bd6c Eric Allman
**  GET_TOPIC_INFO --- get information about a given topic
104
**
105
**                This implementation is pretty bad.
106
*/
107
108
struct topic_info
109
{
110
        const char        *topic_pat;                        // pattern for topic
111 0b2f82a9 Eric Allman
        const char        *log_xname;                        // printable external log name
112 3ca57859 Eric Allman
        gdp_gin_t        *gin;                                // associated GDP Instance
113 a8a86417 Eric Allman
        char                *oldrec;                        // text of old record
114
        char                *currec;                        // text of current record
115
        size_t                recbuflen;                        // size of previous two buffers
116
        time_t                oldrectime;                        // time of old record
117 0b2f82a9 Eric Allman
        time_t                warntime;                        // warn if no record written in N seconds
118
        time_t                lastwarn;                        // time of last warning
119 1cd5bd6c Eric Allman
};
120
121
struct topic_info        **Topics;                // information about topics
122
int                                        NTopics = 0;        // number of topics allocated
123
124
struct topic_info *
125
get_topic_info(const struct mosquitto_message *msg)
126
{
127
        int tno;
128
129 cb03d434 Eric Allman
        ep_dbg_cprintf(Dbg, 22, "get_topic_info(%s): ", msg->topic);
130 1cd5bd6c Eric Allman
        for (tno = 0; tno < NTopics; tno++)
131
        {
132
                struct topic_info *t = Topics[tno];
133
                if (t == NULL)
134
                        continue;
135
                bool res;
136
                int istat = mosquitto_topic_matches_sub(t->topic_pat, msg->topic, &res);
137
                if (istat != 0 || !res)
138
                        continue;
139
140
                // topic matches
141 cb03d434 Eric Allman
                ep_dbg_cprintf(Dbg, 22, "%p (%s)\n", t, t->topic_pat);
142 1cd5bd6c Eric Allman
                return t;
143
        }
144
145
        // no match
146 db5b43ee Eric Allman
        if (ep_dbg_test(Dbg, 22))
147
                ep_dbg_printf("not found\n");
148
        else
149
                ep_dbg_cprintf(Dbg, 2, "get_topic_info(%s): not found\n", msg->topic);
150 1cd5bd6c Eric Allman
        return NULL;
151
}
152
153
154 0b2f82a9 Eric Allman
struct topic_info *
155
add_topic_info(const char *topic_pat)
156 1cd5bd6c Eric Allman
{
157
        struct topic_info *t;
158
159
        t = ep_mem_zalloc(sizeof *t);
160
        t->topic_pat = topic_pat;
161
162
        Topics = ep_mem_realloc(Topics, (NTopics + 1) * sizeof t);
163
        Topics[NTopics++] = t;
164 0b2f82a9 Eric Allman
165 1cd5bd6c Eric Allman
        ep_dbg_cprintf(Dbg, 3, "add_topic_info(%s): %p\n", topic_pat, t);
166 0b2f82a9 Eric Allman
        return t;
167 1cd5bd6c Eric Allman
}
168
169
170
int
171
subscribe_to_all_topics(struct mosquitto *mosq, int qos)
172
{
173
        int tno;
174
175
        for (tno = 0; tno < NTopics; tno++)
176
        {
177
                int istat;
178
                int mid;                                // message id --- do we need this?
179
                struct topic_info *t = Topics[tno];
180
181
                if (t == NULL)
182
                        continue;
183
                ep_dbg_cprintf(Dbg, 5, "subscribe_to_all_topics(%s)\n", t->topic_pat);
184
                istat = mosquitto_subscribe(mosq, &mid, t->topic_pat, qos);
185
                if (istat != 0)
186
                        return istat;
187
        }
188
        return 0;
189
}
190
191
192
/*
193 a8a86417 Eric Allman
**  Determine if a payload is a duplicate of the previous record,
194
**  i.e. if the record payload has changed in an interesting way.
195
**
196
**                Not just strcmp because we treat dates as being identical.
197
*/
198
199
bool
200
record_is_dup(const char *newrec, struct topic_info *t)
201
{
202
        int istat;
203
        size_t newreclen = strlen(newrec) + 1;
204
        struct timeval currectime;
205
        static bool regex_compiled = false;
206
        static regex_t datepat;
207
        // regular expression to match ISO 8601 dates (a subset thereof)
208
        const char *textdatepat =
209
                "([0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}(:[0-9]{2}(\\.[0-9]*)?)?)Z";
210
        regmatch_t pmatch[1];
211
212
        // compile the regular expression if not already done
213
        if (!regex_compiled)
214
        {
215
                istat = regcomp(&datepat, textdatepat, REG_EXTENDED);
216
                if (istat != 0)
217
                {
218
                        char ebuf[256];
219
220
                        regerror(istat, &datepat, ebuf, sizeof ebuf);
221
                        ep_app_fatal("Cannot compile regex \"%s\": %s\n",
222
                                                textdatepat, ebuf);
223
                }
224
                regex_compiled = true;
225
        }
226
227
        // copy the new record to the current record so we can step on it
228
        if (newreclen > t->recbuflen)
229
        {
230
                t->currec = ep_mem_realloc(t->currec, newreclen);
231
                t->oldrec = ep_mem_realloc(t->oldrec, newreclen);
232
                t->recbuflen = newreclen;
233
        }
234
        memcpy(t->currec, newrec, newreclen);
235
236
        while ((istat = regexec(&datepat, t->currec, 1, pmatch, 0)) == 0)
237
        {
238
                if (pmatch[0].rm_so >= 0)
239
                {
240
                        size_t i;
241
242
                        ep_dbg_cprintf(Dbg, 81,
243
                                        "record_is_dup found date from %zd to %zd\n",
244 95fcea29 Eric Allman
                                        (size_t) pmatch[0].rm_so,
245
                                        (size_t) pmatch[0].rm_eo);
246 a8a86417 Eric Allman
247
                        for (i = pmatch[0].rm_so; i < pmatch[0].rm_eo; i++)
248
                                t->currec[i] = '_';                // any fixed character will do
249
                }
250
        }
251
252
        ep_dbg_cprintf(Dbg, 80,
253
                        "record_is_dup: normalized record:\n   %s\n",
254
                        t->currec);
255
256
        gettimeofday(&currectime, NULL);
257
258
        // dates are normalized; let's see if there is a match
259
        if (strcmp(t->oldrec, t->currec) == 0)
260
        {
261
                // yes, they match, but check time
262
                if (HeartbeatTime > 0 &&
263
                        currectime.tv_sec >= t->oldrectime + HeartbeatTime)
264
                {
265
                        ep_dbg_cprintf(Dbg, 80,
266
                                        "dup record, but passed timeout (cur = %ld, old = %ld)\n",
267
                                        currectime.tv_sec, t->oldrectime + HeartbeatTime);
268
                }
269
                else
270
                {
271
                        ep_dbg_cprintf(Dbg, 81, "duplicate record\n");
272
                        return true;
273
                }
274
        }
275
276
        // not a dup --- save currec for next time
277
        memcpy(t->oldrec, t->currec, newreclen);
278
        t->oldrectime = currectime.tv_sec;
279
        ep_dbg_cprintf(Dbg, 80, "new record\n");
280
        return false;
281
}
282
283
284
/*
285 0b2f82a9 Eric Allman
**  Check all topics to see that some data has been received recently.
286
**  If not, give a warning so we can poke things.
287
*/
288
289
void
290
show_idle(struct topic_info *t, time_t now)
291
{
292
        if (t->lastwarn + WarnMessageInterval > now)
293
        {
294
                // suppress output
295
                return;
296
        }
297
#if HUMAN_OUTPUT
298
        printf("Topic %s%s%s idle for %ld sec (log %s%s%s)\n",
299
                        EpChar->lquote, t->topic_pat, EpChar->rquote,
300
                        now - t->oldrectime,
301
                        EpChar->lquote, t->log_xname, EpChar->rquote);
302
#else
303 7e63faf1 Eric Allman
        printf("{ \"tag\": \"idle mqtt topic\", "
304 f5a0bdf5 Eric Allman
                        "\"broker\": \"%s\", "
305 006a6eee Eric Allman
                        "\"topic\": \"%s\", "
306
                        "\"log\": \"%s\", "
307
                        "\"idle\": %ld }\n",
308 f5a0bdf5 Eric Allman
                        MqttBroker, t->topic_pat, t->log_xname, now - t->oldrectime);
309 0b2f82a9 Eric Allman
#endif
310
        t->lastwarn = now;
311
}
312
313
void
314
check_traffic(void)
315
{
316
        int tno;
317
        struct timeval now;
318
319
        gettimeofday(&now, NULL);
320
321
        for (tno = 0; tno < NTopics; tno++)
322
        {
323
                struct topic_info *t = Topics[tno];
324
325
                if (t->warntime > 0 && t->oldrectime + t->warntime <= now.tv_sec)
326
                {
327
                        // nothing has been sent recently
328
                        show_idle(t, now.tv_sec);
329
                }
330
        }
331
}
332
333
334
/*
335 68f8751e Eric Allman
**  Called by Mosquitto when a message comes in.
336
*/
337
338
void
339
message_cb(struct mosquitto *mosq,
340
                                void *udata,
341
                                const struct mosquitto_message *msg)
342
{
343 1cd5bd6c Eric Allman
        struct topic_info *tinfo;
344 68f8751e Eric Allman
        const char *payload;
345 a8a86417 Eric Allman
        size_t paylen;
346 68f8751e Eric Allman
347 a8a86417 Eric Allman
        ep_dbg_cprintf(Dbg, 65, "message_cb: %s @%d => %s\n",
348 95fcea29 Eric Allman
                        msg->topic, msg->qos, (const char *) msg->payload);
349 68f8751e Eric Allman
350 a8a86417 Eric Allman
        paylen = msg->payloadlen;
351
        payload = msg->payload;
352
        if (paylen <= 0)
353
        {
354 c65c18be Eric Allman
                payload = "null";
355 a8a86417 Eric Allman
                paylen = strlen(payload) + 1;
356
        }
357 1cd5bd6c Eric Allman
358
        tinfo = get_topic_info(msg);
359
        if (tinfo == NULL)
360
        {
361
                // no place to log this message
362
                return;
363
        }
364
365 a8a86417 Eric Allman
        // if this record is a duplicate, skip it
366
        if (DropDups && record_is_dup(payload, tinfo))
367
                        return;
368
369 3ca57859 Eric Allman
        if (tinfo->gin != NULL)
370 68f8751e Eric Allman
        {
371
                EP_STAT estat;
372
                gdp_datum_t *datum = gdp_datum_new();
373 df7d92f7 Eric Allman
                gdp_recno_t oldnrecs;
374 68f8751e Eric Allman
375 e8cae538 Eric Allman
                if (SkipMetadata)
376
                {
377
                        gdp_buf_printf(gdp_datum_getbuf(datum),
378
                                        "%s\n", payload);
379
                }
380
                else
381
                {
382
                        gdp_buf_printf(gdp_datum_getbuf(datum),
383 c8b5ad07 Eric Allman
                                        "{\"topic\":\"%s\", \"qos\":%d, \"len\":%d, \"payload\":%s}\n",
384 a8a86417 Eric Allman
                                        msg->topic, msg->qos, paylen, payload);
385 e8cae538 Eric Allman
                }
386 3ca57859 Eric Allman
                oldnrecs = gdp_gin_getnrecs(tinfo->gin);
387
                estat = gdp_gin_append(tinfo->gin, datum, NULL);
388 68f8751e Eric Allman
                if (!EP_STAT_ISOK(estat))
389
                {
390 df7d92f7 Eric Allman
                        ep_log(estat, "cannot log MQTT message for %s "
391
                                        "(nrecs %" PRIgdp_recno " => %" PRIgdp_recno ")",
392 3ca57859 Eric Allman
                                        msg->topic, oldnrecs, gdp_gin_getnrecs(tinfo->gin));
393 68f8751e Eric Allman
                }
394
                gdp_datum_free(datum);
395
        }
396
        else
397
        {
398 e8cae538 Eric Allman
                if (SkipMetadata)
399
                {
400
                        printf("%s\n", payload);
401
                }
402
                else
403
                {
404 c8b5ad07 Eric Allman
                        printf("{\"topic\":\"%s\", \"qos\":%d, \"len\":%zd, \"payload\":%s}\n",
405 a8a86417 Eric Allman
                                        msg->topic, msg->qos, paylen, payload);
406 e8cae538 Eric Allman
                }
407 68f8751e Eric Allman
        }
408
}
409
410
411
/*
412 029f4aec Eric Allman
**  Run Mosquitto loop until failure.
413
*/
414
415
int
416
run_mosquitto_loop(struct mosquitto *mosq)
417
{
418
        int istat;
419
420
        do
421
        {
422
                // check for MQTT messages (1 second timeout)
423
                istat = mosquitto_loop(mosq, 1000, 1);
424
425
                if (istat == MOSQ_ERR_SUCCESS)
426
                {
427
                        // check for stuck topics
428
                        check_traffic();
429
                }
430
        } while (istat == MOSQ_ERR_SUCCESS);
431
432
        // some failure, can we recover?
433
        ep_dbg_cprintf(Dbg, 3, "mosquitto_loop stat %d (%s) errno %d (%s)\n",
434
                        istat, mosquitto_strerror(istat), errno, strerror(errno));
435
        return istat;
436
}
437
438
439
/*
440 68f8751e Eric Allman
**  Run Mosquitto against a designated broker.
441
*/
442
443
EP_STAT
444 029f4aec Eric Allman
run_new_mosquitto(
445
                        const char *broker,
446
                        int subscr_qos,
447 68f8751e Eric Allman
                        void *udata)
448
{
449
        struct mosquitto *mosq;
450
        const char *phase;
451 029f4aec Eric Allman
        int istat = MOSQ_ERR_ERRNO;
452
        EP_STAT estat = EP_STAT_ABORT;
453 68f8751e Eric Allman
454
        // get a mosquitto context
455 029f4aec Eric Allman
        phase = "setup";
456 68f8751e Eric Allman
        mosq = mosquitto_new(NULL, true, udata);
457
        if (mosq == NULL)
458 029f4aec Eric Allman
                goto fail0;
459 68f8751e Eric Allman
460
        // can't just pass in NULL for udata, since mosquitto_new automatically
461
        // maps NULL to mosq
462
        mosquitto_user_data_set(mosq, udata);
463
464 e9df66b8 Eric Allman
        // adjust reconnection parameters
465
        mosquitto_reconnect_delay_set(mosq, 2, 30, true);
466
467 68f8751e Eric Allman
        // set up callbacks
468
        mosquitto_log_callback_set(mosq, &log_cb);
469
        mosquitto_message_callback_set(mosq, &message_cb);
470
471
        // connect to the broker
472 029f4aec Eric Allman
        phase = "connect";
473 c42b17eb Eric Allman
        {
474
                // parse the broker information
475
                int mqtt_port = 1883;
476 029f4aec Eric Allman
                char *bname = ep_mem_strdup(broker);
477
                char *p = strrchr(bname, ':');
478 c42b17eb Eric Allman
                if (p != NULL && strchr(p, ']') == NULL)
479
                {
480
                        *p++ = '\0';
481
                        mqtt_port = atol(p);
482
                }
483 3cb74c50 Eric Allman
                ep_dbg_cprintf(Dbg, 1,
484 029f4aec Eric Allman
                                "mosquitto_connect(%s:%d)\n", bname, mqtt_port);
485
                istat = mosquitto_connect(mosq, bname, mqtt_port, 60);
486
                ep_mem_free(bname);
487 c42b17eb Eric Allman
        }
488 029f4aec Eric Allman
        if (istat != 0)
489
                goto fail0;
490 68f8751e Eric Allman
491
        phase = "subscribe";
492 1cd5bd6c Eric Allman
        istat = subscribe_to_all_topics(mosq, subscr_qos);
493 68f8751e Eric Allman
        if (istat != 0)
494 029f4aec Eric Allman
                goto fail0;
495 68f8751e Eric Allman
496 6857407d Eric Allman
        // we can claim to be started now
497
        ep_sd_notifyf("READY=1\n");
498
499 68f8751e Eric Allman
        phase = "loop";
500 029f4aec Eric Allman
        istat = run_mosquitto_loop(mosq);
501
502
        // recoverable?
503
        switch (istat)
504 0b2f82a9 Eric Allman
        {
505 029f4aec Eric Allman
                case MOSQ_ERR_NO_CONN:
506
                case MOSQ_ERR_CONN_REFUSED:
507
                case MOSQ_ERR_CONN_LOST:
508
                        estat = EP_STAT_ERROR;
509 0b2f82a9 Eric Allman
                        break;
510
511 029f4aec Eric Allman
                case MOSQ_ERR_ERRNO:
512
                        if (errno != EPROTO)
513
                                estat = ep_stat_from_errno(errno);
514
                        break;
515 0b2f82a9 Eric Allman
        }
516 68f8751e Eric Allman
517 029f4aec Eric Allman
fail0:
518 e22005b8 Eric Allman
        fprintf(stderr, "mosquitto error in %s: %s\n",
519
                        phase, mosquitto_strerror(istat));
520 68f8751e Eric Allman
521 029f4aec Eric Allman
        if (EP_STAT_ISABORT(estat))
522
                ep_sd_notifyf("STOPPING=1\n");
523
        else
524
                ep_sd_notifyf("RELOADING=1\n");
525
        if (mosq != NULL)
526
        {
527
                (void) mosquitto_disconnect(mosq);        // ignore possible MOSQ_ERR_NO_CONN
528
                                                                                        // "not connected" error
529
                mosquitto_destroy(mosq);
530
        }
531
        return estat;
532 68f8751e Eric Allman
}
533
534
535 3ca57859 Eric Allman
gdp_gin_t *
536 1cd5bd6c Eric Allman
open_signed_log(const char *gdp_log_name, const char *signing_key_file)
537 68f8751e Eric Allman
{
538
        EP_STAT estat;
539 1cd5bd6c Eric Allman
        gdp_name_t gname;
540 3ca57859 Eric Allman
        gdp_open_info_t *info = NULL;
541
        gdp_gin_t *gin = NULL;
542 68f8751e Eric Allman
543 4bee4508 Eric Allman
        // make sure we can parse the log name
544
        estat = gdp_parse_name(gdp_log_name, gname);
545
        if (!EP_STAT_ISOK(estat))
546
        {
547
                char ebuf[60];
548
549
                ep_app_error("cannot parse log name %s%s%s: %s",
550
                                EpChar->lquote, gdp_log_name, EpChar->rquote,
551
                                ep_stat_tostr(estat, ebuf, sizeof ebuf));
552
                goto fail0;
553
        }
554 68f8751e Eric Allman
555
        if (signing_key_file != NULL)
556
        {
557
                FILE *fp;
558
                EP_CRYPTO_KEY *skey;
559 4bee4508 Eric Allman
                struct stat st;
560
561
                // set up any open information
562 3ca57859 Eric Allman
                info = gdp_open_info_new();
563 4bee4508 Eric Allman
564
                if (stat(signing_key_file, &st) == 0 &&
565
                                (st.st_mode & S_IFMT) == S_IFDIR)
566
                {
567
                        size_t sz;
568
                        char *p;
569
                        gdp_pname_t pname;
570
571
                        // we need the printable name for getting the key file name
572
                        gdp_printable_name(gname, pname);
573
574
                        // find the file in the directory
575
                        sz = strlen(signing_key_file) + sizeof pname + 6;
576
                        p = alloca(sz);
577
                        snprintf(p, sz, "%s/%s.pem", signing_key_file, pname);
578
                        signing_key_file = p;
579
                }
580 68f8751e Eric Allman
581
                fp = fopen(signing_key_file, "r");
582
                if (fp == NULL)
583
                {
584 e60c9d55 Eric Allman
                        estat = ep_stat_from_errno(errno);
585 34c11eee Eric Allman
                        ep_app_error("cannot open signing key file %s, log %s",
586
                                        signing_key_file, gdp_log_name);
587 68f8751e Eric Allman
                        goto fail1;
588
                }
589
590
                skey = ep_crypto_key_read_fp(fp, signing_key_file,
591
                                EP_CRYPTO_KEYFORM_PEM, EP_CRYPTO_F_SECRET);
592
                if (skey == NULL)
593
                {
594 e60c9d55 Eric Allman
                        estat = ep_stat_from_errno(errno);
595
                        if (EP_STAT_ISOK(estat))
596
                                estat = EP_STAT_CRYPTO_FAIL;
597 34c11eee Eric Allman
                        ep_app_error("cannot read signing key file %s, log %s",
598
                                        signing_key_file, gdp_log_name);
599 68f8751e Eric Allman
                        goto fail1;
600
                }
601
602 3ca57859 Eric Allman
                estat = gdp_open_info_set_signing_key(info, skey);
603 1cd5bd6c Eric Allman
fail1:
604 e60c9d55 Eric Allman
                if (fp != NULL)
605
                        fclose(fp);
606 68f8751e Eric Allman
        }
607
608 e60c9d55 Eric Allman
        EP_STAT_CHECK(estat, goto fail0);
609
610 3ca57859 Eric Allman
        // open a log with the provided name
611
        estat = gdp_gin_open(gname, GDP_MODE_AO, info, &gin);
612 1cd5bd6c Eric Allman
        if (!EP_STAT_ISOK(estat))
613
        {
614
                char ebuf[60];
615 68f8751e Eric Allman
616 1cd5bd6c Eric Allman
                ep_app_error("cannot open log %s%s%s: %s",
617
                                EpChar->lquote, gdp_log_name, EpChar->rquote,
618
                                ep_stat_tostr(estat, ebuf, sizeof ebuf));
619
                goto fail0;
620
        }
621 68f8751e Eric Allman
622
fail0:
623 1cd5bd6c Eric Allman
        if (info != NULL)
624 3ca57859 Eric Allman
                gdp_open_info_free(info);
625
        return gin;
626 68f8751e Eric Allman
}
627
628
629
void
630
usage(void)
631
{
632
        fprintf(stderr,
633 a8a86417 Eric Allman
                        "Usage: %s [-d] [-D dbgspec] [-G router_addr] [-h]\n"
634
                        "\t[-K signing_key_file] [-M broker_addr] [-q qos] [-s]\n"
635 0b2f82a9 Eric Allman
                        "\t[-w idletime] [-W msg_interval] mqtt_topic gdp_log ...\n"
636 a8a86417 Eric Allman
                        "    -d  drop duplicate records\n"
637 68f8751e Eric Allman
                        "    -D  set debugging flags\n"
638
                        "    -G  IP host to contact for gdp_router\n"
639 0b2f82a9 Eric Allman
                        "    -h  record \"heartbeat\" interval, even if dups (seconds)\n"
640 68f8751e Eric Allman
                        "    -K  key file to use to sign GDP log writes\n"
641
                        "    -M  IP host to contact for MQTT broker\n"
642
                        "    -q  MQTT Quality of Service to request\n"
643 e8cae538 Eric Allman
                        "    -s  skip metadata in output (topic, qos, len)\n"
644 0b2f82a9 Eric Allman
                        "    -w  set warn interval for no topic traffic (seconds)\n"
645
                        "    -W  only issue warning messages every N seconds\n"
646 a8a86417 Eric Allman
                        "mqtt_topic and gdp_log must be in pairs\n"
647 68f8751e Eric Allman
                        "",
648
                        ep_app_getprogname());
649
        exit(EX_USAGE);
650
}
651
652
653
int
654
main(int argc, char **argv)
655
{
656
        EP_STAT estat;
657
        bool show_usage = false;
658
        int mqtt_qos = 2;
659 1cd5bd6c Eric Allman
        char *gdp_addr = NULL;
660 68f8751e Eric Allman
        char *signing_key_file = NULL;
661
        int opt;
662 0b2f82a9 Eric Allman
        time_t warntime = 0;
663
        struct timeval now;
664 1cd5bd6c Eric Allman
        char ebuf[60];
665 68f8751e Eric Allman
666 0b2f82a9 Eric Allman
        while ((opt = getopt(argc, argv, "dD:G:h:K:M:q:sw:W:")) > 0)
667 68f8751e Eric Allman
        {
668
                switch (opt)
669
                {
670 a8a86417 Eric Allman
                case 'd':
671
                        DropDups = true;
672
                        break;
673
674 68f8751e Eric Allman
                case 'D':
675
                        ep_dbg_set(optarg);
676
                        break;
677
678
                case 'G':
679 1cd5bd6c Eric Allman
                        gdp_addr = optarg;
680 68f8751e Eric Allman
                        break;
681
682 a8a86417 Eric Allman
                case 'h':
683
                        HeartbeatTime = atol(optarg);
684
                        break;
685
686 68f8751e Eric Allman
                case 'K':
687
                        signing_key_file = optarg;
688
                        break;
689
690
                case 'M':
691 f5a0bdf5 Eric Allman
                        MqttBroker = optarg;
692 68f8751e Eric Allman
                        break;
693
694
                case 'q':
695
                        mqtt_qos = atoi(optarg);
696
                        break;
697
698 e8cae538 Eric Allman
                case 's':
699
                        SkipMetadata = true;
700
                        break;
701
702 0b2f82a9 Eric Allman
                case 'w':
703
                        warntime = atoi(optarg);
704
                        break;
705
706
                case 'W':
707
                        WarnMessageInterval = atoi(optarg);
708
                        break;
709
710 68f8751e Eric Allman
                default:
711
                        show_usage = true;
712
                        break;
713
                }
714
        }
715
        argc -= optind;
716
        argv += optind;
717
718
        if (show_usage)
719
                usage();
720 1cd5bd6c Eric Allman
        if (argc < 2)
721 68f8751e Eric Allman
        {
722
                fprintf(stderr, "missing required argument (argc = %d)\n", argc);
723
                usage();
724
        }
725
726 1cd5bd6c Eric Allman
        // initialize GDP library
727
        estat = gdp_init(gdp_addr);
728
        if (!EP_STAT_ISOK(estat))
729
        {
730
                ep_app_error("GDP Initialization failed: %s",
731
                                ep_stat_tostr(estat, ebuf, sizeof ebuf));
732
                exit(EX_UNAVAILABLE);
733
        }
734
735
        // allow thread to settle to avoid interspersed debug output
736
        ep_time_nanosleep(INT64_C(100000000));
737
738
        // open any logs we may use
739 0b2f82a9 Eric Allman
        gettimeofday(&now, NULL);
740 1cd5bd6c Eric Allman
        for (; argc > 1; argv += 2, argc -= 2)
741
        {
742 0b2f82a9 Eric Allman
                struct topic_info *t;
743 1cd5bd6c Eric Allman
                char *mqtt_topic = argv[0];
744
                char *gdp_log_name = argv[1];
745 3ca57859 Eric Allman
                gdp_gin_t *gin = NULL;
746 c42b17eb Eric Allman
747 1cd5bd6c Eric Allman
                if (strcmp(gdp_log_name, "-") != 0)
748
                {
749 3ca57859 Eric Allman
                        gin = open_signed_log(gdp_log_name, signing_key_file);
750
                        if (gin == NULL)
751 1cd5bd6c Eric Allman
                        {
752
                                // message already given
753
                                show_usage = true;
754
                                continue;
755
                        }
756
                }
757
758 0b2f82a9 Eric Allman
                t = add_topic_info(mqtt_topic);
759 3ca57859 Eric Allman
                t->gin = gin;
760 0b2f82a9 Eric Allman
                t->log_xname = gdp_log_name;
761
                t->warntime = warntime;
762
                t->oldrectime = now.tv_sec;
763 7e63faf1 Eric Allman
764
                if (warntime > 0)
765
                {
766
                        printf("{ \"tag\": \"initial mqtt topic\", "
767
                                        "\"broker\": \"%s\", "
768
                                        "\"topic\": \"%s\", "
769
                                        "\"log\": \"%s\" } \n",
770
                                        MqttBroker, t->topic_pat, t->log_xname);
771
                }
772 1cd5bd6c Eric Allman
        }
773
774
        if (show_usage)
775
                usage();
776 68f8751e Eric Allman
777 f5a0bdf5 Eric Allman
        if (MqttBroker == NULL)
778 68f8751e Eric Allman
        {
779 f5a0bdf5 Eric Allman
                MqttBroker = ep_adm_getstrparam("swarm.mqtt-gdp-gateway.broker",
780 c42b17eb Eric Allman
                                                        "127.0.0.1");
781 68f8751e Eric Allman
        }
782
783 a8a86417 Eric Allman
        if (HeartbeatTime < 0)
784
        {
785
                HeartbeatTime = ep_adm_getlongparam("swarm.mqtt-gdp-gateway.heartbeat.time",
786
                                60);
787
        }
788
789 029f4aec Eric Allman
        mosquitto_lib_init();
790
        int backoff = 0;
791
        for (;;)
792
        {
793
                estat = run_new_mosquitto(MqttBroker, mqtt_qos, NULL);
794
                ep_log(estat, "run_new_mosquitto %s returned", MqttBroker);
795
796
                // if this is an abort condition, die now
797
                if (EP_STAT_ISABORT(estat))
798
                        break;
799
800
                // try again (with backoff)
801
                if (backoff > 0)
802
                {
803
                        sleep(backoff);
804
                        backoff *= 2;
805
                }
806
                else
807
                {
808
                        backoff = 1;
809
                }
810
                if (backoff > MaxRestartBackoff)
811
                        backoff = MaxRestartBackoff;
812
        }
813 68f8751e Eric Allman
814
        exit(EX_SOFTWARE);
815
}