Project

General

Profile

Statistics
| Branch: | Tag: | Revision:

gdp / gdp / gdp_main.c @ master

History | View | Annotate | Download (34.6 KB)

1
/* vim: set ai sw=4 sts=4 ts=4 :*/
2

    
3
/*
4
**  GDP Initialization and main event loop.
5
**
6
**        ----- BEGIN LICENSE BLOCK -----
7
**        GDP: Global Data Plane Support Library
8
**        From the Ubiquitous Swarm Lab, 490 Cory Hall, U.C. Berkeley.
9
**
10
**        Copyright (c) 2015-2019, Regents of the University of California.
11
**        All rights reserved.
12
**
13
**        Permission is hereby granted, without written agreement and without
14
**        license or royalty fees, to use, copy, modify, and distribute this
15
**        software and its documentation for any purpose, provided that the above
16
**        copyright notice and the following two paragraphs appear in all copies
17
**        of this software.
18
**
19
**        IN NO EVENT SHALL REGENTS BE LIABLE TO ANY PARTY FOR DIRECT, INDIRECT,
20
**        SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING LOST
21
**        PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION,
22
**        EVEN IF REGENTS HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
23
**
24
**        REGENTS SPECIFICALLY DISCLAIMS ANY WARRANTIES, INCLUDING, BUT NOT
25
**        LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
26
**        FOR A PARTICULAR PURPOSE. THE SOFTWARE AND ACCOMPANYING DOCUMENTATION,
27
**        IF ANY, PROVIDED HEREUNDER IS PROVIDED "AS IS". REGENTS HAS NO
28
**        OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS,
29
**        OR MODIFICATIONS.
30
**        ----- END LICENSE BLOCK -----
31
*/
32

    
33
#include "gdp.h"
34
#include "gdp_chan.h"
35
#include "gdp_event.h"
36
#include "gdp_hongd.h"
37
#include "gdp_priv.h"
38
#include "gdp_version.h"
39

    
40
#include <ep/ep.h>
41
#include <ep/ep_app.h>
42
#include <ep/ep_dbg.h>
43
#include <ep/ep_funclist.h>
44
#include <ep/ep_log.h>
45
#include <ep/ep_syslog.h>
46

    
47
#include <event2/buffer.h>
48
#include <event2/thread.h>
49

    
50
#include <errno.h>
51
#include <pwd.h>
52
#include <signal.h>
53
#include <string.h>
54

    
55
static EP_DBG        Dbg = EP_DBG_INIT("gdp.main", "GDP initialization and main loop");
56
static EP_DBG        DbgEvLock = EP_DBG_INIT("gdp.libevent.locks", "GDP libevent lock debugging");
57
static EP_DBG        DbgProcResp = EP_DBG_INIT("gdp.response", "GDP response processing");
58
static EP_DBG        DbgTimers = EP_DBG_INIT("gdp.libevent.timers", "GDP timer events");
59

    
60
struct event_base        *_GdpIoEventBase;        // the base for GDP I/O events
61
gdp_name_t                        _GdpMyRoutingName;        // source name for PDUs
62
gdp_chan_t                        *_GdpChannel;                // our primary app-level protocol port
63
static bool                        _GdpRunCmdInThread = true;                // run commands in threads
64
static bool                        _GdpRunRespInThread = false;        // run responses in threads
65
bool                                _GdpLibInitialized;        // are we initialized?
66

    
67

    
68
/*
69
**  INIT_ERROR --- issue error on initialization problem
70
*/
71

    
72
static EP_STAT
73
init_error(const char *datum, const char *where)
74
{
75
        EP_STAT estat = ep_stat_from_errno(errno);
76
        char nbuf[40];
77

    
78
        (void) (0 == strerror_r(errno, nbuf, sizeof nbuf));
79
        ep_log(estat, "gdp_init: %s: %s", where, datum);
80
        ep_app_error("gdp_init: %s: %s: %s", where, datum, nbuf);
81
        return estat;
82
}
83

    
84

    
85
gdp_cmd_t
86
_gdp_acknak_from_estat(EP_STAT estat, gdp_cmd_t def)
87
{
88
        gdp_cmd_t resp = def;
89

    
90
        if (EP_STAT_ISOK(estat))
91
        {
92
                if (def < GDP_ACK_MIN || def > GDP_ACK_MAX)
93
                        resp = GDP_ACK_SUCCESS;
94
        }
95
        else if (EP_STAT_REGISTRY(estat) == EP_REGISTRY_EPLIB &&
96
                        EP_STAT_MODULE(estat) == EP_STAT_MOD_CRYPTO)
97
        {
98
                resp = GDP_NAK_C_UNAUTH;
99
        }
100
        else if (EP_STAT_IS_SAME(estat, GDP_STAT_NOT_IMPLEMENTED))
101
                resp = GDP_NAK_S_NOTIMPL;
102
        else if (EP_STAT_REGISTRY(estat) == EP_REGISTRY_UCB &&
103
                EP_STAT_MODULE(estat) == GDP_MODULE)
104
        {
105
                // if the estat contains the detail, prefer that
106
                int d = EP_STAT_DETAIL(estat);
107

    
108
                if (!EP_STAT_ISFAIL(estat))
109
                {
110
                        if (d >= 200 && d <= (200 + GDP_ACK_MAX - GDP_ACK_MIN))
111
                                resp = (gdp_cmd_t) (d - 200 + GDP_ACK_MIN);
112
                }
113
                else if (d >= 400 && d <= (400 + GDP_NAK_C_MAX - GDP_NAK_C_MIN))
114
                        resp = (gdp_cmd_t) (d - 400 + GDP_NAK_C_MIN);
115
                else if (d >= 500 && d <= (500 + GDP_NAK_S_MAX - GDP_NAK_S_MIN))
116
                                resp = (gdp_cmd_t) (d - 500 + GDP_NAK_S_MIN);
117
                else if (d >= 600 && d <= (600 + GDP_NAK_R_MAX - GDP_NAK_R_MIN))
118
                        resp = (gdp_cmd_t) (d - 600 + GDP_NAK_R_MIN);
119
        }
120
        else if (resp < GDP_NAK_C_MIN || resp > GDP_NAK_R_MAX)
121
                resp = GDP_NAK_S_INTERNAL;                // default to panic code
122

    
123
        if (ep_dbg_test(Dbg, 41))
124
        {
125
                char ebuf[100];
126

    
127
                ep_dbg_printf("_gdp_acknak_from_estat: %s -> %s\n",
128
                                ep_stat_tostr(estat, ebuf, sizeof ebuf),
129
                                _gdp_proto_cmd_name(resp));
130
        }
131
        return resp;
132
}
133

    
134

    
135
/*
136
**  PROCESS_CMD --- process command PDU
137
**
138
**                Usually done in a thread since it may be heavy weight.
139
**                This usually only applies to gdplogd.
140
*/
141

    
142
static EP_THR_MUTEX                GdpCreateMutex                        EP_THR_MUTEX_INITIALIZER;
143

    
144
static void
145
process_cmd(void *cpdu_)
146
{
147
        gdp_pdu_t *cpdu = (gdp_pdu_t *) cpdu_;
148
        gdp_cmd_t cmd;
149
        EP_STAT estat;
150
        gdp_gob_t *gob = NULL;
151
        gdp_req_t *req = NULL;
152
        EP_TIME_SPEC starttime;
153

    
154
        ep_time_now(&starttime);
155
        GDP_MSG_CHECK(cpdu, return);
156
        cmd = cpdu->msg->cmd;
157

    
158
        ep_dbg_cprintf(Dbg, 40,
159
                        "process_cmd(%s, thread %" EP_THR_PRItid ")\n",
160
                        _gdp_proto_cmd_name(cmd), ep_thr_gettid());
161

    
162
        // create has too many special cases, so we single thread it
163
        if (cmd == GDP_CMD_CREATE)
164
                ep_thr_mutex_lock(&GdpCreateMutex);
165

    
166
        estat = _gdp_gob_cache_get(cpdu->dst, GGCF_NOCREATE, &gob);
167
        if (gob != NULL)
168
        {
169
                GDP_GOB_ASSERT_ISLOCKED(gob);
170
                EP_ASSERT(gob->refcnt > 0);
171
        }
172

    
173
        ep_dbg_cprintf(Dbg, 43,
174
                        "process_cmd: allocating new req for GOB %p\n", gob);
175
        estat = _gdp_req_new(cmd, gob, cpdu->chan, cpdu, 0, &req);
176
        EP_STAT_CHECK(estat, goto fail0);
177
        EP_THR_MUTEX_ASSERT_ISLOCKED(&req->mutex);
178
        EP_ASSERT(gob == req->gob);
179

    
180
        ep_dbg_cprintf(Dbg, 40, "process_cmd >>> req=%p\n", req);
181

    
182
        // do the per-command processing
183
        estat = _gdp_req_dispatch(req, cmd);
184
        if (ep_dbg_test(Dbg, 59))
185
        {
186
                char ebuf[100];
187
                ep_dbg_printf("process_cmd: dispatch => %s\n    ",
188
                                        ep_stat_tostr(estat, ebuf, sizeof ebuf));
189
                _gdp_req_dump(req, ep_dbg_getfile(), GDP_PR_BASIC, 1);
190
        }
191
        bool response_already_sent = EP_STAT_IS_SAME(estat, GDP_STAT_RESPONSE_SENT);
192

    
193
        // make sure request or GOB haven't gotten fubared
194
        EP_THR_MUTEX_ASSERT_ISLOCKED(&req->mutex);
195
        GDP_MSG_CHECK(req->cpdu, goto fail1);
196
        GDP_MSG_CHECK(req->rpdu, goto fail1);
197

    
198
        // special case: if we have deleted a GOB, it will have disappeared now
199
        if (req->gob == NULL && gob != NULL &&
200
                        req->cpdu->msg->cmd == GDP_CMD_DELETE &&
201
                        GDP_CMD_IS_ACK(req->rpdu->msg->cmd))
202
        {
203
                _gdp_gob_unlock(gob);
204
                gob = NULL;
205
        }
206

    
207
        if (gob != NULL)
208
        {
209
                GDP_GOB_ASSERT_ISLOCKED(gob);
210
                if (!EP_ASSERT(gob == req->gob) && ep_dbg_test(Dbg, 1))
211
                {
212
                        ep_dbg_printf("process_cmd, after dispatch:\n  gob = ");
213
                        _gdp_gob_dump(gob, ep_dbg_getfile(), GDP_PR_BASIC, 0);
214
                        ep_dbg_printf("  req->gob = ");
215
                        _gdp_gob_dump(req->gob, ep_dbg_getfile(), GDP_PR_BASIC, 0);
216
                }
217
        }
218

    
219
        // cmd_open and cmd_create can return a new GOB in the req
220
        if (gob == NULL && req->gob != NULL)
221
                gob = _gdp_gob_incref(req->gob);
222

    
223
        if (!response_already_sent)
224
        {
225
                req->stat = _gdp_pdu_out(req->rpdu, req->chan);
226
        }
227

    
228
        EP_ASSERT(gob == req->gob);
229
        if (gob != NULL)
230
        {
231
                GDP_GOB_ASSERT_ISLOCKED(gob);
232
                EP_ASSERT(gob->refcnt > 0);
233
        }
234

    
235
        // do command post processing
236
        if (req->postproc)
237
        {
238
                ep_dbg_cprintf(Dbg, 44, "process_cmd: doing post processing\n");
239
                (req->postproc)(req);
240
                req->postproc = NULL;
241

    
242
                // postproc shouldn't change GOB lock status
243
                EP_ASSERT(gob == req->gob);
244
        }
245

    
246
fail1:
247
        // free up resources
248
        if (EP_UT_BITSET(GDP_REQ_PERSIST, req->flags))
249
                _gdp_req_unlock(req);
250
        else
251
                _gdp_req_free(&req);                // also decref's req->gob (leaves locked)
252
        if (gob != NULL)
253
        {
254
                if (!GDP_GOB_ASSERT_ISLOCKED(gob) || !EP_ASSERT(gob->refcnt > 0))
255
                        _gdp_gob_dump(gob, ep_dbg_getfile(), GDP_PR_BASIC, 0);
256
                _gdp_gob_decref(&gob, false);        // ref from _gdp_gob_cache_get
257
        }
258

    
259
        if (false)
260
        {
261
fail0:
262
                ep_log(estat, "process_cmd: cannot allocate request; dropping PDU");
263
                if (cpdu != NULL)
264
                        _gdp_pdu_free(&cpdu);
265
        }
266

    
267
        if (cmd == GDP_CMD_CREATE)
268
                ep_thr_mutex_unlock(&GdpCreateMutex);
269

    
270
        ep_dbg_cprintf(Dbg, 40, "process_cmd <<< done\n");
271
        _gdp_show_elapsed("process_cmd", cmd, &starttime);
272
}
273

    
274

    
275
/*
276
**  Search for a request in a channel list.
277
**
278
**                This is a fall-back that should be used only for finding requests
279
**                sent by FWD_APPEND to a server that isn't accessible.  Since that
280
**                command links the request to a different GOB than the destination
281
**                address in the PDU, a NOROUTE response won't find it.
282
*/
283

    
284
static EP_STAT
285
find_req_in_channel_list(
286
                                const gdp_pdu_t *rpdu,
287
                                gdp_chan_t *chan,
288
                                gdp_req_t **reqp)
