Project

General

Profile

Statistics
| Branch: | Tag: | Revision:

gdp / gdp / gdp_subscr.c @ master

History | View | Annotate | Download (9.8 KB)

1 1440421a Eric Allman
/* vim: set ai sw=4 sts=4 ts=4 :*/
2
3 055d3009 Eric Allman
/*
4
**  ----- BEGIN LICENSE BLOCK -----
5
**        GDP: Global Data Plane Support Library
6
**        From the Ubiquitous Swarm Lab, 490 Cory Hall, U.C. Berkeley.
7
**
8 c87dd166 Eric Allman
**        Copyright (c) 2015-2019, Regents of the University of California.
9 6bd5476b Eric Allman
**        All rights reserved.
10 055d3009 Eric Allman
**
11 6bd5476b Eric Allman
**        Permission is hereby granted, without written agreement and without
12
**        license or royalty fees, to use, copy, modify, and distribute this
13
**        software and its documentation for any purpose, provided that the above
14
**        copyright notice and the following two paragraphs appear in all copies
15
**        of this software.
16 055d3009 Eric Allman
**
17 6bd5476b Eric Allman
**        IN NO EVENT SHALL REGENTS BE LIABLE TO ANY PARTY FOR DIRECT, INDIRECT,
18
**        SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING LOST
19
**        PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION,
20
**        EVEN IF REGENTS HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
21 055d3009 Eric Allman
**
22 6bd5476b Eric Allman
**        REGENTS SPECIFICALLY DISCLAIMS ANY WARRANTIES, INCLUDING, BUT NOT
23 055d3009 Eric Allman
**        LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24 6bd5476b Eric Allman
**        FOR A PARTICULAR PURPOSE. THE SOFTWARE AND ACCOMPANYING DOCUMENTATION,
25
**        IF ANY, PROVIDED HEREUNDER IS PROVIDED "AS IS". REGENTS HAS NO
26
**        OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS,
27
**        OR MODIFICATIONS.
28 055d3009 Eric Allman
**  ----- END LICENSE BLOCK -----
29
*/
30
31 cc4a7f8a Eric Allman
#include "gdp.h"
32
#include "gdp_chan.h"
33
#include "gdp_event.h"
34
#include "gdp_priv.h"
35 d65c01e2 Eric Allman
#include "gdp.pb-c.h"
36 cc4a7f8a Eric Allman
37 1440421a Eric Allman
#include <ep/ep.h>
38
#include <ep/ep_app.h>
39
#include <ep/ep_dbg.h>
40 e87e0a1a Eric Allman
#include <ep/ep_log.h>
41 1440421a Eric Allman
42 e87e0a1a Eric Allman
#include <string.h>
43 1440421a Eric Allman
#include <sys/errno.h>
44
45
static EP_DBG        Dbg = EP_DBG_INIT("gdp.subscr", "GDP subscriptions");
46
47 e87e0a1a Eric Allman
48 5a73c6ad Eric Allman
static bool                        SubscriptionThreadRunning;
49
static EP_THR                SubscriptionThreadId;
50 f2bf61a4 Eric Allman
EP_THR_MUTEX                _GdpSubscriptionMutex        EP_THR_MUTEX_INITIALIZER;
51 44a5b0ef Eric Allman
struct req_head                _GdpSubscriptionRequests;
52 5a73c6ad Eric Allman
53 e87e0a1a Eric Allman
54
/*
55
**  Re-subscribe to a GCL
56
*/
57
58
static EP_STAT
59
subscr_resub(gdp_req_t *req)
60
{
61
        EP_STAT estat;
62
63
        ep_dbg_cprintf(Dbg, 39, "subscr_resub: refreshing req@%p\n", req);
64 91b7142f Eric Allman
        EP_ASSERT_ELSE(req != NULL, return EP_STAT_ASSERT_ABORT);
65 2a8cda22 Eric Allman
        EP_THR_MUTEX_ASSERT_ISLOCKED(&req->mutex);
66 fec93aac Eric Allman
        EP_ASSERT_ELSE(req->gin != NULL, return EP_STAT_ASSERT_ABORT);
67 2a8cda22 Eric Allman
        EP_ASSERT_ELSE(req->cpdu != NULL, return EP_STAT_ASSERT_ABORT);
68 e87e0a1a Eric Allman
69
        req->state = GDP_REQ_ACTIVE;
70 d65c01e2 Eric Allman
71
        // payload should already be set up
72 fec93aac Eric Allman
        memcpy(req->cpdu->dst, req->gob->name, sizeof req->cpdu->dst);
73 a104a10f Eric Allman
        memcpy(req->cpdu->src, _GdpMyRoutingName, sizeof req->cpdu->src);
74 d65c01e2 Eric Allman
        {
75
                GDP_MSG_CHECK(req->cpdu, return EP_STAT_ASSERT_ABORT);
76
                gdp_msg_t *msg = req->cpdu->msg;
77
                EP_ASSERT_ELSE(msg->cmd == GDP_CMD_SUBSCRIBE_BY_RECNO,
78
                                                return EP_STAT_ASSERT_ABORT);
79 4a89fdda Eric Allman
                GdpMessage__CmdSubscribeByRecno *payload =
80
                                                msg->cmd_subscribe_by_recno;
81 d65c01e2 Eric Allman
                payload->has_start = true;
82
                payload->start = req->gob->nrecs + 1;
83
                payload->has_nrecs = true;
84
                payload->nrecs = req->numrecs;
85
        }
86 e87e0a1a Eric Allman
87
        estat = _gdp_invoke(req);
88
89
        if (ep_dbg_test(Dbg, EP_STAT_ISOK(estat) ? 20 : 1))
90
        {
91
                char ebuf[200];
92
93
                ep_dbg_printf("subscr_resub(%s) ->\n\t%s\n",
94 fec93aac Eric Allman
                                req->gob == NULL ? "(no gob)" : req->gob->pname,
95 e87e0a1a Eric Allman
                                ep_stat_tostr(estat, ebuf, sizeof ebuf));
96
        }
97
98
        req->state = GDP_REQ_IDLE;
99 e97eeee0 Eric Allman
        // req->rpdu might be NULL if _gdp_invoke failed
100
        if (req->rpdu != NULL)
101
        {
102 d65c01e2 Eric Allman
                gdp_message__free_unpacked(req->rpdu->msg, NULL);
103
                req->rpdu->msg = NULL;
104 e97eeee0 Eric Allman
        }
105 94e663bc Eric Allman
        if (EP_STAT_ISOK(estat))
106
                ep_time_now(&req->sub_ts);
107 e87e0a1a Eric Allman
108
        return estat;
109
}
110
111
112
/*
113
**  Periodically ping all open subscriptions to make sure they are
114
**  still happy.
115
*/
116
117
static void *
118 f3b89a60 Eric Allman
subscr_poker_thread(void *chan_)
119 e87e0a1a Eric Allman
{
120 f3b89a60 Eric Allman
        gdp_chan_t *chan = chan_;
121
        gdp_chan_x_t *chanx = _gdp_chan_get_cdata(chan);
122 55987b4d Eric Allman
        long timeout = ep_adm_getlongparam("swarm.gdp.subscr.timeout",
123 b068acef Eric Allman
                                                        GDP_SUBSCR_TIMEOUT_DEF);
124 55987b4d Eric Allman
        long delta_poke = ep_adm_getlongparam("swarm.gdp.subscr.refresh",
125
                                                        timeout / 3);
126
127
        if (timeout < delta_poke)
128
                ep_app_warn("swarm.gdp.subscr.timeout < swarm.gdp.subscr.refresh"
129
                                        " (%ld < %ld)",
130
                                        timeout, delta_poke);
131
        ep_dbg_cprintf(Dbg, 10,
132
                        "Starting subscription poker thread, delta_poke = %ld\n",
133
                        delta_poke);
134 e87e0a1a Eric Allman
135
        // loop forever poking subscriptions
136
        for (;;)
137
        {
138 18624361 Eric Allman
                EP_STAT estat;
139 e87e0a1a Eric Allman
                gdp_req_t *req;
140
                gdp_req_t *nextreq;
141
                EP_TIME_SPEC now;
142
                EP_TIME_SPEC t_poke;        // poke if older than this
143
144
                // wait for a while to avoid hogging CPU
145 94e663bc Eric Allman
                ep_time_nanosleep(delta_poke SECONDS / 10);
146 e87e0a1a Eric Allman
                ep_dbg_cprintf(Dbg, 40, "\nsubscr_poker_thread: poking\n");
147
148
                ep_time_now(&now);
149 02de7463 Eric Allman
                ep_time_from_nsec(-delta_poke SECONDS, &t_poke);
150 e87e0a1a Eric Allman
                ep_time_add_delta(&now, &t_poke);
151
152 18624361 Eric Allman
                // do loop is in case _gdp_req_lock fails
153
                do
154 e87e0a1a Eric Allman
                {
155 18624361 Eric Allman
                        estat = EP_STAT_OK;
156 f3b89a60 Eric Allman
                        for (req = LIST_FIRST(&chanx->reqs); req != NULL; req = nextreq)
157 e87e0a1a Eric Allman
                        {
158 18624361 Eric Allman
                                estat = _gdp_req_lock(req);
159
                                EP_STAT_CHECK(estat, break);
160
161
                                nextreq = LIST_NEXT(req, chanlist);
162
                                if (ep_dbg_test(Dbg, 51))
163
                                {
164
                                        char tbuf[60];
165
166
                                        ep_time_format(&now, tbuf, sizeof tbuf, EP_TIME_FMT_HUMAN);
167
                                        ep_dbg_printf("subscr_poker_thread: at %s checking ", tbuf);
168 65f23ae5 Eric Allman
                                        _gdp_req_dump(req, ep_dbg_getfile(), GDP_PR_BASIC, 0);
169 18624361 Eric Allman
                                }
170
171 fec93aac Eric Allman
                                gdp_gob_t *gob = req->gob;
172
                                _gdp_req_unlock(req);        // GOBs need to be locked before reqs
173 6dfc87d5 Eric Allman
174 fec93aac Eric Allman
                                // lock GOB, then req, then validate req
175
                                if (ep_thr_mutex_trylock(&gob->mutex) != 0)
176 f3b89a60 Eric Allman
                                {
177 5aae0d62 Eric Allman
                                        // not an error ... we'll get this one next time through
178 f3b89a60 Eric Allman
                                        ep_dbg_cprintf(Dbg, 51,
179
                                                        "   ... gob->mutex trylock failed (%s)\n",
180
                                                        strerror(errno));
181 6dfc87d5 Eric Allman
                                        continue;
182 f3b89a60 Eric Allman
                                }
183 eae1d3ec Eric Allman
                                gob->flags |= GOBF_ISLOCKED;
184 6dfc87d5 Eric Allman
                                _gdp_req_lock(req);
185
186 18624361 Eric Allman
                                if (!EP_UT_BITSET(GDP_REQ_CLT_SUBSCR, req->flags))
187
                                {
188
                                        // not a subscription: skip this entry
189 f3b89a60 Eric Allman
                                        ep_dbg_cprintf(Dbg, 51, "   ... not client subscription\n");
190 18624361 Eric Allman
                                }
191 94e663bc Eric Allman
                                else if (ep_time_before(&t_poke, &req->sub_ts))
192 18624361 Eric Allman
                                {
193
                                        // we've seen activity recently, no need to poke
194 f3b89a60 Eric Allman
                                        ep_dbg_cprintf(Dbg, 51, "   ... not yet\n");
195 18624361 Eric Allman
                                }
196
                                else
197
                                {
198 94e663bc Eric Allman
                                        // sub_ts <= t_poke: refresh this subscription
199 f3b89a60 Eric Allman
                                        ep_dbg_cprintf(Dbg, 51, "   ... subscr_resub\n");
200 18624361 Eric Allman
                                        (void) subscr_resub(req);
201
                                }
202
203
                                // if _gdp_invoke failed, try again at the next poke interval
204
                                _gdp_req_unlock(req);
205 fec93aac Eric Allman
                                _gdp_gob_unlock(gob);
206 e87e0a1a Eric Allman
                        }
207 fec93aac Eric Allman
                } while (!EP_STAT_ISOK(estat));
208 e87e0a1a Eric Allman
        }
209 52792e40 Eric Allman
210
        // not reached; keep gcc happy
211
        ep_log(EP_STAT_SEVERE, "subscr_poker_thread: fell out of loop");
212
        return NULL;
213 e87e0a1a Eric Allman
}
214
215
216 1440421a Eric Allman
/*
217 eae1d3ec Eric Allman
**        _GDP_GIN_SUBSCRIBE_BY_RECNO --- subscribe to a GCL
218 1440421a Eric Allman
**
219 d2647d1f Eric Allman
**                This also implements multiread.
220 1440421a Eric Allman
*/
221
222
EP_STAT
223 eae1d3ec Eric Allman
_gdp_gin_subscribe(gdp_gin_t *gin,
224 f67d0170 Eric Allman
                gdp_cmd_t cmd,
225 d65c01e2 Eric Allman
                gdp_recno_t start,
226 1440421a Eric Allman
                int32_t numrecs,
227 10125206 Eric Allman
                gdp_sub_qos_t *qos,
228 728e4394 Eric Allman
                gdp_event_cbfunc_t cbfunc,
229 d2647d1f Eric Allman
                void *cbarg)
