Project

General

Profile

Statistics
| Branch: | Tag: | Revision:

gdp / gdp / gdp_subscr.c @ master

History | View | Annotate | Download (9.8 KB)

1
/* vim: set ai sw=4 sts=4 ts=4 :*/
2

    
3
/*
4
**  ----- BEGIN LICENSE BLOCK -----
5
**        GDP: Global Data Plane Support Library
6
**        From the Ubiquitous Swarm Lab, 490 Cory Hall, U.C. Berkeley.
7
**
8
**        Copyright (c) 2015-2019, Regents of the University of California.
9
**        All rights reserved.
10
**
11
**        Permission is hereby granted, without written agreement and without
12
**        license or royalty fees, to use, copy, modify, and distribute this
13
**        software and its documentation for any purpose, provided that the above
14
**        copyright notice and the following two paragraphs appear in all copies
15
**        of this software.
16
**
17
**        IN NO EVENT SHALL REGENTS BE LIABLE TO ANY PARTY FOR DIRECT, INDIRECT,
18
**        SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING LOST
19
**        PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION,
20
**        EVEN IF REGENTS HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
21
**
22
**        REGENTS SPECIFICALLY DISCLAIMS ANY WARRANTIES, INCLUDING, BUT NOT
23
**        LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24
**        FOR A PARTICULAR PURPOSE. THE SOFTWARE AND ACCOMPANYING DOCUMENTATION,
25
**        IF ANY, PROVIDED HEREUNDER IS PROVIDED "AS IS". REGENTS HAS NO
26
**        OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS,
27
**        OR MODIFICATIONS.
28
**  ----- END LICENSE BLOCK -----
29
*/
30

    
31
#include "gdp.h"
32
#include "gdp_chan.h"
33
#include "gdp_event.h"
34
#include "gdp_priv.h"
35
#include "gdp.pb-c.h"
36

    
37
#include <ep/ep.h>
38
#include <ep/ep_app.h>
39
#include <ep/ep_dbg.h>
40
#include <ep/ep_log.h>
41

    
42
#include <string.h>
43
#include <sys/errno.h>
44

    
45
static EP_DBG        Dbg = EP_DBG_INIT("gdp.subscr", "GDP subscriptions");
46

    
47

    
48
static bool                        SubscriptionThreadRunning;
49
static EP_THR                SubscriptionThreadId;
50
EP_THR_MUTEX                _GdpSubscriptionMutex        EP_THR_MUTEX_INITIALIZER;
51
struct req_head                _GdpSubscriptionRequests;
52

    
53

    
54
/*
55
**  Re-subscribe to a GCL
56
*/
57

    
58
static EP_STAT
59
subscr_resub(gdp_req_t *req)
60
{
61
        EP_STAT estat;
62

    
63
        ep_dbg_cprintf(Dbg, 39, "subscr_resub: refreshing req@%p\n", req);
64
        EP_ASSERT_ELSE(req != NULL, return EP_STAT_ASSERT_ABORT);
65
        EP_THR_MUTEX_ASSERT_ISLOCKED(&req->mutex);
66
        EP_ASSERT_ELSE(req->gin != NULL, return EP_STAT_ASSERT_ABORT);
67
        EP_ASSERT_ELSE(req->cpdu != NULL, return EP_STAT_ASSERT_ABORT);
68

    
69
        req->state = GDP_REQ_ACTIVE;
70

    
71
        // payload should already be set up
72
        memcpy(req->cpdu->dst, req->gob->name, sizeof req->cpdu->dst);
73
        memcpy(req->cpdu->src, _GdpMyRoutingName, sizeof req->cpdu->src);
74
        {
75
                GDP_MSG_CHECK(req->cpdu, return EP_STAT_ASSERT_ABORT);
76
                gdp_msg_t *msg = req->cpdu->msg;
77
                EP_ASSERT_ELSE(msg->cmd == GDP_CMD_SUBSCRIBE_BY_RECNO,
78
                                                return EP_STAT_ASSERT_ABORT);
79
                GdpMessage__CmdSubscribeByRecno *payload =
80
                                                msg->cmd_subscribe_by_recno;
81
                payload->has_start = true;
82
                payload->start = req->gob->nrecs + 1;
83
                payload->has_nrecs = true;
84
                payload->nrecs = req->numrecs;
85
        }
86

    
87
        estat = _gdp_invoke(req);
88

    
89
        if (ep_dbg_test(Dbg, EP_STAT_ISOK(estat) ? 20 : 1))
90
        {
91
                char ebuf[200];
92

    
93
                ep_dbg_printf("subscr_resub(%s) ->\n\t%s\n",
94
                                req->gob == NULL ? "(no gob)" : req->gob->pname,
95
                                ep_stat_tostr(estat, ebuf, sizeof ebuf));
96
        }
97

    
98
        req->state = GDP_REQ_IDLE;
99
        // req->rpdu might be NULL if _gdp_invoke failed
100
        if (req->rpdu != NULL)
101
        {
102
                gdp_message__free_unpacked(req->rpdu->msg, NULL);
103
                req->rpdu->msg = NULL;
104
        }
105
        if (EP_STAT_ISOK(estat))
106
                ep_time_now(&req->sub_ts);
107

    
108
        return estat;
109
}
110

    
111

    
112
/*
113
**  Periodically ping all open subscriptions to make sure they are
114
**  still happy.
115
*/
116

    
117
static void *
118
subscr_poker_thread(void *chan_)
119
{
120
        gdp_chan_t *chan = chan_;
121
        gdp_chan_x_t *chanx = _gdp_chan_get_cdata(chan);
122
        long timeout = ep_adm_getlongparam("swarm.gdp.subscr.timeout",
123
                                                        GDP_SUBSCR_TIMEOUT_DEF);
124
        long delta_poke = ep_adm_getlongparam("swarm.gdp.subscr.refresh",
125
                                                        timeout / 3);
126

    
127
        if (timeout < delta_poke)
128
                ep_app_warn("swarm.gdp.subscr.timeout < swarm.gdp.subscr.refresh"
129
                                        " (%ld < %ld)",
130
                                        timeout, delta_poke);
131
        ep_dbg_cprintf(Dbg, 10,
132
                        "Starting subscription poker thread, delta_poke = %ld\n",
133
                        delta_poke);
134

    
135
        // loop forever poking subscriptions
136
        for (;;)
137
        {
138
                EP_STAT estat;
139
                gdp_req_t *req;
140
                gdp_req_t *nextreq;
141
                EP_TIME_SPEC now;
142
                EP_TIME_SPEC t_poke;        // poke if older than this
143

    
144
                // wait for a while to avoid hogging CPU
145
                ep_time_nanosleep(delta_poke SECONDS / 10);
146
                ep_dbg_cprintf(Dbg, 40, "\nsubscr_poker_thread: poking\n");
147

    
148
                ep_time_now(&now);
149
                ep_time_from_nsec(-delta_poke SECONDS, &t_poke);
150
                ep_time_add_delta(&now, &t_poke);
151

    
152
                // do loop is in case _gdp_req_lock fails
153
                do
154
                {
155
                        estat = EP_STAT_OK;
156
                        for (req = LIST_FIRST(&chanx->reqs); req != NULL; req = nextreq)
157
                        {
158
                                estat = _gdp_req_lock(req);
159
                                EP_STAT_CHECK(estat, break);
160

    
161
                                nextreq = LIST_NEXT(req, chanlist);
162
                                if (ep_dbg_test(Dbg, 51))
163
                                {
164
                                        char tbuf[60];
165

    
166
                                        ep_time_format(&now, tbuf, sizeof tbuf, EP_TIME_FMT_HUMAN);
167
                                        ep_dbg_printf("subscr_poker_thread: at %s checking ", tbuf);
168
                                        _gdp_req_dump(req, ep_dbg_getfile(), GDP_PR_BASIC, 0);
169
                                }
170

    
171
                                gdp_gob_t *gob = req->gob;
172
                                _gdp_req_unlock(req);        // GOBs need to be locked before reqs
173

    
174
                                // lock GOB, then req, then validate req
175
                                if (ep_thr_mutex_trylock(&gob->mutex) != 0)
176
                                {
177
                                        // not an error ... we'll get this one next time through
178
                                        ep_dbg_cprintf(Dbg, 51,
179
                                                        "   ... gob->mutex trylock failed (%s)\n",
180
                                                        strerror(errno));
181
                                        continue;
182
                                }
183
                                gob->flags |= GOBF_ISLOCKED;
184
                                _gdp_req_lock(req);
185

    
186
                                if (!EP_UT_BITSET(GDP_REQ_CLT_SUBSCR, req->flags))
187
                                {
188
                                        // not a subscription: skip this entry
189
                                        ep_dbg_cprintf(Dbg, 51, "   ... not client subscription\n");
190
                                }
191
                                else if (ep_time_before(&t_poke, &req->sub_ts))
192
                                {
193
                                        // we've seen activity recently, no need to poke
194
                                        ep_dbg_cprintf(Dbg, 51, "   ... not yet\n");
195
                                }
196
                                else
197
                                {
198
                                        // sub_ts <= t_poke: refresh this subscription
199
                                        ep_dbg_cprintf(Dbg, 51, "   ... subscr_resub\n");
200
                                        (void) subscr_resub(req);
201
                                }
202

    
203
                                // if _gdp_invoke failed, try again at the next poke interval
204
                                _gdp_req_unlock(req);
205
                                _gdp_gob_unlock(gob);
206
                        }
207
                } while (!EP_STAT_ISOK(estat));
208
        }
209

    
210
        // not reached; keep gcc happy
211
        ep_log(EP_STAT_SEVERE, "subscr_poker_thread: fell out of loop");
212
        return NULL;
213
}
214

    
215

    
216
/*
217
**        _GDP_GIN_SUBSCRIBE_BY_RECNO --- subscribe to a GCL
218
**
219
**                This also implements multiread.
220
*/
221

    
222
EP_STAT
223
_gdp_gin_subscribe(gdp_gin_t *gin,
224
                gdp_cmd_t cmd,
225
                gdp_recno_t start,
226
                int32_t numrecs,
227
                gdp_sub_qos_t *qos,
228
                gdp_event_cbfunc_t cbfunc,
229
                void *cbarg)
