Project

General

Profile

Statistics
| Branch: | Tag: | Revision:

gdp / gdp / gdp_chan.c @ master

History | View | Annotate | Download (28.9 KB)

1
/* vim: set ai sw=4 sts=4 ts=4 :*/
2

    
3
/*
4
**        I/O CHANNEL HANDLING
5
**                This communicates between the client and the routing layer.
6
**
7
**        ----- BEGIN LICENSE BLOCK -----
8
**        GDP: Global Data Plane Support Library
9
**        From the Ubiquitous Swarm Lab, 490 Cory Hall, U.C. Berkeley.
10
**
11
**        Copyright (c) 2015-2019, Regents of the University of California.
12
**        All rights reserved.
13
**
14
**        Permission is hereby granted, without written agreement and without
15
**        license or royalty fees, to use, copy, modify, and distribute this
16
**        software and its documentation for any purpose, provided that the above
17
**        copyright notice and the following two paragraphs appear in all copies
18
**        of this software.
19
**
20
**        IN NO EVENT SHALL REGENTS BE LIABLE TO ANY PARTY FOR DIRECT, INDIRECT,
21
**        SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING LOST
22
**        PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION,
23
**        EVEN IF REGENTS HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
24
**
25
**        REGENTS SPECIFICALLY DISCLAIMS ANY WARRANTIES, INCLUDING, BUT NOT
26
**        LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
27
**        FOR A PARTICULAR PURPOSE. THE SOFTWARE AND ACCOMPANYING DOCUMENTATION,
28
**        IF ANY, PROVIDED HEREUNDER IS PROVIDED "AS IS". REGENTS HAS NO
29
**        OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS,
30
**        OR MODIFICATIONS.
31
**        ----- END LICENSE BLOCK -----
32
*/
33

    
34
#include "gdp.h"
35
#include "gdp_chan.h"
36
#include "gdp_priv.h"
37
#include "gdp_zc_client.h"
38

    
39
#include <ep/ep_app.h>
40
#include <ep/ep_dbg.h>
41
#include <ep/ep_hexdump.h>
42
#include <ep/ep_log.h>
43
#include <ep/ep_prflags.h>
44
#include <ep/ep_string.h>
45

    
46
#include <ctype.h>
47
#include <errno.h>
48
#include <string.h>
49
#include <sys/queue.h>
50
#include <sys/signal.h>
51
#include <sys/socket.h>
52

    
53
#include <netinet/tcp.h>
54

    
55
static EP_DBG        Dbg = EP_DBG_INIT("gdp.chan", "GDP channel processing");
56
#if GDP_DEBUG_EXTREME_TESTS
57
static EP_DBG        DbgT1 = EP_DBG_INIT(".test.gdp.chan.seqno", "TEST: GDP channel seqno randomization");
58
#endif
59

    
60
// protocol version number in layer 4 (transport) PDU
61
#define GDP_CHAN_PROTO_VERSION        4
62

    
63
static struct event_base        *EventBase;
64
static EP_STAT                                chan_reopen(gdp_chan_t *);
65

    
66
/*
67
**  Channel internal structure
68
*/
69

    
70
struct gdp_chan
71
{
72
        EP_THR_MUTEX                        mutex;                        // data structure lock
73
        EP_THR_COND                                cond;                        // wake up after state change
74
        int16_t                                        state;                        // current state of channel
75
        uint16_t                                flags;                        // status flags
76
        struct bufferevent                *bev;                        // associated bufferevent (socket)
77
        char                                        *router_addr;        // text version of router address
78
        gdp_name_t                                gdpname;                // GDPname of the router
79
        gdp_chan_x_t                        *cdata;                        // arbitrary user data
80

    
81
        // callbacks
82
        gdp_chan_recv_cb_t                *recv_cb;                // receive callback
83
        gdp_chan_send_cb_t                *send_cb;                // send callback
84
        gdp_chan_ioevent_cb_t        *ioevent_cb;        // close/error/eof callback
85
        gdp_chan_router_cb_t        *router_cb;                // router event callback
86
        gdp_chan_advert_func_t        *advert_cb;                // advertising function
87
};
88

    
89
/* Channel states */
90
#define GDP_CHAN_UNCONNECTED        0                // channel is not connected yet
91
#define GDP_CHAN_CONNECTING                1                // connection being initiated
92
#define GDP_CHAN_CONNECTED                2                // channel is connected and active
93
#define GDP_CHAN_ERROR                        3                // channel has had error
94
#define GDP_CHAN_CLOSING                4                // channel is closing
95

    
96

    
97
//XXX following needs to be changed if ADDR_FMT != 0
98
// magic, hdrlen, type, ttl, seq_mf_foff, fraglen, paylen, dst, src
99
#define MIN_HEADER_LENGTH        (1 + 1 + 1 + 1 + 4 + 2 + 2 + 32 + 32)
100
#define MAX_HEADER_LENGTH        (255 * 4)
101

    
102

    
103
/*
104
**  _GDP_CHAN_INIT --- initialize channel subsystem
105
*/
106

    
107
EP_STAT
108
_gdp_chan_init(
109
                struct event_base *evbase,
110
                void *options_unused)
