Project

General

Profile

Statistics
| Branch: | Tag: | Revision:

gdp / gdp / gdp_api.c @ master

History | View | Annotate | Download (24.8 KB)

1
/* vim: set ai sw=4 sts=4 ts=4 :*/
2

    
3
/*
4
**        This implements the GDP API for C-based applications.
5
**
6
**  With the exception of the name manipulation (parsing,
7
**  printing, etc.) most of these are basically just translation
8
**  routines, converting the API calls into requests and handing
9
**  them on; the hard work is done in gdp_gob_ops.c and gdp_proto.c.
10
**
11
**        TODO In the future this may need to be extended to have knowledge
12
**                 of TSN/AVB, but for now we don't worry about that.
13
**
14
**        ----- BEGIN LICENSE BLOCK -----
15
**        GDP: Global Data Plane Support Library
16
**        From the Ubiquitous Swarm Lab, 490 Cory Hall, U.C. Berkeley.
17
**
18
**        Copyright (c) 2015-2019, Regents of the University of California.
19
**        All rights reserved.
20
**
21
**        Permission is hereby granted, without written agreement and without
22
**        license or royalty fees, to use, copy, modify, and distribute this
23
**        software and its documentation for any purpose, provided that the above
24
**        copyright notice and the following two paragraphs appear in all copies
25
**        of this software.
26
**
27
**        IN NO EVENT SHALL REGENTS BE LIABLE TO ANY PARTY FOR DIRECT, INDIRECT,
28
**        SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING LOST
29
**        PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION,
30
**        EVEN IF REGENTS HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31
**
32
**        REGENTS SPECIFICALLY DISCLAIMS ANY WARRANTIES, INCLUDING, BUT NOT
33
**        LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
34
**        FOR A PARTICULAR PURPOSE. THE SOFTWARE AND ACCOMPANYING DOCUMENTATION,
35
**        IF ANY, PROVIDED HEREUNDER IS PROVIDED "AS IS". REGENTS HAS NO
36
**        OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS,
37
**        OR MODIFICATIONS.
38
**        ----- END LICENSE BLOCK -----
39
*/
40

    
41
#include "gdp.h"
42
#include "gdp_chan.h"
43
#include "gdp_event.h"
44
#include "gdp_md.h"
45
#include "gdp_stat.h"
46
#include "gdp_priv.h"
47

    
48
#include <ep/ep_app.h>
49
#include <ep/ep_b64.h>
50
#include <ep/ep_dbg.h>
51
#include <ep/ep_funclist.h>
52
#include <ep/ep_prflags.h>
53
#include <ep/ep_string.h>
54

    
55
#include <openssl/sha.h>
56

    
57
#include <errno.h>
58
#include <string.h>
59

    
60
static EP_DBG        Dbg = EP_DBG_INIT("gdp.api", "C API for GDP");
61

    
62
static EP_THR_MUTEX                GinFreeListMutex        EP_THR_MUTEX_INITIALIZER;
63
SLIST_HEAD(gin_list_head, gdp_gin)
64
                                                GinFreeList                        = SLIST_HEAD_INITIALIZER(GinFreeList);
65

    
66

    
67
/*
68
**  Mutex around open operations
69
*/
70

    
71
static EP_THR_MUTEX                OpenMutex                EP_THR_MUTEX_INITIALIZER;
72

    
73

    
74
/*
75
**  Simplify debugging
76
*/
77

    
78
static void
79
prstat(EP_STAT estat, const gdp_gin_t *gin, const char *where)
80
{
81
        int dbglev = 2;
82
        char ebuf[100];
83

    
84
        if (EP_STAT_ISOK(estat))
85
                dbglev = 39;
86
        else if (EP_STAT_ISWARN(estat))
87
                dbglev = 11;
88
        if (gin == NULL || gin->gob == NULL)
89
        {
90
                ep_dbg_cprintf(Dbg, dbglev, "<<< %s: %s\n",
91
                                where, ep_stat_tostr(estat, ebuf, sizeof ebuf));
92
        }
93
        else
94
        {
95
                ep_dbg_cprintf(Dbg, dbglev, "<<< %s(%s): %s\n",
96
                                where, gin->gob->pname,
97
                                ep_stat_tostr(estat, ebuf, sizeof ebuf));
98
        }
99
}
100

    
101

    
102
/*
103
**  Generic GDP Object Instance (GIN) code.
104
*/
105

    
106
static EP_STAT
107
bad_gin(const gdp_gin_t *gin, const char *where, const char *xtra)
108
{
109
        EP_STAT estat;
110
        char wbuf[256];
111

    
112
        if (where != NULL && xtra != NULL)
113
        {
114
                snprintf(wbuf, sizeof wbuf, "%s/%s", where, xtra);
115
                where = wbuf;
116
        }
117
        if (gin == NULL)
118
                estat = GDP_STAT_NULL_GIN;
119
        else
120
                estat = GDP_STAT_LOG_NOT_OPEN;
121
        prstat(estat, gin, where);
122
        return estat;
123
}
124

    
125
void
126
_gdp_gin_lock_trace(gdp_gin_t *gin,
127
                                        const char *file,
128
                                        int line,
129
                                        const char *id)
130
{
131
        _ep_thr_mutex_lock(&gin->mutex, file, line, id);
132
        gin->flags |= GINF_ISLOCKED;
133
}
134

    
135
void
136
_gdp_gin_unlock_trace(gdp_gin_t *gin,
137
                                        const char *file,
138
                                        int line,
139
                                        const char *id)
