Project

General

Profile

Statistics
| Branch: | Tag: | Revision:

gdp / gdplogd / logd_pubsub.c @ master

History | View | Annotate | Download (11.5 KB)

1
/* vim: set ai sw=4 sts=4 ts=4 : */
2

    
3
/*
4
**  Handle publish/subscribe requests
5
**
6
**        ----- BEGIN LICENSE BLOCK -----
7
**        GDPLOGD: Log Daemon for the Global Data Plane
8
**        From the Ubiquitous Swarm Lab, 490 Cory Hall, U.C. Berkeley.
9
**
10
**        Copyright (c) 2015-2019, Regents of the University of California.
11
**        All rights reserved.
12
**
13
**        Permission is hereby granted, without written agreement and without
14
**        license or royalty fees, to use, copy, modify, and distribute this
15
**        software and its documentation for any purpose, provided that the above
16
**        copyright notice and the following two paragraphs appear in all copies
17
**        of this software.
18
**
19
**        IN NO EVENT SHALL REGENTS BE LIABLE TO ANY PARTY FOR DIRECT, INDIRECT,
20
**        SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING LOST
21
**        PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION,
22
**        EVEN IF REGENTS HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
23
**
24
**        REGENTS SPECIFICALLY DISCLAIMS ANY WARRANTIES, INCLUDING, BUT NOT
25
**        LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
26
**        FOR A PARTICULAR PURPOSE. THE SOFTWARE AND ACCOMPANYING DOCUMENTATION,
27
**        IF ANY, PROVIDED HEREUNDER IS PROVIDED "AS IS". REGENTS HAS NO
28
**        OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS,
29
**        OR MODIFICATIONS.
30
**        ----- END LICENSE BLOCK -----
31
*/
32

    
33
#include "logd.h"
34
#include "logd_pubsub.h"
35

    
36
#include <gdp/gdp_priv.h>
37
#include <gdp/gdp_chan.h>
38
#include <ep/ep.h>
39
#include <ep/ep_dbg.h>
40
#include <ep/ep_hash.h>
41

    
42
#include <sys/queue.h>
43

    
44
static EP_DBG        Dbg = EP_DBG_INIT("gdplogd.pubsub",
45
                                                                "GDP Log Daemon pub/sub handling");