111
{
112
        if (evbase == NULL)
113
                return EP_STAT_ABORT;
114
        EventBase = evbase;
115
        return EP_STAT_OK;
116
}
117

    
118

    
119
/*
120
**  Lock and Unlock a channel
121
*/
122

    
123
void
124
_gdp_chan_lock(gdp_chan_t *chan)
125
{
126
        ep_thr_mutex_lock(&chan->mutex);
127
}
128

    
129
void
130
_gdp_chan_unlock(gdp_chan_t *chan)
131
{
132
        ep_thr_mutex_unlock(&chan->mutex);
133
}
134

    
135

    
136
/*
137
**  Read and decode fixed PDU header
138
**                On return, the header has been consumed from the input but
139
**                        the complete payload is still in the input buffer.
140
**                        The payload length is returned through plenp.
141
**                Returns GDP_STAT_KEEP_READING and leaves the header in the
142
**                        input buffer if the entire payload is not yet in memory.
143
**                Returns GDP_STAT_NAK_NOROUTE if the router cannot find a
144
**                        path to the destination.
145
*/
146

    
147
static EP_PRFLAGS_DESC        L4Flags[] =
148
{
149
        // address type portion
150
        { GDP_PKT_ADDR_TYPE_2FULL,        GDP_PKT_ADDR_TYPE_MASK,        "2FULL"                        },
151

    
152
        // packet type portion
153
        { GDP_PKT_TYPE_REGULAR,                GDP_PKT_TYPE_MASK,                "REGULAR"                },
154
        { GDP_PKT_TYPE_FORWARD,                GDP_PKT_TYPE_MASK,                "FORWARD"                },
155
        { GDP_PKT_TYPE_ADVERTISE,        GDP_PKT_TYPE_MASK,                "ADVERTISE"                },
156
        { GDP_PKT_TYPE_WITHDRAW,        GDP_PKT_TYPE_MASK,                "WITHDRAW"                },
157
        { GDP_PKT_TYPE_NAK_NOROUTE,        GDP_PKT_TYPE_MASK,                "NAK_NOROUTE"        },
158
        { GDP_PKT_TYPE_SEQ_PACKET,        GDP_PKT_TYPE_MASK,                "SEQ_PACKET"        },
159
        { GDP_PKT_TYPE_NAK_PACKET,        GDP_PKT_TYPE_MASK,                "NAK_PACKET"        },
160
        { GDP_PKT_TYPE_ACK_PACKET,        GDP_PKT_TYPE_MASK,                "ACK_PACKET"        },
161

    
162
        // flags portion
163
        { GDP_PKT_TYPE_RELIABLE,        GDP_PKT_TYPE_RELIABLE,        "RELIABLE"                },
164
        { GDP_PKT_TYPE_SSEQ,                GDP_PKT_TYPE_SSEQ,                "SSEQ"                        },
165
        { 0,                                                0,                                                NULL                        }
166
};
167

    
168
static EP_STAT
169
read_header(gdp_chan_t *chan,
170
                gdp_buf_t *ibuf,
171
                gdp_name_t *src,
172
                gdp_name_t *dst,
173
                gdp_seqno_t *seqnop,
174
                size_t *plenp)