140
{
141
        gin->flags &= ~GINF_ISLOCKED;
142
        _ep_thr_mutex_unlock(&gin->mutex, file, line, id);
143
}
144

    
145

    
146
static EP_PRFLAGS_DESC        _GdpGinFlags[] =
147
{
148
        { GINF_INUSE,                                GINF_INUSE,                        "INUSE"                                },
149
        { GINF_ISLOCKED,                        GINF_ISLOCKED,                "ISLOCKED"                        },
150
        { GINF_SIG_VRFY,                        GINF_SIG_VRFY,                "SIG_VRFY"                        },
151
        { GINF_SIG_VRFY_REQ,                GINF_SIG_VRFY_REQ,        "SIG_VRFY_REQ"                },
152
        { 0, 0, NULL }
153
};
154

    
155
void
156
_gdp_gin_dump(
157
                const gdp_gin_t *gin,
158
                FILE *fp,
159
                int detail)
160
{
161
        int indent = 1;
162

    
163
        if (fp == NULL)
164
                fp = ep_dbg_getfile();
165
        if (detail >= GDP_PR_BASIC)
166
                fprintf(fp, "GIN@%p: ", gin);
167
        VALGRIND_HG_DISABLE_CHECKING(gin, sizeof *gin);
168
        if (gin == NULL)
169
        {
170
                fprintf(fp, "NULL\n");
171
        }
172
        else
173
        {
174
                gdp_gob_t *gob = gin->gob;
175

    
176
                if (gob == NULL)
177
                        fprintf(fp, "No GOB\n");
178
                else
179
                {
180
                        VALGRIND_HG_DISABLE_CHECKING(gob, sizeof *gob);
181
                        if (!gdp_name_is_valid(gob->name))
182
                                fprintf(fp, "no name\n");
183
                        else
184
                                fprintf(fp, "%s\n", gob->pname);
185
                        VALGRIND_HG_ENABLE_CHECKING(gob, sizeof *gob);
186
                }
187
                fprintf(fp, "%sflags = ", _gdp_pr_indent(indent));
188
                ep_prflags(gin->flags, _GdpGinFlags, fp);
189

    
190
                if (detail >= GDP_PR_BASIC)
191
                {
192
                        fprintf(fp, "\n%sgob = %p, iomode = %d",
193
                                        _gdp_pr_indent(indent), gob,  gin->iomode);
194
                        if (detail >= GDP_PR_DETAILED)
195
                        {
196
                                fprintf(fp, ", closefunc = %p\n"
197
                                                "%sapndfilter = %p, apndfpriv = %p,"
198
                                                " readfilter = %p, readfpriv = %p",
199
                                                gin->closefunc,
200
                                                _gdp_pr_indent(indent),
201
                                                gin->apndfilter, gin->apndfpriv,
202
                                                gin->readfilter, gin->readfpriv);
203
                        }
204
                }
205
                fprintf(fp, "\n");
206
        }
207
        VALGRIND_HG_ENABLE_CHECKING(gin, sizeof *gin);
208
}
209

    
210
static void
211
unlock_gin_and_gob(gdp_gin_t *gin, const char *where)
212
{
213
        _gdp_gob_unlock(gin->gob);
214
        _gdp_gin_unlock(gin);
215
}
216

    
217
static EP_STAT
218
check_and_lock_gin(gdp_gin_t *gin, const char *where)
219
{
220
        if (gin == NULL)
221
                return bad_gin(gin, where, "null gin");
222
        _gdp_gin_lock(gin);
223
        if (!EP_UT_BITSET(GINF_INUSE, gin->flags))
224
        {
225
                _gdp_gin_unlock(gin);
226
                return bad_gin(gin, where, "gin not inuse");
227
        }
228
        if (gin->gob == NULL)
229
        {
230
                _gdp_gin_unlock(gin);
231
                return bad_gin(gin, where, "null gob");
232
        }
233
        return EP_STAT_OK;
234
}
235

    
236
static EP_STAT
237
check_and_lock_gin_and_gob(gdp_gin_t *gin, const char *where)
238
{
239
        EP_STAT estat;
240

    
241
        estat = check_and_lock_gin(gin, where);
242
        EP_STAT_CHECK(estat, return estat);
243
        _gdp_gob_lock(gin->gob);
244
        if (!EP_UT_BITSET(GOBF_INUSE, gin->gob->flags))
245
                estat = bad_gin(gin, where, "gob not inuse");
246
        else if (EP_UT_BITSET(GOBF_DROPPING, gin->gob->flags))
247
                estat = bad_gin(gin, where, "gob not open");
248
        else if (gin->gob->refcnt <= 0)
249
                estat = bad_gin(gin, where, "gob bad refcnt");
250
        if (!EP_STAT_ISOK(estat))
251
                unlock_gin_and_gob(gin, "check_and_lock_gin_and_gob");
252
        return estat;
253
}
254

    
255

    
256
/*
257
**  Allocate and Free GDP Object Instance handles.
258
**
259
**                Note that the reference count on the input GOB is not
260
**                increased; it is assumed that the GIN "takes over" the
261
**                reference that is passed in.
262
*/
263

    
264
gdp_gin_t *
265
_gdp_gin_new(gdp_gob_t *gob)
266
{
267
        gdp_gin_t *gin = NULL;
268

    
269
        for (;;)
270
        {
271
                ep_thr_mutex_lock(&GinFreeListMutex);
272
                if ((gin = SLIST_FIRST(&GinFreeList)) != NULL)
273
                        SLIST_REMOVE_HEAD(&GinFreeList, next);
274
                ep_thr_mutex_unlock(&GinFreeListMutex);
275
                if (gin == NULL || !EP_UT_BITSET(GINF_INUSE, gin->flags))
276
                        break;
277

    
278
                // gin from freelist is allocated --- abandon it
279
                EP_ASSERT_PRINT("_gdp_gin_new: allocated gin %p on free list", gin);
280
        }
281

    
282
        if (gin == NULL)
283
        {
284
                gin = (gdp_gin_t *) ep_mem_zalloc(sizeof *gin);
285
                ep_thr_mutex_init(&gin->mutex, EP_THR_MUTEX_DEFAULT);
286
                ep_thr_mutex_setorder(&gin->mutex, GDP_MUTEX_LORDER_GIN);
287
        }
288

    
289
        gin->flags = GINF_INUSE;
290
        gin->iomode = GDP_MODE_ANY;
291
        gin->gob = gob;
292

    
293
        VALGRIND_HG_CLEAN_MEMORY(gin, sizeof gin);
294
        ep_dbg_cprintf(Dbg, 48, "_gdp_gin_new => %p\n", gin);
295
        return gin;
296
}
297

    
298
void
299
_gdp_gin_free(gdp_gin_t *gin)
300
{
301
        ep_dbg_cprintf(Dbg, 28, "_gdp_gin_free(%p)\n", gin);
302
        if (gin == NULL)
303
                return;
304
        if (!EP_ASSERT(EP_UT_BITSET(GINF_INUSE, gin->flags)))
305
                return;
306
        GDP_GIN_ASSERT_ISLOCKED(gin);
307

    
308
        // release resources: requests (subscriptions) and events
309
        if (gin->gob != NULL)
310
        {
311
                GDP_GOB_ASSERT_ISLOCKED(gin->gob);
312
                _gdp_req_freeall(gin->gob, gin, NULL);
313
                _gdp_gob_decref(&gin->gob, false);
314
        }
315
        _gdp_event_free_all(gin);
316

    
317
        // put gin handle on freelist
318
        ep_thr_mutex_lock(&GinFreeListMutex);
319
        gin->flags = 0;
320
        SLIST_INSERT_HEAD(&GinFreeList, gin, next);
321
        _gdp_gin_unlock(gin);
322
        ep_thr_mutex_unlock(&GinFreeListMutex);
323
}
324

    
325

    
326
/*
327
**        GDP_GIN_GETNAME --- get the name of a GDP log
328
*/
329

    
330
const gdp_name_t *
331
gdp_gin_getname(const gdp_gin_t *gin)
332
{
333
        if (!GDP_GIN_ISGOOD(gin))
334
        {
335
                (void) bad_gin(gin, "gdp_gin_getname", NULL);
336
                return NULL;
337
        }
338
        return &gin->gob->name;
339
}
340

    
341

    
342
/*
343
**  GDP_GIN_GETNRECS --- get the number of records in a GOB
344
*/
345

    
346
gdp_recno_t
347
gdp_gin_getnrecs(const gdp_gin_t *gin)
348
{
349
        if (!GDP_GIN_ISGOOD(gin))
350
        {
351
                (void) bad_gin(gin, "gdp_gin_getnrecs", NULL);
352
                return GDP_PDU_NO_RECNO;
353
        }
354
        return gin->gob->nrecs;
355
}
356

    
357

    
358
/*
359
**  GDP_GIN_PRINT --- print a GOB (for debugging)
360
*/
361

    
362
void
363
gdp_gin_print(
364
                const gdp_gin_t *gin,
365
                FILE *fp)
