gdp / gdp / gdp_main.c @ master
History | View | Annotate | Download (34.6 KB)
1 |
/* vim: set ai sw=4 sts=4 ts=4 :*/
|
---|---|
2 |
|
3 |
/*
|
4 |
** GDP Initialization and main event loop.
|
5 |
**
|
6 |
** ----- BEGIN LICENSE BLOCK -----
|
7 |
** GDP: Global Data Plane Support Library
|
8 |
** From the Ubiquitous Swarm Lab, 490 Cory Hall, U.C. Berkeley.
|
9 |
**
|
10 |
** Copyright (c) 2015-2019, Regents of the University of California.
|
11 |
** All rights reserved.
|
12 |
**
|
13 |
** Permission is hereby granted, without written agreement and without
|
14 |
** license or royalty fees, to use, copy, modify, and distribute this
|
15 |
** software and its documentation for any purpose, provided that the above
|
16 |
** copyright notice and the following two paragraphs appear in all copies
|
17 |
** of this software.
|
18 |
**
|
19 |
** IN NO EVENT SHALL REGENTS BE LIABLE TO ANY PARTY FOR DIRECT, INDIRECT,
|
20 |
** SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING LOST
|
21 |
** PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION,
|
22 |
** EVEN IF REGENTS HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
23 |
**
|
24 |
** REGENTS SPECIFICALLY DISCLAIMS ANY WARRANTIES, INCLUDING, BUT NOT
|
25 |
** LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
|
26 |
** FOR A PARTICULAR PURPOSE. THE SOFTWARE AND ACCOMPANYING DOCUMENTATION,
|
27 |
** IF ANY, PROVIDED HEREUNDER IS PROVIDED "AS IS". REGENTS HAS NO
|
28 |
** OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS,
|
29 |
** OR MODIFICATIONS.
|
30 |
** ----- END LICENSE BLOCK -----
|
31 |
*/
|
32 |
|
33 |
#include "gdp.h" |
34 |
#include "gdp_chan.h" |
35 |
#include "gdp_event.h" |
36 |
#include "gdp_hongd.h" |
37 |
#include "gdp_priv.h" |
38 |
#include "gdp_version.h" |
39 |
|
40 |
#include <ep/ep.h> |
41 |
#include <ep/ep_app.h> |
42 |
#include <ep/ep_dbg.h> |
43 |
#include <ep/ep_funclist.h> |
44 |
#include <ep/ep_log.h> |
45 |
#include <ep/ep_syslog.h> |
46 |
|
47 |
#include <event2/buffer.h> |
48 |
#include <event2/thread.h> |
49 |
|
50 |
#include <errno.h> |
51 |
#include <pwd.h> |
52 |
#include <signal.h> |
53 |
#include <string.h> |
54 |
|
55 |
static EP_DBG Dbg = EP_DBG_INIT("gdp.main", "GDP initialization and main loop"); |
56 |
static EP_DBG DbgEvLock = EP_DBG_INIT("gdp.libevent.locks", "GDP libevent lock debugging"); |
57 |
static EP_DBG DbgProcResp = EP_DBG_INIT("gdp.response", "GDP response processing"); |
58 |
static EP_DBG DbgTimers = EP_DBG_INIT("gdp.libevent.timers", "GDP timer events"); |
59 |
|
60 |
struct event_base *_GdpIoEventBase; // the base for GDP I/O events |
61 |
gdp_name_t _GdpMyRoutingName; // source name for PDUs
|
62 |
gdp_chan_t *_GdpChannel; // our primary app-level protocol port
|
63 |
static bool _GdpRunCmdInThread = true; // run commands in threads |
64 |
static bool _GdpRunRespInThread = false; // run responses in threads |
65 |
bool _GdpLibInitialized; // are we initialized? |
66 |
|
67 |
|
68 |
/*
|
69 |
** INIT_ERROR --- issue error on initialization problem
|
70 |
*/
|
71 |
|
72 |
static EP_STAT
|
73 |
init_error(const char *datum, const char *where) |
74 |
{ |
75 |
EP_STAT estat = ep_stat_from_errno(errno); |
76 |
char nbuf[40]; |
77 |
|
78 |
(void) (0 == strerror_r(errno, nbuf, sizeof nbuf)); |
79 |
ep_log(estat, "gdp_init: %s: %s", where, datum);
|
80 |
ep_app_error("gdp_init: %s: %s: %s", where, datum, nbuf);
|
81 |
return estat;
|
82 |
} |
83 |
|
84 |
|
85 |
gdp_cmd_t |
86 |
_gdp_acknak_from_estat(EP_STAT estat, gdp_cmd_t def) |
87 |
{ |
88 |
gdp_cmd_t resp = def; |
89 |
|
90 |
if (EP_STAT_ISOK(estat))
|
91 |
{ |
92 |
if (def < GDP_ACK_MIN || def > GDP_ACK_MAX)
|
93 |
resp = GDP_ACK_SUCCESS; |
94 |
} |
95 |
else if (EP_STAT_REGISTRY(estat) == EP_REGISTRY_EPLIB && |
96 |
EP_STAT_MODULE(estat) == EP_STAT_MOD_CRYPTO) |
97 |
{ |
98 |
resp = GDP_NAK_C_UNAUTH; |
99 |
} |
100 |
else if (EP_STAT_IS_SAME(estat, GDP_STAT_NOT_IMPLEMENTED)) |
101 |
resp = GDP_NAK_S_NOTIMPL; |
102 |
else if (EP_STAT_REGISTRY(estat) == EP_REGISTRY_UCB && |
103 |
EP_STAT_MODULE(estat) == GDP_MODULE) |
104 |
{ |
105 |
// if the estat contains the detail, prefer that
|
106 |
int d = EP_STAT_DETAIL(estat);
|
107 |
|
108 |
if (!EP_STAT_ISFAIL(estat))
|
109 |
{ |
110 |
if (d >= 200 && d <= (200 + GDP_ACK_MAX - GDP_ACK_MIN)) |
111 |
resp = (gdp_cmd_t) (d - 200 + GDP_ACK_MIN);
|
112 |
} |
113 |
else if (d >= 400 && d <= (400 + GDP_NAK_C_MAX - GDP_NAK_C_MIN)) |
114 |
resp = (gdp_cmd_t) (d - 400 + GDP_NAK_C_MIN);
|
115 |
else if (d >= 500 && d <= (500 + GDP_NAK_S_MAX - GDP_NAK_S_MIN)) |
116 |
resp = (gdp_cmd_t) (d - 500 + GDP_NAK_S_MIN);
|
117 |
else if (d >= 600 && d <= (600 + GDP_NAK_R_MAX - GDP_NAK_R_MIN)) |
118 |
resp = (gdp_cmd_t) (d - 600 + GDP_NAK_R_MIN);
|
119 |
} |
120 |
else if (resp < GDP_NAK_C_MIN || resp > GDP_NAK_R_MAX) |
121 |
resp = GDP_NAK_S_INTERNAL; // default to panic code
|
122 |
|
123 |
if (ep_dbg_test(Dbg, 41)) |
124 |
{ |
125 |
char ebuf[100]; |
126 |
|
127 |
ep_dbg_printf("_gdp_acknak_from_estat: %s -> %s\n",
|
128 |
ep_stat_tostr(estat, ebuf, sizeof ebuf),
|
129 |
_gdp_proto_cmd_name(resp)); |
130 |
} |
131 |
return resp;
|
132 |
} |
133 |
|
134 |
|
135 |
/*
|
136 |
** PROCESS_CMD --- process command PDU
|
137 |
**
|
138 |
** Usually done in a thread since it may be heavy weight.
|
139 |
** This usually only applies to gdplogd.
|
140 |
*/
|
141 |
|
142 |
static EP_THR_MUTEX GdpCreateMutex EP_THR_MUTEX_INITIALIZER;
|
143 |
|
144 |
static void |
145 |
process_cmd(void *cpdu_)
|
146 |
{ |
147 |
gdp_pdu_t *cpdu = (gdp_pdu_t *) cpdu_; |
148 |
gdp_cmd_t cmd; |
149 |
EP_STAT estat; |
150 |
gdp_gob_t *gob = NULL;
|
151 |
gdp_req_t *req = NULL;
|
152 |
EP_TIME_SPEC starttime; |
153 |
|
154 |
ep_time_now(&starttime); |
155 |
GDP_MSG_CHECK(cpdu, return);
|
156 |
cmd = cpdu->msg->cmd; |
157 |
|
158 |
ep_dbg_cprintf(Dbg, 40,
|
159 |
"process_cmd(%s, thread %" EP_THR_PRItid ")\n", |
160 |
_gdp_proto_cmd_name(cmd), ep_thr_gettid()); |
161 |
|
162 |
// create has too many special cases, so we single thread it
|
163 |
if (cmd == GDP_CMD_CREATE)
|
164 |
ep_thr_mutex_lock(&GdpCreateMutex); |
165 |
|
166 |
estat = _gdp_gob_cache_get(cpdu->dst, GGCF_NOCREATE, &gob); |
167 |
if (gob != NULL) |
168 |
{ |
169 |
GDP_GOB_ASSERT_ISLOCKED(gob); |
170 |
EP_ASSERT(gob->refcnt > 0);
|
171 |
} |
172 |
|
173 |
ep_dbg_cprintf(Dbg, 43,
|
174 |
"process_cmd: allocating new req for GOB %p\n", gob);
|
175 |
estat = _gdp_req_new(cmd, gob, cpdu->chan, cpdu, 0, &req);
|
176 |
EP_STAT_CHECK(estat, goto fail0);
|
177 |
EP_THR_MUTEX_ASSERT_ISLOCKED(&req->mutex); |
178 |
EP_ASSERT(gob == req->gob); |
179 |
|
180 |
ep_dbg_cprintf(Dbg, 40, "process_cmd >>> req=%p\n", req); |
181 |
|
182 |
// do the per-command processing
|
183 |
estat = _gdp_req_dispatch(req, cmd); |
184 |
if (ep_dbg_test(Dbg, 59)) |
185 |
{ |
186 |
char ebuf[100]; |
187 |
ep_dbg_printf("process_cmd: dispatch => %s\n ",
|
188 |
ep_stat_tostr(estat, ebuf, sizeof ebuf));
|
189 |
_gdp_req_dump(req, ep_dbg_getfile(), GDP_PR_BASIC, 1);
|
190 |
} |
191 |
bool response_already_sent = EP_STAT_IS_SAME(estat, GDP_STAT_RESPONSE_SENT);
|
192 |
|
193 |
// make sure request or GOB haven't gotten fubared
|
194 |
EP_THR_MUTEX_ASSERT_ISLOCKED(&req->mutex); |
195 |
GDP_MSG_CHECK(req->cpdu, goto fail1);
|
196 |
GDP_MSG_CHECK(req->rpdu, goto fail1);
|
197 |
|
198 |
// special case: if we have deleted a GOB, it will have disappeared now
|
199 |
if (req->gob == NULL && gob != NULL && |
200 |
req->cpdu->msg->cmd == GDP_CMD_DELETE && |
201 |
GDP_CMD_IS_ACK(req->rpdu->msg->cmd)) |
202 |
{ |
203 |
_gdp_gob_unlock(gob); |
204 |
gob = NULL;
|
205 |
} |
206 |
|
207 |
if (gob != NULL) |
208 |
{ |
209 |
GDP_GOB_ASSERT_ISLOCKED(gob); |
210 |
if (!EP_ASSERT(gob == req->gob) && ep_dbg_test(Dbg, 1)) |
211 |
{ |
212 |
ep_dbg_printf("process_cmd, after dispatch:\n gob = ");
|
213 |
_gdp_gob_dump(gob, ep_dbg_getfile(), GDP_PR_BASIC, 0);
|
214 |
ep_dbg_printf(" req->gob = ");
|
215 |
_gdp_gob_dump(req->gob, ep_dbg_getfile(), GDP_PR_BASIC, 0);
|
216 |
} |
217 |
} |
218 |
|
219 |
// cmd_open and cmd_create can return a new GOB in the req
|
220 |
if (gob == NULL && req->gob != NULL) |
221 |
gob = _gdp_gob_incref(req->gob); |
222 |
|
223 |
if (!response_already_sent)
|
224 |
{ |
225 |
req->stat = _gdp_pdu_out(req->rpdu, req->chan); |
226 |
} |
227 |
|
228 |
EP_ASSERT(gob == req->gob); |
229 |
if (gob != NULL) |
230 |
{ |
231 |
GDP_GOB_ASSERT_ISLOCKED(gob); |
232 |
EP_ASSERT(gob->refcnt > 0);
|
233 |
} |
234 |
|
235 |
// do command post processing
|
236 |
if (req->postproc)
|
237 |
{ |
238 |
ep_dbg_cprintf(Dbg, 44, "process_cmd: doing post processing\n"); |
239 |
(req->postproc)(req); |
240 |
req->postproc = NULL;
|
241 |
|
242 |
// postproc shouldn't change GOB lock status
|
243 |
EP_ASSERT(gob == req->gob); |
244 |
} |
245 |
|
246 |
fail1:
|
247 |
// free up resources
|
248 |
if (EP_UT_BITSET(GDP_REQ_PERSIST, req->flags))
|
249 |
_gdp_req_unlock(req); |
250 |
else
|
251 |
_gdp_req_free(&req); // also decref's req->gob (leaves locked)
|
252 |
if (gob != NULL) |
253 |
{ |
254 |
if (!GDP_GOB_ASSERT_ISLOCKED(gob) || !EP_ASSERT(gob->refcnt > 0)) |
255 |
_gdp_gob_dump(gob, ep_dbg_getfile(), GDP_PR_BASIC, 0);
|
256 |
_gdp_gob_decref(&gob, false); // ref from _gdp_gob_cache_get |
257 |
} |
258 |
|
259 |
if (false) |
260 |
{ |
261 |
fail0:
|
262 |
ep_log(estat, "process_cmd: cannot allocate request; dropping PDU");
|
263 |
if (cpdu != NULL) |
264 |
_gdp_pdu_free(&cpdu); |
265 |
} |
266 |
|
267 |
if (cmd == GDP_CMD_CREATE)
|
268 |
ep_thr_mutex_unlock(&GdpCreateMutex); |
269 |
|
270 |
ep_dbg_cprintf(Dbg, 40, "process_cmd <<< done\n"); |
271 |
_gdp_show_elapsed("process_cmd", cmd, &starttime);
|
272 |
} |
273 |
|
274 |
|
275 |
/*
|
276 |
** Search for a request in a channel list.
|
277 |
**
|
278 |
** This is a fall-back that should be used only for finding requests
|
279 |
** sent by FWD_APPEND to a server that isn't accessible. Since that
|
280 |
** command links the request to a different GOB than the destination
|
281 |
** address in the PDU, a NOROUTE response won't find it.
|
282 |
*/
|
283 |
|
284 |
static EP_STAT
|
285 |
find_req_in_channel_list( |
286 |
const gdp_pdu_t *rpdu,
|
287 |
gdp_chan_t *chan, |
288 |
gdp_req_t **reqp) |
289 |
{ |
290 |
EP_STAT estat = EP_STAT_OK; |
291 |
gdp_req_t *req; |
292 |
gdp_chan_x_t *chanx; |
293 |
|
294 |
if (ep_dbg_test(DbgProcResp, 14)) |
295 |
{ |
296 |
gdp_pname_t src_p, dst_p; |
297 |
|
298 |
ep_dbg_printf("find_req_in_channel_list: searching for rpdu rid "
|
299 |
"%" PRIgdp_rid
|
300 |
"\n src %s\n dst %s\n",
|
301 |
rpdu->msg->rid, |
302 |
gdp_printable_name(rpdu->src, src_p), |
303 |
gdp_printable_name(rpdu->dst, dst_p)); |
304 |
} |
305 |
_gdp_chan_lock(chan); |
306 |
chanx = _gdp_chan_get_cdata(chan); |
307 |
if (!EP_ASSERT(chanx != NULL)) |
308 |
{ |
309 |
req = NULL;
|
310 |
estat = EP_STAT_ASSERT_ABORT; |
311 |
goto fail0;
|
312 |
} |
313 |
|
314 |
LIST_FOREACH(req, &chanx->reqs, chanlist) |
315 |
{ |
316 |
if (req->cpdu == NULL) |
317 |
continue;
|
318 |
if (ep_dbg_test(Dbg, 48)) |
319 |
{ |
320 |
gdp_pname_t src_p, dst_p; |
321 |
ep_dbg_printf(" ?cpdu rid %" PRIgdp_rid "\t%s =>\n\t %s\n", |
322 |
req->cpdu->msg->rid, |
323 |
gdp_printable_name(req->cpdu->src, src_p), |
324 |
gdp_printable_name(req->cpdu->dst, dst_p)); |
325 |
} |
326 |
if ((rpdu->msg->rid == GDP_PDU_ANY_RID ||
|
327 |
req->cpdu->msg->rid == rpdu->msg->rid) && |
328 |
GDP_NAME_SAME(req->cpdu->src, rpdu->dst) && |
329 |
GDP_NAME_SAME(req->cpdu->dst, rpdu->src)) |
330 |
break;
|
331 |
} |
332 |
if (ep_dbg_test(DbgProcResp, 40)) |
333 |
{ |
334 |
if (req == NULL) |
335 |
ep_dbg_printf(" ... not found\n");
|
336 |
else
|
337 |
ep_dbg_printf(" ... found req @ %p\n", req);
|
338 |
} |
339 |
fail0:
|
340 |
_gdp_chan_unlock(chan); |
341 |
|
342 |
// request should be locked for symmetry with _gdp_req_find
|
343 |
if (req != NULL) |
344 |
{ |
345 |
char ebuf[100]; |
346 |
|
347 |
// since GOB has to be locked before req, do it now
|
348 |
if (req->gob != NULL) |
349 |
_gdp_gob_lock(req->gob); |
350 |
estat = _gdp_req_lock(req); |
351 |
ep_dbg_cprintf(DbgProcResp, 44,
|
352 |
"find_req_in_channel_list: _gdp_req_lock => %s\n",
|
353 |
ep_stat_tostr(estat, ebuf, sizeof ebuf));
|
354 |
if (!EP_STAT_ISOK(estat))
|
355 |
{ |
356 |
if (req->gob != NULL) |
357 |
_gdp_gob_unlock(req->gob); |
358 |
req = NULL;
|
359 |
} |
360 |
} |
361 |
if (EP_STAT_ISOK(estat))
|
362 |
*reqp = req; |
363 |
return estat;
|
364 |
} |
365 |
|
366 |
|
367 |
/*
|
368 |
** PROCESS_RESP --- process response (ack/nak) PDU
|
369 |
**
|
370 |
** When this is called, the rpdu passed in will be the actual
|
371 |
** PDU off the wire and req->cpdu should be the original command
|
372 |
** PDU that prompted this response. We save the passed rpdu
|
373 |
** into req->rpdu for processing in _gdp_req_dispatch.
|
374 |
**
|
375 |
** XXX This is not tested for running in a thread.
|
376 |
*/
|
377 |
|
378 |
static void |
379 |
process_resp(void *rpdu_)
|
380 |
{ |
381 |
gdp_pdu_t *rpdu = (gdp_pdu_t *) rpdu_; |
382 |
gdp_chan_t *chan = _GdpChannel; |
383 |
int cmd = rpdu->msg->cmd;
|
384 |
EP_STAT estat; |
385 |
gdp_gob_t *gob = NULL;
|
386 |
gdp_req_t *req = NULL;
|
387 |
int resp;
|
388 |
int ocmd; // original command prompting this response |
389 |
|
390 |
estat = _gdp_gob_cache_get(rpdu->src, |
391 |
GGCF_NOCREATE | GGCF_GET_PENDING, &gob); |
392 |
if (ep_dbg_test(DbgProcResp, 20)) |
393 |
{ |
394 |
char ebuf[120]; |
395 |
gdp_pname_t rpdu_pname; |
396 |
|
397 |
ep_dbg_printf("process_resp: cmd %s rpdu %p ->src %s) gob %p stat %s\n",
|
398 |
_gdp_proto_cmd_name(cmd), |
399 |
rpdu, gdp_printable_name(rpdu->src, rpdu_pname), gob, |
400 |
ep_stat_tostr(estat, ebuf, sizeof ebuf));
|
401 |
} |
402 |
|
403 |
// check estat here, or is just checking gob enough?
|
404 |
if (gob == NULL) |
405 |
{ |
406 |
// gob was not in cache
|
407 |
char ebuf[200]; |
408 |
|
409 |
estat = find_req_in_channel_list(rpdu, chan, &req); |
410 |
ep_dbg_cprintf(DbgProcResp, 20,
|
411 |
" rpdu = %p req = %p stat = %s\n", rpdu, req,
|
412 |
ep_stat_tostr(estat, ebuf, sizeof ebuf));
|
413 |
|
414 |
if (!EP_STAT_ISOK(estat) || req == NULL) |
415 |
{ |
416 |
if (ep_dbg_test(DbgProcResp,
|
417 |
rpdu->msg->cmd == GDP_NAK_R_NOROUTE ? 19 : 1)) |
418 |
{ |
419 |
gdp_pname_t pname; |
420 |
ep_dbg_printf("process_resp: discarding %d (%s) PDU"
|
421 |
" for unknown GOB\n",
|
422 |
rpdu->msg->cmd, _gdp_proto_cmd_name(rpdu->msg->cmd)); |
423 |
if (ep_dbg_test(DbgProcResp, 24)) |
424 |
_gdp_pdu_dump(rpdu, ep_dbg_getfile(), 0);
|
425 |
else
|
426 |
ep_dbg_printf(" %s\n", gdp_printable_name(rpdu->src, pname));
|
427 |
} |
428 |
_gdp_pdu_free(&rpdu); |
429 |
return;
|
430 |
} |
431 |
|
432 |
EP_ASSERT_ELSE(req->state != GDP_REQ_FREE, return);
|
433 |
if (req->gob != NULL) |
434 |
{ |
435 |
GDP_GOB_ASSERT_ISLOCKED(req->gob); |
436 |
} |
437 |
|
438 |
// remove the request from the GOB list it is already on
|
439 |
// req is already locked by find_req_in_channel_list
|
440 |
if (EP_UT_BITSET(GDP_REQ_ON_GOB_LIST, req->flags))
|
441 |
{ |
442 |
LIST_REMOVE(req, goblist); |
443 |
req->flags &= ~GDP_REQ_ON_GOB_LIST; |
444 |
//DEBUG: without this incref, a gdp_gob_create call on a log
|
445 |
// that already exists throws the error:
|
446 |
// Assertion failed at gob-create:gdp_gob_mgmt.c:435: GDP_GOB_ISGOOD(gob)
|
447 |
// because the refcnt has gone to zero prematurely. But with it,
|
448 |
// a successful gdp_gob_create leaves the refcnt one too high
|
449 |
// leading to a resource leak.
|
450 |
_gdp_gob_incref(req->gob); //DEBUG:
|
451 |
|
452 |
// code below expects request to remain locked
|
453 |
} |
454 |
} |
455 |
else
|
456 |
{ |
457 |
// gob was found in cache
|
458 |
GDP_GOB_ASSERT_ISLOCKED(gob); |
459 |
|
460 |
// find the corresponding request
|
461 |
ep_dbg_cprintf(DbgProcResp, 23,
|
462 |
"process_resp: searching gob %p for rid %" PRIgdp_rid "\n", |
463 |
gob, rpdu->msg->rid); |
464 |
|
465 |
// find request to which this PDU applies
|
466 |
req = _gdp_req_find(gob, rpdu->msg->rid); |
467 |
if (ep_dbg_test(DbgProcResp, 51)) |
468 |
{ |
469 |
ep_dbg_printf("... found ");
|
470 |
_gdp_req_dump(req, ep_dbg_getfile(), GDP_PR_BASIC, 0);
|
471 |
} |
472 |
|
473 |
// req is already locked by _gdp_req_find
|
474 |
if (req == NULL) |
475 |
{ |
476 |
// no req for incoming response --- "can't happen"
|
477 |
if (ep_dbg_test(DbgProcResp, 1)) |
478 |
{ |
479 |
ep_dbg_printf("process_resp: no req for incoming response\n");
|
480 |
_gdp_pdu_dump(rpdu, ep_dbg_getfile(), 0);
|
481 |
_gdp_gob_dump(gob, ep_dbg_getfile(), GDP_PR_DETAILED, 0);
|
482 |
} |
483 |
_gdp_gob_decref(&gob, false);
|
484 |
_gdp_pdu_free(&rpdu); |
485 |
return;
|
486 |
} |
487 |
else if (!EP_ASSERT(req->state != GDP_REQ_FREE)) |
488 |
{ |
489 |
if (ep_dbg_test(DbgProcResp, 1)) |
490 |
{ |
491 |
ep_dbg_printf("process_resp: trying to use free ");
|
492 |
_gdp_req_dump(req, ep_dbg_getfile(), GDP_PR_DETAILED, 0);
|
493 |
} |
494 |
_gdp_pdu_free(&rpdu); |
495 |
return;
|
496 |
} |
497 |
else if (rpdu == req->rpdu) |
498 |
{ |
499 |
// this could be an assertion
|
500 |
if (ep_dbg_test(DbgProcResp, 1)) |
501 |
{ |
502 |
ep_dbg_printf("process_resp(%d): rpdu == req->rpdu\n",
|
503 |
rpdu->msg->cmd); |
504 |
_gdp_pdu_dump(rpdu, ep_dbg_getfile(), 0);
|
505 |
} |
506 |
} |
507 |
EP_ASSERT(gob == req->gob); |
508 |
} |
509 |
|
510 |
GDP_GOB_ASSERT_ISLOCKED(req->gob); |
511 |
|
512 |
if (req->cpdu == NULL) |
513 |
{ |
514 |
ep_dbg_cprintf(DbgProcResp, 1,
|
515 |
"process_resp(%d): no corresponding command PDU\n",
|
516 |
rpdu->msg->cmd); |
517 |
ocmd = rpdu->msg->cmd; |
518 |
//XXX return here? with req->pdu == NULL, _gdp_req_dispatch
|
519 |
//XXX will probably die
|
520 |
} |
521 |
else
|
522 |
{ |
523 |
ocmd = req->cpdu->msg->cmd; |
524 |
} |
525 |
|
526 |
// save the response PDU for further processing
|
527 |
if (req->rpdu != NULL) |
528 |
{ |
529 |
// this can happen in multiread/subscription and async I/O
|
530 |
if (ep_dbg_test(DbgProcResp, 41)) |
531 |
{ |
532 |
ep_dbg_printf("process_resp: req->rpdu already set\n ");
|
533 |
_gdp_pdu_dump(req->rpdu, ep_dbg_getfile(), 1);
|
534 |
} |
535 |
_gdp_pdu_free(&req->rpdu); |
536 |
} |
537 |
req->rpdu = rpdu; |
538 |
|
539 |
if (ep_dbg_test(DbgProcResp, 43)) |
540 |
{ |
541 |
ep_dbg_printf("process_resp: ");
|
542 |
_gdp_req_dump(req, ep_dbg_getfile(), GDP_PR_BASIC, 0);
|
543 |
} |
544 |
|
545 |
// request is locked, GOB should be too
|
546 |
if (req->gob != NULL) |
547 |
GDP_GOB_ASSERT_ISLOCKED(req->gob); |
548 |
|
549 |
// mark this request as active (for subscriptions)
|
550 |
ep_time_now(&req->act_ts); |
551 |
|
552 |
// do ack/nak specific processing
|
553 |
estat = _gdp_req_dispatch(req, cmd); |
554 |
|
555 |
// dispatch should leave it locked
|
556 |
if (req->gob != NULL) |
557 |
GDP_GOB_ASSERT_ISLOCKED(req->gob); |
558 |
|
559 |
// figure out potential response code
|
560 |
// we compute even if unused so we can log server errors
|
561 |
resp = _gdp_acknak_from_estat(estat, req->rpdu->msg->cmd); |
562 |
|
563 |
if (ep_dbg_test(DbgProcResp,
|
564 |
(resp >= GDP_NAK_S_MIN && resp <= GDP_NAK_S_MAX) ? 1 : 44)) |
565 |
{ |
566 |
char ebuf[100]; |
567 |
|
568 |
ep_dbg_printf("process_resp(%s for %s): %s\n",
|
569 |
_gdp_proto_cmd_name(cmd), |
570 |
_gdp_proto_cmd_name(ocmd), |
571 |
ep_stat_tostr(estat, ebuf, sizeof ebuf));
|
572 |
if (ep_dbg_test(DbgProcResp, 55)) |
573 |
_gdp_req_dump(req, ep_dbg_getfile(), GDP_PR_BASIC, 0);
|
574 |
} |
575 |
|
576 |
// ASSERT(all data from chan has been consumed);
|
577 |
|
578 |
if (EP_UT_BITSET(GDP_REQ_ASYNCIO, req->flags))
|
579 |
{ |
580 |
// send the status as an event
|
581 |
estat = _gdp_event_add_from_req(req); |
582 |
|
583 |
// since this is asynchronous we can release the PDU
|
584 |
_gdp_pdu_free(&req->rpdu); |
585 |
} |
586 |
else if (req->state == GDP_REQ_WAITING) |
587 |
{ |
588 |
// return our status via the request
|
589 |
req->stat = estat; |
590 |
req->flags |= GDP_REQ_DONE; |
591 |
|
592 |
// any further data or status is delivered via event
|
593 |
req->flags |= GDP_REQ_ASYNCIO | GDP_REQ_PERSIST; //XXX PERSIST?
|
594 |
|
595 |
if (ep_dbg_test(DbgProcResp, 40)) |
596 |
{ |
597 |
ep_dbg_printf("process_resp: signaling ");
|
598 |
_gdp_req_dump(req, ep_dbg_getfile(), GDP_PR_BASIC, 0);
|
599 |
} |
600 |
|
601 |
// wake up invoker, which will return the status
|
602 |
ep_thr_cond_signal(&req->cond); |
603 |
|
604 |
// give _gdp_invoke a chance to run; not necessary, but
|
605 |
// avoids having to wait on condition variables
|
606 |
ep_thr_yield(); |
607 |
} |
608 |
else if (req->rpdu->msg->cmd == GDP_NAK_R_NOROUTE) |
609 |
{ |
610 |
// since this is common and expected, don't sully output
|
611 |
ep_dbg_cprintf(DbgProcResp, 19,
|
612 |
"process_resp: discarding GDP_NAK_R_NOROUTE\n");
|
613 |
} |
614 |
else if (ep_dbg_test(DbgProcResp, 1)) |
615 |
{ |
616 |
ep_dbg_printf("process_resp: discarding response ");
|
617 |
_gdp_req_dump(req, ep_dbg_getfile(), GDP_PR_BASIC, 0);
|
618 |
} |
619 |
|
620 |
// free up resources
|
621 |
gob = req->gob; |
622 |
if (EP_UT_BITSET(GDP_REQ_PERSIST, req->flags))
|
623 |
_gdp_req_unlock(req); |
624 |
else
|
625 |
_gdp_req_free(&req); // also decref's req->gob (leaves locked)
|
626 |
if (gob != NULL) |
627 |
{ |
628 |
if (!GDP_GOB_ASSERT_ISLOCKED(gob) || !EP_ASSERT(gob->refcnt > 0)) |
629 |
_gdp_gob_dump(gob, ep_dbg_getfile(), GDP_PR_BASIC, 0);
|
630 |
_gdp_gob_decref(&gob, false); // ref from _gdp_gob_cache_get |
631 |
} |
632 |
|
633 |
ep_dbg_cprintf(DbgProcResp, 40, "process_resp <<< done\n"); |
634 |
} |
635 |
|
636 |
|
637 |
/*
|
638 |
** _GDP_PDU_PROCESS --- process a PDU
|
639 |
**
|
640 |
** This is responsible for the lightweight stuff that can happen
|
641 |
** in the I/O thread, such as matching an ack/nak PDU with the
|
642 |
** corresponding req. It should never block. The heavy lifting
|
643 |
** is done in the routine above.
|
644 |
*/
|
645 |
|
646 |
void
|
647 |
_gdp_pdu_process(gdp_pdu_t *pdu, gdp_chan_t *chan) |
648 |
{ |
649 |
// use "cheat" field in pdu to pass chan up
|
650 |
pdu->chan = chan; |
651 |
|
652 |
if (GDP_CMD_IS_COMMAND(pdu->msg->cmd))
|
653 |
{ |
654 |
if (_GdpRunCmdInThread)
|
655 |
ep_thr_pool_run(&process_cmd, pdu); |
656 |
else
|
657 |
process_cmd(pdu); |
658 |
} |
659 |
else
|
660 |
{ |
661 |
if (_GdpRunRespInThread)
|
662 |
ep_thr_pool_run(&process_resp, pdu); |
663 |
else
|
664 |
process_resp(pdu); |
665 |
} |
666 |
} |
667 |
|
668 |
|
669 |
/*
|
670 |
** _GDP_RECLAIM_RESOURCES --- find unused GDP resources and reclaim them
|
671 |
**
|
672 |
** This should really also have a maximum number of GOBs to leave
|
673 |
** open so we don't run out of file descriptors under high load.
|
674 |
**
|
675 |
** This implementation locks the GclsByUse list during the
|
676 |
** entire operation. That's probably not the best idea.
|
677 |
*/
|
678 |
|
679 |
void
|
680 |
_gdp_reclaim_resources(void *null)
|
681 |
{ |
682 |
char pbuf[200]; |
683 |
time_t reclaim_age; // how long to leave GOBs open before reclaiming
|
684 |
|
685 |
ep_dbg_cprintf(Dbg, 69, "_gdp_reclaim_resources\n"); |
686 |
snprintf(pbuf, sizeof pbuf, "swarm.%s.reclaim.age", ep_app_getprogname()); |
687 |
reclaim_age = ep_adm_getlongparam(pbuf, -1);
|
688 |
if (reclaim_age == -1) |
689 |
reclaim_age = ep_adm_getlongparam("swarm.gdp.reclaim.age",
|
690 |
GDP_RECLAIM_AGE_DEF); |
691 |
_gdp_gob_cache_reclaim(reclaim_age); |
692 |
} |
693 |
|
694 |
// stub for libevent
|
695 |
|
696 |
static void |
697 |
gdp_reclaim_resources_callback(int fd, short what, void *ctx) |
698 |
{ |
699 |
ep_dbg_cprintf(Dbg, 69, "gdp_reclaim_resources_callback\n"); |
700 |
if (ep_adm_getboolparam("swarm.gdp.reclaim.inthread", false)) |
701 |
ep_thr_pool_run(_gdp_reclaim_resources, NULL);
|
702 |
else
|
703 |
_gdp_reclaim_resources(NULL);
|
704 |
} |
705 |
|
706 |
|
707 |
void
|
708 |
_gdp_reclaim_resources_init(void (*f)(int, short, void *)) |
709 |
{ |
710 |
static bool running = false; |
711 |
|
712 |
if (running)
|
713 |
return;
|
714 |
running = true;
|
715 |
if (f == NULL) |
716 |
f = &gdp_reclaim_resources_callback; |
717 |
|
718 |
long gc_intvl;
|
719 |
char pbuf[200]; |
720 |
|
721 |
snprintf(pbuf, sizeof pbuf, "swarm.%s.reclaim.interval", |
722 |
ep_app_getprogname()); |
723 |
gc_intvl = ep_adm_getlongparam(pbuf, -1);
|
724 |
if (gc_intvl == -1) |
725 |
gc_intvl = ep_adm_getlongparam("swarm.gdp.reclaim.interval", 15L); |
726 |
|
727 |
struct timeval tv = { gc_intvl, 0 }; |
728 |
struct event *evtimer = event_new(_GdpIoEventBase, -1, EV_PERSIST, |
729 |
f, NULL);
|
730 |
event_add(evtimer, &tv); |
731 |
} |
732 |
|
733 |
|
734 |
/*
|
735 |
** Set libevent timer.
|
736 |
**
|
737 |
** Timeout is in units of microseconds.
|
738 |
** Not general purpose (assumes *pev never changes type).
|
739 |
*/
|
740 |
|
741 |
void
|
742 |
_gdp_evloop_timer_set(uint32_t timeout, |
743 |
libevent_event_t *cbfunc, |
744 |
void *cbarg,
|
745 |
struct event **pev)
|
746 |
{ |
747 |
struct timeval tv;
|
748 |
struct event *ev = *pev;
|
749 |
|
750 |
if (ev != NULL) |
751 |
event_free(ev); |
752 |
*pev = ev = event_new(_GdpIoEventBase, -1, 0, cbfunc, cbarg); |
753 |
ep_dbg_cprintf(DbgTimers, 52,
|
754 |
"_gdp_evloop_timer_set(%" PRIu32 ") => %p\n", timeout, ev); |
755 |
tv.tv_sec = timeout / 1000000;
|
756 |
tv.tv_usec = timeout % 1000000;
|
757 |
event_add(ev, &tv); |
758 |
} |
759 |
|
760 |
|
761 |
/*
|
762 |
** Clear libevent timer.
|
763 |
*/
|
764 |
|
765 |
void
|
766 |
_gdp_evloop_timer_clr(struct event **pev)
|
767 |
{ |
768 |
ep_dbg_cprintf(DbgTimers, 52, "_gdp_evloop_timer_clr(%p)\n", *pev); |
769 |
if (*pev != NULL) |
770 |
event_free(*pev); |
771 |
*pev = NULL;
|
772 |
} |
773 |
|
774 |
|
775 |
/*
|
776 |
** Base loop to be called for event-driven systems.
|
777 |
** Their events should have already been added to the event base.
|
778 |
**
|
779 |
** GdpIoEventLoopThread is also used by gdplogd, hence non-static.
|
780 |
*/
|
781 |
|
782 |
EP_THR _GdpIoEventLoopThread; |
783 |
|
784 |
// to ensure event loop is running before we proceed
|
785 |
bool GdpIoEventLoopRunning = false; |
786 |
static EP_THR_MUTEX GdpIoEventLoopRunningMutex EP_THR_MUTEX_INITIALIZER;
|
787 |
static EP_THR_COND GdpIoEventLoopRunningCond EP_THR_COND_INITIALIZER;
|
788 |
|
789 |
static void |
790 |
event_loop_timeout(int fd, short what, void *unused) |
791 |
{ |
792 |
ep_dbg_cprintf(Dbg, 79, "event loop timeout\n"); |
793 |
} |
794 |
|
795 |
void *
|
796 |
_gdp_run_event_loop(void *eli_)
|
797 |
{ |
798 |
long evdelay = ep_adm_getlongparam("swarm.gdp.event.loopdelay", 100000L); |
799 |
// loopdelay in microseconds
|
800 |
|
801 |
// keep the loop alive if EVLOOP_NO_EXIT_ON_EMPTY isn't available
|
802 |
long ev_timeout = ep_adm_getlongparam("swarm.gdp.event.looptimeout", 30L); |
803 |
// looptimeout in seconds
|
804 |
struct timeval tv = { ev_timeout, 0 }; |
805 |
struct event *evtimer = event_new(_GdpIoEventBase, -1, EV_PERSIST, |
806 |
&event_loop_timeout, NULL);
|
807 |
event_add(evtimer, &tv); |
808 |
|
809 |
for (;;)
|
810 |
{ |
811 |
if (ep_dbg_test(Dbg, 20)) |
812 |
{ |
813 |
ep_dbg_printf("gdp_event_loop: starting up base loop\n");
|
814 |
event_base_dump_events(_GdpIoEventBase, ep_dbg_getfile()); |
815 |
} |
816 |
|
817 |
ep_thr_mutex_lock(&GdpIoEventLoopRunningMutex); |
818 |
GdpIoEventLoopRunning = true;
|
819 |
ep_thr_cond_broadcast(&GdpIoEventLoopRunningCond); |
820 |
ep_thr_mutex_unlock(&GdpIoEventLoopRunningMutex); |
821 |
|
822 |
#ifdef EVLOOP_NO_EXIT_ON_EMPTY
|
823 |
event_base_loop(_GdpIoEventBase, EVLOOP_NO_EXIT_ON_EMPTY); |
824 |
#else
|
825 |
event_base_loop(_GdpIoEventBase, 0);
|
826 |
#endif
|
827 |
|
828 |
GdpIoEventLoopRunning = false;
|
829 |
|
830 |
if (ep_dbg_test(Dbg, 1)) |
831 |
{ |
832 |
ep_dbg_printf("gdp_event_loop: event_base_loop returned\n");
|
833 |
if (event_base_got_break(_GdpIoEventBase))
|
834 |
ep_dbg_printf(" ... as a result of loopbreak\n");
|
835 |
if (event_base_got_exit(_GdpIoEventBase))
|
836 |
ep_dbg_printf(" ... as a result of loopexit\n");
|
837 |
} |
838 |
if (event_base_got_exit(_GdpIoEventBase) ||
|
839 |
event_base_got_break(_GdpIoEventBase)) |
840 |
{ |
841 |
// the GDP daemon went away intentionally
|
842 |
return NULL; |
843 |
} |
844 |
|
845 |
if (evdelay > 0) |
846 |
ep_time_nanosleep(evdelay * 1000LL); // avoid CPU hogging |
847 |
} |
848 |
|
849 |
ep_log(GDP_STAT_DEAD_DAEMON, "lost channel to gdp");
|
850 |
ep_app_abort("lost channel to gdp");
|
851 |
} |
852 |
|
853 |
static EP_STAT
|
854 |
_gdp_start_event_loop_thread(EP_THR *thr) |
855 |
{ |
856 |
if (ep_thr_spawn(thr, _gdp_run_event_loop, NULL) != 0) |
857 |
return init_error("cannot create event loop thread", |
858 |
"_gdp_start_event_loop_thread");
|
859 |
else
|
860 |
return EP_STAT_OK;
|
861 |
} |
862 |
|
863 |
void
|
864 |
_gdp_stop_event_loop(void)
|
865 |
{ |
866 |
event_base_loopbreak(_GdpIoEventBase); |
867 |
ep_thr_mutex_lock(&GdpIoEventLoopRunningMutex); |
868 |
GdpIoEventLoopRunning = false;
|
869 |
ep_thr_cond_broadcast(&GdpIoEventLoopRunningCond); |
870 |
ep_thr_mutex_unlock(&GdpIoEventLoopRunningMutex); |
871 |
} |
872 |
|
873 |
|
874 |
/*
|
875 |
** Logging callback for event library (for debugging).
|
876 |
*/
|
877 |
|
878 |
static EP_DBG EvlibDbg = EP_DBG_INIT("gdp.libevent", "GDP Libevent"); |
879 |
|
880 |
static void |
881 |
evlib_log_cb(int severity, const char *msg) |
882 |
{ |
883 |
const char *sev; |
884 |
const char *sevstrings[] = { "debug", "msg", "warn", "error" }; |
885 |
|
886 |
if (severity < 0 || severity > 3) |
887 |
sev = "?";
|
888 |
else
|
889 |
sev = sevstrings[severity]; |
890 |
ep_dbg_cprintf(EvlibDbg, ((4 - severity) * 20) + 2, "[%s] %s\n", sev, msg); |
891 |
} |
892 |
|
893 |
|
894 |
/*
|
895 |
** Arrange to call atexit(3) functions on SIGINT and SIGTERM
|
896 |
*/
|
897 |
|
898 |
static void |
899 |
exit_on_signal(int sig)
|
900 |
{ |
901 |
ep_app_warn("Exiting on signal %d", sig);
|
902 |
_gdp_stop_event_loop(); |
903 |
exit(sig); |
904 |
} |
905 |
|
906 |
|
907 |
/*
|
908 |
** Change user id to something innocuous.
|
909 |
*/
|
910 |
|
911 |
void
|
912 |
_gdp_run_as(const char *runasuser) |
913 |
{ |
914 |
if (runasuser != NULL && *runasuser != '\0') |
915 |
{ |
916 |
uid_t uid; |
917 |
gid_t gid; |
918 |
struct passwd *pw = getpwnam(runasuser);
|
919 |
if (pw == NULL) |
920 |
{ |
921 |
ep_app_warn("User %s unknown; running as 1:1 (daemon)",
|
922 |
runasuser); |
923 |
gid = 1;
|
924 |
uid = 1;
|
925 |
} |
926 |
else
|
927 |
{ |
928 |
gid = pw->pw_gid; |
929 |
uid = pw->pw_uid; |
930 |
} |
931 |
if (setgid(gid) < 0 || setuid(uid) < 0) |
932 |
ep_app_warn("Cannot set user/group id (%d:%d)", uid, gid);
|
933 |
} |
934 |
} |
935 |
|
936 |
|
937 |
/*
|
938 |
** Print all outstanding requests on a channel
|
939 |
*/
|
940 |
|
941 |
void
|
942 |
_gdp_chan_dumpreqs(gdp_chan_t *chan, FILE *fp) |
943 |
{ |
944 |
gdp_req_t *req; |
945 |
gdp_chan_x_t *chanx = _gdp_chan_get_cdata(chan); |
946 |
|
947 |
if (chanx == NULL) |
948 |
{ |
949 |
fprintf(fp, "\n<<< No Requests >>>\n");
|
950 |
return;
|
951 |
} |
952 |
fprintf(fp, "\n<<< Active requests >>>\n");
|
953 |
LIST_FOREACH(req, &chanx->reqs, chanlist) |
954 |
{ |
955 |
_gdp_req_dump(req, fp, GDP_PR_PRETTY, 0);
|
956 |
} |
957 |
} |
958 |
|
959 |
|
960 |
/*
|
961 |
** SIGINFO --- called to print out internal state (for debugging)
|
962 |
**
|
963 |
** On BSD and MacOS this is implemented as a SIGINFO (^T from
|
964 |
** the command line), but since Linux doesn't have that we use
|
965 |
** SIGUSR1 instead.
|
966 |
*/
|
967 |
|
968 |
extern const char GdpVersion[]; |
969 |
EP_FUNCLIST *_GdpDumpFuncs; |
970 |
|
971 |
void
|
972 |
_gdp_dump_state(int plev)
|
973 |
{ |
974 |
FILE *fp = stderr; // should this be the debug file?
|
975 |
flockfile(fp); |
976 |
fprintf(fp, "\n<<< GDP STATE >>>\nVersion: %s\n", GdpVersion);
|
977 |
|
978 |
_gdp_gob_cache_dump(plev, fp); // GOB cache contents
|
979 |
_gdp_chan_dumpreqs(_GdpChannel, fp); // outstanding requests
|
980 |
|
981 |
if (_GdpDumpFuncs != NULL) |
982 |
ep_funclist_invoke(_GdpDumpFuncs, (void *) fp);
|
983 |
|
984 |
fprintf(fp, "\n<<< Open file descriptors >>>\n");
|
985 |
ep_app_dumpfds(fp); |
986 |
fprintf(fp, "\n<<< Stack backtrace >>>\n");
|
987 |
ep_dbg_backtrace(fp); |
988 |
fprintf(fp, "\n<<< Statistics >>>\n");
|
989 |
_gdp_req_pr_stats(fp); |
990 |
_gdp_gob_pr_stats(fp); |
991 |
funlockfile(fp); |
992 |
} |
993 |
|
994 |
|
995 |
static void |
996 |
siginfo(int sig, short what, void *arg) |
997 |
{ |
998 |
if (ep_dbg_test(Dbg, 1)) |
999 |
_gdp_dump_state(GDP_PR_DETAILED); |
1000 |
else
|
1001 |
_gdp_dump_state(GDP_PR_PRETTY); |
1002 |
} |
1003 |
|
1004 |
|
1005 |
|
1006 |
/*
|
1007 |
** Initialization, Part 0:
|
1008 |
** Initialize external libraries.
|
1009 |
**
|
1010 |
** Used by a few of the utility routines, but unusual in that
|
1011 |
** it doesn't actually start up GDP communications.
|
1012 |
*/
|
1013 |
|
1014 |
EP_STAT |
1015 |
gdp_init_phase_0(const char *progname, uint32_t flags) |
1016 |
{ |
1017 |
ep_dbg_cprintf(Dbg, 4, "gdp_init_phase_0: %s\n", GdpVersion); |
1018 |
|
1019 |
if (_GdpInitState >= GDP_INIT_PHASE_0)
|
1020 |
return EP_STAT_OK;
|
1021 |
|
1022 |
// initialize the EP library
|
1023 |
ep_lib_init(EP_LIB_USEPTHREADS); |
1024 |
_GdpDumpFuncs = ep_funclist_new("GDP debug dump functions");
|
1025 |
|
1026 |
// initialize runtime parameters
|
1027 |
_gdp_adm_readparams("gdp");
|
1028 |
if (progname == NULL) |
1029 |
progname = ep_app_getprogname(); |
1030 |
if (progname != NULL) |
1031 |
_gdp_adm_readparams(progname); |
1032 |
ep_crypto_init(0);
|
1033 |
|
1034 |
// clear out spurious errors
|
1035 |
errno = 0;
|
1036 |
|
1037 |
// we can now re-adjust debugging
|
1038 |
ep_dbg_setfile(NULL);
|
1039 |
|
1040 |
// register status strings
|
1041 |
_gdp_stat_init(); |
1042 |
|
1043 |
// if not using Zeroconf, disable it
|
1044 |
if (EP_UT_BITSET(GDP_INIT_NO_ZEROCONF, flags))
|
1045 |
ep_adm_setparam("swarm.gdp.zeroconf.enable", "false"); |
1046 |
|
1047 |
_GdpInitState = GDP_INIT_PHASE_0; |
1048 |
|
1049 |
return EP_STAT_OK;
|
1050 |
} |
1051 |
|
1052 |
|
1053 |
/*
|
1054 |
** Initialization, Part 1:
|
1055 |
** Initialize the various external libraries.
|
1056 |
** Set up the I/O event loop base.
|
1057 |
** Initialize the GOB cache.
|
1058 |
** Start the event loop.
|
1059 |
*/
|
1060 |
|
1061 |
// locks out multiple calls to gdp_lib_init
|
1062 |
static EP_THR_MUTEX GdpInitMutex EP_THR_MUTEX_INITIALIZER;
|
1063 |
int _GdpInitState;
|
1064 |
|
1065 |
EP_STAT |
1066 |
gdp_lib_init(const char *progname, const char *myname, uint32_t flags) |
1067 |
{ |
1068 |
EP_STAT estat = EP_STAT_OK; |
1069 |
const char *phase = NULL; |
1070 |
|
1071 |
ep_dbg_cprintf(Dbg, 4, "_gdp_lib_init(%s)\n", |
1072 |
myname == NULL ? "NULL" : myname); |
1073 |
|
1074 |
// need to initialize libep before using mutexes
|
1075 |
phase = "ep_lib_init";
|
1076 |
estat = ep_lib_init(EP_LIB_USEPTHREADS); |
1077 |
EP_STAT_CHECK(estat, goto fail0);
|
1078 |
|
1079 |
ep_thr_mutex_lock(&GdpInitMutex); |
1080 |
if (_GdpInitState >= GDP_INIT_LIB)
|
1081 |
goto done;
|
1082 |
|
1083 |
gdp_init_phase_0(progname, flags); |
1084 |
|
1085 |
// initialize external -> internal name mapping
|
1086 |
if (!EP_UT_BITSET(GDP_INIT_NO_HONGDS, flags))
|
1087 |
{ |
1088 |
if (ep_adm_getboolparam("swarm.gdp.hongd.optional", false)) |
1089 |
flags |= GDP_INIT_OPT_HONGDS; |
1090 |
phase = "_gdp_name_init";
|
1091 |
estat = _gdp_name_init(NULL);
|
1092 |
if (EP_UT_BITSET(GDP_INIT_OPT_HONGDS, flags))
|
1093 |
estat = EP_STAT_OK; |
1094 |
EP_STAT_CHECK(estat, goto fail0);
|
1095 |
} |
1096 |
|
1097 |
if (ep_dbg_test(EvlibDbg, 80)) |
1098 |
{ |
1099 |
// according to the book...
|
1100 |
//event_enable_debug_logging(EVENT_DBG_ALL);
|
1101 |
// according to the code...
|
1102 |
event_enable_debug_mode(); |
1103 |
} |
1104 |
|
1105 |
// arrange to call atexit(3) functions on SIGTERM
|
1106 |
if (ep_adm_getboolparam("swarm.gdp.catch.sigint", true)) |
1107 |
(void) signal(SIGINT, exit_on_signal);
|
1108 |
if (ep_adm_getboolparam("swarm.gdp.catch.sigterm", true)) |
1109 |
(void) signal(SIGTERM, exit_on_signal);
|
1110 |
|
1111 |
// get assertion behavior information
|
1112 |
// [DEPRECATED: use libep.assert.allabort]
|
1113 |
EpAssertAllAbort = ep_adm_getboolparam("swarm.gdp.debug.assert.allabort",
|
1114 |
EpAssertAllAbort); |
1115 |
|
1116 |
// check to see if commands/responses should be run in threads
|
1117 |
_GdpRunCmdInThread = ep_adm_getboolparam("swarm.gdp.command.runinthread",
|
1118 |
true);
|
1119 |
_GdpRunRespInThread = ep_adm_getboolparam("swarm.gdp.response.runinthread",
|
1120 |
false);
|
1121 |
|
1122 |
// figure out or generate our name (for routing)
|
1123 |
if (myname == NULL && progname != NULL) |
1124 |
{ |
1125 |
char argname[100]; |
1126 |
|
1127 |
snprintf(argname, sizeof argname, "swarm.%s.gdpname", progname); |
1128 |
myname = ep_adm_getstrparam(argname, NULL);
|
1129 |
} |
1130 |
|
1131 |
if (myname != NULL) |
1132 |
{ |
1133 |
gdp_pname_t pname; |
1134 |
|
1135 |
estat = gdp_parse_name(myname, _GdpMyRoutingName); |
1136 |
ep_dbg_cprintf(Dbg, 9, "Setting my name:\n\t%s\n\t%s\n", |
1137 |
myname, gdp_printable_name(_GdpMyRoutingName, pname)); |
1138 |
if (EP_STAT_ISFAIL(estat))
|
1139 |
myname = NULL;
|
1140 |
} |
1141 |
|
1142 |
if (!gdp_name_is_valid(_GdpMyRoutingName))
|
1143 |
_gdp_newname(_GdpMyRoutingName, NULL);
|
1144 |
|
1145 |
// avoid running as root if possible (and another user specified)
|
1146 |
if (progname != NULL) |
1147 |
{ |
1148 |
char argname[100]; |
1149 |
const char *logfac; |
1150 |
|
1151 |
if (getuid() == 0) |
1152 |
{ |
1153 |
snprintf(argname, sizeof argname, "swarm.%s.runasuser", progname); |
1154 |
_gdp_run_as(ep_adm_getstrparam(argname, NULL));
|
1155 |
} |
1156 |
|
1157 |
// allow log facilities on a per-app basis
|
1158 |
snprintf(argname, sizeof argname, "swarm.%s.syslog.facility", progname); |
1159 |
logfac = ep_adm_getstrparam(argname, NULL);
|
1160 |
if (logfac == NULL) |
1161 |
logfac = ep_adm_getstrparam("swarm.gdp.syslog.facility", "local4"); |
1162 |
ep_log_init(progname, ep_syslog_fac_from_name(logfac), stderr); |
1163 |
} |
1164 |
|
1165 |
if (getuid() == 0) |
1166 |
_gdp_run_as(ep_adm_getstrparam("swarm.gdp.runasuser", NULL)); |
1167 |
|
1168 |
if (ep_dbg_test(Dbg, 1)) |
1169 |
{ |
1170 |
gdp_pname_t pname; |
1171 |
|
1172 |
ep_dbg_printf("My GDP routing name = %s\n",
|
1173 |
gdp_printable_name(_GdpMyRoutingName, pname)); |
1174 |
} |
1175 |
|
1176 |
// initialize the GOB cache. In theory this "cannot fail"
|
1177 |
phase = "_gdp_gob_cache_init";
|
1178 |
estat = _gdp_gob_cache_init(); |
1179 |
EP_STAT_CHECK(estat, goto fail0);
|
1180 |
|
1181 |
// tell the event library that we're using pthreads
|
1182 |
if (evthread_use_pthreads() < 0) |
1183 |
return init_error("cannot use pthreads", "gdp_lib_init"); |
1184 |
if (ep_dbg_test(DbgEvLock, 90)) |
1185 |
{ |
1186 |
evthread_enable_lock_debuging(); |
1187 |
} |
1188 |
|
1189 |
// use our debugging printer
|
1190 |
event_set_log_callback(evlib_log_cb); |
1191 |
|
1192 |
// set up the event base
|
1193 |
if (_GdpIoEventBase == NULL) |
1194 |
{ |
1195 |
// Initialize for I/O events
|
1196 |
struct event_config *ev_cfg = event_config_new();
|
1197 |
|
1198 |
phase = "event_base_new_with_config";
|
1199 |
event_config_require_features(ev_cfg, 0);
|
1200 |
_GdpIoEventBase = event_base_new_with_config(ev_cfg); |
1201 |
if (_GdpIoEventBase == NULL) |
1202 |
estat = init_error("could not create event base", "gdp_lib_init"); |
1203 |
event_config_free(ev_cfg); |
1204 |
EP_STAT_CHECK(estat, goto fail0);
|
1205 |
|
1206 |
// add a debugging signal to print out some internal data structures
|
1207 |
#ifdef SIGINFO
|
1208 |
event_add(evsignal_new(_GdpIoEventBase, SIGINFO, siginfo, NULL), NULL); |
1209 |
#endif
|
1210 |
event_add(evsignal_new(_GdpIoEventBase, SIGUSR1, siginfo, NULL), NULL); |
1211 |
} |
1212 |
|
1213 |
phase = "_gdp_chan_init";
|
1214 |
estat = _gdp_chan_init(_GdpIoEventBase, NULL);
|
1215 |
EP_STAT_CHECK(estat, goto fail0);
|
1216 |
|
1217 |
phase = NULL;
|
1218 |
|
1219 |
done:
|
1220 |
fail0:
|
1221 |
if (ep_dbg_test(Dbg, EP_STAT_ISOK(estat) ? 8 : 1)) |
1222 |
{ |
1223 |
char ebuf[200]; |
1224 |
ep_stat_tostr(estat, ebuf, sizeof ebuf);
|
1225 |
|
1226 |
if (phase == NULL) |
1227 |
ep_dbg_printf("gdp_lib_init: %s\n", ebuf);
|
1228 |
else
|
1229 |
ep_dbg_printf("gdp_lib_init: %s: %s\n", phase, ebuf);
|
1230 |
} |
1231 |
|
1232 |
_GdpInitState = GDP_INIT_LIB; |
1233 |
ep_thr_mutex_unlock(&GdpInitMutex); |
1234 |
return estat;
|
1235 |
} |
1236 |
|
1237 |
|
1238 |
EP_STAT |
1239 |
_gdp_evloop_init(void)
|
1240 |
{ |
1241 |
EP_STAT estat; |
1242 |
|
1243 |
// set up synchronization for event loop thread startup
|
1244 |
ep_thr_mutex_lock(&GdpIoEventLoopRunningMutex); |
1245 |
|
1246 |
if (!GdpIoEventLoopRunning)
|
1247 |
{ |
1248 |
// create a thread to run the event loop
|
1249 |
estat = _gdp_start_event_loop_thread(&_GdpIoEventLoopThread); |
1250 |
} |
1251 |
|
1252 |
while (!GdpIoEventLoopRunning)
|
1253 |
ep_thr_cond_wait(&GdpIoEventLoopRunningCond, |
1254 |
&GdpIoEventLoopRunningMutex, NULL);
|
1255 |
ep_thr_mutex_unlock(&GdpIoEventLoopRunningMutex); |
1256 |
|
1257 |
return estat;
|
1258 |
} |
1259 |
|
1260 |
|
1261 |
/*
|
1262 |
*/
|
1263 |
|
1264 |
|
1265 |
/*
|
1266 |
** Data Ready (Receive) callback
|
1267 |
**
|
1268 |
** Called whenever there is input from the channel.
|
1269 |
** It is up to this routine to actually read the data from the
|
1270 |
** chan level buffer into active memory.
|
1271 |
*/
|
1272 |
|
1273 |
EP_STAT |
1274 |
_gdp_io_recv( |
1275 |
gdp_chan_t *chan, |
1276 |
gdp_name_t src, |
1277 |
gdp_name_t dst, |
1278 |
gdp_seqno_t seqno, |
1279 |
gdp_buf_t *payload_buf, |
1280 |
size_t payload_len) |
1281 |
{ |
1282 |
EP_STAT estat; |
1283 |
|
1284 |
gdp_pdu_t *pdu = _gdp_pdu_new(NULL, src, dst, seqno);
|
1285 |
estat = _gdp_pdu_in(pdu, payload_buf, payload_len, chan); |
1286 |
EP_STAT_CHECK(estat, goto fail0);
|
1287 |
|
1288 |
_gdp_pdu_process(pdu, chan); |
1289 |
// _gdp_pdu_process frees pdu, possibly in a thread
|
1290 |
|
1291 |
fail0:
|
1292 |
{ |
1293 |
char ebuf[100]; |
1294 |
ep_dbg_cprintf(Dbg, EP_STAT_ISOK(estat) ? 21 : 3, |
1295 |
"_gdp_io_recv: %s\n",
|
1296 |
ep_stat_tostr(estat, ebuf, sizeof ebuf));
|
1297 |
} |
1298 |
return estat;
|
1299 |
} |
1300 |
|
1301 |
|
1302 |
/*
|
1303 |
** Router Event callback
|
1304 |
**
|
1305 |
** Called when the router has something to signal to the higher level.
|
1306 |
*/
|
1307 |
|
1308 |
EP_STAT |
1309 |
_gdp_router_event( |
1310 |
gdp_chan_t *chan, |
1311 |
gdp_name_t src, |
1312 |
gdp_name_t dst, |
1313 |
size_t payload_len, |
1314 |
EP_STAT estat) |
1315 |
{ |
1316 |
// fake up a PDU for the router event
|
1317 |
gdp_cmd_t cmd = (gdp_cmd_t) 0;
|
1318 |
|
1319 |
if (EP_STAT_IS_SAME(estat, GDP_STAT_NAK_NOROUTE))
|
1320 |
cmd = GDP_NAK_R_NOROUTE; |
1321 |
else
|
1322 |
goto fail0;
|
1323 |
|
1324 |
{ |
1325 |
//XXX wildcard seqno?
|
1326 |
GdpMessage *msg = _gdp_msg_new(cmd, GDP_PDU_ANY_RID, GDP_PDU_NO_L5SEQNO); |
1327 |
gdp_pdu_t *pdu = _gdp_pdu_new(msg, src, dst, GDP_SEQNO_NONE); |
1328 |
|
1329 |
if (msg->cmd != 0) |
1330 |
_gdp_pdu_process(pdu, chan); |
1331 |
else
|
1332 |
_gdp_pdu_free(&pdu); |
1333 |
} |
1334 |
|
1335 |
fail0:
|
1336 |
if (ep_dbg_test(Dbg, 23)) |
1337 |
{ |
1338 |
char ebuf[100]; |
1339 |
ep_dbg_printf("_gdp_router_event: %s\n",
|
1340 |
ep_stat_tostr(estat, ebuf, sizeof ebuf));
|
1341 |
} |
1342 |
return estat;
|
1343 |
} |
1344 |
|
1345 |
|
1346 |
|
1347 |
/*
|
1348 |
** Data I/O Event callback
|
1349 |
**
|
1350 |
** Typically connects, disconnects, and errors
|
1351 |
**
|
1352 |
** Following is a list of actions that should be undertaken in
|
1353 |
** response to various events:
|
1354 |
**
|
1355 |
** Event Client Action Gdplogd Action
|
1356 |
**
|
1357 |
** connection established advertise one advertise all
|
1358 |
** re-subscribe all
|
1359 |
**
|
1360 |
** connection lost [1] retry open retry open
|
1361 |
**
|
1362 |
** data available process command/ack process command/ack
|
1363 |
**
|
1364 |
** write complete anything needed? anything needed?
|
1365 |
**
|
1366 |
** advertise timeout re-advertise me re-advertise all
|
1367 |
**
|
1368 |
** connection close withdraw me withdraw all
|
1369 |
**
|
1370 |
** [1] Should be handled automatically by the channel layer, but should
|
1371 |
** generate a "connection established" event.
|
1372 |
*/
|
1373 |
|
1374 |
EP_STAT |
1375 |
_gdp_io_event( |
1376 |
gdp_chan_t *chan, |
1377 |
uint32_t what) |
1378 |
{ |
1379 |
EP_STAT estat = EP_STAT_OK; |
1380 |
|
1381 |
if (EP_UT_BITSET(BEV_EVENT_CONNECTED, what))
|
1382 |
{ |
1383 |
// connection up; do advertising and resend subscriptions (if any)
|
1384 |
gdp_chan_x_t *cx = _gdp_chan_get_cdata(chan); |
1385 |
if (cx->connect_cb != NULL) |
1386 |
estat = (*cx->connect_cb)(chan); |
1387 |
} |
1388 |
return estat;
|
1389 |
} |