175
{
176
        uint8_t *pbp = gdp_buf_getptr(ibuf, MIN_HEADER_LENGTH);
177
        size_t hdr_len = 0;
178
        size_t payload_len = 0;
179
        EP_STAT estat = EP_STAT_OK;
180

    
181
        if (pbp == NULL)
182
        {
183
                // fewer than MIN_HEADER_LENGTH bytes in buffer
184
                ep_dbg_cprintf(Dbg, 11, "read_header: pbp == NULL\n");
185
                estat = GDP_STAT_KEEP_READING;
186
                goto done;
187
        }
188

    
189
        if (ep_dbg_test(Dbg, 66))
190
        {
191
                ep_dbg_printf("read_header: initial header:\n");
192
                ep_hexdump(pbp, MIN_HEADER_LENGTH, ep_dbg_getfile(), EP_HEXDUMP_HEX, 0);
193
        }
194

    
195
        int ver;
196
        GET8(ver);                                // PDU version number (offset 0)
197
        if (ver != GDP_CHAN_PROTO_VERSION)
198
        {
199
                ep_dbg_cprintf(Dbg, 1, "wrong protocol version %d (%d expected)\n",
200
                                ver, GDP_CHAN_PROTO_VERSION);
201
                estat = GDP_STAT_PDU_VERSION_MISMATCH;
202

    
203
                // for lack of anything better, flush the entire input buffer
204
                gdp_buf_drain(ibuf, gdp_buf_getlength(ibuf));
205
                goto fail0;
206
        }
207
        GET8(hdr_len);                        // header length / 4
208
        hdr_len &= 0x3F;                // top two bits reserved
209
        hdr_len *= 4;
210
        if (hdr_len < MIN_HEADER_LENGTH)
211
        {
212
                ep_dbg_cprintf(Dbg, 1,
213
                                "read_header: short header, need %d got %zd\n",
214
                                MIN_HEADER_LENGTH, hdr_len);
215
                estat = GDP_STAT_PDU_CORRUPT;
216
                goto done;
217
        }
218

    
219
        // if we don't yet have the whole header, wait until we do
220
        if (gdp_buf_getlength(ibuf) < hdr_len)
221
                return GDP_STAT_KEEP_READING;
222

    
223
        int flags;
224
        GET8(flags);                        // type of service/flags/address format
225
        int ttl;
226
        GET8(ttl);                                // time to live (ignored by endpoints)
227
        ttl &= 0x3f;
228
        uint32_t seq_mf_foff;
229
        GET32(seq_mf_foff);                // seqno, more frags bit, and frag offset
230
        uint16_t seqno = (seq_mf_foff >> GDP_PKT_SEQNO_SHIFT) & GDP_PKT_SEQNO_MASK;
231
        uint16_t frag_off = seq_mf_foff & GDP_PKT_SEQNO_FOFF_MASK;
232
        uint16_t frag_len;
233
        GET16(frag_len);                // fragment length
234
        GET16(payload_len);                // length of opaque payload (reassembled)
235
        if ((flags & GDP_PKT_ADDR_TYPE_MASK) == 0)
236
        {
237
                memcpy(dst, pbp, sizeof (gdp_name_t));
238
                pbp += sizeof (gdp_name_t);
239
                memcpy(src, pbp, sizeof (gdp_name_t));
240
                pbp += sizeof (gdp_name_t);
241
        }
242
        else
243
        {
244
                ep_dbg_cprintf(Dbg, 1,
245
                                "read_header: unknown address format 0x%02x\n",
246
                                flags & GDP_PKT_ADDR_TYPE_MASK);
247
                estat = GDP_STAT_PDU_CORRUPT;
248
                goto done;
249
        }
250

    
251
#if GDP_DEBUG_EXTREME_TESTS
252
        if (ep_dbg_test(DbgT1, 102))
253
        {
254
                // randomize sequence number to test recovery
255
                // (no need to be cryptographically secure, hence trivial algorithm)
256
                seqno = random() % GDP_SEQNO_BASE;
257
        }
258
#endif
259

    
260
        if (ep_dbg_test(Dbg, 55))
261
        {
262
                gdp_pname_t src_p, dst_p;
263
                ep_dbg_printf("read_header(%zd): ver %d ttl %d seqno %d off %d paylen %zd\n"
264
                                        "    src %s\n"
265
                                        "    dst %s\n"
266
                                        "    flags ",
267
                                hdr_len, ver, ttl, seqno, frag_off, payload_len,
268
                                gdp_printable_name(*src, src_p),
269
                                gdp_printable_name(*dst, dst_p));
270
                        ep_prflags(flags, L4Flags, NULL);
271
                        ep_dbg_printf("\n");
272
        }
273

    
274
        // check for router meta-commands (type)
275
        if ((flags & GDP_PKT_TYPE_MASK) == GDP_PKT_TYPE_NAK_NOROUTE)
276
        {
277
                estat = GDP_STAT_NAK_NOROUTE;
278
                goto done;
279
        }
280
        else if ((flags & GDP_PKT_TYPE_MASK) != GDP_PKT_TYPE_REGULAR)
281
        {
282
                if (ep_dbg_test(Dbg, 1))
283
                {
284
                        gdp_pname_t src_p, dst_p;
285
                        ep_dbg_printf("read_header(%zd): unxpected PDU router type\n"
286
                                                "    ver %d ttl %d seqno %d off %d paylen %zd\n"
287
                                                "    src %s\n"
288
                                                "    dst %s\n"
289
                                                "    flags ",
290
                                        hdr_len, ver, ttl, seqno, frag_off, payload_len,
291
                                        gdp_printable_name(*src, src_p),
292
                                        gdp_printable_name(*dst, dst_p));
293
                                ep_prflags(flags, L4Flags, NULL);
294
                                ep_dbg_printf("\n");
295
                }
296
                estat = GDP_STAT_PDU_CORRUPT;
297
                goto done;
298
        }
299

    
300
        // XXX check for rational payload_len here? XXX
301

    
302
        // make sure entire PDU is in memory
303
        if (gdp_buf_getlength(ibuf) < hdr_len + payload_len)
304
                return GDP_STAT_KEEP_READING;
305

    
306
        // consume the header, but leave the payload
307
        gdp_buf_drain(ibuf, hdr_len);
308

    
309
done:
310
        if (EP_STAT_ISOK(estat))
311
        {
312
                estat = EP_STAT_FROM_INT(payload_len);
313
                *seqnop = seqno;
314
        }
315
        else
316
        {
317
                ep_dbg_cprintf(Dbg, 19, "read_header: draining %zd on error\n",
318
                                                hdr_len + payload_len);
319
                gdp_buf_drain(ibuf, hdr_len + payload_len);
320
                payload_len = 0;
321
        }
322

    
323
fail0:
324
        {
325
                char ebuf[100];
326
                ep_dbg_cprintf(Dbg, 32, "read_header: hdr %zd pay %zd stat %s\n",
327
                                hdr_len, payload_len,
328
                                ep_stat_tostr(estat, ebuf, sizeof ebuf));
329
        }
330
        *plenp = payload_len;
331
        return estat;
332
}
333

    
334

    
335
/*
336
**        CHAN_READ_CB --- data is available for reading from network socket
337
**
338
**                Minimal implementation: read in PDU and hand it to
339
**                processing routine.  If that processing is going to be
340
**                lengthy it should use a thread.
341
**
342
**                We insist that the entire PDU be in memory before passing
343
**                the cursor up.  To fix that we would need to associate the
344
**                cursor with a {src, dst, seqno} tuple, but our naive
345
**                implementation will never intersperse portions of messages,
346
**                so this is safe.
347
*/
348

    
349
static void
350
chan_read_cb(struct bufferevent *bev, void *ctx)
351
{
352
        EP_STAT estat;
353
        gdp_buf_t *ibuf = GDP_BUF_FROM_EVBUFFER(bufferevent_get_input(bev));
354
        gdp_chan_t *chan = (gdp_chan_t *) ctx;
355
        gdp_name_t src, dst;
356
        gdp_seqno_t seqno = 0;
357

    
358
        ep_dbg_cprintf(Dbg, 50, "chan_read_cb: fd %d, %zd bytes\n",
359
                        bufferevent_getfd(bev), gdp_buf_getlength(ibuf));
360

    
361
        EP_ASSERT(bev == chan->bev);
362

    
363
        while (gdp_buf_getlength(ibuf) >= MIN_HEADER_LENGTH)
364
        {
365
                // get the transport layer header
366
                size_t payload_len;
367
                estat = read_header(chan, ibuf, &src, &dst, &seqno, &payload_len);
368

    
369
                // if we don't have enough input, wait for more (we'll be called again)
370
                if (EP_STAT_IS_SAME(estat, GDP_STAT_KEEP_READING))
371
                        break;
372

    
373
                if (!EP_STAT_ISOK(estat))
374
                {
375
                        // deliver routing error to upper level
376
                        ep_dbg_cprintf(Dbg, 27, "chan_read_cb: sending to router_cb %p\n",
377
                                                chan->router_cb);
378
                        if (chan->router_cb != NULL)
379
                        {
380
                                estat = (*chan->router_cb)(chan, src, dst, payload_len, estat);
381
                        }
382
                        else
383
                        {
384
                                ep_dbg_cprintf(Dbg, 1, "chan_read_cb: NULL router_cb\n");
385
                                estat = GDP_STAT_NOT_IMPLEMENTED;
386
                                gdp_buf_drain(ibuf, payload_len);
387
                        }
388
                }
389

    
390
                // pass it to the L5 callback
391
                // note that if the callback is not set, the PDU is thrown away
392
                if (EP_STAT_ISOK(estat))
393
                {
394
                        if (chan->recv_cb != NULL)
395
                        {
396
                                // call upper level processing
397
                                estat = (*chan->recv_cb)(chan, src, dst, seqno,
398
                                                                        ibuf, payload_len);
399
                        }
400
                        else
401
                        {
402
                                // discard input
403
                                ep_dbg_cprintf(Dbg, 1, "chan_read_cb: NULL recv_cb\n");
404
                                estat = GDP_STAT_NOT_IMPLEMENTED;
405
                                gdp_buf_drain(ibuf, payload_len);
406
                        }
407
                }
408
                char ebuf[100];
409
                ep_dbg_cprintf(Dbg, 32, "chan_read_cb: %s\n",
410
                                ep_stat_tostr(estat, ebuf, sizeof ebuf));
411
        }
412
}
413

    
414

    
415
/*
416
**        CHAN_EVENT_CB --- events or errors occur on network socket
417
*/
418

    
419
static EP_PRFLAGS_DESC        EventWhatFlags[] =
420
{
421
        { BEV_EVENT_READING,        BEV_EVENT_READING,                "READING"                        },
422
        { BEV_EVENT_WRITING,        BEV_EVENT_WRITING,                "WRITING"                        },
423
        { BEV_EVENT_EOF,                BEV_EVENT_EOF,                        "EOF"                                },
424
        { BEV_EVENT_ERROR,                BEV_EVENT_ERROR,                "ERROR"                                },
425
        { BEV_EVENT_TIMEOUT,        BEV_EVENT_TIMEOUT,                "TIMEOUT"                        },
426
        { BEV_EVENT_CONNECTED,        BEV_EVENT_CONNECTED,        "CONNECTED"                        },
427
        { GDP_IOEVENT_USER_CLOSE,        0xffff,                                "USER_CLOSE"                },
428
        { 0, 0, NULL }
429
};
430

    
431
static void
432
chan_event_cb(struct bufferevent *bev, short events, void *ctx)
433
{
434
        bool restart_connection = false;
435
        gdp_chan_t *chan = (gdp_chan_t *) ctx;
436
        uint32_t cbflags = 0;
437

    
438
        if (ep_dbg_test(Dbg, 10))
439
        {
440
                int sockerr = EVUTIL_SOCKET_ERROR();
441
                ep_dbg_printf("chan_event_cb[%d]: ", getpid());
442
                ep_prflags(events, EventWhatFlags, ep_dbg_getfile());
443
                ep_dbg_printf(", fd=%d , errno=%d %s\n",
444
                                bufferevent_getfd(bev),
445
                                sockerr, evutil_socket_error_to_string(sockerr));
446
        }
447

    
448
        EP_ASSERT(bev == chan->bev);
449

    
450
        if (EP_UT_BITSET(BEV_EVENT_CONNECTED, events))
451
        {
452
                // sometimes libevent says we're connected when we're not
453
                if (EVUTIL_SOCKET_ERROR() == ECONNREFUSED)
454
                {
455
                        chan->state = GDP_CHAN_ERROR;
456
                        cbflags |= GDP_IOEVENT_ERROR;
457
                }
458
                else
459
                {
460
                        chan->state = GDP_CHAN_CONNECTED;
461
                        cbflags |= GDP_IOEVENT_CONNECTED;
462
                }
463
                ep_thr_cond_broadcast(&chan->cond);
464
        }
465
        if (EP_UT_BITSET(BEV_EVENT_EOF, events))
466
        {
467
                gdp_buf_t *ibuf = GDP_BUF_FROM_EVBUFFER(bufferevent_get_input(bev));
468
                size_t l = gdp_buf_getlength(ibuf);
469

    
470
                ep_dbg_cprintf(Dbg, 1, "chan_event_cb[%d]: got EOF, %zu bytes left\n",
471
                                        getpid(), l);
472
                cbflags |= GDP_IOEVENT_EOF;
473
                restart_connection = true;
474
        }
475
        if (EP_UT_BITSET(BEV_EVENT_ERROR, events))
476
        {
477
                int sockerr = EVUTIL_SOCKET_ERROR();
478

    
479
                ep_dbg_cprintf(Dbg, 1, "chan_event_cb[%d]: error: %s\n",
480
                                getpid(), evutil_socket_error_to_string(sockerr));
481
                cbflags |= GDP_IOEVENT_ERROR;
482
                restart_connection = true;
483
        }
484

    
485
        if (chan->ioevent_cb != NULL)
486
                (*chan->ioevent_cb)(chan, cbflags);
487

    
488
        // if we need to restart, let it run
489
        if (restart_connection)
490
        {
491
                EP_STAT estat;
492

    
493
                chan->state = GDP_CHAN_ERROR;
494
                ep_thr_cond_broadcast(&chan->cond);
495

    
496
                do
497
                {
498
                        long delay = ep_adm_getlongparam("swarm.gdp.reconnect.delay", 1000L);
499
                        if (delay > 0)
500
                                ep_time_nanosleep(delay * INT64_C(1000000));
501
                        estat = chan_reopen(chan);
502
                } while (!EP_STAT_ISOK(estat));
503
        }
504

    
505
        if (chan->state == GDP_CHAN_CONNECTED)
506
                (*chan->advert_cb)(chan, GDP_CMD_ADVERTISE, ctx);
507
}
508

    
509

    
510
/*
511
**  Helper for close, error, and eof handlers
512
*/
513

    
514
static EP_STAT
515
chan_do_close(gdp_chan_t *chan, int what)
516
{
517
        if (ep_dbg_test(Dbg, 7))
518
        {
519
                ep_dbg_printf("chan_do_close: chan %p what ", chan);
520
                ep_prflags(what, EventWhatFlags, NULL);
521
        }
522
        if (chan == NULL)
523
                return EP_STAT_ERROR;
524

    
525
        chan->state = GDP_CHAN_CLOSING;
526
        ep_thr_cond_broadcast(&chan->cond);
527
        _gdp_chan_flush(chan);
528
        if (chan->ioevent_cb != NULL)
529
                (*chan->ioevent_cb)(chan, what);
530
        if (chan->bev != NULL)
531
                bufferevent_free(chan->bev);
532
        chan->bev = NULL;
533
        if (chan->router_addr != NULL)
534
                ep_mem_free(chan->router_addr);
535
        ep_thr_cond_destroy(&chan->cond);
536
        ep_thr_mutex_destroy(&chan->mutex);
537
        ep_mem_free(chan);
538
        return EP_STAT_OK;
539
}
540

    
541

    
542
/*
543
**        _GDP_CHAN_OPEN_HELPER --- open channel to the routing layer
544
*/
545

    
546
static EP_STAT
547
chan_open_helper(
548
                gdp_chan_t *chan,
549
                void *adata)
