Project

General

Profile

Statistics
| Branch: | Tag: | Revision:

gdp / gdplogd / logd_proto.c @ master

History | View | Annotate | Download (34.6 KB)

1
/* vim: set ai sw=4 sts=4 ts=4 : */
2

    
3
/*
4
**        ----- BEGIN LICENSE BLOCK -----
5
**        GDPLOGD: Log Daemon for the Global Data Plane
6
**        From the Ubiquitous Swarm Lab, 490 Cory Hall, U.C. Berkeley.
7
**
8
**        Copyright (c) 2015-2019, Regents of the University of California.
9
**        All rights reserved.
10
**
11
**        Permission is hereby granted, without written agreement and without
12
**        license or royalty fees, to use, copy, modify, and distribute this
13
**        software and its documentation for any purpose, provided that the above
14
**        copyright notice and the following two paragraphs appear in all copies
15
**        of this software.
16
**
17
**        IN NO EVENT SHALL REGENTS BE LIABLE TO ANY PARTY FOR DIRECT, INDIRECT,
18
**        SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING LOST
19
**        PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION,
20
**        EVEN IF REGENTS HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
21
**
22
**        REGENTS SPECIFICALLY DISCLAIMS ANY WARRANTIES, INCLUDING, BUT NOT
23
**        LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24
**        FOR A PARTICULAR PURPOSE. THE SOFTWARE AND ACCOMPANYING DOCUMENTATION,
25
**        IF ANY, PROVIDED HEREUNDER IS PROVIDED "AS IS". REGENTS HAS NO
26
**        OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS,
27
**        OR MODIFICATIONS.
28
**        ----- END LICENSE BLOCK -----
29
*/
30

    
31
#include "logd.h"
32
#include "logd_admin.h"
33
#include "logd_pubsub.h"
34

    
35
#include <gdp/gdp_chan.h>                // for PUT64
36
#include <gdp/gdp_md.h>
37
#include <gdp/gdp_priv.h>
38

    
39
#include <ep/ep_assert.h>
40

    
41
static EP_DBG        Dbg = EP_DBG_INIT("gdplogd.proto", "GDP Log Daemon protocol");
42
static EP_DBG        DbgSignatureOverride = EP_DBG_INIT("gdplogd.sig.override",
43
                                                                        "Request signature check override");