230 1440421a Eric Allman
{
231
        EP_STAT estat = EP_STAT_OK;
232 d65c01e2 Eric Allman
        gdp_req_t *req;
233
234
        // create the subscribe request
235
        estat = _gdp_req_new(cmd, gin->gob, _GdpChannel, NULL,
236
                        GDP_REQ_PERSIST | GDP_REQ_CLT_SUBSCR | GDP_REQ_ALLOC_RID,
237
                        &req);
238
        EP_STAT_CHECK(estat, goto fail0);
239
240
        // add start and stop parameters to PDU
241
        req->gin = gin;
242
243
        EP_ASSERT_ELSE(req != NULL, return EP_STAT_ASSERT_ABORT);
244
        EP_THR_MUTEX_ASSERT_ISLOCKED(&req->mutex);
245
        EP_ASSERT_ELSE(req->gin != NULL, return EP_STAT_ASSERT_ABORT);
246
        EP_ASSERT_ELSE(req->cpdu != NULL, return EP_STAT_ASSERT_ABORT);
247 1440421a Eric Allman
248
        errno = 0;                                // avoid spurious messages
249
250 d65c01e2 Eric Allman
        {
251
                gdp_msg_t *msg = req->cpdu->msg;
252
                EP_ASSERT_ELSE(msg != NULL, return EP_STAT_ASSERT_ABORT);
253 4a89fdda Eric Allman
                GdpMessage__CmdSubscribeByRecno *payload =
254
                                                msg->cmd_subscribe_by_recno;
255 d65c01e2 Eric Allman
                if (start != GDP_PDU_NO_RECNO)
256
                {
257
                        payload->has_start = true;
258
                        payload->start = start;
259
                }
260
                if (numrecs > 0)
261
                {
262
                        payload->has_nrecs = true;
263
                        payload->nrecs = numrecs;
264
                }
265
        }
266 1440421a Eric Allman
267 7c79c6bc Eric Allman
        // arrange for responses to appear as events or callbacks
268 e2682ba9 Eric Allman
        _gdp_event_setcb(req, cbfunc, cbarg);
269 7c79c6bc Eric Allman
270 1440421a Eric Allman
        // issue the subscription --- no data returned
271
        estat = _gdp_invoke(req);
272 0d975e56 Eric Allman
        EP_ASSERT(req->state == GDP_REQ_ACTIVE);
273 1440421a Eric Allman
274
        if (!EP_STAT_ISOK(estat))
275
        {
276 283110f2 Eric Allman
                _gdp_req_free(&req);
277 94e663bc Eric Allman
                goto fail0;
278 1440421a Eric Allman
        }
279 4d6ff457 Eric Allman
280 94e663bc Eric Allman
        // at this point remaining results will be asynchronous
281
        req->flags |= GDP_REQ_ASYNCIO;
282 1440421a Eric Allman
283 94e663bc Eric Allman
        // now waiting for other events; go ahead and unlock
284 9b4c4283 Eric Allman
        ep_time_now(&req->sub_ts);
285 94e663bc Eric Allman
        req->state = GDP_REQ_IDLE;
286
        if (req->rpdu != NULL)
287
                _gdp_pdu_free(&req->rpdu);
288
        ep_thr_cond_signal(&req->cond);
289
        _gdp_req_unlock(req);
290 1440421a Eric Allman
291 94e663bc Eric Allman
        // the req is still on the channel list
292 786df979 Eric Allman
293 94e663bc Eric Allman
        // start a subscription poker thread if needed (not for multiread)
294
        if (cmd == GDP_CMD_SUBSCRIBE_BY_RECNO)
295
        {
296
                long poke = ep_adm_getlongparam("swarm.gdp.subscr.pokeintvl", 60L);
297
298
                ep_thr_mutex_lock(&_GdpSubscriptionMutex);
299
                if (poke > 0 && !SubscriptionThreadRunning)
300
                {
301
                        SubscriptionThreadRunning = true;
302
                        int istat = ep_thr_spawn(&SubscriptionThreadId,
303
                                                                subscr_poker_thread, req->chan);
304
                        if (istat != 0)
305 786df979 Eric Allman
                        {
306 94e663bc Eric Allman
                                EP_STAT spawn_stat = ep_stat_from_errno(istat);
307
                                ep_log(spawn_stat, "_gdp_gin_subscribe: thread spawn failure");
308 e87e0a1a Eric Allman
                        }
309 1440421a Eric Allman
                }
310 94e663bc Eric Allman
                ep_thr_mutex_unlock(&_GdpSubscriptionMutex);
311 1440421a Eric Allman
        }
312 e87e0a1a Eric Allman
313 d65c01e2 Eric Allman
fail0:
314 e87e0a1a Eric Allman
        return estat;
315 1440421a Eric Allman
}
316 9c515e64 Eric Allman
317
EP_STAT
318 eae1d3ec Eric Allman
_gdp_gin_unsubscribe(gdp_gin_t *gin,
319 9c515e64 Eric Allman
                gdp_event_cbfunc_t cbfunc,
320
                void *cbarg,
321
                uint32_t reqflags)