550
{
551
        EP_STAT estat = EP_STAT_OK;
552
        char abuf[500] = "";
553
        char *host;
554
        char *port = NULL;
555

    
556
        // get the host:port info into abuf
557
        if (chan->router_addr != NULL && chan->router_addr[0] != '\0')
558
        {
559
                strlcpy(abuf, chan->router_addr, sizeof abuf);
560
        }
561
        else
562
        {
563
#if GDP_OSCF_USE_ZEROCONF
564
                if (ep_adm_getboolparam("swarm.gdp.zeroconf.enable", true))
565
                {
566
                        ep_dbg_cprintf(Dbg, 1, "Trying Zeroconf:\n");
567

    
568
                        if (gdp_zc_scan())
569
                        {
570
                                ep_dbg_cprintf(Dbg, 20, "... after gdp_zc_scan\n");
571
                                zcinfo_t **list = gdp_zc_get_infolist();
572
                                ep_dbg_cprintf(Dbg, 20, "... after gdp_zc_get_infolist: %p\n",
573
                                                list);
574
                                if (list != NULL)
575
                                {
576
                                        char *info = gdp_zc_addr_str(list);
577
                                        ep_dbg_cprintf(Dbg, 20, "... after gdp_zc_addr_str: %p\n",
578
                                                        info);
579
                                        gdp_zc_free_infolist(list);
580
                                        ep_dbg_cprintf(Dbg, 20, "... after gdp_zc_free_infolist\n");
581
                                        if (info != NULL)
582
                                        {
583
                                                if (info[0] != '\0')
584
                                                {
585
                                                        ep_dbg_cprintf(Dbg, 1, "Zeroconf found %s\n",
586
                                                                        info);
587
                                                        strlcpy(abuf, info, sizeof abuf);
588
                                                        strlcat(abuf, ";", sizeof abuf);
589
                                                }
590
                                                free(info);
591
                                        }
592
                                }
593
                        }
594
                        else
595
                                ep_dbg_cprintf(Dbg, 20, "gdp_zc_scan failed\n");
596
                }
597
#endif // GDP_OSCF_USE_ZEROCONF
598
                strlcat(abuf,
599
                                ep_adm_getstrparam("swarm.gdp.routers", "127.0.0.1"),
600
                                sizeof abuf);
601
        }
602

    
603
        ep_dbg_cprintf(Dbg, 28, "chan_open_helper(%s)\n", abuf);
604

    
605
        /*
606
        **  Sort-of ABNF syntax is:
607
        **
608
        **  addr-list =                addr-spec *( ";" addr-spec )
609
        **  addr-spec =                host-ip [ ":" port ] [ "/" gdpame ]
610
        **  host-ip =                dns-name | v4-ip | "[" v6-ip "]"
611
        **        gdpname =                43*b64-char
612
        **        b64-char =                ( "a" - "z" | "A" - "Z" | "0" - "9" | "-" | "_" )
613
        **
614
        **        Hopefully the missing non-terminals (port, etc.) will be obvious.
615
        */
616

    
617
        // strip off addresses and try them
618
        estat = GDP_STAT_NOTFOUND;                                // anything that is not OK
619
        {
620
                char *delim = abuf;
621
                do
622
                {
623
                        char pbuf[10];
624
                        char *gdppname = NULL;
625

    
626
                        host = delim;                                                // beginning of address spec
627
                        delim = strchr(delim, ';');                        // end of address spec
628
                        if (delim != NULL)
629
                                *delim++ = '\0';
630

    
631
                        host = &host[strspn(host, " \t\n\f\f\r")];        // strip whitespace
632
                        if (*host == '\0')
633
                                continue;                                                // empty spec
634

    
635
                        ep_dbg_cprintf(Dbg, 1, "Trying %s\n", host);
636

    
637
                        // if host is an IPv6 literal it may have colons
638
                        char *sep = host;
639
                        if (*host == '[')
640
                        {
641
                                // IPv6 literal --- strip [] to satisfy getaddrinfo
642
                                host++;
643
                                sep = &host[strcspn(host, "]/")];
644
                                if (*sep == ']')
645
                                        *sep++ = '\0';
646
                        }
647

    
648
                        // extract port and gdpname
649
                        sep = &sep[strcspn(sep, ":/")];
650
                        if (*sep == ':')
651
                        {
652
                                *sep++ = '\0';
653
                                port = sep;
654
                                sep = &sep[strcspn(sep, "/")];
655
                        }
656
                        if (*sep == '/')
657
                        {
658
                                *sep++ = '\0';
659
                                gdppname = sep;
660
                                sep = &sep[strcspn(sep, " \t\n\f\f\r")];
661
                        }
662

    
663
                        // allow trailing white space
664
                        if (isspace(*sep))
665
                                *sep++ = '\0';
666

    
667
                        // extract gdpname if it exists
668
                        if (gdppname != NULL)
669
                        {
670
                                estat = gdp_internal_name(gdppname, chan->gdpname);
671
                                if (!EP_STAT_ISOK(estat))
672
                                {
673
                                        char ebuf[100];
674
                                        ep_dbg_cprintf(Dbg, 2,
675
                                                        "chan_open_helper: bad gdpname in router spec:\n"
676
                                                        "    %s\n    %s\n",
677
                                                        gdppname, ep_stat_tostr(estat, ebuf, sizeof ebuf));
678
                                        continue;
679
                                }
680
                        }
681
                        else
682
                        {
683
                                ep_dbg_cprintf(Dbg, 20,
684
                                                "chan_open_helper: no gdpname in router spec\n");
685
                                memset(chan->gdpname, 0, sizeof chan->gdpname);
686
                        }
687

    
688
                        // if no explicit port number, use a default
689
                        if (port == NULL || *port == '\0')
690
                        {
691
                                int portno;
692

    
693
                                portno = ep_adm_getintparam("swarm.gdp.router.port",
694
                                                                GDP_PORT_DEFAULT);
695
                                snprintf(pbuf, sizeof pbuf, "%d", portno);
696
                                port = pbuf;
697
                        }
698

    
699
                        ep_dbg_cprintf(Dbg, 20, "chan_open_helper: trying host %s port %s\n",
700
                                        host, port);
701

    
702
                        // parsing done....  let's try the lookup
703
                        struct addrinfo *res, *a;
704
                        struct addrinfo hints;
705
                        int r;
706

    
707
                        memset(&hints, '\0', sizeof hints);
708
                        hints.ai_socktype = SOCK_STREAM;
709
                        hints.ai_protocol = IPPROTO_TCP;
710
                        r = getaddrinfo(host, port, &hints, &res);
711
                        if (r != 0)
712
                        {
713
                                // address resolution failed; try the next one
714
                                switch (r)
715
                                {
716
                                case EAI_SYSTEM:
717
                                        estat = ep_stat_from_errno(errno);
718
                                        if (!EP_STAT_ISOK(estat))
719
                                                break;
720
                                        // ... fall through
721

    
722
                                case EAI_NONAME:
723
                                        estat = EP_STAT_DNS_NOTFOUND;
724
                                        break;
725

    
726
                                default:
727
                                        estat = EP_STAT_DNS_FAILURE;
728
                                }
729
                                ep_dbg_cprintf(Dbg, 1,
730
                                                "chan_open_helper: getaddrinfo(%s, %s) =>\n"
731
                                                "    %s\n",
732
                                                host, port, gai_strerror(r));
733
                                continue;
734
                        }
735

    
736
                        // attempt connects on all available addresses
737
                        _gdp_chan_lock(chan);
738
                        for (a = res; a != NULL; a = a->ai_next)
739
                        {
740
                                // make the actual connection
741
                                // it would be nice to have a private timeout here...
742
                                evutil_socket_t sock = socket(a->ai_family, SOCK_STREAM, 0);
743
                                if (sock < 0)
744
                                {
745
                                        // bad news, but keep trying
746
                                        estat = ep_stat_from_errno(errno);
747
                                        ep_log(estat, "chan_open_helper: cannot create socket");
748
                                        continue;
749
                                }
750

    
751
                                // shall we disable Nagle algorithm?
752
                                if (ep_adm_getboolparam("swarm.gdp.tcp.nodelay", false))
753
                                {
754
                                        int enable = 1;
755
                                        if (setsockopt(sock, IPPROTO_TCP, TCP_NODELAY,
756
                                                                (void *) &enable, sizeof enable) != 0)
757
                                        {
758
                                                estat = ep_stat_from_errno(errno);
759
                                                ep_log(estat, "chan_open_helper: cannot set TCP_NODELAY");
760
                                                // error not fatal, let's just go on
761
                                        }
762
                                }
763
                                if (connect(sock, a->ai_addr, a->ai_addrlen) < 0)
764
                                {
765
                                        // connection failure
766
                                        estat = ep_stat_from_errno(errno);
767
                                        ep_dbg_cprintf(Dbg, 38,
768
                                                        "chan_open_helper[%d]: connect failed: %s\n",
769
                                                        getpid(), strerror(errno));
770
                                        close(sock);
771
                                        continue;
772
                                }
773

    
774
                                // success!  Make it non-blocking and associate with bufferevent
775
                                ep_dbg_cprintf(Dbg, 39, "successful connect\n");
776
                                estat = EP_STAT_OK;
777

    
778
                                evutil_make_socket_nonblocking(sock);
779
                                chan->bev = bufferevent_socket_new(EventBase, sock,
780
                                                                BEV_OPT_CLOSE_ON_FREE | BEV_OPT_THREADSAFE |
781
                                                                BEV_OPT_DEFER_CALLBACKS |
782
                                                                BEV_OPT_UNLOCK_CALLBACKS);
783
                                bufferevent_setcb(chan->bev,
784
                                                                chan_read_cb, NULL, chan_event_cb, chan);
785
                                bufferevent_setwatermark(chan->bev,
786
                                                                EV_READ, MIN_HEADER_LENGTH, 0);
787
                                bufferevent_enable(chan->bev, EV_READ | EV_WRITE);
788

    
789
                                // disable SIGPIPE so that we'll get an error instead of death
790
#ifdef SO_NOSIGPIPE
791
                                int sockopt_set = 1;
792
                                setsockopt(sock, SOL_SOCKET, SO_NOSIGPIPE,
793
                                                (void *) &sockopt_set, sizeof sockopt_set);
794
#else
795
                                if (ep_adm_getboolparam("swarm.gdp.ignore.sigpipe", false))
796
                                {
797
                                        // It isn't clear you want to do this, since this may
798
                                        // cause stdout writes to dead pipes to keep going in
799
                                        // programs that are not fastidious about error checking.
800
                                        // (Most programs do not check e.g., printf for error.)
801
                                        signal(SIGPIPE, SIG_IGN);
802
                                }
803
#endif
804
                                break;
805
                        }
806

    
807
                        _gdp_chan_unlock(chan);
808
                        freeaddrinfo(res);
809

    
810
                        if (EP_STAT_ISOK(estat))
811
                        {
812
                                // success
813
                                break;
814
                        }
815
                } while (delim != NULL);
816
        }
817

    
818
        // error cleanup and return
819
        if (!EP_STAT_ISOK(estat))
820
        {
821
                if (ep_dbg_test(Dbg, 2))
822
                {
823
                        char ebuf[80];
824
                        ep_dbg_printf("chan_open_helper[%d]: could not open channel: %s\n",
825
                                        getpid(), ep_stat_tostr(estat, ebuf, sizeof ebuf));
826
                        //ep_log(estat, "chan_open_helper: could not open channel");
827
                }
828
        }
829
        else
830
        {
831
                if (ep_dbg_test(Dbg, 1))
832
                {
833
                        ep_dbg_printf("chan_open_helper[%d]: talking to router at %s:%s\n    (",
834
                                        getpid(), host, port);
835
                        if (gdp_name_is_valid(chan->gdpname))
836
                                gdp_print_name(chan->gdpname, ep_dbg_getfile());
837
                        else
838
                                ep_dbg_printf("no gdpname");
839
                        ep_dbg_printf(")\n");
840
                }
841
                (*chan->advert_cb)(chan, GDP_CMD_ADVERTISE, adata);
842
        }
843
        return estat;
844
}
845

    
846

    
847
/*
848
**  _GDP_CHAN_OPEN --- open a channel
849
*/
850

    
851
EP_STAT
852
_gdp_chan_open(
853
                const char *router_addr,
854
                void *qos,
855
                gdp_chan_recv_cb_t *recv_cb,
856
                gdp_chan_send_cb_t *send_cb,
857
                gdp_chan_ioevent_cb_t *ioevent_cb,
858
                gdp_chan_router_cb_t *router_cb,
859
                gdp_chan_advert_func_t *advert_func,
860
                gdp_chan_x_t *cdata,
861
                gdp_chan_t **pchan)
