Project

General

Profile

Statistics
| Branch: | Revision:

gdp-if / mqtt-gateway / mqtt-gdp-gateway.c @ master

History | View | Annotate | Download (17.6 KB)

1
/* vim: set ai sw=4 sts=4 ts=4 :*/
2

    
3
/*
4
**        ----- BEGIN LICENSE BLOCK -----
5
**        MQTT gateway to the Global Data Plane
6
**        From the Ubiquitous Swarm Lab, 490 Cory Hall, U.C. Berkeley.
7
**
8
**        Copyright (c) 2015-2017, Regents of the University of California.
9
**        All rights reserved.
10
**
11
**        Permission is hereby granted, without written agreement and without
12
**        license or royalty fees, to use, copy, modify, and distribute this
13
**        software and its documentation for any purpose, provided that the above
14
**        copyright notice and the following two paragraphs appear in all copies
15
**        of this software.
16
**
17
**        IN NO EVENT SHALL REGENTS BE LIABLE TO ANY PARTY FOR DIRECT, INDIRECT,
18
**        SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING LOST
19
**        PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION,
20
**        EVEN IF REGENTS HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
21
**
22
**        REGENTS SPECIFICALLY DISCLAIMS ANY WARRANTIES, INCLUDING, BUT NOT
23
**        LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24
**        FOR A PARTICULAR PURPOSE. THE SOFTWARE AND ACCOMPANYING DOCUMENTATION,
25
**        IF ANY, PROVIDED HEREUNDER IS PROVIDED "AS IS". REGENTS HAS NO
26
**        OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS,
27
**        OR MODIFICATIONS.
28
**        ----- END LICENSE BLOCK -----
29
*/
30

    
31
#include <gdp/gdp.h>
32
#include <ep/ep_app.h>
33
#include <ep/ep_dbg.h>
34
#include <ep/ep_log.h>
35
#include <ep/ep_sd.h>
36
#include <ep/ep_string.h>
37

    
38
#include <mosquitto.h>
39

    
40
#include <errno.h>
41
#include <getopt.h>
42
#include <regex.h>
43
#include <sysexits.h>
44
#include <sys/stat.h>
45

    
46

    
47
static EP_DBG        Dbg = EP_DBG_INIT("mqtt-gdp-gateway", "MQTT to GDP gateway");
48

    
49

    
50
bool                SkipMetadata = false;                // exclude topic, qos, and len on output
51
bool                DropDups = false;                        // drop duplicate records
52
time_t                HeartbeatTime = -1;                        // include dups every so often
53
time_t                WarnMessageInterval = 600;        // issue traffic warnings every 10m
54
const char        *MqttBroker = NULL;                        // host name of broker
55
int                        MaxRestartBackoff = 30;                // max reconnect backoff in seconds
56

    
57

    
58
/*
59
**  Called by Mosquitto to do logging
60
*/
61

    
62
void
63
log_cb(struct mosquitto *mosq,
64
                                void *udata,
65
                                int level,
66
                                const char *str)