289
{
290
        EP_STAT estat = EP_STAT_OK;
291
        gdp_req_t *req;
292
        gdp_chan_x_t *chanx;
293

    
294
        if (ep_dbg_test(DbgProcResp, 14))
295
        {
296
                gdp_pname_t src_p, dst_p;
297

    
298
                ep_dbg_printf("find_req_in_channel_list: searching for rpdu rid "
299
                                                "%" PRIgdp_rid
300
                                        "\n    src %s\n    dst %s\n",
301
                                rpdu->msg->rid,
302
                                gdp_printable_name(rpdu->src, src_p),
303
                                gdp_printable_name(rpdu->dst, dst_p));
304
        }
305
        _gdp_chan_lock(chan);
306
        chanx = _gdp_chan_get_cdata(chan);
307
        if (!EP_ASSERT(chanx != NULL))
308
        {
309
                req = NULL;
310
                estat = EP_STAT_ASSERT_ABORT;
311
                goto fail0;
312
        }
313

    
314
        LIST_FOREACH(req, &chanx->reqs, chanlist)
315
        {
316
                if (req->cpdu == NULL)
317
                        continue;
318
                if (ep_dbg_test(Dbg, 48))
319
                {
320
                        gdp_pname_t src_p, dst_p;
321
                        ep_dbg_printf("    ?cpdu rid %" PRIgdp_rid "\t%s =>\n\t    %s\n",
322
                                        req->cpdu->msg->rid,
323
                                        gdp_printable_name(req->cpdu->src, src_p),
324
                                        gdp_printable_name(req->cpdu->dst, dst_p));
325
                }
326
                if ((rpdu->msg->rid == GDP_PDU_ANY_RID ||
327
                                        req->cpdu->msg->rid == rpdu->msg->rid) &&
328
                                GDP_NAME_SAME(req->cpdu->src, rpdu->dst) &&
329
                                GDP_NAME_SAME(req->cpdu->dst, rpdu->src))
330
                        break;
331
        }
332
        if (ep_dbg_test(DbgProcResp, 40))
333
        {
334
                if (req == NULL)
335
                        ep_dbg_printf("    ... not found\n");
336
                else
337
                        ep_dbg_printf("    ... found req @ %p\n", req);
338
        }
339
fail0:
340
        _gdp_chan_unlock(chan);
341

    
342
        // request should be locked for symmetry with _gdp_req_find
343
        if (req != NULL)
344
        {
345
                char ebuf[100];
346

    
347
                // since GOB has to be locked before req, do it now
348
                if (req->gob != NULL)
349
                        _gdp_gob_lock(req->gob);
350
                estat = _gdp_req_lock(req);
351
                ep_dbg_cprintf(DbgProcResp, 44,
352
                                "find_req_in_channel_list: _gdp_req_lock => %s\n",
353
                                ep_stat_tostr(estat, ebuf, sizeof ebuf));
354
                if (!EP_STAT_ISOK(estat))
355
                {
356
                        if (req->gob != NULL)
357
                                _gdp_gob_unlock(req->gob);
358
                        req = NULL;
359
                }
360
        }
361
        if (EP_STAT_ISOK(estat))
362
                *reqp = req;
363
        return estat;
364
}
365

    
366

    
367
/*
368
**  PROCESS_RESP --- process response (ack/nak) PDU
369
**
370
**                When this is called, the rpdu passed in will be the actual
371
**                PDU off the wire and req->cpdu should be the original command
372
**                PDU that prompted this response.  We save the passed rpdu
373
**                into req->rpdu for processing in _gdp_req_dispatch.
374
**
375
**                XXX This is not tested for running in a thread.
376
*/
377

    
378
static void
379
process_resp(void *rpdu_)
380
{
381
        gdp_pdu_t *rpdu = (gdp_pdu_t *) rpdu_;
382
        gdp_chan_t *chan = _GdpChannel;
383
        int cmd = rpdu->msg->cmd;
384
        EP_STAT estat;
385
        gdp_gob_t *gob = NULL;
386
        gdp_req_t *req = NULL;
387
        int resp;
388
        int ocmd;                                        // original command prompting this response
389

    
390
        estat = _gdp_gob_cache_get(rpdu->src,
391
                                                GGCF_NOCREATE | GGCF_GET_PENDING, &gob);
392
        if (ep_dbg_test(DbgProcResp, 20))
393
        {
394
                char ebuf[120];
395
                gdp_pname_t rpdu_pname;
396

    
397
                ep_dbg_printf("process_resp: cmd %s rpdu %p ->src %s) gob %p stat %s\n",
398
                        _gdp_proto_cmd_name(cmd),
399
                        rpdu, gdp_printable_name(rpdu->src, rpdu_pname), gob,
400
                        ep_stat_tostr(estat, ebuf, sizeof ebuf));
401
        }
402

    
403
        // check estat here, or is just checking gob enough?
404
        if (gob == NULL)
405
        {
406
                // gob was not in cache
407
                char ebuf[200];
408

    
409
                estat = find_req_in_channel_list(rpdu, chan, &req);
410
                ep_dbg_cprintf(DbgProcResp, 20,
411
                                "    rpdu = %p req = %p stat = %s\n", rpdu, req,
412
                                ep_stat_tostr(estat, ebuf, sizeof ebuf));
413

    
414
                if (!EP_STAT_ISOK(estat) || req == NULL)
415
                {
416
                        if (ep_dbg_test(DbgProcResp,
417
                                                rpdu->msg->cmd == GDP_NAK_R_NOROUTE ? 19 : 1))
418
                        {
419
                                gdp_pname_t pname;
420
                                ep_dbg_printf("process_resp: discarding %d (%s) PDU"
421
                                                        " for unknown GOB\n",
422
                                                        rpdu->msg->cmd, _gdp_proto_cmd_name(rpdu->msg->cmd));
423
                                if (ep_dbg_test(DbgProcResp, 24))
424
                                        _gdp_pdu_dump(rpdu, ep_dbg_getfile(), 0);
425
                                else
426
                                        ep_dbg_printf("    %s\n", gdp_printable_name(rpdu->src, pname));
427
                        }
428
                        _gdp_pdu_free(&rpdu);
429
                        return;
430
                }
431

    
432
                EP_ASSERT_ELSE(req->state != GDP_REQ_FREE, return);
433
                if (req->gob != NULL)
434
                {
435
                        GDP_GOB_ASSERT_ISLOCKED(req->gob);
436
                }
437

    
438
                // remove the request from the GOB list it is already on
439
                // req is already locked by find_req_in_channel_list
440
                if (EP_UT_BITSET(GDP_REQ_ON_GOB_LIST, req->flags))
441
                {
442
                        LIST_REMOVE(req, goblist);
443
                        req->flags &= ~GDP_REQ_ON_GOB_LIST;
444
                        //DEBUG: without this incref, a gdp_gob_create call on a log
445
                        //        that already exists throws the error:
446
                        //        Assertion failed at gob-create:gdp_gob_mgmt.c:435: GDP_GOB_ISGOOD(gob)
447
                        //        because the refcnt has gone to zero prematurely.  But with it,
448
                        //        a successful gdp_gob_create leaves the refcnt one too high
449
                        //        leading to a resource leak.
450
                        _gdp_gob_incref(req->gob);                //DEBUG:
451

    
452
                        // code below expects request to remain locked
453
                }
454
        }
455
        else
456
        {
457
                // gob was found in cache
458
                GDP_GOB_ASSERT_ISLOCKED(gob);
459

    
460
                // find the corresponding request
461
                ep_dbg_cprintf(DbgProcResp, 23,
462
                                "process_resp: searching gob %p for rid %" PRIgdp_rid "\n",
463
                                gob, rpdu->msg->rid);
464

    
465
                // find request to which this PDU applies
466
                req = _gdp_req_find(gob, rpdu->msg->rid);
467
                if (ep_dbg_test(DbgProcResp, 51))
468
                {
469
                        ep_dbg_printf("... found ");
470
                        _gdp_req_dump(req, ep_dbg_getfile(), GDP_PR_BASIC, 0);
471
                }
472

    
473
                // req is already locked by _gdp_req_find
474
                if (req == NULL)
475
                {
476
                        // no req for incoming response --- "can't happen"
477
                        if (ep_dbg_test(DbgProcResp, 1))
478
                        {
479
                                ep_dbg_printf("process_resp: no req for incoming response\n");
480
                                _gdp_pdu_dump(rpdu, ep_dbg_getfile(), 0);
481
                                _gdp_gob_dump(gob, ep_dbg_getfile(), GDP_PR_DETAILED, 0);
482
                        }
483
                        _gdp_gob_decref(&gob, false);
484
                        _gdp_pdu_free(&rpdu);
485
                        return;
486
                }
487
                else if (!EP_ASSERT(req->state != GDP_REQ_FREE))
488
                {
489
                        if (ep_dbg_test(DbgProcResp, 1))
490
                        {
491
                                ep_dbg_printf("process_resp: trying to use free ");
492
                                _gdp_req_dump(req, ep_dbg_getfile(), GDP_PR_DETAILED, 0);
493
                        }
494
                        _gdp_pdu_free(&rpdu);
495
                        return;
496
                }
497
                else if (rpdu == req->rpdu)
498
                {
499
                        // this could be an assertion
500
                        if (ep_dbg_test(DbgProcResp, 1))
501
                        {
502
                                ep_dbg_printf("process_resp(%d): rpdu == req->rpdu\n",
503
                                                        rpdu->msg->cmd);
504
                                _gdp_pdu_dump(rpdu, ep_dbg_getfile(), 0);
505
                        }
506
                }
507
                EP_ASSERT(gob == req->gob);
508
        }
509

    
510
        GDP_GOB_ASSERT_ISLOCKED(req->gob);
511

    
512
        if (req->cpdu == NULL)
513
        {
514
                ep_dbg_cprintf(DbgProcResp, 1,
515
                                "process_resp(%d): no corresponding command PDU\n",
516
                                rpdu->msg->cmd);
517
                ocmd = rpdu->msg->cmd;
518
                //XXX return here?  with req->pdu == NULL, _gdp_req_dispatch
519
                //XXX will probably die
520
        }
521
        else
522
        {
523
                ocmd = req->cpdu->msg->cmd;
524
        }
525

    
526
        // save the response PDU for further processing
527
        if (req->rpdu != NULL)
528
        {
529
                // this can happen in multiread/subscription and async I/O
530
                if (ep_dbg_test(DbgProcResp, 41))
531
                {
532
                        ep_dbg_printf("process_resp: req->rpdu already set\n    ");
533
                        _gdp_pdu_dump(req->rpdu, ep_dbg_getfile(), 1);
534
                }
535
                _gdp_pdu_free(&req->rpdu);
536
        }
537
        req->rpdu = rpdu;
538

    
539
        if (ep_dbg_test(DbgProcResp, 43))
540
        {
541
                ep_dbg_printf("process_resp: ");
542
                _gdp_req_dump(req, ep_dbg_getfile(), GDP_PR_BASIC, 0);
543
        }
544

    
545
        // request is locked, GOB should be too
546
        if (req->gob != NULL)
547
                GDP_GOB_ASSERT_ISLOCKED(req->gob);
548

    
549
        // mark this request as active (for subscriptions)
550
        ep_time_now(&req->act_ts);
551

    
552
        // do ack/nak specific processing
553
        estat = _gdp_req_dispatch(req, cmd);
554

    
555
        // dispatch should leave it locked
556
        if (req->gob != NULL)
557
                GDP_GOB_ASSERT_ISLOCKED(req->gob);
558

    
559
        // figure out potential response code
560
        // we compute even if unused so we can log server errors
561
        resp = _gdp_acknak_from_estat(estat, req->rpdu->msg->cmd);
562

    
563
        if (ep_dbg_test(DbgProcResp,
564
                                (resp >= GDP_NAK_S_MIN && resp <= GDP_NAK_S_MAX) ? 1 : 44))
565
        {
566
                char ebuf[100];
567

    
568
                ep_dbg_printf("process_resp(%s for %s): %s\n",
569
                                _gdp_proto_cmd_name(cmd),
570
                                _gdp_proto_cmd_name(ocmd),
571
                                ep_stat_tostr(estat, ebuf, sizeof ebuf));
572
                if (ep_dbg_test(DbgProcResp, 55))
573
                        _gdp_req_dump(req, ep_dbg_getfile(), GDP_PR_BASIC, 0);
574
        }
575

    
576
        // ASSERT(all data from chan has been consumed);
577

    
578
        if (EP_UT_BITSET(GDP_REQ_ASYNCIO, req->flags))
579
        {
580
                // send the status as an event
581
                estat = _gdp_event_add_from_req(req);
582

    
583
                // since this is asynchronous we can release the PDU
584
                _gdp_pdu_free(&req->rpdu);
585
        }
586
        else if (req->state == GDP_REQ_WAITING)
587
        {
588
                // return our status via the request
589
                req->stat = estat;
590
                req->flags |= GDP_REQ_DONE;
591

    
592
                // any further data or status is delivered via event
593
                req->flags |= GDP_REQ_ASYNCIO | GDP_REQ_PERSIST;        //XXX PERSIST?
594

    
595
                if (ep_dbg_test(DbgProcResp, 40))
596
                {
597
                        ep_dbg_printf("process_resp: signaling ");
598
                        _gdp_req_dump(req, ep_dbg_getfile(), GDP_PR_BASIC, 0);
599
                }
600

    
601
                // wake up invoker, which will return the status
602
                ep_thr_cond_signal(&req->cond);
603

    
604
                // give _gdp_invoke a chance to run; not necessary, but
605
                // avoids having to wait on condition variables
606
                ep_thr_yield();
607
        }
608
        else if (req->rpdu->msg->cmd == GDP_NAK_R_NOROUTE)
609
        {
610
                // since this is common and expected, don't sully output
611
                ep_dbg_cprintf(DbgProcResp, 19,
612
                                "process_resp: discarding GDP_NAK_R_NOROUTE\n");
613
        }
614
        else if (ep_dbg_test(DbgProcResp, 1))
615
        {
616
                ep_dbg_printf("process_resp: discarding response ");
617
                _gdp_req_dump(req, ep_dbg_getfile(), GDP_PR_BASIC, 0);
618
        }
619

    
620
        // free up resources
621
        gob = req->gob;
622
        if (EP_UT_BITSET(GDP_REQ_PERSIST, req->flags))
623
                _gdp_req_unlock(req);
624
        else
625
                _gdp_req_free(&req);                // also decref's req->gob (leaves locked)
626
        if (gob != NULL)
627
        {
628
                if (!GDP_GOB_ASSERT_ISLOCKED(gob) || !EP_ASSERT(gob->refcnt > 0))
629
                        _gdp_gob_dump(gob, ep_dbg_getfile(), GDP_PR_BASIC, 0);
630
                _gdp_gob_decref(&gob, false);        // ref from _gdp_gob_cache_get
631
        }
632

    
633
        ep_dbg_cprintf(DbgProcResp, 40, "process_resp <<< done\n");
634
}
635

    
636

    
637
/*
638
**  _GDP_PDU_PROCESS --- process a PDU
639
**
640
**                This is responsible for the lightweight stuff that can happen
641
**                in the I/O thread, such as matching an ack/nak PDU with the
642
**                corresponding req.  It should never block.  The heavy lifting
643
**                is done in the routine above.
644
*/
645

    
646
void
647
_gdp_pdu_process(gdp_pdu_t *pdu, gdp_chan_t *chan)
648
{
649
        // use "cheat" field in pdu to pass chan up
650
        pdu->chan = chan;
651

    
652
        if (GDP_CMD_IS_COMMAND(pdu->msg->cmd))
653
        {
654
                if (_GdpRunCmdInThread)
655
                        ep_thr_pool_run(&process_cmd, pdu);
656
                else
657
                        process_cmd(pdu);
658
        }
659
        else
660
        {
661
                if (_GdpRunRespInThread)
662
                        ep_thr_pool_run(&process_resp, pdu);
663
                else
664
                        process_resp(pdu);
665
        }
666
}
667

    
668

    
669
/*
670
**  _GDP_RECLAIM_RESOURCES --- find unused GDP resources and reclaim them
671
**
672
**                This should really also have a maximum number of GOBs to leave
673
**                open so we don't run out of file descriptors under high load.
674
**
675
**                This implementation locks the GclsByUse list during the
676
**                entire operation.  That's probably not the best idea.
677
*/
678

    
679
void
680
_gdp_reclaim_resources(void *null)
681
{
682
        char pbuf[200];
683
        time_t reclaim_age;                // how long to leave GOBs open before reclaiming
684

    
685
        ep_dbg_cprintf(Dbg, 69, "_gdp_reclaim_resources\n");
686
        snprintf(pbuf, sizeof pbuf, "swarm.%s.reclaim.age", ep_app_getprogname());
687
        reclaim_age = ep_adm_getlongparam(pbuf, -1);
688
        if (reclaim_age == -1)
689
                reclaim_age = ep_adm_getlongparam("swarm.gdp.reclaim.age",
690
                                                                        GDP_RECLAIM_AGE_DEF);
691
        _gdp_gob_cache_reclaim(reclaim_age);
692
}
693

    
694
// stub for libevent
695

    
696
static void
697
gdp_reclaim_resources_callback(int fd, short what, void *ctx)
698
{
699
        ep_dbg_cprintf(Dbg, 69, "gdp_reclaim_resources_callback\n");
700
        if (ep_adm_getboolparam("swarm.gdp.reclaim.inthread", false))
701
                ep_thr_pool_run(_gdp_reclaim_resources, NULL);
702
        else
703
                _gdp_reclaim_resources(NULL);
704
}
705

    
706

    
707
void
708
_gdp_reclaim_resources_init(void (*f)(int, short, void *))
709
{
710
        static bool running = false;
711

    
712
        if (running)
713
                return;
714
        running = true;
715
        if (f == NULL)
716
                f = &gdp_reclaim_resources_callback;
717

    
718
        long gc_intvl;
719
        char pbuf[200];
720

    
721
        snprintf(pbuf, sizeof pbuf, "swarm.%s.reclaim.interval",
722
                        ep_app_getprogname());
723
        gc_intvl = ep_adm_getlongparam(pbuf, -1);
724
        if (gc_intvl == -1)
725
                gc_intvl = ep_adm_getlongparam("swarm.gdp.reclaim.interval", 15L);
726

    
727
        struct timeval tv = { gc_intvl, 0 };
728
        struct event *evtimer = event_new(_GdpIoEventBase, -1, EV_PERSIST,
729
                                                                f, NULL);
730
        event_add(evtimer, &tv);
731
}
732

    
733

    
734
/*
735
**  Set libevent timer.
736
**
737
**                Timeout is in units of microseconds.
738
**                Not general purpose (assumes *pev never changes type).
739
*/
740

    
741
void
742
_gdp_evloop_timer_set(uint32_t timeout,
743
                                        libevent_event_t *cbfunc,
744
                                        void *cbarg,
745
                                        struct event **pev)