862
{
863
        EP_STAT estat;
864
        gdp_chan_t *chan;
865

    
866
        ep_dbg_cprintf(Dbg, 11, "_gdp_chan_open(%s)\n", router_addr);
867

    
868
        // allocate a new channel structure
869
        chan = (gdp_chan_t *) ep_mem_zalloc(sizeof *chan);
870
        ep_thr_mutex_init(&chan->mutex, EP_THR_MUTEX_DEFAULT);
871
        ep_thr_mutex_setorder(&chan->mutex, GDP_MUTEX_LORDER_CHAN);
872
        ep_thr_cond_init(&chan->cond);
873
        chan->state = GDP_CHAN_CONNECTING;
874
        chan->recv_cb = recv_cb;
875
        chan->send_cb = send_cb;                        //XXX unused at this time
876
        chan->ioevent_cb = ioevent_cb;
877
        chan->router_cb = router_cb;
878
        chan->advert_cb = advert_func;
879
        chan->cdata = cdata;
880
        if (router_addr != NULL)
881
                chan->router_addr = ep_mem_strdup(router_addr);
882

    
883
        estat = chan_open_helper(chan, NULL);
884

    
885
        if (EP_STAT_ISOK(estat))
886
                *pchan = chan;
887
        else
888
        {
889
                ep_app_message(estat, "Cannot open connection to GDP");
890
                chan_do_close(chan, BEV_EVENT_ERROR);
891
        }
892
        return estat;
893
}
894

    
895

    
896
/*
897
**  CHAN_REOPEN --- re-open a channel (e.g., on router failure)
898
*/
899

    
900
static EP_STAT
901
chan_reopen(gdp_chan_t *chan)
902
{
903
        EP_STAT estat;
904

    
905
        ep_dbg_cprintf(Dbg, 12, "chan_reopen: %p\n         advert_cb = %p\n",
906
                        chan, chan->advert_cb);
907

    
908
        // close the (now dead) bufferevent
909
        if (chan->bev != NULL)
910
                bufferevent_free(chan->bev);
911
        chan->bev = NULL;
912
        estat = chan_open_helper(chan, NULL);
913
        return estat;
914
}
915

    
916

    
917
/*
918
**        _GDP_CHAN_CLOSE --- close a channel (user-driven)
919
*/
920

    
921
EP_STAT
922
_gdp_chan_close(gdp_chan_t *chan)
923
{
924
        return chan_do_close(chan, GDP_IOEVENT_USER_CLOSE);
925
}
926

    
927

    
928
/*
929
**  _GDP_CHAN_SEND --- send a message to a channel
930
*/
931

    
932
static EP_STAT
933
send_helper(gdp_chan_t *chan,
934
                        gdp_target_t *target,
935
                        gdp_name_t src,
936
                        gdp_name_t dst,
937
                        gdp_buf_t *payload,
938
                        int tos)