366
{
367
        if (!GDP_GIN_ISGOOD(gin))
368
        {
369
                (void) bad_gin(gin, "gdp_gin_print", NULL);
370
        }
371
        else
372
        {
373
                // _gdp_gob_dump handles null gob properly
374
                _gdp_gob_dump(gin->gob, fp, GDP_PR_PRETTY, 0);
375
        }
376
}
377

    
378

    
379

    
380
/*
381
**        GDP_INIT --- initialize this library
382
**
383
**                This is the normal startup for a client process.  Servers
384
**                may need to do additional steps early on, and may choose
385
**                to advertise more than their own name.
386
*/
387

    
388
static void
389
gdp_exit_debug(void)
390
{
391
        if (ep_dbg_test(Dbg, 10))
392
        {
393
                _gdp_req_pr_stats(ep_dbg_getfile());
394
                _gdp_gob_pr_stats(ep_dbg_getfile());
395
        }
396
}
397

    
398

    
399
EP_STAT
400
gdp_init(const char *router_addr)
401
{
402
        return gdp_init2(router_addr, 0);
403
}
404

    
405

    
406
EP_STAT
407
gdp_init2(const char *router_addr, uint32_t flags)
408
{
409
        gdp_chan_x_t *chanx = NULL;
410
        EP_STAT estat = EP_STAT_OK;
411
        extern EP_FUNCLIST *_GdpDumpFuncs;
412

    
413
        ep_dbg_cprintf(Dbg, 9, "gdp_init, state = %d\n", _GdpInitState);
414
        if (_GdpInitState >= GDP_INIT_COMPLETE)
415
                goto done;
416

    
417
        // set up global state, event loop, etc. (shared with gdplogd)
418
        if (_GdpInitState < GDP_INIT_LIB)
419
        {
420
                estat = gdp_lib_init(NULL, NULL, flags);
421
                EP_STAT_CHECK(estat, goto fail0);
422
        }
423

    
424
        ep_funclist_push(_GdpDumpFuncs, _gdp_event_dump_all, NULL);
425

    
426
        chanx = (gdp_chan_x_t *) ep_mem_zalloc(sizeof *chanx);
427
        LIST_INIT(&chanx->reqs);
428

    
429
        // open at least one channel to the routing subsystem
430
        _GdpChannel = NULL;
431
        estat = _gdp_chan_open(router_addr,                        // IP of router
432
                                                NULL,                                        // qos (unused as yet)
433
                                                &_gdp_io_recv,                        // receive callback
434
                                                NULL,                                        // send callback
435
                                                &_gdp_io_event,                        // close/error/eof callback
436
                                                &_gdp_router_event,                // router event callback
437
                                                &_gdp_advertise_me,                // advertise callback
438
                                                chanx,                                        // user channel data
439
                                                &_GdpChannel);                        // output: new channel
440
        EP_STAT_CHECK(estat, goto fail0);
441

    
442
        if (ep_thr_spawn(&_GdpIoEventLoopThread, &_gdp_run_event_loop, NULL) != 0)
443
        {
444
                char ebuf[100];
445

    
446
                estat = ep_stat_from_errno(errno);
447
                ep_app_severe("cannot spawn event i/o thread: %s",
448
                                        ep_stat_tostr(estat, ebuf, sizeof ebuf));
449
        }
450

    
451
        // do some optional status printing on exit
452
        atexit(gdp_exit_debug);
453

    
454
        _GdpInitState = GDP_INIT_COMPLETE;
455

    
456
fail0:
457
done:
458
        if (ep_dbg_test(Dbg, 4))
459
        {
460
                char ebuf[200];
461

    
462
                ep_dbg_printf("gdp_init: %s\n",
463
                                        ep_stat_tostr(estat, ebuf, sizeof ebuf));
464
        }
465
        return estat;
466
}
467

    
468

    
469
/*
470
**        GDP_GIN_OPEN --- open a GOB for reading or further appending
471
*/
472

    
473
EP_STAT
474
gdp_gin_open(gdp_name_t name,
475
                        gdp_iomode_t iomode,
476
                        gdp_open_info_t *open_info,
477
                        gdp_gin_t **pgin)