746
{
747
        struct timeval tv;
748
        struct event *ev = *pev;
749

    
750
        if (ev != NULL)
751
                event_free(ev);
752
        *pev = ev = event_new(_GdpIoEventBase, -1, 0, cbfunc, cbarg);
753
        ep_dbg_cprintf(DbgTimers, 52,
754
                        "_gdp_evloop_timer_set(%" PRIu32 ") => %p\n", timeout,  ev);
755
        tv.tv_sec = timeout / 1000000;
756
        tv.tv_usec = timeout % 1000000;
757
        event_add(ev, &tv);
758
}
759

    
760

    
761
/*
762
**  Clear libevent timer.
763
*/
764

    
765
void
766
_gdp_evloop_timer_clr(struct event **pev)
767
{
768
        ep_dbg_cprintf(DbgTimers, 52, "_gdp_evloop_timer_clr(%p)\n", *pev);
769
        if (*pev != NULL)
770
                event_free(*pev);
771
        *pev = NULL;
772
}
773

    
774

    
775
/*
776
**        Base loop to be called for event-driven systems.
777
**        Their events should have already been added to the event base.
778
**
779
**                GdpIoEventLoopThread is also used by gdplogd, hence non-static.
780
*/
781

    
782
EP_THR                        _GdpIoEventLoopThread;
783

    
784
// to ensure event loop is running before we proceed
785
bool                                        GdpIoEventLoopRunning                = false;
786
static EP_THR_MUTEX                GdpIoEventLoopRunningMutex        EP_THR_MUTEX_INITIALIZER;
787
static EP_THR_COND                GdpIoEventLoopRunningCond        EP_THR_COND_INITIALIZER;
788

    
789
static void
790
event_loop_timeout(int fd, short what, void *unused)
791
{
792
        ep_dbg_cprintf(Dbg, 79, "event loop timeout\n");
793
}
794

    
795
void *
796
_gdp_run_event_loop(void *eli_)
797
{
798
        long evdelay = ep_adm_getlongparam("swarm.gdp.event.loopdelay", 100000L);
799
                                                                        // loopdelay in microseconds
800

    
801
        // keep the loop alive if EVLOOP_NO_EXIT_ON_EMPTY isn't available
802
        long ev_timeout = ep_adm_getlongparam("swarm.gdp.event.looptimeout", 30L);
803
                                                                        // looptimeout in seconds
804
        struct timeval tv = { ev_timeout, 0 };
805
        struct event *evtimer = event_new(_GdpIoEventBase, -1, EV_PERSIST,
806
                        &event_loop_timeout, NULL);
807
        event_add(evtimer, &tv);
808

    
809
        for (;;)
810
        {
811
                if (ep_dbg_test(Dbg, 20))
812
                {
813
                        ep_dbg_printf("gdp_event_loop: starting up base loop\n");
814
                        event_base_dump_events(_GdpIoEventBase, ep_dbg_getfile());
815
                }
816

    
817
                ep_thr_mutex_lock(&GdpIoEventLoopRunningMutex);
818
                GdpIoEventLoopRunning = true;
819
                ep_thr_cond_broadcast(&GdpIoEventLoopRunningCond);
820
                ep_thr_mutex_unlock(&GdpIoEventLoopRunningMutex);
821

    
822
#ifdef EVLOOP_NO_EXIT_ON_EMPTY
823
                event_base_loop(_GdpIoEventBase, EVLOOP_NO_EXIT_ON_EMPTY);
824
#else
825
                event_base_loop(_GdpIoEventBase, 0);
826
#endif
827

    
828
                GdpIoEventLoopRunning = false;
829

    
830
                if (ep_dbg_test(Dbg, 1))
831
                {
832
                        ep_dbg_printf("gdp_event_loop: event_base_loop returned\n");
833
                        if (event_base_got_break(_GdpIoEventBase))
834
                                ep_dbg_printf(" ... as a result of loopbreak\n");
835
                        if (event_base_got_exit(_GdpIoEventBase))
836
                                ep_dbg_printf(" ... as a result of loopexit\n");
837
                }
838
                if (event_base_got_exit(_GdpIoEventBase) ||
839
                                event_base_got_break(_GdpIoEventBase))
840
                {
841
                        // the GDP daemon went away intentionally
842
                        return NULL;
843
                }
844

    
845
                if (evdelay > 0)
846
                        ep_time_nanosleep(evdelay * 1000LL);                // avoid CPU hogging
847
        }
848

    
849
        ep_log(GDP_STAT_DEAD_DAEMON, "lost channel to gdp");
850
        ep_app_abort("lost channel to gdp");
851
}
852

    
853
static EP_STAT
854
_gdp_start_event_loop_thread(EP_THR *thr)
855
{
856
        if (ep_thr_spawn(thr, _gdp_run_event_loop, NULL) != 0)
857
                return init_error("cannot create event loop thread",
858
                                                "_gdp_start_event_loop_thread");
859
        else
860
                return EP_STAT_OK;
861
}
862

    
863
void
864
_gdp_stop_event_loop(void)
865
{
866
        event_base_loopbreak(_GdpIoEventBase);
867
        ep_thr_mutex_lock(&GdpIoEventLoopRunningMutex);
868
        GdpIoEventLoopRunning = false;
869
        ep_thr_cond_broadcast(&GdpIoEventLoopRunningCond);
870
        ep_thr_mutex_unlock(&GdpIoEventLoopRunningMutex);
871
}
872

    
873

    
874
/*
875
**   Logging callback for event library (for debugging).
876
*/
877

    
878
static EP_DBG        EvlibDbg = EP_DBG_INIT("gdp.libevent", "GDP Libevent");
879

    
880
static void
881
evlib_log_cb(int severity, const char *msg)
882
{
883
        const char *sev;
884
        const char *sevstrings[] = { "debug", "msg", "warn", "error" };
885

    
886
        if (severity < 0 || severity > 3)
887
                sev = "?";
888
        else
889
                sev = sevstrings[severity];
890
        ep_dbg_cprintf(EvlibDbg, ((4 - severity) * 20) + 2, "[%s] %s\n", sev, msg);
891
}
892

    
893

    
894
/*
895
**  Arrange to call atexit(3) functions on SIGINT and SIGTERM
896
*/
897

    
898
static void
899
exit_on_signal(int sig)
900
{
901
        ep_app_warn("Exiting on signal %d", sig);
902
        _gdp_stop_event_loop();
903
        exit(sig);
904
}
905

    
906

    
907
/*
908
**  Change user id to something innocuous.
909
*/
910

    
911
void
912
_gdp_run_as(const char *runasuser)
913
{
914
        if (runasuser != NULL && *runasuser != '\0')
915
        {
916
                uid_t uid;
917
                gid_t gid;
918
                struct passwd *pw = getpwnam(runasuser);
919
                if (pw == NULL)
920
                {
921
                        ep_app_warn("User %s unknown; running as 1:1 (daemon)",
922
                                        runasuser);
923
                        gid = 1;
924
                        uid = 1;
925
                }
926
                else
927
                {
928
                        gid = pw->pw_gid;
929
                        uid = pw->pw_uid;
930
                }
931
                if (setgid(gid) < 0 || setuid(uid) < 0)
932
                        ep_app_warn("Cannot set user/group id (%d:%d)", uid, gid);
933
        }
934
}
935

    
936

    
937
/*
938
**  Print all outstanding requests on a channel
939
*/
940

    
941
void
942
_gdp_chan_dumpreqs(gdp_chan_t *chan, FILE *fp)
943
{
944
        gdp_req_t *req;
945
        gdp_chan_x_t *chanx = _gdp_chan_get_cdata(chan);
946

    
947
        if (chanx == NULL)
948
        {
949
                fprintf(fp, "\n<<< No Requests >>>\n");
950
                return;
951
        }
952
        fprintf(fp, "\n<<< Active requests >>>\n");
953
        LIST_FOREACH(req, &chanx->reqs, chanlist)
954
        {
955
                _gdp_req_dump(req, fp, GDP_PR_PRETTY, 0);
956
        }
957
}
958

    
959

    
960
/*
961
**  SIGINFO --- called to print out internal state (for debugging)
962
**
963
**                On BSD and MacOS this is implemented as a SIGINFO (^T from
964
**                the command line), but since Linux doesn't have that we use
965
**                SIGUSR1 instead.
966
*/
967

    
968
extern const char        GdpVersion[];
969
EP_FUNCLIST                        *_GdpDumpFuncs;
970

    
971
void
972
_gdp_dump_state(int plev)
973
{
974
        FILE *fp = stderr;                        // should this be the debug file?
975
        flockfile(fp);
976
        fprintf(fp, "\n<<< GDP STATE >>>\nVersion: %s\n", GdpVersion);
977

    
978
        _gdp_gob_cache_dump(plev, fp);                        // GOB cache contents
979
        _gdp_chan_dumpreqs(_GdpChannel, fp);        // outstanding requests
980

    
981
        if (_GdpDumpFuncs != NULL)
982
                ep_funclist_invoke(_GdpDumpFuncs, (void *) fp);
983

    
984
        fprintf(fp, "\n<<< Open file descriptors >>>\n");
985
        ep_app_dumpfds(fp);
986
        fprintf(fp, "\n<<< Stack backtrace >>>\n");
987
        ep_dbg_backtrace(fp);
988
        fprintf(fp, "\n<<< Statistics >>>\n");
989
        _gdp_req_pr_stats(fp);
990
        _gdp_gob_pr_stats(fp);
991
        funlockfile(fp);
992
}
993

    
994

    
995
static void
996
siginfo(int sig, short what, void *arg)
997
{
998
        if (ep_dbg_test(Dbg, 1))
999
                _gdp_dump_state(GDP_PR_DETAILED);
1000
        else
1001
                _gdp_dump_state(GDP_PR_PRETTY);
1002
}
1003

    
1004

    
1005

    
1006
/*
1007
**  Initialization, Part 0:
1008
**                Initialize external libraries.
1009
**
1010
**                Used by a few of the utility routines, but unusual in that
1011
**                it doesn't actually start up GDP communications.
1012
*/
1013

    
1014
EP_STAT
1015
gdp_init_phase_0(const char *progname, uint32_t flags)
1016
{
1017
        ep_dbg_cprintf(Dbg, 4, "gdp_init_phase_0: %s\n", GdpVersion);
1018

    
1019
        if (_GdpInitState >= GDP_INIT_PHASE_0)
1020
                return EP_STAT_OK;
1021

    
1022
        // initialize the EP library
1023
        ep_lib_init(EP_LIB_USEPTHREADS);
1024
        _GdpDumpFuncs = ep_funclist_new("GDP debug dump functions");
1025

    
1026
        // initialize runtime parameters
1027
        _gdp_adm_readparams("gdp");
1028
        if (progname == NULL)
1029
                progname = ep_app_getprogname();
1030
        if (progname != NULL)
1031
                _gdp_adm_readparams(progname);
1032
        ep_crypto_init(0);
1033

    
1034
        // clear out spurious errors
1035
        errno = 0;
1036

    
1037
        // we can now re-adjust debugging
1038
        ep_dbg_setfile(NULL);
1039

    
1040
        // register status strings
1041
        _gdp_stat_init();
1042

    
1043
        // if not using Zeroconf, disable it
1044
        if (EP_UT_BITSET(GDP_INIT_NO_ZEROCONF, flags))
1045
                ep_adm_setparam("swarm.gdp.zeroconf.enable", "false");
1046

    
1047
        _GdpInitState = GDP_INIT_PHASE_0;
1048

    
1049
        return EP_STAT_OK;
1050
}
1051

    
1052

    
1053
/*
1054
**  Initialization, Part 1:
1055
**                Initialize the various external libraries.
1056
**                Set up the I/O event loop base.
1057
**                Initialize the GOB cache.
1058
**                Start the event loop.
1059
*/
1060

    
1061
// locks out multiple calls to gdp_lib_init
1062
static EP_THR_MUTEX                GdpInitMutex        EP_THR_MUTEX_INITIALIZER;
1063
int                                                _GdpInitState;
1064

    
1065
EP_STAT
1066
gdp_lib_init(const char *progname, const char *myname, uint32_t flags)
1067
{
1068
        EP_STAT estat = EP_STAT_OK;
1069
        const char *phase = NULL;
1070

    
1071
        ep_dbg_cprintf(Dbg, 4, "_gdp_lib_init(%s)\n",
1072
                        myname == NULL ? "NULL" : myname);
1073

    
1074
        // need to initialize libep before using mutexes
1075
        phase = "ep_lib_init";
1076
        estat = ep_lib_init(EP_LIB_USEPTHREADS);
1077
        EP_STAT_CHECK(estat, goto fail0);
1078

    
1079
        ep_thr_mutex_lock(&GdpInitMutex);
1080
        if (_GdpInitState >= GDP_INIT_LIB)
1081
                goto done;
1082

    
1083
        gdp_init_phase_0(progname, flags);
1084

    
1085
        // initialize external -> internal name mapping
1086
        if (!EP_UT_BITSET(GDP_INIT_NO_HONGDS, flags))
1087
        {
1088
                if (ep_adm_getboolparam("swarm.gdp.hongd.optional", false))
1089
                        flags |= GDP_INIT_OPT_HONGDS;
1090
                phase = "_gdp_name_init";
1091
                estat = _gdp_name_init(NULL);
1092
                if (EP_UT_BITSET(GDP_INIT_OPT_HONGDS, flags))
1093
                        estat = EP_STAT_OK;
1094
                EP_STAT_CHECK(estat, goto fail0);
1095
        }
1096

    
1097
        if (ep_dbg_test(EvlibDbg, 80))
1098
        {
1099
                // according to the book...
1100
                //event_enable_debug_logging(EVENT_DBG_ALL);
1101
                // according to the code...
1102
                event_enable_debug_mode();
1103
        }
1104

    
1105
        // arrange to call atexit(3) functions on SIGTERM
1106
        if (ep_adm_getboolparam("swarm.gdp.catch.sigint", true))
1107
                (void) signal(SIGINT, exit_on_signal);
1108
        if (ep_adm_getboolparam("swarm.gdp.catch.sigterm", true))
1109
                (void) signal(SIGTERM, exit_on_signal);
1110

    
1111
        // get assertion behavior information
1112
        // [DEPRECATED: use libep.assert.allabort]
1113
        EpAssertAllAbort = ep_adm_getboolparam("swarm.gdp.debug.assert.allabort",
1114
                                                                        EpAssertAllAbort);
1115

    
1116
        // check to see if commands/responses should be run in threads
1117
        _GdpRunCmdInThread = ep_adm_getboolparam("swarm.gdp.command.runinthread",
1118
                                                                        true);
1119
        _GdpRunRespInThread = ep_adm_getboolparam("swarm.gdp.response.runinthread",
1120
                                                                        false);
1121

    
1122
        // figure out or generate our name (for routing)
1123
        if (myname == NULL && progname != NULL)
1124
        {
1125
                char argname[100];
1126

    
1127
                snprintf(argname, sizeof argname, "swarm.%s.gdpname", progname);
1128
                myname = ep_adm_getstrparam(argname, NULL);
1129
        }
1130

    
1131
        if (myname != NULL)
1132
        {
1133
                gdp_pname_t pname;
1134

    
1135
                estat = gdp_parse_name(myname, _GdpMyRoutingName);
1136
                ep_dbg_cprintf(Dbg, 9, "Setting my name:\n\t%s\n\t%s\n",
1137
                                myname, gdp_printable_name(_GdpMyRoutingName, pname));
1138
                if (EP_STAT_ISFAIL(estat))
1139
                        myname = NULL;
1140
        }
1141

    
1142
        if (!gdp_name_is_valid(_GdpMyRoutingName))
1143
                _gdp_newname(_GdpMyRoutingName, NULL);
1144

    
1145
        // avoid running as root if possible (and another user specified)
1146
        if (progname != NULL)
1147
        {
1148
                char argname[100];
1149
                const char *logfac;
1150

    
1151
                if (getuid() == 0)
1152
                {
1153
                        snprintf(argname, sizeof argname, "swarm.%s.runasuser", progname);
1154
                        _gdp_run_as(ep_adm_getstrparam(argname, NULL));
1155
                }
1156

    
1157
                // allow log facilities on a per-app basis
1158
                snprintf(argname, sizeof argname, "swarm.%s.syslog.facility", progname);
1159
                logfac = ep_adm_getstrparam(argname, NULL);
1160
                if (logfac == NULL)
1161
                        logfac = ep_adm_getstrparam("swarm.gdp.syslog.facility", "local4");
1162
                ep_log_init(progname, ep_syslog_fac_from_name(logfac), stderr);
1163
        }
1164

    
1165
        if (getuid() == 0)
1166
                _gdp_run_as(ep_adm_getstrparam("swarm.gdp.runasuser", NULL));
1167

    
1168
        if (ep_dbg_test(Dbg, 1))
1169
        {
1170
                gdp_pname_t pname;
1171

    
1172
                ep_dbg_printf("My GDP routing name = %s\n",
1173
                                gdp_printable_name(_GdpMyRoutingName, pname));
1174
        }
1175

    
1176
        // initialize the GOB cache.  In theory this "cannot fail"
1177
        phase = "_gdp_gob_cache_init";
1178
        estat = _gdp_gob_cache_init();
1179
        EP_STAT_CHECK(estat, goto fail0);
1180

    
1181
        // tell the event library that we're using pthreads
1182
        if (evthread_use_pthreads() < 0)
1183
                return init_error("cannot use pthreads", "gdp_lib_init");
1184
        if (ep_dbg_test(DbgEvLock, 90))
1185
        {
1186
                evthread_enable_lock_debuging();
1187
        }
1188

    
1189
        // use our debugging printer
1190
        event_set_log_callback(evlib_log_cb);
1191

    
1192
        // set up the event base
1193
        if (_GdpIoEventBase == NULL)
1194
        {
1195
                // Initialize for I/O events
1196
                struct event_config *ev_cfg = event_config_new();
1197

    
1198
                phase = "event_base_new_with_config";
1199
                event_config_require_features(ev_cfg, 0);
1200
                _GdpIoEventBase = event_base_new_with_config(ev_cfg);
1201
                if (_GdpIoEventBase == NULL)
1202
                        estat = init_error("could not create event base", "gdp_lib_init");
1203
                event_config_free(ev_cfg);
1204
                EP_STAT_CHECK(estat, goto fail0);
1205

    
1206
                // add a debugging signal to print out some internal data structures
1207
#ifdef SIGINFO
1208
                event_add(evsignal_new(_GdpIoEventBase, SIGINFO, siginfo, NULL), NULL);
1209
#endif
1210
                event_add(evsignal_new(_GdpIoEventBase, SIGUSR1, siginfo, NULL), NULL);
1211
        }
1212

    
1213
        phase = "_gdp_chan_init";
1214
        estat = _gdp_chan_init(_GdpIoEventBase, NULL);
1215
        EP_STAT_CHECK(estat, goto fail0);
1216

    
1217
        phase = NULL;
1218

    
1219
done:
1220
fail0:
1221
        if (ep_dbg_test(Dbg, EP_STAT_ISOK(estat) ? 8 : 1))
1222
        {
1223
                char ebuf[200];
1224
                ep_stat_tostr(estat, ebuf, sizeof ebuf);
1225

    
1226
                if (phase == NULL)
1227
                        ep_dbg_printf("gdp_lib_init: %s\n", ebuf);
1228
                else
1229
                        ep_dbg_printf("gdp_lib_init: %s: %s\n", phase, ebuf);
1230
        }
1231

    
1232
        _GdpInitState = GDP_INIT_LIB;
1233
        ep_thr_mutex_unlock(&GdpInitMutex);
1234
        return estat;
1235
}
1236

    
1237

    
1238
EP_STAT
1239
_gdp_evloop_init(void)
1240
{
1241
        EP_STAT estat;
1242

    
1243
        // set up synchronization for event loop thread startup
1244
        ep_thr_mutex_lock(&GdpIoEventLoopRunningMutex);
1245

    
1246
        if (!GdpIoEventLoopRunning)
1247
        {
1248
                // create a thread to run the event loop
1249
                estat = _gdp_start_event_loop_thread(&_GdpIoEventLoopThread);
1250
        }
1251

    
1252
        while (!GdpIoEventLoopRunning)
1253
                ep_thr_cond_wait(&GdpIoEventLoopRunningCond,
1254
                                                &GdpIoEventLoopRunningMutex, NULL);
1255
        ep_thr_mutex_unlock(&GdpIoEventLoopRunningMutex);
1256

    
1257
        return estat;
1258
}
1259

    
1260

    
1261
/*
1262
*/
1263

    
1264

    
1265
/*
1266
**  Data Ready (Receive) callback
1267
**
1268
**                Called whenever there is input from the channel.
1269
**                It is up to this routine to actually read the data from the
1270
**                chan level buffer into active memory.
1271
*/
1272

    
1273
EP_STAT
1274
_gdp_io_recv(
1275
                gdp_chan_t *chan,
1276
                gdp_name_t src,
1277
                gdp_name_t dst,
1278
                gdp_seqno_t seqno,
1279
                gdp_buf_t *payload_buf,
1280
                size_t payload_len)