939
{
940
        EP_STAT estat = EP_STAT_OK;
941
        int i;
942
        size_t payload_len = 0;
943
        uint16_t seqno = 0;                        //FIXME: should be useful
944

    
945
        if (payload != NULL)
946
                payload_len = gdp_buf_getlength(payload);
947

    
948
        if (ep_dbg_test(Dbg, 51))
949
        {
950
                gdp_pname_t src_printable;
951
                gdp_pname_t dst_printable;
952
                ep_dbg_printf("send_helper:\n\tsrc %s\n\tdst %s\n\tflags ",
953
                                gdp_printable_name(src, src_printable),
954
                                gdp_printable_name(dst, dst_printable));
955
                ep_prflags(tos, L4Flags, NULL);
956
                ep_dbg_printf("\n\tpayload %p ", payload);
957
                if (payload == NULL)
958
                        ep_dbg_printf("(no payload)\n");
959
                else
960
                {
961
                        ep_dbg_printf("len %zd\n", payload_len);
962
                        ep_hexdump(gdp_buf_getptr(payload, payload_len), payload_len,
963
                                        ep_dbg_getfile(), EP_HEXDUMP_ASCII, 0);
964
                }
965
        }
966

    
967
        if (chan->bev == NULL)
968
        {
969
                ep_dbg_cprintf(Dbg, 1, "send_helper: no channel\n");
970
                return GDP_STAT_DEAD_DAEMON;
971
        }
972

    
973
        if (payload_len > UINT16_MAX)
974
        {
975
                ep_dbg_cprintf(Dbg, 1, "send_helper: payload_len = %zd, max %d\n",
976
                                payload_len, UINT16_MAX);
977
                return GDP_STAT_PDU_TOO_LONG;
978
        }
979

    
980
        // build the header in memory
981
        char pb[MAX_HEADER_LENGTH];
982
        char *pbp = pb;
983

    
984
        PUT8(GDP_CHAN_PROTO_VERSION);                // version number
985
        PUT8(MIN_HEADER_LENGTH / 4);                // header length (= 72 / 4)
986
        PUT8(tos);                                                        // flags / type of service
987
        PUT8(GDP_TTL_DEFAULT);                                // time to live
988
        uint32_t seq_mf_foff = (seqno & GDP_PKT_SEQNO_MASK) << GDP_PKT_SEQNO_SHIFT;
989
        PUT32(seq_mf_foff);                                        // more frag bit, seqno, frag offset
990
        uint16_t frag_len = 0;
991
        PUT16(frag_len);                                        // length of this fragment
992
        PUT16(payload_len);                                        // length of opaque payload
993
        memcpy(pbp, dst, sizeof (gdp_name_t));        // destination address
994
        pbp += sizeof (gdp_name_t);
995
        memcpy(pbp, src, sizeof (gdp_name_t));        // source address
996
        pbp += sizeof (gdp_name_t);
997

    
998
        // now write header to the socket
999
        bufferevent_lock(chan->bev);
1000
        EP_ASSERT((pbp - pb) == MIN_HEADER_LENGTH);
1001
        if (ep_dbg_test(Dbg, 42))
1002
        {
1003
                ep_dbg_printf("send_helper: sending %zd octets:\n",
1004
                                        payload_len + (pbp - pb));
1005
                ep_hexdump(pb, pbp - pb, ep_dbg_getfile(), 0, 0);
1006
                if (payload_len > 0)
1007
                {
1008
                        ep_hexdump(gdp_buf_getptr(payload, payload_len), payload_len,
1009
                                        ep_dbg_getfile(), EP_HEXDUMP_ASCII, pbp - pb);
1010
                }
1011
        }
1012

    
1013
        i = bufferevent_write(chan->bev, pb, pbp - pb);
1014
        if (i < 0)
1015
        {
1016
                estat = GDP_STAT_PDU_WRITE_FAIL;
1017
                goto fail0;
1018
        }
1019

    
1020
        // and the payload
1021
        if (payload_len > 0)
1022
        {
1023
                i = bufferevent_write_buffer(chan->bev, payload);
1024
                if (i < 0)
1025
                {
1026
                        estat = GDP_STAT_PDU_WRITE_FAIL;
1027
                        goto fail0;
1028
                }
1029
        }
1030

    
1031
fail0:
1032
        if (!EP_STAT_ISOK(estat) && ep_dbg_test(Dbg, 4))
1033
        {
1034
                char ebuf[100];
1035
                ep_dbg_printf("send_helper failure: %s\n",
1036
                                ep_stat_tostr(estat, ebuf, sizeof ebuf));
1037
        }
1038
        bufferevent_unlock(chan->bev);
1039
        return estat;
1040
}
1041

    
1042
EP_STAT
1043
_gdp_chan_send(gdp_chan_t *chan,
1044
                        gdp_target_t *target,
1045
                        gdp_name_t src,
1046
                        gdp_name_t dst,
1047
                        gdp_buf_t *payload,
1048
                        int tos)