67
{
68
        const char *level_str = NULL;
69

    
70
        switch (level)
71
        {
72
        case MOSQ_LOG_DEBUG:
73
                //level_str = "debug";
74
                break;
75

    
76
        case MOSQ_LOG_INFO:
77
                level_str = "info";
78
                break;
79

    
80
        case MOSQ_LOG_WARNING:
81
                level_str = "warning";
82
                break;
83

    
84
        case MOSQ_LOG_ERR:
85
                level_str = "error";
86
                break;
87

    
88
        case MOSQ_LOG_NOTICE:
89
                level_str = "notice";
90
                break;
91

    
92
        default:
93
                fprintf(stderr, "unknown log level %d\n", level);
94
                level_str = "???";
95
                break;
96
        }
97
        if (level_str != NULL)
98
                fprintf(stderr, "MOSQ(%s): %s\n", level_str, str);
99
}
100

    
101

    
102
/*
103
**  GET_TOPIC_INFO --- get information about a given topic
104
**
105
**                This implementation is pretty bad.
106
*/
107

    
108
struct topic_info
109
{
110
        const char        *topic_pat;                        // pattern for topic
111
        const char        *log_xname;                        // printable external log name
112
        gdp_gin_t        *gin;                                // associated GDP Instance
113
        char                *oldrec;                        // text of old record
114
        char                *currec;                        // text of current record
115
        size_t                recbuflen;                        // size of previous two buffers
116
        time_t                oldrectime;                        // time of old record
117
        time_t                warntime;                        // warn if no record written in N seconds
118
        time_t                lastwarn;                        // time of last warning
119
};
120

    
121
struct topic_info        **Topics;                // information about topics
122
int                                        NTopics = 0;        // number of topics allocated
123

    
124
struct topic_info *
125
get_topic_info(const struct mosquitto_message *msg)
126
{
127
        int tno;
128

    
129
        ep_dbg_cprintf(Dbg, 22, "get_topic_info(%s): ", msg->topic);
130
        for (tno = 0; tno < NTopics; tno++)
131
        {
132
                struct topic_info *t = Topics[tno];
133
                if (t == NULL)
134
                        continue;
135
                bool res;
136
                int istat = mosquitto_topic_matches_sub(t->topic_pat, msg->topic, &res);
137
                if (istat != 0 || !res)
138
                        continue;
139

    
140
                // topic matches
141
                ep_dbg_cprintf(Dbg, 22, "%p (%s)\n", t, t->topic_pat);
142
                return t;
143
        }
144

    
145
        // no match
146
        if (ep_dbg_test(Dbg, 22))
147
                ep_dbg_printf("not found\n");
148
        else
149
                ep_dbg_cprintf(Dbg, 2, "get_topic_info(%s): not found\n", msg->topic);
150
        return NULL;
151
}
152

    
153

    
154
struct topic_info *
155
add_topic_info(const char *topic_pat)
156
{
157
        struct topic_info *t;
158

    
159
        t = ep_mem_zalloc(sizeof *t);
160
        t->topic_pat = topic_pat;
161

    
162
        Topics = ep_mem_realloc(Topics, (NTopics + 1) * sizeof t);
163
        Topics[NTopics++] = t;
164

    
165
        ep_dbg_cprintf(Dbg, 3, "add_topic_info(%s): %p\n", topic_pat, t);
166
        return t;
167
}
168

    
169

    
170
int
171
subscribe_to_all_topics(struct mosquitto *mosq, int qos)
172
{
173
        int tno;
174

    
175
        for (tno = 0; tno < NTopics; tno++)
176
        {
177
                int istat;
178
                int mid;                                // message id --- do we need this?
179
                struct topic_info *t = Topics[tno];
180

    
181
                if (t == NULL)
182
                        continue;
183
                ep_dbg_cprintf(Dbg, 5, "subscribe_to_all_topics(%s)\n", t->topic_pat);
184
                istat = mosquitto_subscribe(mosq, &mid, t->topic_pat, qos);
185
                if (istat != 0)
186
                        return istat;
187
        }
188
        return 0;
189
}
190

    
191

    
192
/*
193
**  Determine if a payload is a duplicate of the previous record,
194
**  i.e. if the record payload has changed in an interesting way.
195
**
196
**                Not just strcmp because we treat dates as being identical.
197
*/
198

    
199
bool
200
record_is_dup(const char *newrec, struct topic_info *t)
201
{
202
        int istat;
203
        size_t newreclen = strlen(newrec) + 1;
204
        struct timeval currectime;
205
        static bool regex_compiled = false;
206
        static regex_t datepat;
207
        // regular expression to match ISO 8601 dates (a subset thereof)
208
        const char *textdatepat =
209
                "([0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}(:[0-9]{2}(\\.[0-9]*)?)?)Z";
210
        regmatch_t pmatch[1];
211

    
212
        // compile the regular expression if not already done
213
        if (!regex_compiled)
214
        {
215
                istat = regcomp(&datepat, textdatepat, REG_EXTENDED);
216
                if (istat != 0)
217
                {
218
                        char ebuf[256];
219

    
220
                        regerror(istat, &datepat, ebuf, sizeof ebuf);
221
                        ep_app_fatal("Cannot compile regex \"%s\": %s\n",
222
                                                textdatepat, ebuf);
223
                }
224
                regex_compiled = true;
225
        }
226

    
227
        // copy the new record to the current record so we can step on it
228
        if (newreclen > t->recbuflen)
229
        {
230
                t->currec = ep_mem_realloc(t->currec, newreclen);
231
                t->oldrec = ep_mem_realloc(t->oldrec, newreclen);
232
                t->recbuflen = newreclen;
233
        }
234
        memcpy(t->currec, newrec, newreclen);
235

    
236
        while ((istat = regexec(&datepat, t->currec, 1, pmatch, 0)) == 0)
237
        {
238
                if (pmatch[0].rm_so >= 0)
239
                {
240
                        size_t i;
241

    
242
                        ep_dbg_cprintf(Dbg, 81,
243
                                        "record_is_dup found date from %zd to %zd\n",
244
                                        (size_t) pmatch[0].rm_so,
245
                                        (size_t) pmatch[0].rm_eo);
246

    
247
                        for (i = pmatch[0].rm_so; i < pmatch[0].rm_eo; i++)
248
                                t->currec[i] = '_';                // any fixed character will do
249
                }
250
        }
251

    
252
        ep_dbg_cprintf(Dbg, 80,
253
                        "record_is_dup: normalized record:\n   %s\n",
254
                        t->currec);
255

    
256
        gettimeofday(&currectime, NULL);
257

    
258
        // dates are normalized; let's see if there is a match
259
        if (strcmp(t->oldrec, t->currec) == 0)
260
        {
261
                // yes, they match, but check time
262
                if (HeartbeatTime > 0 &&
263
                        currectime.tv_sec >= t->oldrectime + HeartbeatTime)
264
                {
265
                        ep_dbg_cprintf(Dbg, 80,
266
                                        "dup record, but passed timeout (cur = %ld, old = %ld)\n",
267
                                        currectime.tv_sec, t->oldrectime + HeartbeatTime);
268
                }
269
                else
270
                {
271
                        ep_dbg_cprintf(Dbg, 81, "duplicate record\n");
272
                        return true;
273
                }
274
        }
275

    
276
        // not a dup --- save currec for next time
277
        memcpy(t->oldrec, t->currec, newreclen);
278
        t->oldrectime = currectime.tv_sec;
279
        ep_dbg_cprintf(Dbg, 80, "new record\n");
280
        return false;
281
}
282

    
283

    
284
/*
285
**  Check all topics to see that some data has been received recently.
286
**  If not, give a warning so we can poke things.
287
*/
288

    
289
void
290
show_idle(struct topic_info *t, time_t now)
291
{
292
        if (t->lastwarn + WarnMessageInterval > now)
293
        {
294
                // suppress output
295
                return;
296
        }
297
#if HUMAN_OUTPUT
298
        printf("Topic %s%s%s idle for %ld sec (log %s%s%s)\n",
299
                        EpChar->lquote, t->topic_pat, EpChar->rquote,
300
                        now - t->oldrectime,
301
                        EpChar->lquote, t->log_xname, EpChar->rquote);
302
#else
303
        printf("{ \"tag\": \"idle mqtt topic\", "
304
                        "\"broker\": \"%s\", "
305
                        "\"topic\": \"%s\", "
306
                        "\"log\": \"%s\", "
307
                        "\"idle\": %ld }\n",
308
                        MqttBroker, t->topic_pat, t->log_xname, now - t->oldrectime);
309
#endif
310
        t->lastwarn = now;
311
}
312

    
313
void
314
check_traffic(void)
315
{
316
        int tno;
317
        struct timeval now;
318

    
319
        gettimeofday(&now, NULL);
320

    
321
        for (tno = 0; tno < NTopics; tno++)
322
        {
323
                struct topic_info *t = Topics[tno];
324

    
325
                if (t->warntime > 0 && t->oldrectime + t->warntime <= now.tv_sec)
326
                {
327
                        // nothing has been sent recently
328
                        show_idle(t, now.tv_sec);
329
                }
330
        }
331
}
332

    
333

    
334
/*
335
**  Called by Mosquitto when a message comes in.
336
*/
337

    
338
void
339
message_cb(struct mosquitto *mosq,
340
                                void *udata,
341
                                const struct mosquitto_message *msg)
