gdp / gdplogd / logd_proto.c @ master
History | View | Annotate | Download (34.6 KB)
1 |
/* vim: set ai sw=4 sts=4 ts=4 : */
|
---|---|
2 |
|
3 |
/*
|
4 |
** ----- BEGIN LICENSE BLOCK -----
|
5 |
** GDPLOGD: Log Daemon for the Global Data Plane
|
6 |
** From the Ubiquitous Swarm Lab, 490 Cory Hall, U.C. Berkeley.
|
7 |
**
|
8 |
** Copyright (c) 2015-2019, Regents of the University of California.
|
9 |
** All rights reserved.
|
10 |
**
|
11 |
** Permission is hereby granted, without written agreement and without
|
12 |
** license or royalty fees, to use, copy, modify, and distribute this
|
13 |
** software and its documentation for any purpose, provided that the above
|
14 |
** copyright notice and the following two paragraphs appear in all copies
|
15 |
** of this software.
|
16 |
**
|
17 |
** IN NO EVENT SHALL REGENTS BE LIABLE TO ANY PARTY FOR DIRECT, INDIRECT,
|
18 |
** SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING LOST
|
19 |
** PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION,
|
20 |
** EVEN IF REGENTS HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
21 |
**
|
22 |
** REGENTS SPECIFICALLY DISCLAIMS ANY WARRANTIES, INCLUDING, BUT NOT
|
23 |
** LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
|
24 |
** FOR A PARTICULAR PURPOSE. THE SOFTWARE AND ACCOMPANYING DOCUMENTATION,
|
25 |
** IF ANY, PROVIDED HEREUNDER IS PROVIDED "AS IS". REGENTS HAS NO
|
26 |
** OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS,
|
27 |
** OR MODIFICATIONS.
|
28 |
** ----- END LICENSE BLOCK -----
|
29 |
*/
|
30 |
|
31 |
#include "logd.h" |
32 |
#include "logd_admin.h" |
33 |
#include "logd_pubsub.h" |
34 |
|
35 |
#include <gdp/gdp_chan.h> // for PUT64 |
36 |
#include <gdp/gdp_md.h> |
37 |
#include <gdp/gdp_priv.h> |
38 |
|
39 |
#include <ep/ep_assert.h> |
40 |
|
41 |
static EP_DBG Dbg = EP_DBG_INIT("gdplogd.proto", "GDP Log Daemon protocol"); |
42 |
static EP_DBG DbgSignatureOverride = EP_DBG_INIT("gdplogd.sig.override", |
43 |
"Request signature check override");
|
44 |
|
45 |
#define GET_PAYLOAD(req, where, bodycase) \
|
46 |
{ \ |
47 |
if (!EP_ASSERT(req != NULL) || \ |
48 |
!EP_ASSERT(req->cpdu != NULL) || \
|
49 |
!EP_ASSERT(req->cpdu->msg != NULL)) \
|
50 |
return EP_STAT_ASSERT_ABORT; \
|
51 |
GdpMessage *msg = req->cpdu->msg; \ |
52 |
if (msg->body_case != \
|
53 |
GDP_MESSAGE__BODY_ ## bodycase) \ |
54 |
{ \ |
55 |
ep_dbg_cprintf(Dbg, 1, \
|
56 |
"%s: wrong payload type %d (expected %d)\n", \
|
57 |
#where, \
|
58 |
msg->body_case, \ |
59 |
GDP_MESSAGE__BODY_ ## bodycase); \ |
60 |
return GDP_STAT_PROTOCOL_FAIL; \
|
61 |
} \ |
62 |
payload = msg->where; \ |
63 |
} |
64 |
|
65 |
|
66 |
/*
|
67 |
** Stub
|
68 |
*/
|
69 |
|
70 |
EP_STAT |
71 |
implement_me(const char *s) |
72 |
{ |
73 |
ep_app_error("Not implemented: %s", s);
|
74 |
return GDP_STAT_NOT_IMPLEMENTED;
|
75 |
} |
76 |
|
77 |
|
78 |
#if 0 //TODO
|
79 |
/*
|
80 |
** GET_STARTING_POINT_BY_xxx --- get the starting point for a read or subscribe
|
81 |
*/
|
82 |
|
83 |
static EP_STAT
|
84 |
get_starting_point_by_recno(gdp_req_t *req, gdp_recno_t recno)
|
85 |
{
|
86 |
EP_STAT estat = EP_STAT_OK;
|
87 |
|
88 |
// handle record numbers relative to the end
|
89 |
if (recno <= 0)
|
90 |
{
|
91 |
recno += req->gob->nrecs + 1;
|
92 |
if (recno <= 0)
|
93 |
{
|
94 |
// can't read before the beginning
|
95 |
recno = 1;
|
96 |
}
|
97 |
}
|
98 |
req->nextrec = recno;
|
99 |
return estat;
|
100 |
}
|
101 |
|
102 |
static EP_STAT
|
103 |
get_starting_point_by_ts(gdp_req_t *req, GdpTimestamp *ts)
|
104 |
{
|
105 |
return implement_me("get_starting_point_by_ts");
|
106 |
}
|
107 |
#endif //TODO
|
108 |
|
109 |
|
110 |
/***********************************************************************
|
111 |
** GDP command implementations
|
112 |
**
|
113 |
** Each of these takes a request as the argument.
|
114 |
**
|
115 |
** These routines should set req->rpdu->cmd to the "ACK" reply
|
116 |
** code, which will be used if the command succeeds (i.e.,
|
117 |
** returns EP_STAT_OK). Otherwise the return status is decoded
|
118 |
** to produce a NAK code. A specific NAK code can be sent
|
119 |
** using GDP_STAT_FROM_NAK(nak).
|
120 |
**
|
121 |
***********************************************************************/
|
122 |
|
123 |
|
124 |
// print trace info about a command
|
125 |
#define CMD_TRACE(cmd, msg, ...) \
|
126 |
if (ep_dbg_test(Dbg, 20)) \ |
127 |
{ \ |
128 |
flockfile(ep_dbg_getfile()); \ |
129 |
ep_dbg_printf("%s [%d]: ", _gdp_proto_cmd_name(cmd), cmd); \
|
130 |
ep_dbg_printf(msg, __VA_ARGS__); \ |
131 |
ep_dbg_printf("\n"); \
|
132 |
funlockfile(ep_dbg_getfile()); \ |
133 |
} |
134 |
|
135 |
|
136 |
/*
|
137 |
** CMD_PING --- just return an OK response to indicate that we are alive.
|
138 |
**
|
139 |
** If this is addressed to a GOB (instead of the daemon itself),
|
140 |
** it is really a test to see if the subscription is still alive.
|
141 |
*/
|
142 |
|
143 |
EP_STAT |
144 |
cmd_ping(gdp_req_t *req) |
145 |
{ |
146 |
gdp_gob_t *gob; |
147 |
EP_STAT estat = EP_STAT_OK; |
148 |
|
149 |
if (GDP_NAME_SAME(req->cpdu->dst, _GdpMyRoutingName))
|
150 |
goto done;
|
151 |
|
152 |
estat = _gdp_gob_cache_get(req->rpdu->dst, GGCF_NOCREATE, &gob); |
153 |
if (EP_STAT_ISOK(estat))
|
154 |
{ |
155 |
// We know about the GOB. How about the subscription?
|
156 |
gdp_req_t *sub; |
157 |
|
158 |
LIST_FOREACH(sub, &req->gob->reqs, goblist) |
159 |
{ |
160 |
if (GDP_NAME_SAME(sub->rpdu->dst, req->rpdu->dst) &&
|
161 |
EP_UT_BITSET(GDP_REQ_SRV_SUBSCR, sub->flags)) |
162 |
{ |
163 |
// Yes, we have a subscription!
|
164 |
goto done;
|
165 |
} |
166 |
} |
167 |
} |
168 |
|
169 |
estat = GDP_STAT_NAK_NOTFOUND; |
170 |
|
171 |
done:
|
172 |
if (EP_STAT_ISOK(estat))
|
173 |
return _gdp_req_ack_resp(req, GDP_ACK_SUCCESS);
|
174 |
return _gdp_req_nak_resp(req, GDP_NAK_S_LOST_SUBSCR,
|
175 |
"cmd_ping: lost subscription",
|
176 |
estat); |
177 |
} |
178 |
|
179 |
|
180 |
/*
|
181 |
** CMD_CREATE --- create new GOB.
|
182 |
**
|
183 |
** A bit unusual in that the PDU is addressed to the daemon,
|
184 |
** not the log; the log name is in the payload. However, we
|
185 |
** respond using the name of the new log rather than the
|
186 |
** daemon.
|
187 |
*/
|
188 |
|
189 |
|
190 |
EP_STAT |
191 |
cmd_create(gdp_req_t *req) |
192 |
{ |
193 |
EP_STAT estat; |
194 |
gdp_gob_t *gob = NULL;
|
195 |
gdp_md_t *gmd; |
196 |
gdp_name_t gobname; |
197 |
|
198 |
if (!GDP_NAME_SAME(req->cpdu->dst, _GdpMyRoutingName))
|
199 |
{ |
200 |
// this is directed to a GOB, not to the daemon
|
201 |
return _gdp_req_nak_resp(req, GDP_NAK_C_BADREQ,
|
202 |
"cmd_create: log name required",
|
203 |
GDP_STAT_NAK_CONFLICT); |
204 |
} |
205 |
|
206 |
GdpMessage__CmdCreate *payload; |
207 |
GET_PAYLOAD(req, cmd_create, CMD_CREATE); |
208 |
|
209 |
//XXX for now, insist that the client specify the internal name;
|
210 |
//XXX ultimately this should be ILLEGAL: name is hash of metadata
|
211 |
if (payload->logname.len != sizeof gobname || |
212 |
!gdp_name_is_valid(payload->logname.data)) |
213 |
{ |
214 |
// bad log name
|
215 |
if (ep_dbg_test(Dbg, 2)) |
216 |
{ |
217 |
ep_dbg_printf("cmd_create: improper log name, len %zd (expected %zd)\n",
|
218 |
payload->logname.len, sizeof gobname);
|
219 |
ep_dbg_printf("\tpname %s\n", payload->logname.data);
|
220 |
} |
221 |
estat = _gdp_req_nak_resp(req, GDP_NAK_C_BADREQ, |
222 |
"cmd_create: improper log name",
|
223 |
GDP_STAT_GDP_NAME_INVALID); |
224 |
goto fail0;
|
225 |
} |
226 |
memcpy(gobname, payload->logname.data, sizeof gobname);
|
227 |
|
228 |
// make sure we aren't creating a log with our name
|
229 |
if (GDP_NAME_SAME(gobname, _GdpMyRoutingName))
|
230 |
{ |
231 |
estat = _gdp_req_nak_resp(req, GDP_NAK_C_FORBIDDEN, |
232 |
"cmd_create: cannot create a log with same name as logd",
|
233 |
GDP_STAT_GDP_NAME_INVALID); |
234 |
goto fail0;
|
235 |
} |
236 |
|
237 |
{ |
238 |
gdp_pname_t pbuf; |
239 |
ep_dbg_cprintf(Dbg, 14, "cmd_create: creating GOB %s\n", |
240 |
gdp_printable_name(gobname, pbuf)); |
241 |
} |
242 |
|
243 |
// get the memory space for the GOB itself
|
244 |
estat = gob_alloc(gobname, GDP_MODE_AO, &gob); |
245 |
EP_STAT_CHECK(estat, goto fail0);
|
246 |
|
247 |
// collect metadata
|
248 |
if (payload->metadata->data.len == 0) |
249 |
{ |
250 |
estat = _gdp_req_nak_resp(req, GDP_NAK_C_BADOPT, |
251 |
"cmd_create: metadata required",
|
252 |
GDP_STAT_METADATA_REQUIRED); |
253 |
goto fail0;
|
254 |
} |
255 |
gmd = _gdp_md_deserialize(payload->metadata->data.data, |
256 |
payload->metadata->data.len); |
257 |
|
258 |
// have to get lock ordering right here.
|
259 |
// safe because no one else can have a handle on this req.
|
260 |
req->gob = gob; // for debugging
|
261 |
_gdp_req_unlock(req); |
262 |
_gdp_gob_lock(gob); |
263 |
_gdp_req_lock(req); |
264 |
|
265 |
// do the physical create
|
266 |
gob->gob_md = gmd; |
267 |
estat = gob->x->physimpl->create(gob, gob->gob_md); |
268 |
if (!EP_STAT_ISOK(estat))
|
269 |
{ |
270 |
estat = _gdp_req_nak_resp(req, 0,
|
271 |
"cmd_create: physical create failure",
|
272 |
estat); |
273 |
goto fail1;
|
274 |
} |
275 |
|
276 |
// cache the open GOB Handle for possible future use
|
277 |
EP_ASSERT(gdp_name_is_valid(gob->name)); |
278 |
gob->flags |= GOBF_DEFER_FREE; |
279 |
gob->flags &= ~GOBF_PENDING; |
280 |
_gdp_gob_cache_add(gob); |
281 |
|
282 |
// advertise this new GOB
|
283 |
logd_advertise_one(req->chan, gob->name, GDP_CMD_ADVERTISE); |
284 |
|
285 |
// pass any creation info back to the caller
|
286 |
// (none at this point)
|
287 |
|
288 |
fail1:
|
289 |
fail0:
|
290 |
if (EP_STAT_ISOK(estat))
|
291 |
{ |
292 |
_gdp_req_ack_resp(req, GDP_ACK_CREATED); |
293 |
} |
294 |
char ebuf[60]; |
295 |
if (gob != NULL) |
296 |
{ |
297 |
admin_post_stats(ADMIN_LOG_EXIST, "log-create",
|
298 |
"log-name", gob->pname,
|
299 |
"status", ep_stat_tostr(estat, ebuf, sizeof ebuf), |
300 |
NULL, NULL); |
301 |
} |
302 |
|
303 |
ep_dbg_cprintf(Dbg, 9, "<<< cmd_create(%s): %s\n", |
304 |
gob != NULL ? gob->pname : "NULL", |
305 |
ep_stat_tostr(estat, ebuf, sizeof ebuf));
|
306 |
|
307 |
return estat;
|
308 |
} |
309 |
|
310 |
|
311 |
/*
|
312 |
** CMD_OPEN --- open for read-only, append-only, or read-append
|
313 |
**
|
314 |
** From the point of view of gdplogd these are the same command.
|
315 |
*/
|
316 |
|
317 |
EP_STAT |
318 |
cmd_open(gdp_req_t *req) |
319 |
{ |
320 |
EP_STAT estat = EP_STAT_OK; |
321 |
gdp_gob_t *gob = NULL;
|
322 |
|
323 |
// see if we already know about this GOB
|
324 |
estat = get_open_handle(req); |
325 |
if (!EP_STAT_ISOK(estat))
|
326 |
{ |
327 |
estat = _gdp_req_nak_resp(req, 0,
|
328 |
"cmd_open: could not open GOB", estat);
|
329 |
return estat;
|
330 |
} |
331 |
|
332 |
// set up the response
|
333 |
_gdp_req_ack_resp(req, GDP_ACK_SUCCESS); |
334 |
GdpMessage__AckSuccess *resp = req->rpdu->msg->ack_success; |
335 |
|
336 |
gob = req->gob; |
337 |
gob->flags |= GOBF_DEFER_FREE; |
338 |
if (gob->gob_md != NULL) |
339 |
{ |
340 |
// send metadata as response payload
|
341 |
uint8_t *obuf; |
342 |
size_t olen; |
343 |
olen = _gdp_md_serialize(gob->gob_md, &obuf); |
344 |
resp->metadata.data = obuf; |
345 |
resp->metadata.len = olen; |
346 |
resp->has_metadata = true;
|
347 |
} |
348 |
resp->recno = gob->nrecs; |
349 |
resp->has_recno = true;
|
350 |
|
351 |
// post statistics for monitoring
|
352 |
{ |
353 |
char ebuf[60]; |
354 |
|
355 |
admin_post_stats(ADMIN_LOG_OPEN, "log-open",
|
356 |
"log-name", gob->pname,
|
357 |
"status", ep_stat_tostr(estat, ebuf, sizeof ebuf), |
358 |
NULL, NULL); |
359 |
} |
360 |
|
361 |
if (ep_dbg_test(Dbg, 10)) |
362 |
{ |
363 |
char ebuf[100]; |
364 |
ep_dbg_printf("<<< cmd_open(%s): gob %p nrecs %" PRIgdp_recno ": %s\n", |
365 |
gob->pname, gob, gob->nrecs, |
366 |
ep_stat_tostr(estat, ebuf, sizeof ebuf));
|
367 |
} |
368 |
|
369 |
return estat;
|
370 |
} |
371 |
|
372 |
|
373 |
/*
|
374 |
** CMD_CLOSE --- close an open GOB
|
375 |
**
|
376 |
** This really doesn't do much except terminate subscriptions. We
|
377 |
** let the usual cache reclaim algorithm do the actual close.
|
378 |
*/
|
379 |
|
380 |
EP_STAT |
381 |
cmd_close(gdp_req_t *req) |
382 |
{ |
383 |
EP_STAT estat = EP_STAT_OK; |
384 |
|
385 |
// a bit wierd to open the GOB only to close it again....
|
386 |
estat = get_open_handle(req); |
387 |
if (!EP_STAT_ISOK(estat))
|
388 |
{ |
389 |
return _gdp_req_nak_resp(req, GDP_NAK_C_BADREQ,
|
390 |
"cmd_close: GOB not open", estat);
|
391 |
} |
392 |
|
393 |
// set up the response
|
394 |
_gdp_req_ack_resp(req, GDP_ACK_SUCCESS); |
395 |
GdpMessage__AckSuccess *resp = req->rpdu->msg->ack_success; |
396 |
|
397 |
//return number of records
|
398 |
resp->recno = req->gob->nrecs; |
399 |
|
400 |
// remove any subscriptions
|
401 |
sub_end_all_subscriptions(req->gob, req->cpdu->src, GDP_PDU_NO_RID); |
402 |
|
403 |
if (ep_dbg_test(Dbg, 10)) |
404 |
{ |
405 |
char ebuf[100]; |
406 |
ep_dbg_printf("<<< cmd_close(%s): %s\n", req->gob->pname,
|
407 |
ep_stat_tostr(estat, ebuf, sizeof ebuf));
|
408 |
} |
409 |
|
410 |
return estat;
|
411 |
} |
412 |
|
413 |
|
414 |
/*
|
415 |
** CMD_DELETE --- delete a GOB
|
416 |
**
|
417 |
** This also does all the close actions.
|
418 |
*/
|
419 |
|
420 |
static EP_STAT
|
421 |
verify_req_signature(gdp_req_t *req) |
422 |
{ |
423 |
//TODO: IMPLEMENT_ME XXX
|
424 |
//return GDP_STAT_NAK_UNAUTH; //DEBUG
|
425 |
if (ep_dbg_test(DbgSignatureOverride, 101)) |
426 |
return EP_STAT_OK;
|
427 |
else
|
428 |
return GDP_STAT_NOT_IMPLEMENTED;
|
429 |
} |
430 |
|
431 |
EP_STAT |
432 |
cmd_delete(gdp_req_t *req) |
433 |
{ |
434 |
EP_STAT estat = EP_STAT_OK; |
435 |
|
436 |
// a bit wierd to open the GOB only to close it again....
|
437 |
estat = get_open_handle(req); |
438 |
if (!EP_STAT_ISOK(estat))
|
439 |
{ |
440 |
return _gdp_req_nak_resp(req, GDP_NAK_C_BADREQ,
|
441 |
"cmd_delete: GOB not open", estat);
|
442 |
} |
443 |
|
444 |
estat = verify_req_signature(req); |
445 |
if (!EP_STAT_ISOK(estat))
|
446 |
{ |
447 |
return _gdp_req_nak_resp(req, GDP_NAK_C_UNAUTH,
|
448 |
"cmd_delete: signature failure", estat);
|
449 |
} |
450 |
|
451 |
// set up the response
|
452 |
_gdp_req_ack_resp(req, GDP_ACK_DELETED); |
453 |
GdpMessage__AckSuccess *resp = req->rpdu->msg->ack_success; |
454 |
|
455 |
// return number of records
|
456 |
resp->recno = req->gob->nrecs; |
457 |
|
458 |
// remove log advertisement
|
459 |
logd_advertise_one(req->chan, req->gob->name, GDP_CMD_WITHDRAW); |
460 |
|
461 |
// remove any subscriptions
|
462 |
sub_end_all_subscriptions(req->gob, req->cpdu->src, GDP_PDU_NO_RID); |
463 |
|
464 |
// we will force a close and delete now
|
465 |
req->gob->freefunc = NULL;
|
466 |
gob_delete(req->gob); |
467 |
|
468 |
if (ep_dbg_test(Dbg, 10)) |
469 |
{ |
470 |
char ebuf[100]; |
471 |
ep_dbg_printf("<<< cmd_delete: %s\n",
|
472 |
ep_stat_tostr(estat, ebuf, sizeof ebuf));
|
473 |
} |
474 |
|
475 |
return estat;
|
476 |
} |
477 |
|
478 |
|
479 |
static void |
480 |
make_read_acknak_pdu(gdp_req_t *req, EP_STAT estat) |
481 |
{ |
482 |
if (EP_STAT_ISOK(estat))
|
483 |
{ |
484 |
// OK, the next record exists: send it
|
485 |
_gdp_req_ack_resp(req, GDP_ACK_CONTENT); |
486 |
GdpMessage__AckContent *resp = req->rpdu->msg->ack_content; |
487 |
resp->dl->n_d = 1; //FIXME: should handle multiples |
488 |
EP_ASSERT(resp->dl->d == NULL);
|
489 |
GdpDatum *pbd; |
490 |
resp->dl->d = ep_mem_malloc(resp->dl->n_d * sizeof pbd);
|
491 |
resp->dl->d[0] = pbd = ep_mem_malloc(sizeof *pbd); |
492 |
gdp_datum__init(pbd); |
493 |
} |
494 |
else if (EP_STAT_IS_SAME(estat, GDP_STAT_NAK_NOTFOUND)) |
495 |
{ |
496 |
// no results found matching query
|
497 |
_gdp_req_ack_resp(req, GDP_NAK_C_NOTFOUND); |
498 |
GdpMessage__NakGeneric *resp = req->rpdu->msg->nak; |
499 |
resp->ep_stat = EP_STAT_TO_INT(estat); |
500 |
resp->has_ep_stat = true;
|
501 |
resp->recno = req->nextrec; |
502 |
resp->has_recno = true;
|
503 |
} |
504 |
else if (EP_STAT_IS_SAME(estat, GDP_STAT_NAK_LOST_SUBSCR) || |
505 |
EP_STAT_IS_SAME(estat, GDP_STAT_ACK_END_OF_RESULTS)) |
506 |
{ |
507 |
// found some results, but no more to come
|
508 |
_gdp_req_ack_resp(req, GDP_ACK_END_OF_RESULTS); |
509 |
GdpMessage__AckEndOfResults *resp = req->rpdu->msg->ack_end_of_results; |
510 |
resp->ep_stat = EP_STAT_TO_INT(estat); |
511 |
resp->has_ep_stat = true;
|
512 |
resp->nresults = req->s_results; |
513 |
resp->has_nresults = true;
|
514 |
} |
515 |
else
|
516 |
{ |
517 |
// some other failure
|
518 |
_gdp_req_ack_resp(req, GDP_NAK_S_INTERNAL); |
519 |
GdpMessage__NakGeneric *resp = req->rpdu->msg->nak; |
520 |
resp->ep_stat = EP_STAT_TO_INT(estat); |
521 |
resp->has_ep_stat = true;
|
522 |
resp->recno = req->nextrec; |
523 |
resp->has_recno = true;
|
524 |
} |
525 |
if (ep_dbg_test(Dbg, 37)) |
526 |
{ |
527 |
char ebuf[100]; |
528 |
|
529 |
ep_dbg_printf("make_read_acknak_pdu(%s): %d\n",
|
530 |
ep_stat_tostr(estat, ebuf, sizeof ebuf),
|
531 |
req->rpdu->msg->body_case); |
532 |
} |
533 |
} |
534 |
|
535 |
static EP_STAT
|
536 |
send_read_result(EP_STAT estat, gdp_datum_t *datum, gdp_result_ctx_t *cb_ctx) |
537 |
{ |
538 |
gdp_req_t *req = (gdp_req_t *) cb_ctx; |
539 |
|
540 |
make_read_acknak_pdu(req, estat); |
541 |
if (EP_STAT_ISOK(estat))
|
542 |
_gdp_datum_to_pb(datum, req->rpdu->msg, |
543 |
req->rpdu->msg->ack_content->dl->d[0]);
|
544 |
req->stat = estat = _gdp_pdu_out(req->rpdu, req->chan); |
545 |
if (EP_STAT_ISOK(estat))
|
546 |
req->s_results++; |
547 |
return estat;
|
548 |
} |
549 |
|
550 |
EP_STAT |
551 |
cmd_read_by_recno(gdp_req_t *req) |
552 |
{ |
553 |
EP_STAT estat; |
554 |
|
555 |
estat = get_open_handle(req); |
556 |
if (!EP_STAT_ISOK(estat))
|
557 |
{ |
558 |
return _gdp_req_nak_resp(req, 0, |
559 |
"cmd_read_by_recno: GOB open failure", estat);
|
560 |
} |
561 |
|
562 |
GdpMessage__CmdReadByRecno *payload; |
563 |
GET_PAYLOAD(req, cmd_read_by_recno, CMD_READ_BY_RECNO); |
564 |
req->numrecs = payload->nrecs; |
565 |
if (req->numrecs < 0) |
566 |
req->numrecs = UINT32_MAX; |
567 |
|
568 |
req->nextrec = payload->recno; |
569 |
if (req->nextrec < 0) |
570 |
{ |
571 |
req->nextrec += req->gob->nrecs + 1;
|
572 |
if (req->nextrec <= 0) |
573 |
req->nextrec = 1;
|
574 |
} |
575 |
req->s_results = 0;
|
576 |
|
577 |
estat = req->gob->x->physimpl->read_by_recno(req->gob, |
578 |
req->nextrec, req->numrecs, |
579 |
send_read_result, req); |
580 |
// if successful, data will have already been returned
|
581 |
if (EP_STAT_ISOK(estat))
|
582 |
return GDP_STAT_RESPONSE_SENT;
|
583 |
make_read_acknak_pdu(req, estat); |
584 |
return estat;
|
585 |
} |
586 |
|
587 |
EP_STAT |
588 |
cmd_read_by_ts(gdp_req_t *req) |
589 |
{ |
590 |
return implement_me("cmd_read_by_ts"); |
591 |
EP_STAT estat; |
592 |
|
593 |
estat = get_open_handle(req); |
594 |
if (!EP_STAT_ISOK(estat))
|
595 |
{ |
596 |
return _gdp_req_nak_resp(req, 0, |
597 |
"cmd_read_by_ts: GOB open failure", estat);
|
598 |
} |
599 |
|
600 |
GdpMessage__CmdReadByTs *payload; |
601 |
GET_PAYLOAD(req, cmd_read_by_ts, CMD_READ_BY_TS); |
602 |
req->numrecs = payload->nrecs; |
603 |
if (req->numrecs < 0) |
604 |
req->numrecs = UINT32_MAX; |
605 |
req->s_results = 0;
|
606 |
|
607 |
EP_TIME_SPEC ts; |
608 |
_gdp_timestamp_from_pb(&ts, payload->timestamp); |
609 |
estat = req->gob->x->physimpl->read_by_timestamp(req->gob, |
610 |
&ts, req->numrecs, |
611 |
send_read_result, req); |
612 |
if (EP_STAT_ISOK(estat))
|
613 |
return GDP_STAT_RESPONSE_SENT;
|
614 |
make_read_acknak_pdu(req, estat); |
615 |
return estat;
|
616 |
} |
617 |
|
618 |
|
619 |
/*
|
620 |
** CMD_APPEND --- append a datum to a GOB
|
621 |
**
|
622 |
** This will have side effects if there are subscriptions pending.
|
623 |
*/
|
624 |
|
625 |
EP_STAT |
626 |
cmd_append(gdp_req_t *req) |
627 |
{ |
628 |
EP_STAT estat; |
629 |
const char *where = NULL; |
630 |
|
631 |
estat = get_open_handle(req); |
632 |
if (!EP_STAT_ISOK(estat))
|
633 |
{ |
634 |
return _gdp_req_nak_resp(req, GDP_NAK_C_BADREQ,
|
635 |
"cmd_append: GOB not open", estat);
|
636 |
} |
637 |
|
638 |
// get the payload, which may contain multiple records
|
639 |
GdpMessage__CmdAppend *payload; |
640 |
GET_PAYLOAD(req, cmd_append, CMD_APPEND); |
641 |
if (payload->dl->n_d <= 0) |
642 |
{ |
643 |
// no data included in APPEND command
|
644 |
return _gdp_req_nak_resp(req, GDP_NAK_C_BADREQ,
|
645 |
"cmd_append: no data", GDP_STAT_NAK_BADREQ);
|
646 |
} |
647 |
|
648 |
// verify that the records in this payload link to each other
|
649 |
int rx;
|
650 |
GdpDatum *pbd; |
651 |
for (rx = 0; rx < payload->dl->n_d; rx++) |
652 |
{ |
653 |
pbd = payload->dl->d[rx++]; |
654 |
|
655 |
// FIXME: check hash of previous record matches prevhash in this record
|
656 |
|
657 |
CMD_TRACE(req->cpdu->msg->cmd, "%s %" PRIgdp_recno,
|
658 |
req->gob->pname, pbd->recno); |
659 |
|
660 |
// validate record number
|
661 |
if (pbd->recno <= 0) |
662 |
{ |
663 |
estat = _gdp_req_nak_resp(req, GDP_NAK_C_BADOPT, |
664 |
"cmd_append: recno must be > 0",
|
665 |
GDP_STAT_NAK_BADOPT); |
666 |
goto fail0;
|
667 |
} |
668 |
|
669 |
if (pbd->recno != (gdp_recno_t) (req->gob->nrecs + 1)) |
670 |
{ |
671 |
bool random_order_ok = EP_UT_BITSET(FORGIVE_LOG_GAPS, GdplogdForgive) &&
|
672 |
EP_UT_BITSET(FORGIVE_LOG_DUPS, GdplogdForgive); |
673 |
|
674 |
// replay or missing a record
|
675 |
ep_dbg_cprintf(Dbg, random_order_ok ? 29 : 9, |
676 |
"cmd_append: record out of sequence: got %"
|
677 |
PRIgdp_recno ", expected %" PRIgdp_recno "\n" |
678 |
"\ton log %s\n",
|
679 |
pbd->recno, req->gob->nrecs + 1,
|
680 |
req->gob->pname); |
681 |
|
682 |
if (pbd->recno <= (gdp_recno_t) req->gob->nrecs)
|
683 |
{ |
684 |
// may be a duplicate append, or just filling in a gap
|
685 |
// (should probably see if duplicates are the same data)
|
686 |
|
687 |
if (!EP_UT_BITSET(FORGIVE_LOG_DUPS, GdplogdForgive) &&
|
688 |
req->gob->x->physimpl->recno_exists( |
689 |
req->gob, pbd->recno)) |
690 |
{ |
691 |
char mbuf[100]; |
692 |
snprintf(mbuf, sizeof mbuf,
|
693 |
"cmd_append: duplicate record number %" PRIgdp_recno,
|
694 |
pbd->recno); |
695 |
estat = _gdp_req_nak_resp(req, GDP_NAK_C_CONFLICT, |
696 |
mbuf, GDP_STAT_RECORD_DUPLICATED); |
697 |
goto fail0;
|
698 |
} |
699 |
} |
700 |
else if (pbd->recno > (gdp_recno_t) (req->gob->nrecs + 1) && |
701 |
!EP_UT_BITSET(FORGIVE_LOG_GAPS, GdplogdForgive)) |
702 |
{ |
703 |
// gap in record numbers
|
704 |
char mbuf[100]; |
705 |
snprintf(mbuf, sizeof mbuf,
|
706 |
"cmd_append: record number %" PRIgdp_recno " missing", |
707 |
pbd->recno); |
708 |
estat = _gdp_req_nak_resp(req, GDP_NAK_C_FORBIDDEN, |
709 |
mbuf, GDP_STAT_RECNO_SEQ_ERROR); |
710 |
goto fail0;
|
711 |
} |
712 |
} |
713 |
} |
714 |
|
715 |
// only path to datum is via this req, so we don't have to lock it
|
716 |
gdp_datum_t *datum = gdp_datum_new(); |
717 |
_gdp_datum_from_pb(datum, pbd, pbd->sig); |
718 |
|
719 |
estat = _gdp_datum_vrfy_gob(datum, req->gob); |
720 |
where = NULL;
|
721 |
if (EP_STAT_ISOK(estat))
|
722 |
{ |
723 |
// signature exists and is OK
|
724 |
} |
725 |
else if (EP_STAT_IS_SAME(estat, GDP_STAT_CRYPTO_NO_PUB_KEY)) |
726 |
{ |
727 |
// log has no public key associated
|
728 |
where = "no public key";
|
729 |
if (EP_UT_BITSET(GDP_SIG_PUBKEYREQ, GdpSignatureStrictness))
|
730 |
goto fail1;
|
731 |
} |
732 |
else if (EP_STAT_IS_SAME(estat, GDP_STAT_CRYPTO_NO_SIG)) |
733 |
{ |
734 |
// datum has no signature
|
735 |
where = "missing signature";
|
736 |
if (EP_UT_BITSET(GDP_SIG_REQUIRED, GdpSignatureStrictness))
|
737 |
goto fail1;
|
738 |
} |
739 |
else if (EP_STAT_IS_SAME(estat, GDP_STAT_CRYPTO_VRFY_FAIL)) |
740 |
{ |
741 |
// signature exists but does not match
|
742 |
where = "signature failed";
|
743 |
if (EP_UT_BITSET(GDP_SIG_MUSTVERIFY, GdpSignatureStrictness))
|
744 |
goto fail1;
|
745 |
} |
746 |
else
|
747 |
{ |
748 |
// other unknown error
|
749 |
where = "unknown verification error";
|
750 |
goto fail1;
|
751 |
} |
752 |
|
753 |
// if we got here with a failure, print it and ignore it
|
754 |
if (!EP_STAT_ISOK(estat))
|
755 |
{ |
756 |
ep_dbg_cprintf(Dbg, 51, "cmd_append: %s (warn)\n", where); |
757 |
estat = EP_STAT_OK; |
758 |
} |
759 |
|
760 |
// append records to long term storage
|
761 |
if (req->gob->x->physimpl->xact_begin != NULL) |
762 |
req->gob->x->physimpl->xact_begin(req->gob); |
763 |
for (rx = 0; rx < payload->dl->n_d; rx++) |
764 |
{ |
765 |
pbd = payload->dl->d[rx++]; |
766 |
if (payload->dl->n_d != 1) |
767 |
_gdp_datum_from_pb(datum, pbd, pbd->sig); |
768 |
// else it is already set from above
|
769 |
|
770 |
// do the physical append to disk
|
771 |
estat = req->gob->x->physimpl->append(req->gob, datum); |
772 |
if (!EP_STAT_ISOK(estat))
|
773 |
break;
|
774 |
} |
775 |
if (EP_STAT_ISOK(estat))
|
776 |
{ |
777 |
if (req->gob->x->physimpl->xact_end != NULL) |
778 |
req->gob->x->physimpl->xact_end(req->gob); |
779 |
} |
780 |
else
|
781 |
{ |
782 |
if (req->gob->x->physimpl->xact_abort != NULL) |
783 |
req->gob->x->physimpl->xact_abort(req->gob); |
784 |
} |
785 |
|
786 |
// if physical appends succeeded, notify subscribers
|
787 |
if (EP_STAT_ISOK(estat))
|
788 |
{ |
789 |
for (rx = 0; rx < payload->dl->n_d; rx++) |
790 |
{ |
791 |
pbd = payload->dl->d[rx++]; |
792 |
if (payload->dl->n_d != 1) |
793 |
_gdp_datum_from_pb(datum, pbd, pbd->sig); |
794 |
// else it is already set from above
|
795 |
|
796 |
// send the new datum to any and all subscribers
|
797 |
GdpDatum *pbd = ep_mem_malloc(sizeof *pbd);
|
798 |
gdp_datum__init(pbd); |
799 |
gdp_msg_t *msg = _gdp_msg_new(GDP_ACK_CONTENT, |
800 |
req->cpdu->msg->rid, |
801 |
req->cpdu->msg->l5seqno); |
802 |
|
803 |
_gdp_datum_to_pb(datum, msg, pbd); |
804 |
|
805 |
//FIXME: does dl ever get used or freed?
|
806 |
GdpDatumList *dl = msg->ack_content->dl; |
807 |
dl->d = ep_mem_malloc(payload->dl->n_d * sizeof pbd);
|
808 |
dl->n_d = 1;
|
809 |
dl->d[0] = pbd;
|
810 |
|
811 |
EP_ASSERT(req->rpdu == NULL);
|
812 |
req->rpdu = _gdp_pdu_new(msg, req->cpdu->dst, req->cpdu->src, |
813 |
GDP_SEQNO_NONE); |
814 |
sub_notify_all_subscribers(req); |
815 |
_gdp_pdu_free(&req->rpdu); |
816 |
} |
817 |
} |
818 |
gdp_datum_free(datum); |
819 |
|
820 |
if (EP_STAT_ISOK(estat))
|
821 |
{ |
822 |
// update the server's view of the number of records
|
823 |
req->gob->nrecs++; |
824 |
|
825 |
_gdp_req_ack_resp(req, GDP_ACK_SUCCESS); |
826 |
GdpMessage__AckSuccess *resp = req->rpdu->msg->ack_success; |
827 |
resp->recno = req->gob->nrecs; |
828 |
resp->ts = pbd->ts; |
829 |
pbd->ts = NULL; // avoid double free |
830 |
} |
831 |
else
|
832 |
{ |
833 |
estat = _gdp_req_nak_resp(req, 0,
|
834 |
"cmd_append: append failure",
|
835 |
estat); |
836 |
} |
837 |
|
838 |
if (false) |
839 |
{ |
840 |
char mbuf[150]; |
841 |
char ebuf[100]; |
842 |
fail1:
|
843 |
if (where == NULL) |
844 |
{ |
845 |
snprintf(mbuf, sizeof mbuf, "unknown error %s", |
846 |
ep_stat_tostr(estat, ebuf, sizeof ebuf));
|
847 |
where = mbuf; |
848 |
} |
849 |
estat = _gdp_req_nak_resp(req, GDP_NAK_C_FORBIDDEN, where, estat); |
850 |
req->rpdu->msg->nak->recno = req->gob->nrecs; |
851 |
} |
852 |
|
853 |
fail0:
|
854 |
if (ep_dbg_test(Dbg, 12)) |
855 |
{ |
856 |
char ebuf[100]; |
857 |
ep_dbg_printf("<<< cmd_append(%s): %s\n", req->gob->pname,
|
858 |
ep_stat_tostr(estat, ebuf, sizeof ebuf));
|
859 |
} |
860 |
|
861 |
EP_ASSERT(req->rpdu != NULL);
|
862 |
return estat;
|
863 |
} |
864 |
|
865 |
|
866 |
/*
|
867 |
** POST_SUBSCRIBE --- do subscription work after initial ACK
|
868 |
**
|
869 |
** Assuming the subscribe worked we are now going to deliver any
|
870 |
** previously existing records. Once those are all sent we can
|
871 |
** convert this to an ordinary subscription. If the subscribe
|
872 |
** request is satisified, we remove it.
|
873 |
**
|
874 |
** This code is also the core of multiread.
|
875 |
*/
|
876 |
|
877 |
void
|
878 |
post_subscribe(gdp_req_t *req) |
879 |
{ |
880 |
EP_STAT estat; |
881 |
|
882 |
EP_ASSERT_ELSE(req != NULL, return); |
883 |
EP_ASSERT_ELSE(req->state != GDP_REQ_FREE, return);
|
884 |
EP_ASSERT_ELSE(req->gob != NULL, return); |
885 |
ep_dbg_cprintf(Dbg, 38,
|
886 |
"post_subscribe: numrecs %d, nextrec = %"PRIgdp_recno
|
887 |
" gob->nrecs %"PRIgdp_recno "\n", |
888 |
req->numrecs, req->nextrec, req->gob->nrecs); |
889 |
|
890 |
if (req->numrecs <= 0) |
891 |
req->numrecs = INT32_MAX; |
892 |
|
893 |
// if data pre-exists in the GOB, return it now
|
894 |
if (req->nextrec <= req->gob->nrecs)
|
895 |
{ |
896 |
// get the existing records and return them via callback
|
897 |
estat = req->gob->x->physimpl->read_by_recno( |
898 |
req->gob, |
899 |
req->nextrec, req->numrecs, |
900 |
send_read_result, req); |
901 |
if (EP_STAT_ISOK(estat))
|
902 |
{ |
903 |
gdp_recno_t nrecs = EP_STAT_TO_INT(estat); |
904 |
req->nextrec += nrecs; |
905 |
req->numrecs -= nrecs; |
906 |
} |
907 |
} |
908 |
|
909 |
// done with response PDU
|
910 |
if (req->rpdu != NULL) |
911 |
_gdp_pdu_free(&req->rpdu); |
912 |
|
913 |
if (req->numrecs < 0 || !EP_UT_BITSET(GDP_REQ_SUBUPGRADE, req->flags)) |
914 |
{ |
915 |
// no more to read: do cleanup & send termination notice
|
916 |
sub_end_subscription(req); |
917 |
} |
918 |
else
|
919 |
{ |
920 |
if (ep_dbg_test(Dbg, 24)) |
921 |
{ |
922 |
ep_dbg_printf("post_subscribe: converting to subscription\n ");
|
923 |
_gdp_req_dump(req, NULL, GDP_PR_BASIC, 0); |
924 |
} |
925 |
req->flags |= GDP_REQ_SRV_SUBSCR; |
926 |
|
927 |
// link this request into the GOB so the subscription can be found
|
928 |
if (!EP_UT_BITSET(GDP_REQ_ON_GOB_LIST, req->flags))
|
929 |
{ |
930 |
IF_LIST_CHECK_OK(&req->gob->reqs, req, goblist, gdp_req_t) |
931 |
{ |
932 |
// req->gob->refcnt already allows for this reference
|
933 |
LIST_INSERT_HEAD(&req->gob->reqs, req, goblist); |
934 |
req->flags |= GDP_REQ_ON_GOB_LIST; |
935 |
} |
936 |
} |
937 |
} |
938 |
} |
939 |
|
940 |
|
941 |
/*
|
942 |
** CMD_SUBSCRIBE --- subscribe command
|
943 |
**
|
944 |
** Arranges to return existing data (if any) after the response
|
945 |
** is sent, and non-existing data (if any) as a side-effect of
|
946 |
** append.
|
947 |
**
|
948 |
** XXX Race Condition: if records are written between the time
|
949 |
** the subscription and the completion of the first half of
|
950 |
** this process, some records may be lost. For example,
|
951 |
** if the GOB has 20 records (1-20) and you ask for 20
|
952 |
** records starting at record 11, you probably want records
|
953 |
** 11-30. But if during the return of records 11-20 another
|
954 |
** record (21) is written, then the second half of the
|
955 |
** subscription will actually return records 22-31.
|
956 |
**
|
957 |
** XXX Does not implement timeouts.
|
958 |
**
|
959 |
** XXX This assumes different commands based on the initial
|
960 |
** starting "query" (recno, timestamp, hash). Another
|
961 |
** possible approach would be a single command that would
|
962 |
** send at most one of those values. Error conditions
|
963 |
** galore, but fewer commands to handle.
|
964 |
*/
|
965 |
|
966 |
EP_STAT |
967 |
cmd_subscribe_by_recno(gdp_req_t *req) |
968 |
{ |
969 |
EP_STAT estat; |
970 |
EP_TIME_SPEC timeout; |
971 |
gdp_gob_t *gob; |
972 |
|
973 |
if (req->gob != NULL) |
974 |
GDP_GOB_ASSERT_ISLOCKED(req->gob); |
975 |
|
976 |
GdpMessage__CmdSubscribeByRecno *payload; |
977 |
GET_PAYLOAD(req, cmd_subscribe_by_recno, CMD_SUBSCRIBE_BY_RECNO); |
978 |
|
979 |
// find the GOB handle
|
980 |
estat = get_open_handle(req); |
981 |
if (!EP_STAT_ISOK(estat))
|
982 |
{ |
983 |
return _gdp_req_nak_resp(req, GDP_NAK_C_BADREQ,
|
984 |
"cmd_subscribe: GOB not open", estat);
|
985 |
} |
986 |
|
987 |
gob = req->gob; |
988 |
if (!EP_ASSERT(GDP_GOB_ISGOOD(gob)))
|
989 |
{ |
990 |
ep_dbg_printf("cmd_subscribe: bad gob %p in req, flags = %x\n",
|
991 |
gob, gob == NULL ? 0 : gob->flags); |
992 |
return EP_STAT_ASSERT_ABORT;
|
993 |
} |
994 |
|
995 |
// get the additional parameters: number of records and timeout
|
996 |
req->numrecs = payload->nrecs; |
997 |
//XXX following should only be in cmd_subscribe_by_ts
|
998 |
if (payload->timeout != NULL) |
999 |
{ |
1000 |
timeout.tv_sec = payload->timeout->sec; |
1001 |
timeout.tv_nsec = payload->timeout->nsec; |
1002 |
timeout.tv_accuracy = payload->timeout->accuracy; |
1003 |
} |
1004 |
else
|
1005 |
{ |
1006 |
EP_TIME_INVALIDATE(&timeout); |
1007 |
} |
1008 |
|
1009 |
if (ep_dbg_test(Dbg, 14)) |
1010 |
{ |
1011 |
ep_dbg_printf("cmd_subscribe: first = %" PRIgdp_recno ", numrecs = %d\n ", |
1012 |
payload->start, req->numrecs); |
1013 |
_gdp_gob_dump(gob, ep_dbg_getfile(), GDP_PR_BASIC, 0);
|
1014 |
} |
1015 |
|
1016 |
if (req->numrecs < 0) |
1017 |
{ |
1018 |
return _gdp_req_nak_resp(req, GDP_NAK_C_BADOPT,
|
1019 |
"cmd_subscribe: numrecs cannot be negative",
|
1020 |
GDP_STAT_NAK_BADOPT); |
1021 |
} |
1022 |
|
1023 |
// get our starting point, which may be relative to the end
|
1024 |
req->nextrec = payload->start; |
1025 |
if (req->nextrec <= 0) |
1026 |
{ |
1027 |
req->nextrec += req->gob->nrecs + 1;
|
1028 |
if (req->nextrec <= 0) |
1029 |
{ |
1030 |
// can't read before the beginning
|
1031 |
req->nextrec = 1;
|
1032 |
} |
1033 |
} |
1034 |
|
1035 |
ep_dbg_cprintf(Dbg, 24,
|
1036 |
"cmd_subscribe: starting from %" PRIgdp_recno ", %d records\n", |
1037 |
req->nextrec, req->numrecs); |
1038 |
|
1039 |
// see if this is refreshing an existing subscription
|
1040 |
{ |
1041 |
gdp_req_t *r1; |
1042 |
|
1043 |
if (ep_dbg_test(Dbg, 50)) |
1044 |
{ |
1045 |
ep_dbg_printf("cmd_subscribe_by_recno: starting ");
|
1046 |
_gdp_req_dump(req, NULL, GDP_PR_BASIC, 0); |
1047 |
} |
1048 |
for (r1 = LIST_FIRST(&gob->reqs); r1 != NULL; |
1049 |
r1 = LIST_NEXT(r1, goblist)) |
1050 |
{ |
1051 |
EP_ASSERT(GDP_GOB_ISGOOD(r1->gob)); |
1052 |
EP_ASSERT(r1->gob == gob); |
1053 |
if (ep_dbg_test(Dbg, 50)) |
1054 |
{ |
1055 |
ep_dbg_printf("cmd_subscribe: comparing to ");
|
1056 |
_gdp_req_dump(r1, NULL, GDP_PR_BASIC, 0); |
1057 |
} |
1058 |
if (GDP_NAME_SAME(r1->cpdu->src, req->cpdu->src) &&
|
1059 |
r1->cpdu->msg->rid == req->cpdu->msg->rid) |
1060 |
{ |
1061 |
ep_dbg_cprintf(Dbg, 20, "cmd_subscribe: refreshing sub\n"); |
1062 |
break;
|
1063 |
} |
1064 |
} |
1065 |
if (r1 != NULL) |
1066 |
{ |
1067 |
// abandon old request, we'll overwrite it with new request
|
1068 |
// (but keep the GOB around)
|
1069 |
ep_dbg_cprintf(Dbg, 20, "cmd_subscribe: removing old request\n"); |
1070 |
LIST_REMOVE(r1, goblist); |
1071 |
r1->flags &= ~GDP_REQ_ON_GOB_LIST; |
1072 |
_gdp_req_lock(r1); |
1073 |
_gdp_req_free(&r1); |
1074 |
} |
1075 |
} |
1076 |
|
1077 |
// the _gdp_gob_decref better not have invalidated the GOB
|
1078 |
EP_ASSERT(GDP_GOB_ISGOOD(gob)); |
1079 |
|
1080 |
// mark this as persistent and upgradable
|
1081 |
req->flags |= GDP_REQ_PERSIST | GDP_REQ_SUBUPGRADE; |
1082 |
|
1083 |
// note that the subscription is active
|
1084 |
ep_time_now(&req->sub_ts); |
1085 |
|
1086 |
// if some of the records already exist, arrange to return them
|
1087 |
if (req->nextrec <= gob->nrecs)
|
1088 |
{ |
1089 |
ep_dbg_cprintf(Dbg, 24, "cmd_subscribe: doing post processing\n"); |
1090 |
req->flags &= ~GDP_REQ_SRV_SUBSCR; |
1091 |
req->postproc = &post_subscribe; |
1092 |
} |
1093 |
else
|
1094 |
{ |
1095 |
// this is a pure "future" subscription
|
1096 |
ep_dbg_cprintf(Dbg, 24, "cmd_subscribe: enabling subscription\n"); |
1097 |
req->flags |= GDP_REQ_SRV_SUBSCR; |
1098 |
|
1099 |
// link this request into the GOB so the subscription can be found
|
1100 |
if (!EP_UT_BITSET(GDP_REQ_ON_GOB_LIST, req->flags))
|
1101 |
{ |
1102 |
IF_LIST_CHECK_OK(&gob->reqs, req, goblist, gdp_req_t) |
1103 |
{ |
1104 |
LIST_INSERT_HEAD(&gob->reqs, req, goblist); |
1105 |
req->flags |= GDP_REQ_ON_GOB_LIST; |
1106 |
} |
1107 |
else
|
1108 |
{ |
1109 |
estat = EP_STAT_ASSERT_ABORT; |
1110 |
} |
1111 |
} |
1112 |
} |
1113 |
|
1114 |
// we don't drop the GOB reference until the subscription is satisified
|
1115 |
|
1116 |
if (EP_STAT_ISOK(estat) && req->rpdu == NULL) |
1117 |
{ |
1118 |
_gdp_req_ack_resp(req, GDP_ACK_SUCCESS); |
1119 |
// ... if we want to fill in some info later....
|
1120 |
//GdpMessage__AckSuccess *resp = req->rpdu->msg->ack_success;
|
1121 |
} |
1122 |
|
1123 |
if (ep_dbg_test(Dbg, 10)) |
1124 |
{ |
1125 |
char ebuf[100]; |
1126 |
ep_dbg_printf("<<< cmd_subscribe_by_recno(%s): %s\n", req->gob->pname,
|
1127 |
ep_stat_tostr(estat, ebuf, sizeof ebuf));
|
1128 |
} |
1129 |
return estat;
|
1130 |
} |
1131 |
|
1132 |
|
1133 |
/*
|
1134 |
** CMD_MULTIREAD --- read multiple records
|
1135 |
**
|
1136 |
** Arranges to return existing data (if any) after the response
|
1137 |
** is sent. No long-term subscription will ever be created, but
|
1138 |
** much of the infrastructure is reused.
|
1139 |
*/
|
1140 |
|
1141 |
#if 0 //XXX NOT IMPLEMENTED YET
|
1142 |
EP_STAT
|
1143 |
cmd_multiread(gdp_req_t *req)
|
1144 |
{
|
1145 |
EP_STAT estat;
|
1146 |
|
1147 |
req->rpdu->cmd = GDP_ACK_SUCCESS;
|
1148 |
|
1149 |
// find the GOB handle
|
1150 |
estat = get_open_handle(req);
|
1151 |
if (!EP_STAT_ISOK(estat))
|
1152 |
{
|
1153 |
return _gdp_req_nak_resp(req, GDP_NAK_C_BADREQ,
|
1154 |
"cmd_multiread: GOB not open", estat);
|
1155 |
}
|
1156 |
|
1157 |
// get the additional parameters: number of records and timeout
|
1158 |
req->numrecs = (int) gdp_buf_get_uint32(req->cpdu->datum->dbuf);
|
1159 |
|
1160 |
if (ep_dbg_test(Dbg, 14))
|
1161 |
{
|
1162 |
ep_dbg_printf("cmd_multiread: first = %" PRIgdp_recno ", numrecs = %d\n ",
|
1163 |
pbd->recno, req->numrecs);
|
1164 |
_gdp_gob_dump(req->gob, ep_dbg_getfile(), GDP_PR_BASIC, 0);
|
1165 |
}
|
1166 |
|
1167 |
if (req->numrecs < 0)
|
1168 |
{
|
1169 |
return GDP_STAT_NAK_BADOPT;
|
1170 |
}
|
1171 |
|
1172 |
// get our starting point, which may be relative to the end
|
1173 |
estat = get_starting_point_by_recno(req, payload->start);
|
1174 |
EP_STAT_CHECK(estat, goto fail0);
|
1175 |
|
1176 |
ep_dbg_cprintf(Dbg, 24, "cmd_multiread: starting from %" PRIgdp_recno
|
1177 |
", %d records\n",
|
1178 |
req->nextrec, req->numrecs);
|
1179 |
|
1180 |
// if some of the records already exist, arrange to return them
|
1181 |
if (req->nextrec <= req->gob->nrecs)
|
1182 |
{
|
1183 |
ep_dbg_cprintf(Dbg, 24, "cmd_multiread: doing post processing\n");
|
1184 |
req->postproc = &post_subscribe;
|
1185 |
|
1186 |
// make this a "snapshot", i.e., don't read additional records
|
1187 |
int32_t nrec = req->gob->nrecs - req->nextrec;
|
1188 |
if (nrec < req->numrecs || req->numrecs == 0)
|
1189 |
req->numrecs = nrec + 1;
|
1190 |
|
1191 |
// keep the request around until the post-processing is done
|
1192 |
req->flags |= GDP_REQ_PERSIST;
|
1193 |
}
|
1194 |
else
|
1195 |
{
|
1196 |
// no data to read
|
1197 |
estat = GDP_STAT_NAK_NOTFOUND;
|
1198 |
}
|
1199 |
|
1200 |
fail0:
|
1201 |
return estat;
|
1202 |
}
|
1203 |
#endif //XXX
|
1204 |
|
1205 |
|
1206 |
/*
|
1207 |
** CMD_UNSUBSCRIBE --- terminate a subscription
|
1208 |
*/
|
1209 |
|
1210 |
EP_STAT |
1211 |
cmd_unsubscribe(gdp_req_t *req) |
1212 |
{ |
1213 |
EP_STAT estat = EP_STAT_OK; |
1214 |
|
1215 |
estat = get_open_handle(req); |
1216 |
if (!EP_STAT_ISOK(estat))
|
1217 |
{ |
1218 |
return _gdp_req_nak_resp(req, GDP_NAK_C_BADREQ,
|
1219 |
"cmd_unsubscribe: GOB not open", estat);
|
1220 |
} |
1221 |
|
1222 |
// remove any subscriptions
|
1223 |
sub_end_all_subscriptions(req->gob, req->cpdu->src, req->cpdu->msg->rid); |
1224 |
|
1225 |
// send the ack
|
1226 |
_gdp_req_ack_resp(req, GDP_ACK_SUCCESS); |
1227 |
|
1228 |
if (ep_dbg_test(Dbg, 10)) |
1229 |
{ |
1230 |
char ebuf[100]; |
1231 |
ep_dbg_printf("<<< cmd_unsubscribe(%s): %s\n", req->gob->pname,
|
1232 |
ep_stat_tostr(estat, ebuf, sizeof ebuf));
|
1233 |
} |
1234 |
|
1235 |
return estat;
|
1236 |
} |
1237 |
|
1238 |
|
1239 |
/*
|
1240 |
** CMD_GETMETADATA --- get metadata for a GOB
|
1241 |
*/
|
1242 |
|
1243 |
EP_STAT |
1244 |
cmd_getmetadata(gdp_req_t *req) |
1245 |
{ |
1246 |
gdp_md_t *gmd; |
1247 |
EP_STAT estat; |
1248 |
|
1249 |
estat = get_open_handle(req); |
1250 |
if (!EP_STAT_ISOK(estat))
|
1251 |
{ |
1252 |
estat = _gdp_req_nak_resp(req, 0,
|
1253 |
"cmd_getmetadata: GOB open failure", estat);
|
1254 |
goto fail0;
|
1255 |
} |
1256 |
|
1257 |
// get the metadata into memory
|
1258 |
estat = req->gob->x->physimpl->getmetadata(req->gob, &gmd); |
1259 |
|
1260 |
if (EP_STAT_ISOK(estat))
|
1261 |
{ |
1262 |
// serialize it to the client
|
1263 |
uint8_t *mdbuf; |
1264 |
size_t mdlen = _gdp_md_serialize(gmd, &mdbuf); |
1265 |
_gdp_req_ack_resp(req, GDP_ACK_SUCCESS); |
1266 |
GdpMessage__AckSuccess *resp = req->rpdu->msg->ack_success; |
1267 |
resp->metadata.data = mdbuf; |
1268 |
resp->metadata.len = mdlen; |
1269 |
resp->has_metadata = true;
|
1270 |
} |
1271 |
else
|
1272 |
{ |
1273 |
_gdp_req_nak_resp(req, 0,
|
1274 |
"cannot get log metadata", estat);
|
1275 |
} |
1276 |
|
1277 |
if (ep_dbg_test(Dbg, 10)) |
1278 |
{ |
1279 |
char ebuf[100]; |
1280 |
ep_dbg_printf("<<< cmd_getmetadata(%s): %s\n", req->gob->pname,
|
1281 |
ep_stat_tostr(estat, ebuf, sizeof ebuf));
|
1282 |
} |
1283 |
|
1284 |
fail0:
|
1285 |
return estat;
|
1286 |
} |
1287 |
|
1288 |
|
1289 |
/*
|
1290 |
** CMD_FWD_APPEND --- forwarded APPEND command
|
1291 |
**
|
1292 |
** Used for replication. This is identical to an APPEND,
|
1293 |
** except it is addressed to an individual gdplogd rather
|
1294 |
** than to a GOB. The actual name is in the payload.
|
1295 |
** On return, the message will have a source address of the
|
1296 |
** GOB, not the gdplogd instance (i.e., we don't just do
|
1297 |
** the default swap of src and dst addresses).
|
1298 |
*/
|
1299 |
|
1300 |
#if 0 //XXX Move to Layer 4?
|
1301 |
EP_STAT
|
1302 |
cmd_fwd_append(gdp_req_t *req)
|
1303 |
{
|
1304 |
EP_STAT estat;
|
1305 |
gdp_name_t gobname;
|
1306 |
|
1307 |
// must be addressed to me
|
1308 |
if (!GDP_NAME_SAME(req->cpdu->dst, _GdpMyRoutingName))
|
1309 |
{
|
1310 |
// this is directed to a GOB, not to the daemon
|
1311 |
return _gdp_req_nak_resp(req, GDP_NAK_C_BADREQ,
|
1312 |
"cmd_create: log name required",
|
1313 |
GDP_STAT_NAK_CONFLICT);
|
1314 |
}
|
1315 |
|
1316 |
// get the name of the GOB into current PDU
|
1317 |
{
|
1318 |
int i;
|
1319 |
gdp_pname_t pbuf;
|
1320 |
|
1321 |
i = gdp_buf_read(req->cpdu->datum->dbuf, gobname, sizeof gobname);
|
1322 |
if (i < sizeof req->cpdu->dst)
|
1323 |
{
|
1324 |
return _gdp_req_nak_resp(req, GDP_NAK_S_INTERNAL,
|
1325 |
"cmd_fwd_append: gobname required",
|
1326 |
GDP_STAT_GDP_NAME_INVALID);
|
1327 |
}
|
1328 |
memcpy(req->cpdu->dst, gobname, sizeof req->cpdu->dst);
|
1329 |
|
1330 |
ep_dbg_cprintf(Dbg, 14, "cmd_fwd_append: %s\n",
|
1331 |
gdp_printable_name(req->cpdu->dst, pbuf));
|
1332 |
}
|
1333 |
|
1334 |
// actually do the append
|
1335 |
estat = cmd_append(req);
|
1336 |
|
1337 |
// make response seem to come from log
|
1338 |
memcpy(req->rpdu->src, gobname, sizeof req->rpdu->src);
|
1339 |
|
1340 |
return estat;
|
1341 |
}
|
1342 |
#endif //XXX
|
1343 |
|
1344 |
|
1345 |
/**************** END OF COMMAND IMPLEMENTATIONS ****************/
|
1346 |
|
1347 |
|
1348 |
|
1349 |
/*
|
1350 |
** GDPD_PROTO_INIT --- initialize protocol module
|
1351 |
*/
|
1352 |
|
1353 |
static struct cmdfuncs CmdFuncs[] = |
1354 |
{ |
1355 |
{ GDP_CMD_PING, cmd_ping }, |
1356 |
{ GDP_CMD_CREATE, cmd_create }, |
1357 |
{ GDP_CMD_OPEN_AO, cmd_open }, |
1358 |
{ GDP_CMD_OPEN_RO, cmd_open }, |
1359 |
{ GDP_CMD_OPEN_RA, cmd_open }, |
1360 |
{ GDP_CMD_CLOSE, cmd_close }, |
1361 |
{ GDP_CMD_APPEND, cmd_append }, |
1362 |
{ GDP_CMD_READ_BY_RECNO, cmd_read_by_recno }, |
1363 |
{ GDP_CMD_READ_BY_TS, cmd_read_by_ts }, |
1364 |
// { GDP_CMD_READ_BY_HASH , cmd_read_by_hash },
|
1365 |
{ GDP_CMD_SUBSCRIBE_BY_RECNO, cmd_subscribe_by_recno }, |
1366 |
// { GDP_CMD_SUBSCRIBE_BY_TS, cmd_subscribe_by_ts },
|
1367 |
// { GDP_CMD_SUBSCRIBE_BY_HASH, cmd_subscribe_by_hash },
|
1368 |
{ GDP_CMD_GETMETADATA, cmd_getmetadata }, |
1369 |
// { GDP_CMD_NEWSEGMENT, cmd_newsegment },
|
1370 |
// { GDP_CMD_FWD_APPEND, cmd_fwd_append },
|
1371 |
{ GDP_CMD_UNSUBSCRIBE, cmd_unsubscribe }, |
1372 |
{ GDP_CMD_DELETE, cmd_delete }, |
1373 |
{ 0, NULL } |
1374 |
}; |
1375 |
|
1376 |
EP_STAT |
1377 |
gdpd_proto_init(void)
|
1378 |
{ |
1379 |
// register the commands we implement
|
1380 |
_gdp_register_cmdfuncs(CmdFuncs); |
1381 |
return EP_STAT_OK;
|
1382 |
} |