478
{
479
        EP_STAT estat;
480
        gdp_gob_t *gob = NULL;
481
        gdp_gin_t *gin = NULL;
482
        gdp_cmd_t cmd;
483

    
484
        if (ep_dbg_test(Dbg, 19))
485
        {
486
                gdp_pname_t pname;
487
                ep_dbg_printf("\n>>> gdp_gin_open(%s)\n",
488
                                        gdp_printable_name(name, pname));
489
        }
490
        estat = GDP_CHECK_INITIALIZED;                // make sure gdp_init is done
491
        EP_STAT_CHECK(estat, return estat);
492

    
493
        if (iomode == GDP_MODE_RO)
494
                cmd = GDP_CMD_OPEN_RO;
495
        else if (iomode == GDP_MODE_AO)
496
                cmd = GDP_CMD_OPEN_AO;
497
        else if (iomode == GDP_MODE_RA)
498
                cmd = GDP_CMD_OPEN_RA;
499
        else
500
        {
501
                // illegal I/O mode
502
                ep_app_error("gdp_gin_open: illegal mode %d", iomode);
503
                return GDP_STAT_BAD_IOMODE;
504
        }
505

    
506
        if (!gdp_name_is_valid(name))
507
        {
508
                // illegal GOB name
509
                ep_dbg_cprintf(Dbg, 6, "gdp_gin_open: null GOB name\n");
510
                return GDP_STAT_GDP_NAME_INVALID;
511
        }
512

    
513
        // lock this operation to keep the GOB cache consistent
514
        ep_thr_mutex_lock(&OpenMutex);
515

    
516
        // see if we already have this open (and initiate open if not)
517
        estat = _gdp_gob_cache_get(name, GGCF_CREATE, &gob);
518
        EP_STAT_CHECK(estat, goto fail0);
519
        EP_ASSERT(gob != NULL);
520
        GDP_GOB_ASSERT_ISLOCKED(gob);
521

    
522
        // if open is partially complete, finish the job
523
        if (EP_UT_BITSET(GOBF_PENDING, gob->flags))
524
        {
525
                estat = _gdp_gob_open(gob, cmd, open_info, _GdpChannel, 0);
526
                if (EP_STAT_ISFAIL(estat))
527
                        goto fail0;
528
        }
529
        gob->flags &= ~GOBF_PENDING;
530
        gin = _gdp_gin_new(gob);
531

    
532
        if (open_info != NULL)
533
        {
534
                if (EP_UT_BITSET(GOIF_VERIFY_PROOF, open_info->flags))
535
                {
536
                        gin->flags |= GINF_SIG_VRFY;
537
                }
538
        }
539
        _gdp_gob_unlock(gob);
540

    
541
fail0:
542
        // note that warnings are treated like success
543
        if (EP_STAT_ISFAIL(estat) && gob != NULL)
544
                _gdp_gob_free(&gob);
545
        *pgin = gin;                // might be NULL
546

    
547
        ep_thr_mutex_unlock(&OpenMutex);
548
        prstat(estat, gin, "gdp_gin_open");
549
        if (ep_dbg_test(Dbg, 10))
550
        {
551
                _gdp_gin_dump(gin, NULL, GDP_PR_DETAILED);
552
                if (ep_dbg_test(Dbg, 14))
553
                        _gdp_gob_dump(gob, NULL, GDP_PR_BASIC, 0);
554
        }
555
        return estat;
556
}
557

    
558

    
559
/*
560
**        GDP_GIN_CLOSE --- close an open GOB
561
*/
562

    
563
EP_STAT
564
gdp_gin_close(gdp_gin_t *gin)
565
{
566
        EP_STAT estat;
567

    
568
        estat = check_and_lock_gin_and_gob(gin, "gdp_gin_close");
569
        EP_STAT_CHECK(estat, return estat);
570
        ep_dbg_cprintf(Dbg, 19, "\n>>> gdp_gin_close(%s)\n", gin->gob->pname);
571
        estat = _gdp_gob_close(gin->gob, _GdpChannel, 0);
572
        _gdp_gin_free(gin);
573
        prstat(estat, gin, "gdp_gin_close");
574
        return estat;
575
}
576

    
577

    
578
/*
579
**  GDP_GIN_DELETE --- delete and close an open GOB
580
**
581
**                This is not intended to be an end-user API.  Deletion should
582
**                only be done by a system service on the basis of expiration
583
**                criteria.  This API is intended for testing.
584
*/
585

    
586
EP_STAT
587
gdp_gin_delete(gdp_gin_t *gin)
588
{
589
        EP_STAT estat;
590

    
591
        estat = check_and_lock_gin_and_gob(gin, "gdp_gin_delete");
592
        EP_STAT_CHECK(estat, return estat);
593
        ep_dbg_cprintf(Dbg, 19, "\n>>> gdp_gin_delete(%s)\n", gin->gob->pname);
594
        estat = _gdp_gob_delete(gin->gob, _GdpChannel, 0);
595
        _gdp_gin_free(gin);
596
        prstat(estat, gin, "gdp_gin_delete");
597
        return estat;
598
}
599

    
600

    
601
/*
602
**        GDP_GIN_APPEND --- append a message to a writable GOB
603
*/
604

    
605
EP_STAT
606
gdp_gin_append(gdp_gin_t *gin, gdp_datum_t *datum, gdp_hash_t *prevhash)
607
{
608
        EP_STAT estat;
609

    
610
        ep_dbg_cprintf(Dbg, 39, "\n>>> gdp_gin_append\n");
611
        if (!GDP_DATUM_ISGOOD(datum))
612
                return GDP_STAT_DATUM_REQUIRED;
613
        estat = check_and_lock_gin_and_gob(gin, "gdp_gin_append");
614
        EP_STAT_CHECK(estat, return estat);
615
        if (gin->apndfilter != NULL)
616
                estat = gin->apndfilter(datum, gin->apndfpriv);
617
        if (EP_STAT_ISOK(estat))
618
                estat = _gdp_gob_append_sync(gin->gob, 1, &datum, prevhash,
619
                                                                _GdpChannel, 0);
620
        unlock_gin_and_gob(gin, "gdp_gin_append");
621
        prstat(estat, gin, "gdp_gin_append");
622
        return estat;
623
}
624

    
625

    
626
/*
627
**  GDP_GIN_APPEND_ASYNC --- asynchronously append to a writable GOB
628
*/
629

    
630
EP_STAT
631
gdp_gin_append_async(gdp_gin_t *gin,
632
                        int ndatums,
633
                        gdp_datum_t **datums,
634
                        gdp_hash_t *prevhash,
635
                        gdp_event_cbfunc_t cbfunc,
636
                        void *udata)