342
{
343
        struct topic_info *tinfo;
344
        const char *payload;
345
        size_t paylen;
346

    
347
        ep_dbg_cprintf(Dbg, 65, "message_cb: %s @%d => %s\n",
348
                        msg->topic, msg->qos, (const char *) msg->payload);
349

    
350
        paylen = msg->payloadlen;
351
        payload = msg->payload;
352
        if (paylen <= 0)
353
        {
354
                payload = "null";
355
                paylen = strlen(payload) + 1;
356
        }
357

    
358
        tinfo = get_topic_info(msg);
359
        if (tinfo == NULL)
360
        {
361
                // no place to log this message
362
                return;
363
        }
364

    
365
        // if this record is a duplicate, skip it
366
        if (DropDups && record_is_dup(payload, tinfo))
367
                        return;
368

    
369
        if (tinfo->gin != NULL)
370
        {
371
                EP_STAT estat;
372
                gdp_datum_t *datum = gdp_datum_new();
373
                gdp_recno_t oldnrecs;
374

    
375
                if (SkipMetadata)
376
                {
377
                        gdp_buf_printf(gdp_datum_getbuf(datum),
378
                                        "%s\n", payload);
379
                }
380
                else
381
                {
382
                        gdp_buf_printf(gdp_datum_getbuf(datum),
383
                                        "{\"topic\":\"%s\", \"qos\":%d, \"len\":%d, \"payload\":%s}\n",
384
                                        msg->topic, msg->qos, paylen, payload);
385
                }
386
                oldnrecs = gdp_gin_getnrecs(tinfo->gin);
387
                estat = gdp_gin_append(tinfo->gin, datum, NULL);
388
                if (!EP_STAT_ISOK(estat))
389
                {
390
                        ep_log(estat, "cannot log MQTT message for %s "
391
                                        "(nrecs %" PRIgdp_recno " => %" PRIgdp_recno ")",
392
                                        msg->topic, oldnrecs, gdp_gin_getnrecs(tinfo->gin));
393
                }
394
                gdp_datum_free(datum);
395
        }
396
        else
397
        {
398
                if (SkipMetadata)
399
                {
400
                        printf("%s\n", payload);
401
                }
402
                else
403
                {
404
                        printf("{\"topic\":\"%s\", \"qos\":%d, \"len\":%zd, \"payload\":%s}\n",
405
                                        msg->topic, msg->qos, paylen, payload);
406
                }
407
        }
408
}
409

    
410

    
411
/*
412
**  Run Mosquitto loop until failure.
413
*/
414

    
415
int
416
run_mosquitto_loop(struct mosquitto *mosq)
417
{
418
        int istat;
419

    
420
        do
421
        {
422
                // check for MQTT messages (1 second timeout)
423
                istat = mosquitto_loop(mosq, 1000, 1);
424

    
425
                if (istat == MOSQ_ERR_SUCCESS)
426
                {
427
                        // check for stuck topics
428
                        check_traffic();
429
                }
430
        } while (istat == MOSQ_ERR_SUCCESS);
431

    
432
        // some failure, can we recover?
433
        ep_dbg_cprintf(Dbg, 3, "mosquitto_loop stat %d (%s) errno %d (%s)\n",
434
                        istat, mosquitto_strerror(istat), errno, strerror(errno));
435
        return istat;
436
}
437

    
438

    
439
/*
440
**  Run Mosquitto against a designated broker.
441
*/
442

    
443
EP_STAT
444
run_new_mosquitto(
445
                        const char *broker,
446
                        int subscr_qos,
447
                        void *udata)