46

    
47
extern EP_HASH        *_OpenGOBCache;                // associative cache
48

    
49

    
50
/*
51
**  SUB_SEND_MESSAGE_NOTIFICATION --- inform a subscriber of a new message
52
**
53
**                Assumes req is locked.
54
*/
55

    
56
EP_STAT
57
sub_send_message_notification(gdp_req_t *req)
58
{
59
        EP_STAT estat;
60

    
61
        if (ep_dbg_test(Dbg, 33))
62
        {
63
                ep_dbg_printf("sub_send_message_notification: ");
64
                _gdp_req_dump(req, NULL, GDP_PR_BASIC, 0);
65
        }
66

    
67
        // sanity checks
68
        EP_ASSERT(req->rpdu != NULL);
69
        EP_ASSERT(req->cpdu != NULL);
70
        EP_ASSERT(req->rpdu->msg != NULL);
71

    
72
        memcpy(req->rpdu->dst, req->cpdu->src, sizeof req->rpdu->dst);
73
        req->rpdu->msg->rid = req->cpdu->msg->rid;
74
        req->rpdu->msg->l5seqno = req->cpdu->msg->l5seqno;
75
        estat = _gdp_pdu_out(req->rpdu, req->chan);
76

    
77
        if (!EP_STAT_ISOK(estat))
78
        {
79
                ep_dbg_cprintf(Dbg, 1,
80
                                "sub_send_message_notification: couldn't write PDU!\n");
81
        }
82

    
83
        return estat;
84
}
85

    
86

    
87
/*
88
**  SUB_NOTIFY_ALL_SUBSCRIBERS --- send something to all interested parties
89
**
90
**                pubreq should be locked when this is called.
91
*/
92

    
93
void
94
sub_notify_all_subscribers(gdp_req_t *pubreq)
95
{
96
        gdp_req_t *req;
97
        gdp_req_t *nextreq;
98
        long timeout;
99
        EP_TIME_SPEC sub_timeout;
100

    
101
        EP_THR_MUTEX_ASSERT_ISLOCKED(&pubreq->mutex);
102
        GDP_GOB_ASSERT_ISLOCKED(pubreq->gob);
103
        EP_ASSERT_ELSE(pubreq->rpdu != NULL, return);
104
        EP_ASSERT_ELSE(pubreq->rpdu->msg != NULL, return);
105

    
106
        // set up for subscription timeout
107
        {
108
                EP_TIME_SPEC sub_delta;
109
                timeout = ep_adm_getlongparam("swarm.gdplogd.subscr.timeout", 0);
110

    
111
                if (timeout == 0)
112
                        timeout = ep_adm_getlongparam("swarm.gdp.subscr.timeout",
113
                                                                        GDP_SUBSCR_TIMEOUT_DEF);
114
                ep_time_from_nsec(-timeout SECONDS, &sub_delta);
115
                ep_time_deltanow(&sub_delta, &sub_timeout);
116
        }
117

    
118
        if (ep_dbg_test(Dbg, 32))
119
        {
120
                EP_TIME_SPEC now;
121
                char tbuf[100];
122

    
123
                ep_time_now(&now);
124
                ep_dbg_printf("sub_notify_all_subscribers(timeout=%ld, now=%s)\n",
125
                                        timeout,
126
                                        ep_time_format(&now, tbuf, sizeof tbuf, EP_TIME_FMT_HUMAN));
127
                ep_dbg_printf("%spub", _gdp_pr_indent(1));
128
                _gdp_req_dump(pubreq, ep_dbg_getfile(), GDP_PR_BASIC, 1);
129
        }
130

    
131
        pubreq->gob->flags |= GOBF_KEEPLOCKED;
132
        for (req = LIST_FIRST(&pubreq->gob->reqs); req != NULL; req = nextreq)
133
        {
134
                _gdp_req_lock(req);
135
                nextreq = LIST_NEXT(req, goblist);
136
                EP_ASSERT_ELSE(req != nextreq, break);
137

    
138
                // make sure we don't tell ourselves
139
                if (req == pubreq)
140
                {
141
                        _gdp_req_unlock(req);
142
                        continue;
143
                }
144

    
145
                if (ep_dbg_test(Dbg, 59))
146
                {
147
                        ep_dbg_printf("sub_notify_all_subscribers: checking ");
148
                        _gdp_req_dump(req, ep_dbg_getfile(), GDP_PR_BASIC, 0);
149
                }
150

    
151
                // notify subscribers
152
                if (!EP_UT_BITSET(GDP_REQ_SRV_SUBSCR, req->flags))
153
                {
154
                        ep_dbg_cprintf(Dbg, 59,
155
                                        "   ... not a subscription (flags = 0x%x)\n",
156
                                        req->flags);
157
                }
158
                else if (!ep_time_before(&req->sub_ts, &sub_timeout))
159
                {
160
                        EP_STAT estat;
161
                        EP_ASSERT_ELSE(req->cpdu != NULL, continue);
162
                        EP_ASSERT_ELSE(req->cpdu->msg != NULL, continue);
163
                        gdp_pdu_t *save_pdu = req->rpdu;
164
                        req->rpdu = pubreq->rpdu;
165
                        estat = sub_send_message_notification(req);
166
                        req->rpdu = save_pdu;
167
                        if (EP_STAT_ISOK(estat))
168
                        {
169
                                // XXX: This won't really work in case of holes.
170
                                req->nextrec++;
171

    
172
                                if (req->numrecs > 0 && --req->numrecs <= 0)
173
                                        sub_end_subscription(req);
174
                        }
175
                }
176
                else
177
                {
178
                        // this subscription seems to be dead
179
                        if (ep_dbg_test(Dbg, 18))
180
                        {
181
                                char tbuf[100];
182
                                ep_time_format(&sub_timeout, tbuf, sizeof tbuf,
183
                                                        EP_TIME_FMT_HUMAN);
184
                                ep_dbg_printf("sub_notify_all_subscribers: "
185
                                                "subscription timeout (%s):\n%s",
186
                                                tbuf, _gdp_pr_indent(1));
187
                                _gdp_req_dump(req, ep_dbg_getfile(), GDP_PR_BASIC, 1);
188
                                ep_dbg_printf("%s", _gdp_pr_indent(1));
189
                                _gdp_gob_dump(req->gob, ep_dbg_getfile(), GDP_PR_BASIC, 1);
190
                        }
191

    
192
                        // actually remove the subscription
193
                        //XXX isn't this done by _gdp_req_free???
194
                        //LIST_REMOVE(req, goblist);
195

    
196
                        EP_ASSERT(req->gob != NULL);
197
                        EP_ASSERT(EP_UT_BITSET(GDP_REQ_ON_GOB_LIST, req->flags));
198
                        _gdp_req_free(&req);
199
                }
200
                if (req != NULL)
201
                        _gdp_req_unlock(req);
202
        }
203
        pubreq->gob->flags &= ~GOBF_KEEPLOCKED;
204
}
205

    
206

    
207
/*
208
**  SUB_END_SUBSCRIPTION --- terminate a subscription
209
**
210
**                req and req->gob should be locked when this is called.
211
*/
212

    
213
void
214
sub_end_subscription(gdp_req_t *req)
215
{
216

    
217
        EP_THR_MUTEX_ASSERT_ISLOCKED(&req->mutex);
218
        GDP_GOB_ASSERT_ISLOCKED(req->gob);
219

    
220
        // make it not persistent and not a subscription
221
        req->flags &= ~(GDP_REQ_PERSIST | GDP_REQ_SRV_SUBSCR);
222

    
223
        // remove the request from the work list
224
        if (EP_UT_BITSET(GDP_REQ_ON_GOB_LIST, req->flags))
225
        {
226
                gdp_gob_t *gob = req->gob;
227
                LIST_REMOVE(req, goblist);
228
                req->flags &= ~GDP_REQ_ON_GOB_LIST;
229
                EP_ASSERT(gob->refcnt > 1);
230
                _gdp_gob_decref(&gob, true);
231
        }
232

    
233
        // make sure we have a response message available
234
        if (req->rpdu == NULL || req->rpdu->msg == NULL)
235
        {
236
                GdpMessage *msg = ep_mem_malloc(sizeof *msg);
237
                gdp_message__init(msg);
238
                if (req->rpdu == NULL)
239
                        req->rpdu = _gdp_pdu_new(msg, req->cpdu->dst, req->cpdu->src,
240
                                                                        GDP_SEQNO_NONE);
241
                req->rpdu->msg = msg;
242
        }
243

    
244
        // send an "end of subscription" event
245
        req->rpdu->msg->rid = req->cpdu->msg->rid;
246
        req->rpdu->msg->cmd = GDP_ACK_END_OF_RESULTS;
247

    
248
        if (ep_dbg_test(Dbg, 39))
249
        {
250
                ep_dbg_printf("sub_end_subscription removing:\n  ");
251
                _gdp_req_dump(req, ep_dbg_getfile(), GDP_PR_BASIC, 0);
252
        }
253

    
254
        (void) _gdp_pdu_out(req->rpdu, req->chan);
255
}
256

    
257

    
258
/*
259
**  Unsubscribe all requests for a given gob and destination.
260
**  Can also optionally select a particular request id.
261
*/
262

    
263
EP_STAT
264
sub_end_all_subscriptions(
265
                gdp_gob_t *gob,
266
                gdp_name_t dest,
267
                gdp_rid_t rid)