637
{
638
        EP_STAT estat;
639

    
640
        ep_dbg_cprintf(Dbg, 39, "\n>>> gdp_gin_append_async\n");
641
        estat = check_and_lock_gin_and_gob(gin, "gdp_gin_append_async");
642
        EP_STAT_CHECK(estat, return estat);
643
        estat = _gdp_gob_append_async(gin->gob, gin, ndatums, datums, prevhash,
644
                                                        cbfunc, udata, _GdpChannel, 0);
645
        unlock_gin_and_gob(gin, "gdp_gin_append_async");
646
        prstat(estat, gin, "gdp_gin_append_async");
647
        return estat;
648
}
649

    
650

    
651
/*
652
**        GDP_GIN_READ_BY_RECNO --- read a message from a GOB based on recno
653
**
654
**        The data is returned through the passed-in datum.
655
**
656
**                Parameters:
657
**                        gin --- the GDP instance from which to read
658
**                        recno --- the record number to read
659
**                        datum --- the message header (to avoid dynamic memory)
660
*/
661

    
662
EP_STAT
663
gdp_gin_read_by_recno(gdp_gin_t *gin,
664
                        gdp_recno_t recno,
665
                        gdp_datum_t *datum)
666
{
667
        EP_STAT estat;
668
        uint32_t reqflags = 0;
669

    
670
        ep_dbg_cprintf(Dbg, 39, "\n>>> gdp_gin_read_by_recno (%"PRIgdp_recno ")\n",
671
                        recno);
672
        EP_ASSERT_POINTER_VALID(datum);
673
        gdp_datum_reset(datum);
674

    
675
        estat = check_and_lock_gin_and_gob(gin, "gdp_gin_read_by_recno");
676
        EP_STAT_CHECK(estat, return estat);
677

    
678
        if (EP_UT_BITSET(GINF_SIG_VRFY, gin->flags))
679
                reqflags |= GDP_REQ_VRFY_CONTENT;
680
        //XXX somehow have to convey gin->readfilter to _gdp_gob_read
681
        //XXX is there any reason not to just do it here?
682
        //XXX Answer: read_async and subscriptions
683
        estat = _gdp_gob_read_by_recno(gin->gob, recno, _GdpChannel,
684
                                                        reqflags, datum);
685
        unlock_gin_and_gob(gin, "gdp_gin_read_by_recno");
686
        prstat(estat, gin, "gdp_gin_read_by_recno");
687
        return estat;
688
}
689

    
690

    
691
/*
692
**        GDP_GIN_READ_BY_TS --- read a message from a GOB based on timestamp
693
**
694
**        The data is returned through the passed-in datum.
695
**
696
**                Parameters:
697
**                        gin --- the GDP instance from which to read
698
**                        ts --- the lowest timestamp we are interested in.  The
699
**                                result will be the lowest timestamp that is greater than
700
**                                or equal to this value.
701
**                        datum --- the message header (to avoid dynamic memory)
702
*/
703

    
704
EP_STAT
705
gdp_gin_read_by_ts(gdp_gin_t *gin,
706
                        EP_TIME_SPEC *ts,
707
                        gdp_datum_t *datum)