1281
{
1282
        EP_STAT estat;
1283

    
1284
        gdp_pdu_t *pdu = _gdp_pdu_new(NULL, src, dst, seqno);
1285
        estat = _gdp_pdu_in(pdu, payload_buf, payload_len, chan);
1286
        EP_STAT_CHECK(estat, goto fail0);
1287

    
1288
        _gdp_pdu_process(pdu, chan);
1289
        // _gdp_pdu_process frees pdu, possibly in a thread
1290

    
1291
fail0:
1292
        {
1293
                char ebuf[100];
1294
                ep_dbg_cprintf(Dbg, EP_STAT_ISOK(estat) ? 21 : 3,
1295
                                "_gdp_io_recv: %s\n",
1296
                                ep_stat_tostr(estat, ebuf, sizeof ebuf));
1297
        }
1298
        return estat;
1299
}
1300

    
1301

    
1302
/*
1303
**  Router Event callback
1304
**
1305
**                Called when the router has something to signal to the higher level.
1306
*/
1307

    
1308
EP_STAT
1309
_gdp_router_event(
1310
                gdp_chan_t *chan,
1311
                gdp_name_t src,
1312
                gdp_name_t dst,
1313
                size_t payload_len,
1314
                EP_STAT estat)
1315
{
1316
        // fake up a PDU for the router event
1317
        gdp_cmd_t cmd = (gdp_cmd_t) 0;
1318

    
1319
        if (EP_STAT_IS_SAME(estat, GDP_STAT_NAK_NOROUTE))
1320
                cmd = GDP_NAK_R_NOROUTE;
1321
        else
1322
                goto fail0;
1323

    
1324
        {
1325
                //XXX wildcard seqno?
1326
                GdpMessage *msg = _gdp_msg_new(cmd, GDP_PDU_ANY_RID, GDP_PDU_NO_L5SEQNO);
1327
                gdp_pdu_t *pdu = _gdp_pdu_new(msg, src, dst, GDP_SEQNO_NONE);
1328

    
1329
                if (msg->cmd != 0)
1330
                        _gdp_pdu_process(pdu, chan);
1331
                else
1332
                        _gdp_pdu_free(&pdu);
1333
        }
1334

    
1335
fail0:
1336
        if (ep_dbg_test(Dbg, 23))
1337
        {
1338
                char ebuf[100];
1339
                ep_dbg_printf("_gdp_router_event: %s\n",
1340
                                ep_stat_tostr(estat, ebuf, sizeof ebuf));
1341
        }
1342
        return estat;
1343
}
1344

    
1345

    
1346

    
1347
/*
1348
**  Data I/O Event callback
1349
**
1350
**                Typically connects, disconnects, and errors
1351
**
1352
**                Following is a list of actions that should be undertaken in
1353
**                response to various events:
1354
**
1355
**                Event                                        Client Action                                Gdplogd Action
1356
**
1357
**                connection established        advertise one                                advertise all
1358
**                                                                re-subscribe all
1359
**
1360
**                connection lost [1]                retry open                                        retry open
1361
**
1362
**                data available                        process command/ack                        process command/ack
1363
**
1364
**                write complete                        anything needed?                        anything needed?
1365
**
1366
**                advertise timeout                re-advertise me                                re-advertise all
1367
**
1368
**                connection close                withdraw me                                        withdraw all
1369
**
1370
**                [1] Should be handled automatically by the channel layer, but should
1371
**                generate a "connection established" event.
1372
*/
1373

    
1374
EP_STAT
1375
_gdp_io_event(
1376
                gdp_chan_t *chan,
1377
                uint32_t what)
1378
{
1379
        EP_STAT estat = EP_STAT_OK;
1380

    
1381
        if (EP_UT_BITSET(BEV_EVENT_CONNECTED, what))
1382
        {
1383
                // connection up; do advertising and resend subscriptions (if any)
1384
                gdp_chan_x_t *cx = _gdp_chan_get_cdata(chan);
1385
                if (cx->connect_cb != NULL)
1386
                        estat = (*cx->connect_cb)(chan);
1387
        }
1388
        return estat;
1389
}