44

    
45
#define GET_PAYLOAD(req, where, bodycase)                                                                        \
46
                        {                                                                                                                                \
47
                                if (!EP_ASSERT(req != NULL) ||                                                                \
48
                                                !EP_ASSERT(req->cpdu != NULL) ||                                        \
49
                                                !EP_ASSERT(req->cpdu->msg != NULL))                                        \
50
                                        return EP_STAT_ASSERT_ABORT;                                                        \
51
                                GdpMessage *msg = req->cpdu->msg;                                                        \
52
                                if (msg->body_case !=                                                                                \
53
                                                GDP_MESSAGE__BODY_ ## bodycase)                                                \
54
                                {                                                                                                                        \
55
                                        ep_dbg_cprintf(Dbg, 1,                                                                        \
56
                                                        "%s: wrong payload type %d (expected %d)\n",        \
57
                                                        #where,                                                                                        \
58
                                                        msg->body_case,                                                                        \
59
                                                        GDP_MESSAGE__BODY_ ## bodycase);                                \
60
                                        return GDP_STAT_PROTOCOL_FAIL;                                                        \
61
                                }                                                                                                                        \
62
                                payload = msg->where;                                                                                \
63
                        }
64

    
65

    
66
/*
67
**        Stub
68
*/
69

    
70
EP_STAT
71
implement_me(const char *s)
72
{
73
        ep_app_error("Not implemented: %s", s);
74
        return GDP_STAT_NOT_IMPLEMENTED;
75
}
76

    
77

    
78
#if 0        //TODO
79
/*
80
**  GET_STARTING_POINT_BY_xxx --- get the starting point for a read or subscribe
81
*/
82

83
static EP_STAT
84
get_starting_point_by_recno(gdp_req_t *req, gdp_recno_t recno)
85
{
86
        EP_STAT estat = EP_STAT_OK;
87

88
        // handle record numbers relative to the end
89
        if (recno <= 0)
90
        {
91
                recno += req->gob->nrecs + 1;
92
                if (recno <= 0)
93
                {
94
                        // can't read before the beginning
95
                        recno = 1;
96
                }
97
        }
98
        req->nextrec = recno;
99
        return estat;
100
}
101

102
static EP_STAT
103
get_starting_point_by_ts(gdp_req_t *req, GdpTimestamp *ts)
104
{
105
        return implement_me("get_starting_point_by_ts");
106
}
107
#endif        //TODO
108

    
109

    
110
/***********************************************************************
111
**  GDP command implementations
112
**
113
**                Each of these takes a request as the argument.
114
**
115
**                These routines should set req->rpdu->cmd to the "ACK" reply
116
**                code, which will be used if the command succeeds (i.e.,
117
**                returns EP_STAT_OK).  Otherwise the return status is decoded
118
**                to produce a NAK code.  A specific NAK code can be sent
119
**                using GDP_STAT_FROM_NAK(nak).
120
**
121
***********************************************************************/
122

    
123

    
124
// print trace info about a command
125
#define CMD_TRACE(cmd, msg, ...)                                                                                        \
126
                        if (ep_dbg_test(Dbg, 20))                                                                                \
127
                        {                                                                                                                                \
128
                                flockfile(ep_dbg_getfile());                                                                \
129
                                ep_dbg_printf("%s [%d]: ", _gdp_proto_cmd_name(cmd), cmd);        \
130
                                ep_dbg_printf(msg, __VA_ARGS__);                                                        \
131
                                ep_dbg_printf("\n");                                                                                \
132
                                funlockfile(ep_dbg_getfile());                                                                \
133
                        }
134

    
135

    
136
/*
137
**  CMD_PING --- just return an OK response to indicate that we are alive.
138
**
139
**                If this is addressed to a GOB (instead of the daemon itself),
140
**                it is really a test to see if the subscription is still alive.
141
*/
142

    
143
EP_STAT
144
cmd_ping(gdp_req_t *req)
145
{
146
        gdp_gob_t *gob;
147
        EP_STAT estat = EP_STAT_OK;
148

    
149
        if (GDP_NAME_SAME(req->cpdu->dst, _GdpMyRoutingName))
150
                goto done;
151

    
152
        estat = _gdp_gob_cache_get(req->rpdu->dst, GGCF_NOCREATE, &gob);
153
        if (EP_STAT_ISOK(estat))
154
        {
155
                // We know about the GOB.  How about the subscription?
156
                gdp_req_t *sub;
157

    
158
                LIST_FOREACH(sub, &req->gob->reqs, goblist)
159
                {
160
                        if (GDP_NAME_SAME(sub->rpdu->dst, req->rpdu->dst) &&
161
                                        EP_UT_BITSET(GDP_REQ_SRV_SUBSCR, sub->flags))
162
                        {
163
                                // Yes, we have a subscription!
164
                                goto done;
165
                        }
166
                }
167
        }
168

    
169
        estat =  GDP_STAT_NAK_NOTFOUND;
170

    
171
done:
172
        if (EP_STAT_ISOK(estat))
173
                return _gdp_req_ack_resp(req, GDP_ACK_SUCCESS);
174
        return _gdp_req_nak_resp(req, GDP_NAK_S_LOST_SUBSCR,
175
                                        "cmd_ping: lost subscription",
176
                                        estat);
177
}
178

    
179

    
180
/*
181
**  CMD_CREATE --- create new GOB.
182
**
183
**                A bit unusual in that the PDU is addressed to the daemon,
184
**                not the log; the log name is in the payload.  However, we
185
**                respond using the name of the new log rather than the
186
**                daemon.
187
*/
188

    
189

    
190
EP_STAT
191
cmd_create(gdp_req_t *req)
192
{
193
        EP_STAT estat;
194
        gdp_gob_t *gob = NULL;
195
        gdp_md_t *gmd;
196
        gdp_name_t gobname;
197

    
198
        if (!GDP_NAME_SAME(req->cpdu->dst, _GdpMyRoutingName))
199
        {
200
                // this is directed to a GOB, not to the daemon
201
                return _gdp_req_nak_resp(req, GDP_NAK_C_BADREQ,
202
                                                "cmd_create: log name required",
203
                                                GDP_STAT_NAK_CONFLICT);
204
        }
205

    
206
        GdpMessage__CmdCreate *payload;
207
        GET_PAYLOAD(req, cmd_create, CMD_CREATE);
208

    
209
        //XXX for now, insist that the client specify the internal name;
210
        //XXX ultimately this should be ILLEGAL: name is hash of metadata
211
        if (payload->logname.len != sizeof gobname ||
212
                        !gdp_name_is_valid(payload->logname.data))
213
        {
214
                // bad log name
215
                if (ep_dbg_test(Dbg, 2))
216
                {
217
                        ep_dbg_printf("cmd_create: improper log name, len %zd (expected %zd)\n",
218
                                                payload->logname.len, sizeof gobname);
219
                        ep_dbg_printf("\tpname %s\n", payload->logname.data);
220
                }
221
                estat = _gdp_req_nak_resp(req, GDP_NAK_C_BADREQ,
222
                                                "cmd_create: improper log name",
223
                                                GDP_STAT_GDP_NAME_INVALID);
224
                goto fail0;
225
        }
226
        memcpy(gobname, payload->logname.data, sizeof gobname);
227

    
228
        // make sure we aren't creating a log with our name
229
        if (GDP_NAME_SAME(gobname, _GdpMyRoutingName))
230
        {
231
                estat = _gdp_req_nak_resp(req, GDP_NAK_C_FORBIDDEN,
232
                                "cmd_create: cannot create a log with same name as logd",
233
                                GDP_STAT_GDP_NAME_INVALID);
234
                goto fail0;
235
        }
236

    
237
        {
238
                gdp_pname_t pbuf;
239
                ep_dbg_cprintf(Dbg, 14, "cmd_create: creating GOB %s\n",
240
                                gdp_printable_name(gobname, pbuf));
241
        }
242

    
243
        // get the memory space for the GOB itself
244
        estat = gob_alloc(gobname, GDP_MODE_AO, &gob);
245
        EP_STAT_CHECK(estat, goto fail0);
246

    
247
        // collect metadata
248
        if (payload->metadata->data.len == 0)
249
        {
250
                estat = _gdp_req_nak_resp(req, GDP_NAK_C_BADOPT,
251
                                                "cmd_create: metadata required",
252
                                                GDP_STAT_METADATA_REQUIRED);
253
                goto fail0;
254
        }
255
        gmd = _gdp_md_deserialize(payload->metadata->data.data,
256
                                                        payload->metadata->data.len);
257

    
258
        // have to get lock ordering right here.
259
        // safe because no one else can have a handle on this req.
260
        req->gob = gob;                        // for debugging
261
        _gdp_req_unlock(req);
262
        _gdp_gob_lock(gob);
263
        _gdp_req_lock(req);
264

    
265
        // do the physical create
266
        gob->gob_md = gmd;
267
        estat = gob->x->physimpl->create(gob, gob->gob_md);
268
        if (!EP_STAT_ISOK(estat))
269
        {
270
                estat = _gdp_req_nak_resp(req, 0,
271
                                                "cmd_create: physical create failure",
272
                                                estat);
273
                goto fail1;
274
        }
275

    
276
        // cache the open GOB Handle for possible future use
277
        EP_ASSERT(gdp_name_is_valid(gob->name));
278
        gob->flags |= GOBF_DEFER_FREE;
279
        gob->flags &= ~GOBF_PENDING;
280
        _gdp_gob_cache_add(gob);
281

    
282
        // advertise this new GOB
283
        logd_advertise_one(req->chan, gob->name, GDP_CMD_ADVERTISE);
284

    
285
        // pass any creation info back to the caller
286
        // (none at this point)
287

    
288
fail1:
289
fail0:
290
        if (EP_STAT_ISOK(estat))
291
        {
292
                _gdp_req_ack_resp(req, GDP_ACK_CREATED);
293
        }
294
        char ebuf[60];
295
        if (gob != NULL)
296
        {
297
                admin_post_stats(ADMIN_LOG_EXIST, "log-create",
298
                                "log-name", gob->pname,
299
                                "status", ep_stat_tostr(estat, ebuf, sizeof ebuf),
300
                                NULL, NULL);
301
        }
302

    
303
        ep_dbg_cprintf(Dbg, 9, "<<< cmd_create(%s): %s\n",
304
                                gob != NULL ? gob->pname : "NULL",
305
                                ep_stat_tostr(estat, ebuf, sizeof ebuf));
306

    
307
        return estat;
308
}
309

    
310

    
311
/*
312
**  CMD_OPEN --- open for read-only, append-only, or read-append
313
**
314
**                From the point of view of gdplogd these are the same command.
315
*/
316

    
317
EP_STAT
318
cmd_open(gdp_req_t *req)
319
{
320
        EP_STAT estat = EP_STAT_OK;
321
        gdp_gob_t *gob = NULL;
322

    
323
        // see if we already know about this GOB
324
        estat = get_open_handle(req);
325
        if (!EP_STAT_ISOK(estat))
326
        {
327
                estat = _gdp_req_nak_resp(req, 0,
328
                                                        "cmd_open: could not open GOB", estat);
329
                return estat;
330
        }
331

    
332
        // set up the response
333
        _gdp_req_ack_resp(req, GDP_ACK_SUCCESS);
334
        GdpMessage__AckSuccess *resp = req->rpdu->msg->ack_success;
335

    
336
        gob = req->gob;
337
        gob->flags |= GOBF_DEFER_FREE;
338
        if (gob->gob_md != NULL)
339
        {
340
                // send metadata as response payload
341
                uint8_t *obuf;
342
                size_t olen;
343
                olen = _gdp_md_serialize(gob->gob_md, &obuf);
344
                resp->metadata.data = obuf;
345
                resp->metadata.len = olen;
346
                resp->has_metadata = true;
347
        }
348
        resp->recno = gob->nrecs;
349
        resp->has_recno = true;
350

    
351
        // post statistics for monitoring
352
        {
353
                char ebuf[60];
354

    
355
                admin_post_stats(ADMIN_LOG_OPEN, "log-open",
356
                                "log-name", gob->pname,
357
                                "status", ep_stat_tostr(estat, ebuf, sizeof ebuf),
358
                                NULL, NULL);
359
        }
360

    
361
        if (ep_dbg_test(Dbg, 10))
362
        {
363
                char ebuf[100];
364
                ep_dbg_printf("<<< cmd_open(%s): gob %p nrecs %" PRIgdp_recno ": %s\n",
365
                                        gob->pname, gob, gob->nrecs,
366
                                        ep_stat_tostr(estat, ebuf, sizeof ebuf));
367
        }
368

    
369
        return estat;
370
}
371

    
372

    
373
/*
374
**  CMD_CLOSE --- close an open GOB
375
**
376
**                This really doesn't do much except terminate subscriptions.  We
377
**                let the usual cache reclaim algorithm do the actual close.
378
*/
379

    
380
EP_STAT
381
cmd_close(gdp_req_t *req)
382
{
383
        EP_STAT estat = EP_STAT_OK;
384

    
385
        // a bit wierd to open the GOB only to close it again....
386
        estat = get_open_handle(req);
387
        if (!EP_STAT_ISOK(estat))
388
        {
389
                return _gdp_req_nak_resp(req, GDP_NAK_C_BADREQ,
390
                                                        "cmd_close: GOB not open", estat);
391
        }
392

    
393
        // set up the response
394
        _gdp_req_ack_resp(req, GDP_ACK_SUCCESS);
395
        GdpMessage__AckSuccess *resp = req->rpdu->msg->ack_success;
396

    
397
        //return number of records
398
        resp->recno = req->gob->nrecs;
399

    
400
        // remove any subscriptions
401
        sub_end_all_subscriptions(req->gob, req->cpdu->src, GDP_PDU_NO_RID);
402

    
403
        if (ep_dbg_test(Dbg, 10))
404
        {
405
                char ebuf[100];
406
                ep_dbg_printf("<<< cmd_close(%s): %s\n", req->gob->pname,
407
                                        ep_stat_tostr(estat, ebuf, sizeof ebuf));
408
        }
409

    
410
        return estat;
411
}
412

    
413

    
414
/*
415
**  CMD_DELETE --- delete a GOB
416
**
417
**                This also does all the close actions.
418
*/
419

    
420
static EP_STAT
421
verify_req_signature(gdp_req_t *req)
422
{
423
        //TODO: IMPLEMENT_ME XXX
424
        //return GDP_STAT_NAK_UNAUTH;                                //DEBUG
425
        if (ep_dbg_test(DbgSignatureOverride, 101))
426
                return EP_STAT_OK;
427
        else
428
                return GDP_STAT_NOT_IMPLEMENTED;
429
}
430

    
431
EP_STAT
432
cmd_delete(gdp_req_t *req)
433
{
434
        EP_STAT estat = EP_STAT_OK;
435

    
436
        // a bit wierd to open the GOB only to close it again....
437
        estat = get_open_handle(req);
438
        if (!EP_STAT_ISOK(estat))
439
        {
440
                return _gdp_req_nak_resp(req, GDP_NAK_C_BADREQ,
441
                                                        "cmd_delete: GOB not open", estat);
442
        }
443

    
444
        estat = verify_req_signature(req);
445
        if (!EP_STAT_ISOK(estat))
446
        {
447
                return _gdp_req_nak_resp(req, GDP_NAK_C_UNAUTH,
448
                                                        "cmd_delete: signature failure", estat);
449
        }
450

    
451
        // set up the response
452
        _gdp_req_ack_resp(req, GDP_ACK_DELETED);
453
        GdpMessage__AckSuccess *resp = req->rpdu->msg->ack_success;
454

    
455
        // return number of records
456
        resp->recno = req->gob->nrecs;
457

    
458
        // remove log advertisement
459
        logd_advertise_one(req->chan, req->gob->name, GDP_CMD_WITHDRAW);
460

    
461
        // remove any subscriptions
462
        sub_end_all_subscriptions(req->gob, req->cpdu->src, GDP_PDU_NO_RID);
463

    
464
        // we will force a close and delete now
465
        req->gob->freefunc = NULL;
466
        gob_delete(req->gob);
467

    
468
        if (ep_dbg_test(Dbg, 10))
469
        {
470
                char ebuf[100];
471
                ep_dbg_printf("<<< cmd_delete: %s\n",
472
                                        ep_stat_tostr(estat, ebuf, sizeof ebuf));
473
        }
474

    
475
        return estat;
476
}
477

    
478

    
479
static void
480
make_read_acknak_pdu(gdp_req_t *req, EP_STAT estat)
481
{
482
        if (EP_STAT_ISOK(estat))
483
        {
484
                // OK, the next record exists: send it
485
                _gdp_req_ack_resp(req, GDP_ACK_CONTENT);
486
                GdpMessage__AckContent *resp = req->rpdu->msg->ack_content;
487
                resp->dl->n_d = 1;                //FIXME: should handle multiples
488
                EP_ASSERT(resp->dl->d == NULL);
489
                GdpDatum *pbd;
490
                resp->dl->d = ep_mem_malloc(resp->dl->n_d * sizeof pbd);
491
                resp->dl->d[0] = pbd = ep_mem_malloc(sizeof *pbd);
492
                gdp_datum__init(pbd);
493
        }
494
        else if (EP_STAT_IS_SAME(estat, GDP_STAT_NAK_NOTFOUND))
495
        {
496
                // no results found matching query
497
                _gdp_req_ack_resp(req, GDP_NAK_C_NOTFOUND);
498
                GdpMessage__NakGeneric *resp = req->rpdu->msg->nak;
499
                resp->ep_stat = EP_STAT_TO_INT(estat);
500
                resp->has_ep_stat = true;
501
                resp->recno = req->nextrec;
502
                resp->has_recno = true;
503
        }
504
        else if (EP_STAT_IS_SAME(estat, GDP_STAT_NAK_LOST_SUBSCR) ||
505
                        EP_STAT_IS_SAME(estat, GDP_STAT_ACK_END_OF_RESULTS))
506
        {
507
                // found some results, but no more to come
508
                _gdp_req_ack_resp(req, GDP_ACK_END_OF_RESULTS);
509
                GdpMessage__AckEndOfResults *resp = req->rpdu->msg->ack_end_of_results;
510
                resp->ep_stat = EP_STAT_TO_INT(estat);
511
                resp->has_ep_stat = true;
512
                resp->nresults = req->s_results;
513
                resp->has_nresults = true;
514
        }
515
        else
516
        {
517
                // some other failure
518
                _gdp_req_ack_resp(req, GDP_NAK_S_INTERNAL);
519
                GdpMessage__NakGeneric *resp = req->rpdu->msg->nak;
520
                resp->ep_stat = EP_STAT_TO_INT(estat);
521
                resp->has_ep_stat = true;
522
                resp->recno = req->nextrec;
523
                resp->has_recno = true;
524
        }
525
        if (ep_dbg_test(Dbg, 37))
526
        {
527
                char ebuf[100];
528

    
529
                ep_dbg_printf("make_read_acknak_pdu(%s): %d\n",
530
                                ep_stat_tostr(estat, ebuf, sizeof ebuf),
531
                                req->rpdu->msg->body_case);
532
        }
533
}
534

    
535
static EP_STAT
536
send_read_result(EP_STAT estat, gdp_datum_t *datum, gdp_result_ctx_t *cb_ctx)
537
{
538
        gdp_req_t *req = (gdp_req_t *) cb_ctx;
539

    
540
        make_read_acknak_pdu(req, estat);
541
        if (EP_STAT_ISOK(estat))
542
                _gdp_datum_to_pb(datum, req->rpdu->msg,
543
                                        req->rpdu->msg->ack_content->dl->d[0]);
544
        req->stat = estat = _gdp_pdu_out(req->rpdu, req->chan);
545
        if (EP_STAT_ISOK(estat))
546
                req->s_results++;
547
        return estat;
548
}
549

    
550
EP_STAT
551
cmd_read_by_recno(gdp_req_t *req)
552
{
553
        EP_STAT estat;
554

    
555
        estat = get_open_handle(req);
556
        if (!EP_STAT_ISOK(estat))
557
        {
558
                return _gdp_req_nak_resp(req, 0,
559
                                                        "cmd_read_by_recno: GOB open failure", estat);
560
        }
561

    
562
        GdpMessage__CmdReadByRecno *payload;
563
        GET_PAYLOAD(req, cmd_read_by_recno, CMD_READ_BY_RECNO);
564
        req->numrecs = payload->nrecs;
565
        if (req->numrecs < 0)
566
                req->numrecs = UINT32_MAX;
567

    
568
        req->nextrec = payload->recno;
569
        if (req->nextrec < 0)
570
        {
571
                req->nextrec += req->gob->nrecs + 1;
572
                if (req->nextrec <= 0)
573
                        req->nextrec = 1;
574
        }
575
        req->s_results = 0;
576

    
577
        estat = req->gob->x->physimpl->read_by_recno(req->gob,
578
                                                                req->nextrec, req->numrecs,
579
                                                                send_read_result, req);
580
        // if successful, data will have already been returned
581
        if (EP_STAT_ISOK(estat))
582
                return GDP_STAT_RESPONSE_SENT;
583
        make_read_acknak_pdu(req, estat);
584
        return estat;
585
}
586

    
587
EP_STAT
588
cmd_read_by_ts(gdp_req_t *req)
589
{
590
        return implement_me("cmd_read_by_ts");
591
        EP_STAT estat;
592

    
593
        estat = get_open_handle(req);
594
        if (!EP_STAT_ISOK(estat))
595
        {
596
                return _gdp_req_nak_resp(req, 0,
597
                                                        "cmd_read_by_ts: GOB open failure", estat);
598
        }
599

    
600
        GdpMessage__CmdReadByTs *payload;
601
        GET_PAYLOAD(req, cmd_read_by_ts, CMD_READ_BY_TS);
602
        req->numrecs = payload->nrecs;
603
        if (req->numrecs < 0)
604
                req->numrecs = UINT32_MAX;
605
        req->s_results = 0;
606

    
607
        EP_TIME_SPEC ts;
608
        _gdp_timestamp_from_pb(&ts, payload->timestamp);
609
        estat = req->gob->x->physimpl->read_by_timestamp(req->gob,
610
                                                                &ts, req->numrecs,
611
                                                                send_read_result, req);
612
        if (EP_STAT_ISOK(estat))
613
                return GDP_STAT_RESPONSE_SENT;
614
        make_read_acknak_pdu(req, estat);
615
        return estat;
616
}
617

    
618

    
619
/*
620
**  CMD_APPEND --- append a datum to a GOB
621
**
622
**                This will have side effects if there are subscriptions pending.
623
*/
624

    
625
EP_STAT
626
cmd_append(gdp_req_t *req)
627
{
628
        EP_STAT estat;
629
        const char *where = NULL;
630

    
631
        estat = get_open_handle(req);
632
        if (!EP_STAT_ISOK(estat))
633
        {
634
                return _gdp_req_nak_resp(req, GDP_NAK_C_BADREQ,
635
                                                        "cmd_append: GOB not open", estat);
636
        }
637

    
638
        // get the payload, which may contain multiple records
639
        GdpMessage__CmdAppend *payload;
640
        GET_PAYLOAD(req, cmd_append, CMD_APPEND);
641
        if (payload->dl->n_d <= 0)
642
        {
643
                // no data included in APPEND command
644
                return _gdp_req_nak_resp(req, GDP_NAK_C_BADREQ,
645
                                                        "cmd_append: no data", GDP_STAT_NAK_BADREQ);
646
        }
647

    
648
        // verify that the records in this payload link to each other
649
        int rx;
650
        GdpDatum *pbd;
651
        for (rx = 0; rx < payload->dl->n_d; rx++)
652
        {
653
                pbd = payload->dl->d[rx++];
654

    
655
                // FIXME: check hash of previous record matches prevhash in this record
656

    
657
                CMD_TRACE(req->cpdu->msg->cmd, "%s %" PRIgdp_recno,
658
                                req->gob->pname, pbd->recno);
659

    
660
                // validate record number
661
                if (pbd->recno <= 0)
662
                {
663
                        estat = _gdp_req_nak_resp(req, GDP_NAK_C_BADOPT,
664
                                                        "cmd_append: recno must be > 0",
665
                                                        GDP_STAT_NAK_BADOPT);
666
                        goto fail0;
667
                }
668

    
669
                if (pbd->recno != (gdp_recno_t) (req->gob->nrecs + 1))
670
                {
671
                        bool random_order_ok = EP_UT_BITSET(FORGIVE_LOG_GAPS, GdplogdForgive) &&
672
                                                                EP_UT_BITSET(FORGIVE_LOG_DUPS, GdplogdForgive);
673

    
674
                        // replay or missing a record
675
                        ep_dbg_cprintf(Dbg, random_order_ok ? 29 : 9,
676
                                                        "cmd_append: record out of sequence: got %"
677
                                                        PRIgdp_recno ", expected %" PRIgdp_recno "\n"
678
                                                        "\ton log %s\n",
679
                                                        pbd->recno, req->gob->nrecs + 1,
680
                                                        req->gob->pname);
681

    
682
                        if (pbd->recno <= (gdp_recno_t) req->gob->nrecs)
683
                        {
684
                                // may be a duplicate append, or just filling in a gap
685
                                // (should probably see if duplicates are the same data)
686

    
687
                                if (!EP_UT_BITSET(FORGIVE_LOG_DUPS, GdplogdForgive) &&
688
                                        req->gob->x->physimpl->recno_exists(
689
                                                                                req->gob, pbd->recno))
690
                                {
691
                                        char mbuf[100];
692
                                        snprintf(mbuf, sizeof mbuf,
693
                                                        "cmd_append: duplicate record number %" PRIgdp_recno,
694
                                                        pbd->recno);
695
                                        estat = _gdp_req_nak_resp(req, GDP_NAK_C_CONFLICT,
696
                                                        mbuf, GDP_STAT_RECORD_DUPLICATED);
697
                                        goto fail0;
698
                                }
699
                        }
700
                        else if (pbd->recno > (gdp_recno_t) (req->gob->nrecs + 1) &&
701
                                        !EP_UT_BITSET(FORGIVE_LOG_GAPS, GdplogdForgive))
702
                        {
703
                                // gap in record numbers
704
                                char mbuf[100];
705
                                snprintf(mbuf, sizeof mbuf,
706
                                                "cmd_append: record number %" PRIgdp_recno " missing",
707
                                                pbd->recno);
708
                                estat = _gdp_req_nak_resp(req, GDP_NAK_C_FORBIDDEN,
709
                                                mbuf, GDP_STAT_RECNO_SEQ_ERROR);
710
                                goto fail0;
711
                        }
712
                }
713
        }
714

    
715
        // only path to datum is via this req, so we don't have to lock it
716
        gdp_datum_t *datum = gdp_datum_new();
717
        _gdp_datum_from_pb(datum, pbd, pbd->sig);
718

    
719
        estat = _gdp_datum_vrfy_gob(datum, req->gob);
720
        where = NULL;
721
        if (EP_STAT_ISOK(estat))
722
        {
723
                // signature exists and is OK
724
        }
725
        else if (EP_STAT_IS_SAME(estat, GDP_STAT_CRYPTO_NO_PUB_KEY))
726
        {
727
                // log has no public key associated
728
                where = "no public key";
729
                if (EP_UT_BITSET(GDP_SIG_PUBKEYREQ, GdpSignatureStrictness))
730
                        goto fail1;
731
        }
732
        else if (EP_STAT_IS_SAME(estat, GDP_STAT_CRYPTO_NO_SIG))
733
        {
734
                // datum has no signature
735
                where = "missing signature";
736
                if (EP_UT_BITSET(GDP_SIG_REQUIRED, GdpSignatureStrictness))
737
                        goto fail1;
738
        }
739
        else if (EP_STAT_IS_SAME(estat, GDP_STAT_CRYPTO_VRFY_FAIL))
740
        {
741
                // signature exists but does not match
742
                where = "signature failed";
743
                if (EP_UT_BITSET(GDP_SIG_MUSTVERIFY, GdpSignatureStrictness))
744
                        goto fail1;
745
        }
746
        else
747
        {
748
                // other unknown error
749
                where = "unknown verification error";
750
                goto fail1;
751
        }
752

    
753
        // if we got here with a failure, print it and ignore it
754
        if (!EP_STAT_ISOK(estat))
755
        {
756
                ep_dbg_cprintf(Dbg, 51, "cmd_append: %s (warn)\n", where);
757
                estat = EP_STAT_OK;
758
        }
759

    
760
        // append records to long term storage
761
        if (req->gob->x->physimpl->xact_begin != NULL)
762
                req->gob->x->physimpl->xact_begin(req->gob);
763
        for (rx = 0; rx < payload->dl->n_d; rx++)
764
        {
765
                pbd = payload->dl->d[rx++];
766
                if (payload->dl->n_d != 1)
767
                        _gdp_datum_from_pb(datum, pbd, pbd->sig);
768
                // else it is already set from above
769

    
770
                // do the physical append to disk
771
                estat = req->gob->x->physimpl->append(req->gob, datum);
772
                if (!EP_STAT_ISOK(estat))
773
                        break;
774
        }
775
        if (EP_STAT_ISOK(estat))
776
        {
777
                if (req->gob->x->physimpl->xact_end != NULL)
778
                        req->gob->x->physimpl->xact_end(req->gob);
779
        }
780
        else
781
        {
782
                if (req->gob->x->physimpl->xact_abort != NULL)
783
                        req->gob->x->physimpl->xact_abort(req->gob);
784
        }
785

    
786
        // if physical appends succeeded, notify subscribers
787
        if (EP_STAT_ISOK(estat))
788
        {
789
                for (rx = 0; rx < payload->dl->n_d; rx++)
790
                {
791
                        pbd = payload->dl->d[rx++];
792
                        if (payload->dl->n_d != 1)
793
                                _gdp_datum_from_pb(datum, pbd, pbd->sig);
794
                        // else it is already set from above
795

    
796
                        // send the new datum to any and all subscribers
797
                        GdpDatum *pbd = ep_mem_malloc(sizeof *pbd);
798
                        gdp_datum__init(pbd);
799
                        gdp_msg_t *msg = _gdp_msg_new(GDP_ACK_CONTENT,
800
                                                                                req->cpdu->msg->rid,
801
                                                                                req->cpdu->msg->l5seqno);
802

    
803
                        _gdp_datum_to_pb(datum, msg, pbd);
804

    
805
                        //FIXME: does dl ever get used or freed?
806
                        GdpDatumList *dl = msg->ack_content->dl;
807
                        dl->d = ep_mem_malloc(payload->dl->n_d * sizeof pbd);
808
                        dl->n_d = 1;
809
                        dl->d[0] = pbd;
810

    
811
                        EP_ASSERT(req->rpdu == NULL);
812
                        req->rpdu = _gdp_pdu_new(msg, req->cpdu->dst, req->cpdu->src,
813
                                                                        GDP_SEQNO_NONE);
814
                        sub_notify_all_subscribers(req);
815
                        _gdp_pdu_free(&req->rpdu);
816
                }
817
        }
818
        gdp_datum_free(datum);
819

    
820
        if (EP_STAT_ISOK(estat))
821
        {
822
                // update the server's view of the number of records
823
                req->gob->nrecs++;
824

    
825
                _gdp_req_ack_resp(req, GDP_ACK_SUCCESS);
826
                GdpMessage__AckSuccess *resp = req->rpdu->msg->ack_success;
827
                resp->recno = req->gob->nrecs;
828
                resp->ts = pbd->ts;
829
                pbd->ts = NULL;                // avoid double free
830
        }
831
        else
832
        {
833
                estat = _gdp_req_nak_resp(req, 0,
834
                                                "cmd_append: append failure",
835
                                                estat);
836
        }
837

    
838
        if (false)
839
        {
840
                char mbuf[150];
841
                char ebuf[100];
842
fail1:
843
                if (where == NULL)
844
                {
845
                        snprintf(mbuf, sizeof mbuf, "unknown error %s",
846
                                        ep_stat_tostr(estat, ebuf, sizeof ebuf));
847
                        where = mbuf;
848
                }
849
                estat = _gdp_req_nak_resp(req, GDP_NAK_C_FORBIDDEN, where, estat);
850
                req->rpdu->msg->nak->recno = req->gob->nrecs;
851
        }
852

    
853
fail0:
854
        if (ep_dbg_test(Dbg, 12))
855
        {
856
                char ebuf[100];
857
                ep_dbg_printf("<<< cmd_append(%s): %s\n", req->gob->pname,
858
                                        ep_stat_tostr(estat, ebuf, sizeof ebuf));
859
        }
860

    
861
        EP_ASSERT(req->rpdu != NULL);
862
        return estat;
863
}
864

    
865

    
866
/*
867
**  POST_SUBSCRIBE --- do subscription work after initial ACK
868
**
869
**                Assuming the subscribe worked we are now going to deliver any
870
**                previously existing records.  Once those are all sent we can
871
**                convert this to an ordinary subscription.  If the subscribe
872
**                request is satisified, we remove it.
873
**
874
**                This code is also the core of multiread.
875
*/
876

    
877
void
878
post_subscribe(gdp_req_t *req)
879
{
880
        EP_STAT estat;
881

    
882
        EP_ASSERT_ELSE(req != NULL, return);
883
        EP_ASSERT_ELSE(req->state != GDP_REQ_FREE, return);
884
        EP_ASSERT_ELSE(req->gob != NULL, return);
885
        ep_dbg_cprintf(Dbg, 38,
886
                        "post_subscribe: numrecs %d, nextrec = %"PRIgdp_recno
887
                        " gob->nrecs %"PRIgdp_recno "\n",
888
                        req->numrecs, req->nextrec, req->gob->nrecs);
889

    
890
        if (req->numrecs <= 0)
891
                req->numrecs = INT32_MAX;
892

    
893
        // if data pre-exists in the GOB, return it now
894
        if (req->nextrec <= req->gob->nrecs)
895
        {
896
                // get the existing records and return them via callback
897
                estat = req->gob->x->physimpl->read_by_recno(
898
                                                                        req->gob,
899
                                                                        req->nextrec, req->numrecs,
900
                                                                        send_read_result, req);
901
                if (EP_STAT_ISOK(estat))
902
                {
903
                        gdp_recno_t nrecs = EP_STAT_TO_INT(estat);
904
                        req->nextrec += nrecs;
905
                        req->numrecs -= nrecs;
906
                }
907
        }
908

    
909
        // done with response PDU
910
        if (req->rpdu != NULL)
911
                _gdp_pdu_free(&req->rpdu);
912

    
913
        if (req->numrecs < 0 || !EP_UT_BITSET(GDP_REQ_SUBUPGRADE, req->flags))
914
        {
915
                // no more to read: do cleanup & send termination notice
916
                sub_end_subscription(req);
917
        }
918
        else
919
        {
920
                if (ep_dbg_test(Dbg, 24))
921
                {
922
                        ep_dbg_printf("post_subscribe: converting to subscription\n    ");
923
                        _gdp_req_dump(req, NULL, GDP_PR_BASIC, 0);
924
                }
925
                req->flags |= GDP_REQ_SRV_SUBSCR;
926

    
927
                // link this request into the GOB so the subscription can be found
928
                if (!EP_UT_BITSET(GDP_REQ_ON_GOB_LIST, req->flags))
929
                {
930
                        IF_LIST_CHECK_OK(&req->gob->reqs, req, goblist, gdp_req_t)
931
                        {
932
                                // req->gob->refcnt already allows for this reference
933
                                LIST_INSERT_HEAD(&req->gob->reqs, req, goblist);
934
                                req->flags |= GDP_REQ_ON_GOB_LIST;
935
                        }
936
                }
937
        }
938
}
939

    
940

    
941
/*
942
**  CMD_SUBSCRIBE --- subscribe command
943
**
944
**                Arranges to return existing data (if any) after the response
945
**                is sent, and non-existing data (if any) as a side-effect of
946
**                append.
947
**
948
**                XXX        Race Condition: if records are written between the time
949
**                        the subscription and the completion of the first half of
950
**                        this process, some records may be lost.  For example,
951
**                        if the GOB has 20 records (1-20) and you ask for 20
952
**                        records starting at record 11, you probably want records
953
**                        11-30.  But if during the return of records 11-20 another
954
**                        record (21) is written, then the second half of the
955
**                        subscription will actually return records 22-31.
956
**
957
**                XXX        Does not implement timeouts.
958
**
959
**                XXX This assumes different commands based on the initial
960
**                        starting "query" (recno, timestamp, hash).  Another
961
**                        possible approach would be a single command that would
962
**                        send at most one of those values.  Error conditions
963
**                        galore, but fewer commands to handle.
964
*/
965

    
966
EP_STAT
967
cmd_subscribe_by_recno(gdp_req_t *req)
968
{
969
        EP_STAT estat;
970
        EP_TIME_SPEC timeout;
971
        gdp_gob_t *gob;
972

    
973
        if (req->gob != NULL)
974
                GDP_GOB_ASSERT_ISLOCKED(req->gob);
975

    
976
        GdpMessage__CmdSubscribeByRecno *payload;
977
        GET_PAYLOAD(req, cmd_subscribe_by_recno, CMD_SUBSCRIBE_BY_RECNO);
978

    
979
        // find the GOB handle
980
        estat = get_open_handle(req);
981
        if (!EP_STAT_ISOK(estat))
982
        {
983
                return _gdp_req_nak_resp(req, GDP_NAK_C_BADREQ,
984
                                                        "cmd_subscribe: GOB not open", estat);
985
        }
986

    
987
        gob = req->gob;
988
        if (!EP_ASSERT(GDP_GOB_ISGOOD(gob)))
989
        {
990
                ep_dbg_printf("cmd_subscribe: bad gob %p in req, flags = %x\n",
991
                                gob, gob == NULL ? 0 : gob->flags);
992
                return EP_STAT_ASSERT_ABORT;
993
        }
994

    
995
        // get the additional parameters: number of records and timeout
996
        req->numrecs = payload->nrecs;
997
        //XXX following should only be in cmd_subscribe_by_ts
998
        if (payload->timeout != NULL)
999
        {
1000
                timeout.tv_sec = payload->timeout->sec;
1001
                timeout.tv_nsec = payload->timeout->nsec;
1002
                timeout.tv_accuracy = payload->timeout->accuracy;
1003
        }
1004
        else
1005
        {
1006
                EP_TIME_INVALIDATE(&timeout);
1007
        }
1008

    
1009
        if (ep_dbg_test(Dbg, 14))
1010
        {
1011
                ep_dbg_printf("cmd_subscribe: first = %" PRIgdp_recno ", numrecs = %d\n  ",
1012
                                payload->start, req->numrecs);
1013
                _gdp_gob_dump(gob, ep_dbg_getfile(), GDP_PR_BASIC, 0);
1014
        }
1015

    
1016
        if (req->numrecs < 0)
1017
        {
1018
                return _gdp_req_nak_resp(req, GDP_NAK_C_BADOPT,
1019
                                                        "cmd_subscribe: numrecs cannot be negative",
1020
                                                        GDP_STAT_NAK_BADOPT);
1021
        }
1022

    
1023
        // get our starting point, which may be relative to the end
1024
        req->nextrec = payload->start;
1025
        if (req->nextrec <= 0)
1026
        {
1027
                req->nextrec += req->gob->nrecs + 1;
1028
                if (req->nextrec <= 0)
1029
                {
1030
                        // can't read before the beginning
1031
                        req->nextrec = 1;
1032
                }
1033
        }
1034

    
1035
        ep_dbg_cprintf(Dbg, 24,
1036
                        "cmd_subscribe: starting from %" PRIgdp_recno ", %d records\n",
1037
                        req->nextrec, req->numrecs);
1038

    
1039
        // see if this is refreshing an existing subscription
1040
        {
1041
                gdp_req_t *r1;
1042

    
1043
                if (ep_dbg_test(Dbg, 50))
1044
                {
1045
                        ep_dbg_printf("cmd_subscribe_by_recno: starting ");
1046
                        _gdp_req_dump(req, NULL, GDP_PR_BASIC, 0);
1047
                }
1048
                for (r1 = LIST_FIRST(&gob->reqs); r1 != NULL;
1049
                                r1 = LIST_NEXT(r1, goblist))
1050
                {
1051
                        EP_ASSERT(GDP_GOB_ISGOOD(r1->gob));
1052
                        EP_ASSERT(r1->gob == gob);
1053
                        if (ep_dbg_test(Dbg, 50))
1054
                        {
1055
                                ep_dbg_printf("cmd_subscribe: comparing to ");
1056
                                _gdp_req_dump(r1, NULL, GDP_PR_BASIC, 0);
1057
                        }
1058
                        if (GDP_NAME_SAME(r1->cpdu->src, req->cpdu->src) &&
1059
                                        r1->cpdu->msg->rid == req->cpdu->msg->rid)
1060
                        {
1061
                                ep_dbg_cprintf(Dbg, 20, "cmd_subscribe: refreshing sub\n");
1062
                                break;
1063
                        }
1064
                }
1065
                if (r1 != NULL)
1066
                {
1067
                        // abandon old request, we'll overwrite it with new request
1068
                        // (but keep the GOB around)
1069
                        ep_dbg_cprintf(Dbg, 20, "cmd_subscribe: removing old request\n");
1070
                        LIST_REMOVE(r1, goblist);
1071
                        r1->flags &= ~GDP_REQ_ON_GOB_LIST;
1072
                        _gdp_req_lock(r1);
1073
                        _gdp_req_free(&r1);
1074
                }
1075
        }
1076

    
1077
        // the _gdp_gob_decref better not have invalidated the GOB
1078
        EP_ASSERT(GDP_GOB_ISGOOD(gob));
1079

    
1080
        // mark this as persistent and upgradable
1081
        req->flags |= GDP_REQ_PERSIST | GDP_REQ_SUBUPGRADE;
1082

    
1083
        // note that the subscription is active
1084
        ep_time_now(&req->sub_ts);
1085

    
1086
        // if some of the records already exist, arrange to return them
1087
        if (req->nextrec <= gob->nrecs)
1088
        {
1089
                ep_dbg_cprintf(Dbg, 24, "cmd_subscribe: doing post processing\n");
1090
                req->flags &= ~GDP_REQ_SRV_SUBSCR;
1091
                req->postproc = &post_subscribe;
1092
        }
1093
        else
1094
        {
1095
                // this is a pure "future" subscription
1096
                ep_dbg_cprintf(Dbg, 24, "cmd_subscribe: enabling subscription\n");
1097
                req->flags |= GDP_REQ_SRV_SUBSCR;
1098

    
1099
                // link this request into the GOB so the subscription can be found
1100
                if (!EP_UT_BITSET(GDP_REQ_ON_GOB_LIST, req->flags))
1101
                {
1102
                        IF_LIST_CHECK_OK(&gob->reqs, req, goblist, gdp_req_t)
1103
                        {
1104
                                LIST_INSERT_HEAD(&gob->reqs, req, goblist);
1105
                                req->flags |= GDP_REQ_ON_GOB_LIST;
1106
                        }
1107
                        else
1108
                        {
1109
                                estat = EP_STAT_ASSERT_ABORT;
1110
                        }
1111
                }
1112
        }
1113

    
1114
        // we don't drop the GOB reference until the subscription is satisified
1115

    
1116
        if (EP_STAT_ISOK(estat) && req->rpdu == NULL)
1117
        {
1118
                _gdp_req_ack_resp(req, GDP_ACK_SUCCESS);
1119
                // ... if we want to fill in some info later....
1120
                //GdpMessage__AckSuccess *resp = req->rpdu->msg->ack_success;
1121
        }
1122

    
1123
        if (ep_dbg_test(Dbg, 10))
1124
        {
1125
                char ebuf[100];
1126
                ep_dbg_printf("<<< cmd_subscribe_by_recno(%s): %s\n", req->gob->pname,
1127
                                        ep_stat_tostr(estat, ebuf, sizeof ebuf));
1128
        }
1129
        return estat;
1130
}
1131

    
1132

    
1133
/*
1134
**  CMD_MULTIREAD --- read multiple records
1135
**
1136
**                Arranges to return existing data (if any) after the response
1137
**                is sent.  No long-term subscription will ever be created, but
1138
**                much of the infrastructure is reused.
1139
*/
1140

    
1141
#if 0                //XXX NOT IMPLEMENTED YET
1142
EP_STAT
1143
cmd_multiread(gdp_req_t *req)
1144
{
1145
        EP_STAT estat;
1146

1147
        req->rpdu->cmd = GDP_ACK_SUCCESS;
1148

1149
        // find the GOB handle
1150
        estat = get_open_handle(req);
1151
        if (!EP_STAT_ISOK(estat))
1152
        {
1153
                return _gdp_req_nak_resp(req, GDP_NAK_C_BADREQ,
1154
                                                        "cmd_multiread: GOB not open", estat);
1155
        }
1156

1157
        // get the additional parameters: number of records and timeout
1158
        req->numrecs = (int) gdp_buf_get_uint32(req->cpdu->datum->dbuf);
1159

1160
        if (ep_dbg_test(Dbg, 14))
1161
        {
1162
                ep_dbg_printf("cmd_multiread: first = %" PRIgdp_recno ", numrecs = %d\n  ",
1163
                                pbd->recno, req->numrecs);
1164
                _gdp_gob_dump(req->gob, ep_dbg_getfile(), GDP_PR_BASIC, 0);
1165
        }
1166

1167
        if (req->numrecs < 0)
1168
        {
1169
                return GDP_STAT_NAK_BADOPT;
1170
        }
1171

1172
        // get our starting point, which may be relative to the end
1173
        estat = get_starting_point_by_recno(req, payload->start);
1174
        EP_STAT_CHECK(estat, goto fail0);
1175

1176
        ep_dbg_cprintf(Dbg, 24, "cmd_multiread: starting from %" PRIgdp_recno
1177
                        ", %d records\n",
1178
                        req->nextrec, req->numrecs);
1179

1180
        // if some of the records already exist, arrange to return them
1181
        if (req->nextrec <= req->gob->nrecs)
1182
        {
1183
                ep_dbg_cprintf(Dbg, 24, "cmd_multiread: doing post processing\n");
1184
                req->postproc = &post_subscribe;
1185

1186
                // make this a "snapshot", i.e., don't read additional records
1187
                int32_t nrec = req->gob->nrecs - req->nextrec;
1188
                if (nrec < req->numrecs || req->numrecs == 0)
1189
                        req->numrecs = nrec + 1;
1190

1191
                // keep the request around until the post-processing is done
1192
                req->flags |= GDP_REQ_PERSIST;
1193
        }
1194
        else
1195
        {
1196
                // no data to read
1197
                estat = GDP_STAT_NAK_NOTFOUND;
1198
        }
1199

1200
fail0:
1201
        return estat;
1202
}
1203
#endif //XXX
1204

    
1205

    
1206
/*
1207
**  CMD_UNSUBSCRIBE --- terminate a subscription
1208
*/
1209

    
1210
EP_STAT
1211
cmd_unsubscribe(gdp_req_t *req)
1212
{
1213
        EP_STAT estat = EP_STAT_OK;
1214

    
1215
        estat = get_open_handle(req);
1216
        if (!EP_STAT_ISOK(estat))
1217
        {
1218
                return _gdp_req_nak_resp(req, GDP_NAK_C_BADREQ,
1219
                                                        "cmd_unsubscribe: GOB not open", estat);
1220
        }
1221

    
1222
        // remove any subscriptions
1223
        sub_end_all_subscriptions(req->gob, req->cpdu->src, req->cpdu->msg->rid);
1224

    
1225
        // send the ack
1226
        _gdp_req_ack_resp(req, GDP_ACK_SUCCESS);
1227

    
1228
        if (ep_dbg_test(Dbg, 10))
1229
        {
1230
                char ebuf[100];
1231
                ep_dbg_printf("<<< cmd_unsubscribe(%s): %s\n", req->gob->pname,
1232
                                        ep_stat_tostr(estat, ebuf, sizeof ebuf));
1233
        }
1234

    
1235
        return estat;
1236
}
1237

    
1238

    
1239
/*
1240
**  CMD_GETMETADATA --- get metadata for a GOB
1241
*/
1242

    
1243
EP_STAT
1244
cmd_getmetadata(gdp_req_t *req)
1245
{
1246
        gdp_md_t *gmd;
1247
        EP_STAT estat;
1248

    
1249
        estat = get_open_handle(req);
1250
        if (!EP_STAT_ISOK(estat))
1251
        {
1252
                estat = _gdp_req_nak_resp(req, 0,
1253
                                                        "cmd_getmetadata: GOB open failure", estat);
1254
                goto fail0;
1255
        }
1256

    
1257
        // get the metadata into memory
1258
        estat = req->gob->x->physimpl->getmetadata(req->gob, &gmd);
1259

    
1260
        if (EP_STAT_ISOK(estat))
1261
        {
1262
                // serialize it to the client
1263
                uint8_t *mdbuf;
1264
                size_t mdlen = _gdp_md_serialize(gmd, &mdbuf);
1265
                _gdp_req_ack_resp(req, GDP_ACK_SUCCESS);
1266
                GdpMessage__AckSuccess *resp = req->rpdu->msg->ack_success;
1267
                resp->metadata.data = mdbuf;
1268
                resp->metadata.len = mdlen;
1269
                resp->has_metadata = true;
1270
        }
1271
        else
1272
        {
1273
                _gdp_req_nak_resp(req, 0,
1274
                                        "cannot get log metadata", estat);
1275
        }
1276

    
1277
        if (ep_dbg_test(Dbg, 10))
1278
        {
1279
                char ebuf[100];
1280
                ep_dbg_printf("<<< cmd_getmetadata(%s): %s\n", req->gob->pname,
1281
                                        ep_stat_tostr(estat, ebuf, sizeof ebuf));
1282
        }
1283

    
1284
fail0:
1285
        return estat;
1286
}
1287

    
1288

    
1289
/*
1290
**  CMD_FWD_APPEND --- forwarded APPEND command
1291
**
1292
**                Used for replication.  This is identical to an APPEND,
1293
**                except it is addressed to an individual gdplogd rather
1294
**                than to a GOB.  The actual name is in the payload.
1295
**                On return, the message will have a source address of the
1296
**                GOB, not the gdplogd instance (i.e., we don't just do
1297
**                the default swap of src and dst addresses).
1298
*/
1299

    
1300
#if 0        //XXX Move to Layer 4?
1301
EP_STAT
1302
cmd_fwd_append(gdp_req_t *req)
1303
{
1304
        EP_STAT estat;
1305
        gdp_name_t gobname;
1306

1307
        // must be addressed to me
1308
        if (!GDP_NAME_SAME(req->cpdu->dst, _GdpMyRoutingName))
1309
        {
1310
                // this is directed to a GOB, not to the daemon
1311
                return _gdp_req_nak_resp(req, GDP_NAK_C_BADREQ,
1312
                                                        "cmd_create: log name required",
1313
                                                        GDP_STAT_NAK_CONFLICT);
1314
        }
1315

1316
        // get the name of the GOB into current PDU
1317
        {
1318
                int i;
1319
                gdp_pname_t pbuf;
1320

1321
                i = gdp_buf_read(req->cpdu->datum->dbuf, gobname, sizeof gobname);
1322
                if (i < sizeof req->cpdu->dst)
1323
                {
1324
                        return _gdp_req_nak_resp(req, GDP_NAK_S_INTERNAL,
1325
                                                                "cmd_fwd_append: gobname required",
1326
                                                                GDP_STAT_GDP_NAME_INVALID);
1327
                }
1328
                memcpy(req->cpdu->dst, gobname, sizeof req->cpdu->dst);
1329

1330
                ep_dbg_cprintf(Dbg, 14, "cmd_fwd_append: %s\n",
1331
                                gdp_printable_name(req->cpdu->dst, pbuf));
1332
        }
1333

1334
        // actually do the append
1335
        estat = cmd_append(req);
1336

1337
        // make response seem to come from log
1338
        memcpy(req->rpdu->src, gobname, sizeof req->rpdu->src);
1339

1340
        return estat;
1341
}
1342
#endif        //XXX
1343

    
1344

    
1345
/**************** END OF COMMAND IMPLEMENTATIONS ****************/
1346

    
1347

    
1348

    
1349
/*
1350
**  GDPD_PROTO_INIT --- initialize protocol module
1351
*/
1352

    
1353
static struct cmdfuncs        CmdFuncs[] =
1354
{
1355
        { GDP_CMD_PING,                                        cmd_ping                                },
1356
        { GDP_CMD_CREATE,                                cmd_create                                },
1357
        { GDP_CMD_OPEN_AO,                                cmd_open                                },
1358
        { GDP_CMD_OPEN_RO,                                cmd_open                                },
1359
        { GDP_CMD_OPEN_RA,                                cmd_open                                },
1360
        { GDP_CMD_CLOSE,                                cmd_close                                },
1361
        { GDP_CMD_APPEND,                                cmd_append                                },
1362
        { GDP_CMD_READ_BY_RECNO,                cmd_read_by_recno                },
1363
        { GDP_CMD_READ_BY_TS,                        cmd_read_by_ts                        },
1364
//        { GDP_CMD_READ_BY_HASH        ,                cmd_read_by_hash                },
1365
        { GDP_CMD_SUBSCRIBE_BY_RECNO,        cmd_subscribe_by_recno        },
1366
//        { GDP_CMD_SUBSCRIBE_BY_TS,                cmd_subscribe_by_ts                },
1367
//        { GDP_CMD_SUBSCRIBE_BY_HASH,        cmd_subscribe_by_hash        },
1368
        { GDP_CMD_GETMETADATA,                        cmd_getmetadata                        },
1369
//        { GDP_CMD_NEWSEGMENT,                        cmd_newsegment                        },
1370
//        { GDP_CMD_FWD_APPEND,                        cmd_fwd_append                        },
1371
        { GDP_CMD_UNSUBSCRIBE,                        cmd_unsubscribe                        },
1372
        { GDP_CMD_DELETE,                                cmd_delete                                },
1373
        { 0,                                                        NULL                                        }
1374
};
1375

    
1376
EP_STAT
1377
gdpd_proto_init(void)
1378
{
1379
        // register the commands we implement
1380
        _gdp_register_cmdfuncs(CmdFuncs);
1381
        return EP_STAT_OK;
1382
}