268
{
269
        EP_STAT estat;
270
        gdp_req_t *req;
271
        gdp_req_t *nextreq;
272

    
273
        if (ep_dbg_test(Dbg, 29))
274
        {
275
                gdp_pname_t dst_p;
276
                ep_dbg_printf("sub_end_all_subscriptions: rid %" PRIgdp_rid " dst %s\n    ",
277
                                rid, gdp_printable_name(dest, dst_p));
278
                _gdp_gob_dump(gob, ep_dbg_getfile(), GDP_PR_BASIC, 1);
279
        }
280

    
281
        GDP_GOB_ASSERT_ISLOCKED(gob);
282
        if (EP_UT_BITSET(GOBF_KEEPLOCKED, gob->flags) && ep_dbg_test(Dbg, 1))
283
                ep_dbg_printf("sub_end_all_subscriptions: GOBF_KEEPLOCKED on entry\n");
284
        gob->flags |= GOBF_KEEPLOCKED;
285

    
286
        do
287
        {
288
                estat = EP_STAT_OK;
289
                for (req = LIST_FIRST(&gob->reqs); req != NULL; req = nextreq)
290
                {
291
                        estat = _gdp_req_lock(req);
292
                        EP_STAT_CHECK(estat, break);
293
                        nextreq = LIST_NEXT(req, goblist);
294
                        if (!GDP_NAME_SAME(req->cpdu->dst, dest) ||
295
                                        (rid != GDP_PDU_NO_RID && rid != req->cpdu->msg->rid) ||
296
                                        !EP_ASSERT(req->gob == gob))
297
                        {
298
                                _gdp_req_unlock(req);
299
                                continue;
300
                        }
301

    
302
                        // remove subscription for this destination (but keep GOB locked)
303
                        if (ep_dbg_test(Dbg, 39))
304
                        {
305
                                ep_dbg_printf("sub_end_all_subscriptions removing ");
306
                                _gdp_req_dump(req, ep_dbg_getfile(), GDP_PR_BASIC, 0);
307
                        }
308
                        LIST_REMOVE(req, goblist);
309
                        req->flags &= ~GDP_REQ_ON_GOB_LIST;
310
                        _gdp_gob_decref(&req->gob, false);
311
                        _gdp_req_free(&req);
312
                }
313
        } while (!EP_STAT_ISOK(estat));
314
        gob->flags &= ~GOBF_KEEPLOCKED;
315
        return estat;
316
}
317

    
318

    
319
/*
320
**  SUB_RECLAIM_RESOURCES --- remove any expired subscriptions
321
**
322
**                This is a bit tricky to get lock ordering correct.  The
323
**                obvious implementation is to loop through the channel
324
**                list, but when you try to lock a GOB or a request you
325
**                have a lock ordering problem (the channel is quite low
326
**                in the locking hierarchy).  Instead you run through
327
**                the GOB hash table.
328
*/
329

    
330
// helper (does most of the work)
331
static void
332
gob_reclaim_subscriptions(gdp_gob_t *gob)
333
{
334
        int istat;
335
        gdp_req_t *req;
336
        gdp_req_t *nextreq;
337
        EP_TIME_SPEC sub_timeout;
338

    
339
        // just in case
340
        if (gob == NULL)
341
                return;
342

    
343
        {
344
                EP_TIME_SPEC sub_delta;
345
                long timeout = ep_adm_getlongparam("swarm.gdp.subscr.timeout",
346
                                                                GDP_SUBSCR_TIMEOUT_DEF);
347

    
348
                ep_time_from_nsec(-timeout SECONDS, &sub_delta);
349
                ep_time_deltanow(&sub_delta, &sub_timeout);
350
                ep_dbg_cprintf(Dbg, 39,
351
                                "gob_reclaim_subscriptions: GOB = %p, refcnt = %d, timeout = %ld\n",
352
                                gob, gob->refcnt, timeout);
353
        }
354

    
355
        // don't even try locked GOBs
356
        // first check is to avoid extraneous errors
357
        if (EP_UT_BITSET(GOBF_ISLOCKED, gob->flags))
358
        {
359
                ep_dbg_cprintf(Dbg, 39, " ... skipping locked GOB\n");
360
                return;
361
        }
362
        istat = ep_thr_mutex_trylock(&gob->mutex);
363
        if (istat != 0)
364
        {
365
                if (ep_dbg_test(Dbg, 21))
366
                {
367
                        ep_dbg_printf("gob_reclaim_subscriptions: gob already locked:\n    ");
368
                        _gdp_gob_dump(gob, ep_dbg_getfile(), GDP_PR_BASIC, 0);
369
                }
370
                return;
371
        }
372
        gob->flags |= GOBF_ISLOCKED;        // if trylock succeeded
373

    
374
        nextreq = LIST_FIRST(&gob->reqs);
375
        while ((req = nextreq) != NULL)
376
        {
377
                if (ep_dbg_test(Dbg, 59))
378
                {
379
                        ep_dbg_printf("gob_reclaim_subscriptions: checking ");
380
                        _gdp_req_dump(req, ep_dbg_getfile(), GDP_PR_BASIC, 0);
381
                }
382

    
383
                // now that GOB is locked, we lock the request
384
                istat = ep_thr_mutex_trylock(&req->mutex);
385
                if (istat != 0)                // checking on status of req lock attempt
386
                {
387
                        // already locked
388
                        if (ep_dbg_test(Dbg, 41))
389
                        {
390
                                ep_dbg_printf("gob_reclaim_subscriptions: req already locked:\n    ");
391
                                _gdp_req_dump(req, ep_dbg_getfile(), GDP_PR_BASIC, 0);
392
                        }
393
                        _gdp_gob_unlock(req->gob);
394
                        continue;
395
                }
396

    
397
                // get next request while locked and do sanity checks
398
                nextreq = LIST_NEXT(req, goblist);
399
                if (!EP_ASSERT(req != nextreq) || !EP_ASSERT(req->gob == gob))
400
                {
401
                        _gdp_gob_unlock(req->gob);
402
                        break;
403
                }
404

    
405

    
406
                if (!EP_UT_BITSET(GDP_REQ_SRV_SUBSCR, req->flags))
407
                {
408
                        ep_dbg_cprintf(Dbg, 59, "   ... not a subscription (flags = 0x%x)\n",
409
                                        req->flags);
410
                }
411
                else if (ep_time_before(&req->sub_ts, &sub_timeout))
412
                {
413
                        // this subscription seems to be dead
414
                        if (ep_dbg_test(Dbg, 18))
415
                        {
416
                                ep_dbg_printf("    ...  subscription timeout: ");
417
                                _gdp_gob_dump(req->gob, ep_dbg_getfile(), GDP_PR_BASIC, 0);
418
                        }
419

    
420
                        // have to manually remove req from lists to avoid lock inversion
421
                        if (EP_UT_BITSET(GDP_REQ_ON_GOB_LIST, req->flags))
422
                        {
423
                                // gob is already locked
424
                                LIST_REMOVE(req, goblist);
425
                        }
426
                        if (EP_UT_BITSET(GDP_REQ_ON_CHAN_LIST, req->flags))
427
                        {
428
                                LIST_REMOVE(req, chanlist);                        // chan already locked
429
                        }
430
                        req->flags &= ~(GDP_REQ_ON_GOB_LIST | GDP_REQ_ON_CHAN_LIST);
431
                        _gdp_gob_decref(&req->gob, true);
432
                        _gdp_req_free(&req);
433
                }
434
                else if (ep_dbg_test(Dbg, 59))
435
                {
436
                        ep_dbg_printf("    ... not yet time\n");
437
                }
438

    
439
                if (req != NULL)
440
                        _gdp_req_unlock(req);
441
        }
442

    
443
        if (gob != NULL)
444
                _gdp_gob_unlock(gob);
445
}
446

    
447
void
448
sub_reclaim_resources(gdp_chan_t *chan)
449
{
450
        _gdp_gob_cache_foreach(gob_reclaim_subscriptions);
451
}