230
{
231
        EP_STAT estat = EP_STAT_OK;
232
        gdp_req_t *req;
233

    
234
        // create the subscribe request
235
        estat = _gdp_req_new(cmd, gin->gob, _GdpChannel, NULL,
236
                        GDP_REQ_PERSIST | GDP_REQ_CLT_SUBSCR | GDP_REQ_ALLOC_RID,
237
                        &req);
238
        EP_STAT_CHECK(estat, goto fail0);
239

    
240
        // add start and stop parameters to PDU
241
        req->gin = gin;
242

    
243
        EP_ASSERT_ELSE(req != NULL, return EP_STAT_ASSERT_ABORT);
244
        EP_THR_MUTEX_ASSERT_ISLOCKED(&req->mutex);
245
        EP_ASSERT_ELSE(req->gin != NULL, return EP_STAT_ASSERT_ABORT);
246
        EP_ASSERT_ELSE(req->cpdu != NULL, return EP_STAT_ASSERT_ABORT);
247

    
248
        errno = 0;                                // avoid spurious messages
249

    
250
        {
251
                gdp_msg_t *msg = req->cpdu->msg;
252
                EP_ASSERT_ELSE(msg != NULL, return EP_STAT_ASSERT_ABORT);
253
                GdpMessage__CmdSubscribeByRecno *payload =
254
                                                msg->cmd_subscribe_by_recno;
255
                if (start != GDP_PDU_NO_RECNO)
256
                {
257
                        payload->has_start = true;
258
                        payload->start = start;
259
                }
260
                if (numrecs > 0)
261
                {
262
                        payload->has_nrecs = true;
263
                        payload->nrecs = numrecs;
264
                }
265
        }
266

    
267
        // arrange for responses to appear as events or callbacks
268
        _gdp_event_setcb(req, cbfunc, cbarg);
269

    
270
        // issue the subscription --- no data returned
271
        estat = _gdp_invoke(req);
272
        EP_ASSERT(req->state == GDP_REQ_ACTIVE);
273

    
274
        if (!EP_STAT_ISOK(estat))
275
        {
276
                _gdp_req_free(&req);
277
                goto fail0;
278
        }
279

    
280
        // at this point remaining results will be asynchronous
281
        req->flags |= GDP_REQ_ASYNCIO;
282

    
283
        // now waiting for other events; go ahead and unlock
284
        ep_time_now(&req->sub_ts);
285
        req->state = GDP_REQ_IDLE;
286
        if (req->rpdu != NULL)
287
                _gdp_pdu_free(&req->rpdu);
288
        ep_thr_cond_signal(&req->cond);
289
        _gdp_req_unlock(req);
290

    
291
        // the req is still on the channel list
292

    
293
        // start a subscription poker thread if needed (not for multiread)
294
        if (cmd == GDP_CMD_SUBSCRIBE_BY_RECNO)
295
        {
296
                long poke = ep_adm_getlongparam("swarm.gdp.subscr.pokeintvl", 60L);
297

    
298
                ep_thr_mutex_lock(&_GdpSubscriptionMutex);
299
                if (poke > 0 && !SubscriptionThreadRunning)
300
                {
301
                        SubscriptionThreadRunning = true;
302
                        int istat = ep_thr_spawn(&SubscriptionThreadId,
303
                                                                subscr_poker_thread, req->chan);
304
                        if (istat != 0)
305
                        {
306
                                EP_STAT spawn_stat = ep_stat_from_errno(istat);
307
                                ep_log(spawn_stat, "_gdp_gin_subscribe: thread spawn failure");
308
                        }
309
                }
310
                ep_thr_mutex_unlock(&_GdpSubscriptionMutex);
311
        }
312

    
313
fail0:
314
        return estat;
315
}
316

    
317
EP_STAT
318
_gdp_gin_unsubscribe(gdp_gin_t *gin,
319
                gdp_event_cbfunc_t cbfunc,
320
                void *cbarg,
321
                uint32_t reqflags)