448
{
449
        struct mosquitto *mosq;
450
        const char *phase;
451
        int istat = MOSQ_ERR_ERRNO;
452
        EP_STAT estat = EP_STAT_ABORT;
453

    
454
        // get a mosquitto context
455
        phase = "setup";
456
        mosq = mosquitto_new(NULL, true, udata);
457
        if (mosq == NULL)
458
                goto fail0;
459

    
460
        // can't just pass in NULL for udata, since mosquitto_new automatically
461
        // maps NULL to mosq
462
        mosquitto_user_data_set(mosq, udata);
463

    
464
        // adjust reconnection parameters
465
        mosquitto_reconnect_delay_set(mosq, 2, 30, true);
466

    
467
        // set up callbacks
468
        mosquitto_log_callback_set(mosq, &log_cb);
469
        mosquitto_message_callback_set(mosq, &message_cb);
470

    
471
        // connect to the broker
472
        phase = "connect";
473
        {
474
                // parse the broker information
475
                int mqtt_port = 1883;
476
                char *bname = ep_mem_strdup(broker);
477
                char *p = strrchr(bname, ':');
478
                if (p != NULL && strchr(p, ']') == NULL)
479
                {
480
                        *p++ = '\0';
481
                        mqtt_port = atol(p);
482
                }
483
                ep_dbg_cprintf(Dbg, 1,
484
                                "mosquitto_connect(%s:%d)\n", bname, mqtt_port);
485
                istat = mosquitto_connect(mosq, bname, mqtt_port, 60);
486
                ep_mem_free(bname);
487
        }
488
        if (istat != 0)
489
                goto fail0;
490

    
491
        phase = "subscribe";
492
        istat = subscribe_to_all_topics(mosq, subscr_qos);
493
        if (istat != 0)
494
                goto fail0;
495

    
496
        // we can claim to be started now
497
        ep_sd_notifyf("READY=1\n");
498

    
499
        phase = "loop";
500
        istat = run_mosquitto_loop(mosq);
501

    
502
        // recoverable?
503
        switch (istat)
504
        {
505
                case MOSQ_ERR_NO_CONN:
506
                case MOSQ_ERR_CONN_REFUSED:
507
                case MOSQ_ERR_CONN_LOST:
508
                        estat = EP_STAT_ERROR;
509
                        break;
510

    
511
                case MOSQ_ERR_ERRNO:
512
                        if (errno != EPROTO)
513
                                estat = ep_stat_from_errno(errno);
514
                        break;
515
        }
516

    
517
fail0:
518
        fprintf(stderr, "mosquitto error in %s: %s\n",
519
                        phase, mosquitto_strerror(istat));
520

    
521
        if (EP_STAT_ISABORT(estat))
522
                ep_sd_notifyf("STOPPING=1\n");
523
        else
524
                ep_sd_notifyf("RELOADING=1\n");
525
        if (mosq != NULL)
526
        {
527
                (void) mosquitto_disconnect(mosq);        // ignore possible MOSQ_ERR_NO_CONN
528
                                                                                        // "not connected" error
529
                mosquitto_destroy(mosq);
530
        }
531
        return estat;
532
}
533

    
534

    
535
gdp_gin_t *
536
open_signed_log(const char *gdp_log_name, const char *signing_key_file)
537
{
538
        EP_STAT estat;
539
        gdp_name_t gname;
540
        gdp_open_info_t *info = NULL;
541
        gdp_gin_t *gin = NULL;
542

    
543
        // make sure we can parse the log name
544
        estat = gdp_parse_name(gdp_log_name, gname);
545
        if (!EP_STAT_ISOK(estat))
546
        {
547
                char ebuf[60];
548

    
549
                ep_app_error("cannot parse log name %s%s%s: %s",
550
                                EpChar->lquote, gdp_log_name, EpChar->rquote,
551
                                ep_stat_tostr(estat, ebuf, sizeof ebuf));
552
                goto fail0;
553
        }
554

    
555
        if (signing_key_file != NULL)
556
        {
557
                FILE *fp;
558
                EP_CRYPTO_KEY *skey;
559
                struct stat st;
560

    
561
                // set up any open information
562
                info = gdp_open_info_new();
563

    
564
                if (stat(signing_key_file, &st) == 0 &&
565
                                (st.st_mode & S_IFMT) == S_IFDIR)
566
                {
567
                        size_t sz;
568
                        char *p;
569
                        gdp_pname_t pname;
570

    
571
                        // we need the printable name for getting the key file name
572
                        gdp_printable_name(gname, pname);
573

    
574
                        // find the file in the directory
575
                        sz = strlen(signing_key_file) + sizeof pname + 6;
576
                        p = alloca(sz);
577
                        snprintf(p, sz, "%s/%s.pem", signing_key_file, pname);
578
                        signing_key_file = p;
579
                }
580

    
581
                fp = fopen(signing_key_file, "r");
582
                if (fp == NULL)
583
                {
584
                        estat = ep_stat_from_errno(errno);
585
                        ep_app_error("cannot open signing key file %s, log %s",
586
                                        signing_key_file, gdp_log_name);
587
                        goto fail1;
588
                }
589

    
590
                skey = ep_crypto_key_read_fp(fp, signing_key_file,
591
                                EP_CRYPTO_KEYFORM_PEM, EP_CRYPTO_F_SECRET);
592
                if (skey == NULL)
593
                {
594
                        estat = ep_stat_from_errno(errno);
595
                        if (EP_STAT_ISOK(estat))
596
                                estat = EP_STAT_CRYPTO_FAIL;
597
                        ep_app_error("cannot read signing key file %s, log %s",
598
                                        signing_key_file, gdp_log_name);
599
                        goto fail1;
600
                }
601

    
602
                estat = gdp_open_info_set_signing_key(info, skey);
603
fail1:
604
                if (fp != NULL)
605
                        fclose(fp);
606
        }
607

    
608
        EP_STAT_CHECK(estat, goto fail0);
609

    
610
        // open a log with the provided name
611
        estat = gdp_gin_open(gname, GDP_MODE_AO, info, &gin);
612
        if (!EP_STAT_ISOK(estat))
613
        {
614
                char ebuf[60];
615

    
616
                ep_app_error("cannot open log %s%s%s: %s",
617
                                EpChar->lquote, gdp_log_name, EpChar->rquote,
618
                                ep_stat_tostr(estat, ebuf, sizeof ebuf));
619
                goto fail0;
620
        }
621

    
622
fail0:
623
        if (info != NULL)
624
                gdp_open_info_free(info);
625
        return gin;
626
}
627

    
628

    
629
void
630
usage(void)
631
{
632
        fprintf(stderr,
633
                        "Usage: %s [-d] [-D dbgspec] [-G router_addr] [-h]\n"
634
                        "\t[-K signing_key_file] [-M broker_addr] [-q qos] [-s]\n"
635
                        "\t[-w idletime] [-W msg_interval] mqtt_topic gdp_log ...\n"
636
                        "    -d  drop duplicate records\n"
637
                        "    -D  set debugging flags\n"
638
                        "    -G  IP host to contact for gdp_router\n"
639
                        "    -h  record \"heartbeat\" interval, even if dups (seconds)\n"
640
                        "    -K  key file to use to sign GDP log writes\n"
641
                        "    -M  IP host to contact for MQTT broker\n"
642
                        "    -q  MQTT Quality of Service to request\n"
643
                        "    -s  skip metadata in output (topic, qos, len)\n"
644
                        "    -w  set warn interval for no topic traffic (seconds)\n"
645
                        "    -W  only issue warning messages every N seconds\n"
646
                        "mqtt_topic and gdp_log must be in pairs\n"
647
                        "",
648
                        ep_app_getprogname());
649
        exit(EX_USAGE);
650
}
651

    
652

    
653
int
654
main(int argc, char **argv)
655
{
656
        EP_STAT estat;
657
        bool show_usage = false;
658
        int mqtt_qos = 2;
659
        char *gdp_addr = NULL;
660
        char *signing_key_file = NULL;
661
        int opt;
662
        time_t warntime = 0;
663
        struct timeval now;
664
        char ebuf[60];
665

    
666
        while ((opt = getopt(argc, argv, "dD:G:h:K:M:q:sw:W:")) > 0)
667
        {
668
                switch (opt)
669
                {
670
                case 'd':
671
                        DropDups = true;
672
                        break;
673

    
674
                case 'D':
675
                        ep_dbg_set(optarg);
676
                        break;
677

    
678
                case 'G':
679
                        gdp_addr = optarg;
680
                        break;
681

    
682
                case 'h':
683
                        HeartbeatTime = atol(optarg);
684
                        break;
685

    
686
                case 'K':
687
                        signing_key_file = optarg;
688
                        break;
689

    
690
                case 'M':
691
                        MqttBroker = optarg;
692
                        break;
693

    
694
                case 'q':
695
                        mqtt_qos = atoi(optarg);
696
                        break;
697

    
698
                case 's':
699
                        SkipMetadata = true;
700
                        break;
701

    
702
                case 'w':
703
                        warntime = atoi(optarg);
704
                        break;
705

    
706
                case 'W':
707
                        WarnMessageInterval = atoi(optarg);
708
                        break;
709

    
710
                default:
711
                        show_usage = true;
712
                        break;
713
                }
714
        }
715
        argc -= optind;
716
        argv += optind;
717

    
718
        if (show_usage)
719
                usage();
720
        if (argc < 2)
721
        {
722
                fprintf(stderr, "missing required argument (argc = %d)\n", argc);
723
                usage();
724
        }
725

    
726
        // initialize GDP library
727
        estat = gdp_init(gdp_addr);
728
        if (!EP_STAT_ISOK(estat))
729
        {
730
                ep_app_error("GDP Initialization failed: %s",
731
                                ep_stat_tostr(estat, ebuf, sizeof ebuf));
732
                exit(EX_UNAVAILABLE);
733
        }
734

    
735
        // allow thread to settle to avoid interspersed debug output
736
        ep_time_nanosleep(INT64_C(100000000));
737

    
738
        // open any logs we may use
739
        gettimeofday(&now, NULL);
740
        for (; argc > 1; argv += 2, argc -= 2)
741
        {
742
                struct topic_info *t;
743
                char *mqtt_topic = argv[0];
744
                char *gdp_log_name = argv[1];
745
                gdp_gin_t *gin = NULL;
746

    
747
                if (strcmp(gdp_log_name, "-") != 0)
748
                {
749
                        gin = open_signed_log(gdp_log_name, signing_key_file);
750
                        if (gin == NULL)
751
                        {
752
                                // message already given
753
                                show_usage = true;
754
                                continue;
755
                        }
756
                }
757

    
758
                t = add_topic_info(mqtt_topic);
759
                t->gin = gin;
760
                t->log_xname = gdp_log_name;
761
                t->warntime = warntime;
762
                t->oldrectime = now.tv_sec;
763

    
764
                if (warntime > 0)
765
                {
766
                        printf("{ \"tag\": \"initial mqtt topic\", "
767
                                        "\"broker\": \"%s\", "
768
                                        "\"topic\": \"%s\", "
769
                                        "\"log\": \"%s\" } \n",
770
                                        MqttBroker, t->topic_pat, t->log_xname);
771
                }
772
        }
773

    
774
        if (show_usage)
775
                usage();
776

    
777
        if (MqttBroker == NULL)
778
        {
779
                MqttBroker = ep_adm_getstrparam("swarm.mqtt-gdp-gateway.broker",
780
                                                        "127.0.0.1");
781
        }
782

    
783
        if (HeartbeatTime < 0)
784
        {
785
                HeartbeatTime = ep_adm_getlongparam("swarm.mqtt-gdp-gateway.heartbeat.time",
786
                                60);
787
        }
788

    
789
        mosquitto_lib_init();
790
        int backoff = 0;
791
        for (;;)
792
        {
793
                estat = run_new_mosquitto(MqttBroker, mqtt_qos, NULL);
794
                ep_log(estat, "run_new_mosquitto %s returned", MqttBroker);
795

    
796
                // if this is an abort condition, die now
797
                if (EP_STAT_ISABORT(estat))
798
                        break;
799

    
800
                // try again (with backoff)
801
                if (backoff > 0)
802
                {
803
                        sleep(backoff);
804
                        backoff *= 2;
805
                }
806
                else
807
                {
808
                        backoff = 1;
809
                }
810
                if (backoff > MaxRestartBackoff)
811
                        backoff = MaxRestartBackoff;
812
        }
813

    
814
        exit(EX_SOFTWARE);
815
}