322
{
323
        EP_STAT estat;
324
        gdp_req_t *req;
325
        gdp_req_t *sub, *next_sub;
326
327 fec93aac Eric Allman
        if (!GDP_GIN_ASSERT_ISLOCKED(gin))
328 9c515e64 Eric Allman
                return EP_STAT_ASSERT_ABORT;
329
330 eae1d3ec Eric Allman
        ep_dbg_cprintf(Dbg, 1, "_gdp_gin_unsubscribe(%s) cbfunc=%p cbarg=%p\n",
331 fec93aac Eric Allman
                        gin->gob->pname, cbfunc, cbarg);
332 9c515e64 Eric Allman
333 fec93aac Eric Allman
        estat = _gdp_req_new(GDP_CMD_UNSUBSCRIBE, gin->gob, _GdpChannel, NULL,
334 9c515e64 Eric Allman
                                                reqflags, &req);
335
        EP_STAT_CHECK(estat, goto fail0);
336 2e65953f Eric Allman
        req->gin = gin;
337 9c515e64 Eric Allman
338 d65c01e2 Eric Allman
        GDP_MSG_CHECK(req->cpdu, return EP_STAT_ASSERT_ABORT);
339
340 fec93aac Eric Allman
        for (sub = LIST_FIRST(&gin->gob->reqs); sub != NULL; sub = next_sub)
341 9c515e64 Eric Allman
        {
342
                _gdp_req_lock(sub);
343
                ep_dbg_cprintf(Dbg, 1, "... comparing to cbfunc=%p cbarg=%p\n",
344
                                sub->sub_cbfunc, sub->sub_cbarg);
345
346 fec93aac Eric Allman
                next_sub = LIST_NEXT(req, goblist);
347
                if (req->gin != gin ||
348
                                (cbfunc != NULL && cbfunc != sub->sub_cbfunc) ||
349 9c515e64 Eric Allman
                                (cbarg != NULL && cbarg != sub->sub_cbarg))
350
                {
351
                        // this is not the subscription you are looking for
352
                        ep_dbg_cprintf(Dbg, 1, "... no match\n");
353
                        _gdp_req_unlock(sub);
354
                        continue;
355
                }
356
357 d65c01e2 Eric Allman
                GDP_MSG_CHECK(sub->cpdu, continue);
358 9c515e64 Eric Allman
                ep_dbg_cprintf(Dbg, 1, "... deleting rid %" PRIgdp_rid "\n",
359 d65c01e2 Eric Allman
                                        sub->cpdu->msg->rid);
360
                req->cpdu->msg->rid = sub->cpdu->msg->rid;
361 9c515e64 Eric Allman
362
                estat = _gdp_invoke(req);
363
                EP_STAT_CHECK(estat, continue);
364
365 d65c01e2 Eric Allman
                GDP_MSG_CHECK(req->rpdu, return EP_STAT_ASSERT_ABORT);
366
367 9c515e64 Eric Allman
                _gdp_req_free(&sub);
368
        }
369
370
        _gdp_req_free(&req);
371
372
fail0:
373
        return estat;
374
}