322
{
323
        EP_STAT estat;
324
        gdp_req_t *req;
325
        gdp_req_t *sub, *next_sub;
326

    
327
        if (!GDP_GIN_ASSERT_ISLOCKED(gin))
328
                return EP_STAT_ASSERT_ABORT;
329

    
330
        ep_dbg_cprintf(Dbg, 1, "_gdp_gin_unsubscribe(%s) cbfunc=%p cbarg=%p\n",
331
                        gin->gob->pname, cbfunc, cbarg);
332

    
333
        estat = _gdp_req_new(GDP_CMD_UNSUBSCRIBE, gin->gob, _GdpChannel, NULL,
334
                                                reqflags, &req);
335
        EP_STAT_CHECK(estat, goto fail0);
336
        req->gin = gin;
337

    
338
        GDP_MSG_CHECK(req->cpdu, return EP_STAT_ASSERT_ABORT);
339

    
340
        for (sub = LIST_FIRST(&gin->gob->reqs); sub != NULL; sub = next_sub)
341
        {
342
                _gdp_req_lock(sub);
343
                ep_dbg_cprintf(Dbg, 1, "... comparing to cbfunc=%p cbarg=%p\n",
344
                                sub->sub_cbfunc, sub->sub_cbarg);
345

    
346
                next_sub = LIST_NEXT(req, goblist);
347
                if (req->gin != gin ||
348
                                (cbfunc != NULL && cbfunc != sub->sub_cbfunc) ||
349
                                (cbarg != NULL && cbarg != sub->sub_cbarg))
350
                {
351
                        // this is not the subscription you are looking for
352
                        ep_dbg_cprintf(Dbg, 1, "... no match\n");
353
                        _gdp_req_unlock(sub);
354
                        continue;
355
                }
356

    
357
                GDP_MSG_CHECK(sub->cpdu, continue);
358
                ep_dbg_cprintf(Dbg, 1, "... deleting rid %" PRIgdp_rid "\n",
359
                                        sub->cpdu->msg->rid);
360
                req->cpdu->msg->rid = sub->cpdu->msg->rid;
361

    
362
                estat = _gdp_invoke(req);
363
                EP_STAT_CHECK(estat, continue);
364

    
365
                GDP_MSG_CHECK(req->rpdu, return EP_STAT_ASSERT_ABORT);
366

    
367
                _gdp_req_free(&sub);
368
        }
369

    
370
        _gdp_req_free(&req);
371

    
372
fail0:
373
        return estat;
374
}