1049
{
1050
        if (ep_dbg_test(Dbg, 42))
1051
        {
1052
                size_t l = evbuffer_get_length(payload);
1053
                uint8_t *p = evbuffer_pullup(payload, l);
1054
                ep_dbg_printf("_gdp_chan_send: sending PDU:\n");
1055
                ep_hexdump(p, l, ep_dbg_getfile(), EP_HEXDUMP_ASCII, 0);
1056
        }
1057
        return send_helper(chan, target, src, dst, payload, tos);
1058
}
1059

    
1060

    
1061
EP_STAT
1062
_gdp_chan_flush(gdp_chan_t *chan)
1063
{
1064
        // if we aren't running an event loop, do one pass to flush any
1065
        // buffered output.
1066
        extern bool GdpIoEventLoopRunning;
1067

    
1068
        if (!GdpIoEventLoopRunning)
1069
        {
1070
                int i = event_base_loop(_GdpIoEventBase, EVLOOP_ONCE | EVLOOP_NONBLOCK);
1071
                if (i < 0)
1072
                        return GDP_STAT_PDU_WRITE_FAIL;
1073
        }
1074
        return EP_STAT_OK;
1075
}
1076

    
1077

    
1078
/*
1079
**  Advertising primitives
1080
*/
1081

    
1082
EP_STAT
1083
_gdp_chan_advertise(
1084
                        gdp_chan_t *chan,
1085
                        gdp_name_t gname,
1086
                        gdp_adcert_t *adcert,
1087
                        gdp_chan_advert_cr_t *challenge_cb,
1088
                        void *adata)