708
{
709
        EP_STAT estat;
710

    
711
        ep_dbg_cprintf(Dbg, 39, "\n>>> gdp_gin_read_by_ts\n");
712
        EP_ASSERT_POINTER_VALID(datum);
713
#if 0 //TODO
714
        memcpy(&datum->d->ts, ts, sizeof datum->d->ts);
715
        datum->d->recno = GDP_PDU_NO_RECNO;
716

717
        estat = check_and_lock_gin_and_gob(gin, "gdp_gin_read_by_ts");
718
        EP_STAT_CHECK(estat, return estat);
719
        //XXX somehow have to convey gin->readfilter to _gdp_gob_read
720
        //XXX is there any reason not to just do it here?
721
        //XXX Answer: read_async and subscriptions
722
        estat = _gdp_gob_read(gin->gob, datum, _GdpChannel, 0);
723
        unlock_gin_and_gob(gin, "gdp_gin_read_by_ts");
724
        prstat(estat, gin, "gdp_gin_read_by_ts");
725
#else //TODO
726
        estat = GDP_STAT_NOT_IMPLEMENTED;
727
#endif //TODO
728
        return estat;
729
}
730

    
731

    
732
/*
733
**        GDP_GIN_READ_BY_HASH --- read a message from a GOB based on record hash
734
**
735
**        The data is returned through the passed-in datum.
736
**
737
**                Parameters:
738
**                        gin --- the GDP instance from which to read
739
**                        hash --- the starting record hash
740
**                        nrecs --- the number of records to read
741
**                        datum --- the message header (to avoid dynamic memory)
742
*/
743

    
744
#if 0 //TODO
745
EP_STAT
746
gdp_gin_read_by_hash(gdp_gin_t *gin,
747
                        ep_hash_t *hash,
748
                        gdp_datum_t *datum)
749
{
750
        EP_STAT estat;
751

752
        ep_dbg_cprintf(Dbg, 39, "\n>>> gdp_gin_read_by_hash\n");
753
        EP_ASSERT_POINTER_VALID(datum);
754
        memcpy(&datum->d->ts, ts, sizeof datum->d->ts);
755
        datum->d->recno = GDP_PDU_NO_RECNO;
756

757
        estat = check_and_lock_gin_and_gob(gin, "gdp_gin_read_by_hash");
758
        EP_STAT_CHECK(estat, return estat);
759
        //XXX somehow have to convey gin->readfilter to _gdp_gob_read
760
        //XXX is there any reason not to just do it here?
761
        //XXX Answer: read_async and subscriptions
762
        estat = _gdp_gob_read(gin->gob, datum, _GdpChannel, 0);
763
        unlock_gin_and_gob(gin, "gdp_gin_read_by_hash");
764
        prstat(estat, gin, "gdp_gin_read_by_hash");
765
        return estat;
766
}
767
#endif //TODO
768

    
769

    
770
/*
771
**  GDP_GIN_READ_BY_xxx_ASYNC --- read asynchronously
772
**
773
**  Data and status are delivered as events.  These subsume the
774
**  old multiread command.
775
*/
776

    
777
EP_STAT
778
gdp_gin_read_by_recno_async(
779
                        gdp_gin_t *gin,
780
                        gdp_recno_t recno,
781
                        int32_t nrecs,
782
                        gdp_event_cbfunc_t cbfunc,
783
                        void *cbarg)
784
{
785
        EP_STAT estat;
786

    
787
        ep_dbg_cprintf(Dbg, 39, "\n>>> gdp_gin_read_by_recno_async\n");
788
        estat = check_and_lock_gin_and_gob(gin, "gdp_gin_read_by_recno_async");
789
        EP_STAT_CHECK(estat, return estat);
790
        estat = _gdp_gob_read_by_recno_async(gin->gob, gin, recno, nrecs,
791
                                                        cbfunc, cbarg, _GdpChannel);
792
        unlock_gin_and_gob(gin, "gdp_gin_read_by_recno_async");
793
        prstat(estat, gin, "gdp_gin_read_by_recno_async");
794
        return estat;
795
}
796

    
797

    
798
EP_STAT
799
gdp_gin_read_by_ts_async(
800
                        gdp_gin_t *gin,
801
                        EP_TIME_SPEC *ts,
802
                        int32_t nrecs,
803
                        gdp_event_cbfunc_t cbfunc,
804
                        void *cbarg)
805
{
806
        return GDP_STAT_NOT_IMPLEMENTED;
807
}
808

    
809

    
810
EP_STAT
811
gdp_gin_read_by_hash_async(
812
                        gdp_gin_t *gin,
813
                        uint32_t n_hashes,
814
                        gdp_hash_t **hashes,
815
                        gdp_event_cbfunc_t cbfunc,
816
                        void *cbarg)
817
{
818
        return GDP_STAT_NOT_IMPLEMENTED;
819
}
820

    
821

    
822
#if 0
823
// back compat
824
EP_STAT
825
gdp_gcl_read_async(
826
                        gdp_gin_t *gin,
827
                        gdp_recno_t recno,
828
                        int32_t nrecs,
829
                        gdp_event_cbfunc_t cbfunc,
830
                        void *cbarg)
831
{
832
        return gdp_gcl_read_by_recno_async(gin, recno, nrecs, cbfunc, cbarg);
833
}
834
#endif
835

    
836

    
837

    
838

    
839
/*
840
**        GDP_GIN_SUBSCRIBE_BY_RECNO --- subscribe starting from a record number
841
*/
842

    
843
EP_STAT
844
gdp_gin_subscribe_by_recno(gdp_gin_t *gin,
845
                gdp_recno_t start,
846
                int32_t numrecs,
847
                gdp_sub_qos_t *qos,
848
                gdp_event_cbfunc_t cbfunc,
849
                void *cbarg)
850
{
851
        EP_STAT estat;
852

    
853
        ep_dbg_cprintf(Dbg, 39, "\n>>> gdp_gin_subscribe_by_recno\n");
854
        estat = check_and_lock_gin_and_gob(gin, "gdp_gin_subscribe_by_recno");
855
        EP_STAT_CHECK(estat, return estat);
856

    
857
        estat = _gdp_gin_subscribe(gin, GDP_CMD_SUBSCRIBE_BY_RECNO, start, numrecs,
858
                                                        qos, cbfunc, cbarg);
859

    
860
        unlock_gin_and_gob(gin, "gdp_gin_subscribe_by_recno");
861
        prstat(estat, gin, "gdp_gin_subscribe_by_recno");
862
        return estat;
863
}
864

    
865

    
866
/*
867
**        GDP_GIN_SUBSCRIBE_BY_TS --- subscribe to a GOB starting from a timestamp
868
*/
869

    
870
EP_STAT
871
gdp_gin_subscribe_by_ts(gdp_gin_t *gin,
872
                EP_TIME_SPEC *start,
873
                int32_t numrecs,
874
                gdp_sub_qos_t *qos,
875
                gdp_event_cbfunc_t cbfunc,
876
                void *cbarg)
877
{
878
        EP_STAT estat;
879

    
880
        ep_dbg_cprintf(Dbg, 39, "\n>>> gdp_gin_subscribe_by_ts\n");
881
#if 0 //TODO
882
        estat = check_and_lock_gin_and_gob(gin, "gdp_gin_subscribe_by_ts");
883
        EP_STAT_CHECK(estat, return estat);
884

885
        // create the subscribe request
886
        gdp_req_t *req;
887
        estat = _gdp_req_new(GDP_CMD_SUBSCRIBE_BY_TS, gin->gob, _GdpChannel, NULL,
888
                        GDP_REQ_PERSIST | GDP_REQ_CLT_SUBSCR | GDP_REQ_ALLOC_RID,
889
                        &req);
890
        EP_STAT_CHECK(estat, goto fail0);
891

892
        // add start and stop parameters to PDU
893
        req->gin = gin;
894
        memcpy(&req->cpdu->m->b->datum->d->ts, start, sizeof req->cpdu->m->b->datum->d->ts);
895
        req->numrecs = numrecs;
896

897
        // now do the hard work
898
        estat = _gdp_gin_subscribe(req, numrecs, timeout, cbfunc, cbarg);
899
fail0:
900
        unlock_gin_and_gob(gin, "gdp_gin_subscribe_by_ts");
901
#else        //TODO
902
        estat = GDP_STAT_NOT_IMPLEMENTED;
903
#endif        //TODO
904
        prstat(estat, gin, "gdp_gin_subscribe_by_ts");
905
        return estat;
906
}
907

    
908

    
909
/*
910
**  GDP_GIN_UNSUBSCRIBE --- delete subscriptions to a named GOB
911
*/
912

    
913
EP_STAT
914
gdp_gin_unsubscribe(gdp_gin_t *gin,
915
                gdp_event_cbfunc_t cbfunc,
916
                void *cbarg)
917
{
918
        EP_STAT estat;
919

    
920
        ep_dbg_cprintf(Dbg, 39, "\n>>> gdp_gin_unsubscribe\n");
921

    
922
        estat = check_and_lock_gin_and_gob(gin, "gdp_gin_unsubscribe");
923
        EP_STAT_CHECK(estat, return estat);
924
        estat = _gdp_gin_unsubscribe(gin, cbfunc, cbarg, 0);
925
        unlock_gin_and_gob(gin, "gdp_gin_unsubscribe");
926

    
927
        prstat(estat, gin, "gdp_gin_unsubscribe");
928
        return estat;
929
}
930

    
931

    
932
/*
933
**  GDP_GIN_GETMETADATA --- return the metadata associated with a GOB
934
*/
935

    
936
EP_STAT
937
gdp_gin_getmetadata(gdp_gin_t *gin,
938
                gdp_md_t **gmdp)