1089
{
1090
        EP_STAT estat = EP_STAT_OK;
1091
        gdp_pname_t pname;
1092

    
1093
        ep_dbg_cprintf(Dbg, 39, "_gdp_chan_advertise(%s):\n",
1094
                        gdp_printable_name(gname, pname));
1095

    
1096
        estat = send_helper(chan, NULL, _GdpMyRoutingName, gname,
1097
                                                NULL, GDP_PKT_TYPE_ADVERTISE);
1098

    
1099
        if (ep_dbg_test(Dbg, 21))
1100
        {
1101
                char ebuf[100];
1102

    
1103
                ep_dbg_printf("_gdp_chan_advertise => %s\n",
1104
                                ep_stat_tostr(estat, ebuf, sizeof ebuf));
1105
        }
1106

    
1107
        return estat;
1108
}
1109

    
1110

    
1111
EP_STAT
1112
_gdp_chan_withdraw(
1113
                        gdp_chan_t *chan,
1114
                        gdp_name_t gname,
1115
                        void *adata)
1116
{
1117
        EP_STAT estat;
1118
        gdp_pname_t pname;
1119

    
1120
        ep_dbg_cprintf(Dbg, 39, "_gdp_chan_withdraw(%s)\n",
1121
                        gdp_printable_name(gname, pname));
1122
//        gdp_buf_t *payload = gdp_buf_new();
1123
//        gdp_buf_write(payload, gname, sizeof (gdp_name_t));
1124
        estat = send_helper(chan, NULL, _GdpMyRoutingName, gname,
1125
                                                NULL, GDP_PKT_TYPE_WITHDRAW);
1126

    
1127
        if (ep_dbg_test(Dbg, 21))
1128
        {
1129
                char ebuf[100];
1130

    
1131
                ep_dbg_printf("_gdp_chan_withdraw => %s\n",
1132
                                ep_stat_tostr(estat, ebuf, sizeof ebuf));
1133
        }
1134

    
1135
        return estat;
1136
}
1137

    
1138

    
1139
EP_STAT
1140
_gdp_chan_advert_flush(
1141
                        gdp_chan_t *chan)
1142
{
1143
        // eventually, flush out any pending piggybacked advertisements
1144
        return EP_STAT_OK;
1145
}
1146

    
1147

    
1148
/*
1149
**  _GDP_CHAN_GET_UDATA --- get user data from channel
1150
*/
1151

    
1152
gdp_chan_x_t *
1153
_gdp_chan_get_cdata(gdp_chan_t *chan)
1154
{
1155
        return chan->cdata;
1156
}
1157

    
1158

    
1159
/* vim: set noexpandtab : */