939
{
940
        EP_STAT estat;
941

    
942
        ep_dbg_cprintf(Dbg, 39, "\n>>> gdp_gin_getmetadata\n");
943
        estat = check_and_lock_gin_and_gob(gin, "gdp_gin_getmetadata");
944
        EP_STAT_CHECK(estat, return estat);
945
        estat = _gdp_gob_getmetadata(gin->gob, gmdp, _GdpChannel, 0);
946
        unlock_gin_and_gob(gin, "gdp_gin_getmetadata");
947
        prstat(estat, gin, "gdp_gin_getmetadata");
948
        return estat;
949
}
950

    
951

    
952
/*
953
**  GDP_GIN_SET_APPEND_FILTER --- set the append filter function
954
*/
955

    
956
EP_STAT
957
gdp_gin_set_append_filter(gdp_gin_t *gin,
958
                EP_STAT (*appendfilter)(gdp_datum_t *, void *),
959
                void *filterdata)
960
{
961
        EP_STAT estat;
962

    
963
        ep_dbg_cprintf(Dbg, 39, "\n>>> gdp_gin_set_append_filter\n");
964
        estat = check_and_lock_gin(gin, "gdp_gin_set_append_filter");
965
        EP_STAT_CHECK(estat, return estat);
966
        gin->apndfilter = appendfilter;
967
        gin->apndfpriv = filterdata;
968
        _gdp_gin_unlock(gin);
969
        return EP_STAT_OK;
970
}
971

    
972

    
973
/*
974
**  GDP_GIN_SET_READ_FILTER --- set the read filter function
975
*/
976

    
977
EP_STAT
978
gdp_gin_set_read_filter(gdp_gin_t *gin,
979
                EP_STAT (*readfilter)(gdp_datum_t *, void *),
980
                void *filterdata)
981
{
982
        EP_STAT estat;
983

    
984
        ep_dbg_cprintf(Dbg, 39, "\n>>> gdp_gin_set_read_filter\n");
985
        estat = check_and_lock_gin(gin, "gdp_gin_set_read_filter");
986
        EP_STAT_CHECK(estat, return estat);
987
        gin->readfilter = readfilter;
988
        gin->readfpriv = filterdata;
989
        _gdp_gin_unlock(gin);
990
        return EP_STAT_OK;
991
}
992

    
993

    
994
/*
995
**  GDP Open Information handling
996
*/
997

    
998
gdp_open_info_t *
999
gdp_open_info_new(void)
1000
{
1001
        gdp_open_info_t *info;
1002

    
1003
        info = (gdp_open_info_t *) ep_mem_zalloc(sizeof*info);
1004
        return info;
1005
}
1006

    
1007
void
1008
gdp_open_info_free(gdp_open_info_t *info)
1009
{
1010
        //XXX should check for info == NULL here
1011
        if (info->signkey != NULL)
1012
                ep_crypto_key_free(info->signkey);
1013
        ep_mem_free(info);
1014
}
1015

    
1016
EP_STAT
1017
gdp_open_info_set_signing_key(gdp_open_info_t *info,
1018
                EP_CRYPTO_KEY *skey)
1019
{
1020
        //XXX should check for info == NULL here
1021
        info->signkey = skey;
1022
        return EP_STAT_OK;
1023
}
1024

    
1025
EP_STAT
1026
gdp_open_info_set_signkey_cb(
1027
                                gdp_open_info_t *info,
1028
                                EP_STAT (*signkey_cb)(
1029
                                        gdp_name_t gname,
1030
                                        void *signkey_udata,
1031
                                        EP_CRYPTO_KEY **skey),
1032
                                void *signkey_udata)
1033
{
1034
        //XXX should check for info == NULL here
1035
        info->signkey_cb = signkey_cb;
1036
        info->signkey_udata = signkey_udata;
1037
        return EP_STAT_OK;
1038
}
1039

    
1040
EP_STAT
1041
gdp_open_info_set_caching(
1042
                gdp_open_info_t *info,
1043
                bool keep_in_cache)
1044
{
1045
        //XXX should check for info == NULL here
1046
        if (keep_in_cache)
1047
                info->flags |= GOIF_KEEP_IN_CACHE;
1048
        else
1049
                info->flags &= ~GOIF_KEEP_IN_CACHE;
1050
        return EP_STAT_OK;
1051
}
1052

    
1053
EP_STAT
1054
gdp_open_info_set_vrfy(
1055
                gdp_open_info_t *info,
1056
                bool verify_proof)
1057
{
1058
        //XXX should check for info == NULL here
1059
        //XXX ideally this would check to make sure we have a public key,
1060
        //XXX        but that isn't possible yet.
1061
        if (verify_proof)
1062
                info->flags |= GOIF_VERIFY_PROOF;
1063
        else
1064
                info->flags &= ~GOIF_VERIFY_PROOF;
1065
        return EP_STAT_OK;
1066
}
1067

    
1068
EP_STAT
1069
gdp_open_info_set_no_skey_nonfatal(
1070
                gdp_open_info_t *info,
1071
                bool nonfatal)
1072
{
1073
        if (nonfatal)
1074
                info->flags |= GOIF_NO_SKEY_NONFATAL;
1075
        else
1076
                info->flags &= ~GOIF_NO_SKEY_NONFATAL;
1077
        return EP_STAT_OK;
1078
}