Project

General

Profile

Statistics
| Branch: | Tag: | Revision:

gdp / apps / gdp-writer.c @ master

History | View | Annotate | Download (9.78 KB)

1
/* vim: set ai sw=4 sts=4 ts=4 : */
2

    
3
/*
4
**  GDP-WRITER --- writes records to a log
5
**
6
**                This reads the records one line at a time from standard input
7
**                and assumes they are text, but there is no text requirement
8
**                implied by the GDP.
9
**
10
**        ----- BEGIN LICENSE BLOCK -----
11
**        Applications for the Global Data Plane
12
**        From the Ubiquitous Swarm Lab, 490 Cory Hall, U.C. Berkeley.
13
**
14
**        Copyright (c) 2015-2019, Regents of the University of California.
15
**        All rights reserved.
16
**
17
**        Permission is hereby granted, without written agreement and without
18
**        license or royalty fees, to use, copy, modify, and distribute this
19
**        software and its documentation for any purpose, provided that the above
20
**        copyright notice and the following two paragraphs appear in all copies
21
**        of this software.
22
**
23
**        IN NO EVENT SHALL REGENTS BE LIABLE TO ANY PARTY FOR DIRECT, INDIRECT,
24
**        SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING LOST
25
**        PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION,
26
**        EVEN IF REGENTS HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27
**
28
**        REGENTS SPECIFICALLY DISCLAIMS ANY WARRANTIES, INCLUDING, BUT NOT
29
**        LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
30
**        FOR A PARTICULAR PURPOSE. THE SOFTWARE AND ACCOMPANYING DOCUMENTATION,
31
**        IF ANY, PROVIDED HEREUNDER IS PROVIDED "AS IS". REGENTS HAS NO
32
**        OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS,
33
**        OR MODIFICATIONS.
34
**        ----- END LICENSE BLOCK -----
35
*/
36

    
37
#include <ep/ep.h>
38
#include <ep/ep_app.h>
39
#include <ep/ep_dbg.h>
40
#include <ep/ep_hexdump.h>
41
#include <ep/ep_string.h>
42
#include <gdp/gdp.h>
43

    
44
#include <unistd.h>
45
#include <errno.h>
46
#include <fcntl.h>
47
#include <getopt.h>
48
#include <string.h>
49
#include <sysexits.h>
50
#include <sys/stat.h>
51

    
52
bool        AsyncIo = false;                // use asynchronous I/O
53
bool        Quiet = false;                        // be silent (no chatty messages)
54
bool        Hexdump = false;                // echo input in hex instead of ASCII
55
bool        KeepGoing = false;                // keep going on append errors
56

    
57
static EP_DBG        Dbg = EP_DBG_INIT("gdp-writer", "gdp-writer");
58

    
59
/*
60
**  DO_LOG --- log a timestamp (for performance checking).
61
*/
62

    
63
FILE        *LogFile;
64

    
65
void
66
do_log(const char *tag)
67
{
68
        struct timeval tv;
69

    
70
        if (LogFile == NULL)
71
                return;
72
        gettimeofday(&tv, NULL);
73
        fprintf(LogFile, "%s %ld.%06ld\n", tag, tv.tv_sec, (long) tv.tv_usec);
74
}
75

    
76
#define LOG(tag)        { if (LogFile != NULL) do_log(tag); }
77

    
78

    
79
static const char        *EventTypes[] =
80
{
81
        "Free (internal use)",
82
        "Data",
83
        "End of Subscription",
84
        "Shutdown",
85
        "Asynchronous Status",
86
};
87

    
88
void
89
showstat(gdp_event_t *gev)
90
{
91
        unsigned int evtype = gdp_event_gettype(gev);
92
        EP_STAT estat = gdp_event_getstat(gev);
93
        gdp_datum_t *d = gdp_event_getdatum(gev);
94
        char ebuf[100];
95
        char tbuf[20];
96
        const char *evname;
97

    
98
        if (evtype >= sizeof EventTypes / sizeof EventTypes[0])
99
        {
100
                snprintf(tbuf, sizeof tbuf, "%u", evtype);
101
                evname = tbuf;
102
        }
103
        else
104
        {
105
                evname = EventTypes[evtype];
106
        }
107

    
108
        printf("Asynchronous event type %s:\n"
109
                        "\trecno %" PRIgdp_recno ", stat %s\n",
110
                        evname,
111
                        gdp_datum_getrecno(d),
112
                        ep_stat_tostr(estat, ebuf, sizeof ebuf));
113

    
114
        gdp_event_free(gev);
115
}
116

    
117

    
118
EP_STAT
119
write_record(gdp_datum_t *datum, gdp_gin_t *gin)
120
{
121
        EP_STAT estat;
122

    
123
        // echo the input for that warm fuzzy feeling
124
        if (!Quiet)
125
        {
126
                gdp_buf_t *dbuf = gdp_datum_getbuf(datum);
127
                int l = gdp_buf_getlength(dbuf);
128
                unsigned char *buf = gdp_buf_getptr(dbuf, l);
129

    
130
                if (!Hexdump)
131
                        fprintf(stdout, "Got input %s%.*s%s\n",
132
                                        EpChar->lquote, l, buf, EpChar->rquote);
133
                else
134
                        ep_hexdump(buf, l, stdout, EP_HEXDUMP_ASCII, 0);
135
        }
136

    
137
        if (ep_dbg_test(Dbg, 60))
138
                gdp_datum_print(datum, ep_dbg_getfile(), GDP_DATUM_PRDEBUG);
139

    
140
        // then send the buffer to the GDP
141
        LOG("W");
142
        if (AsyncIo)
143
        {
144
                estat = gdp_gin_append_async(gin, 1, &datum, NULL, showstat, NULL);
145
                EP_STAT_CHECK(estat, return estat);
146

    
147
                // return value will be printed asynchronously
148
        }
149
        else
150
        {
151
                estat = gdp_gin_append(gin, datum, NULL);
152

    
153
                if (EP_STAT_ISOK(estat))
154
                {
155
                        // print the return value (shows the record number assigned)
156
                        if (!Quiet)
157
                                gdp_datum_print(datum, stdout, GDP_DATUM_PRMETAONLY);
158
                }
159
                else if (!Quiet)
160
                {
161
                        char ebuf[100];
162
                        ep_app_error("Append error: %s",
163
                                                ep_stat_tostr(estat, ebuf, sizeof ebuf));
164
                }
165
        }
166
        return estat;
167
}
168

    
169

    
170
EP_STAT
171
signkey_cb(
172
                gdp_name_t gname,
173
                void *udata,
174
                EP_CRYPTO_KEY **skeyp)
175
{
176
        FILE *fp;
177
        EP_CRYPTO_KEY *skey;
178
        const char *signing_key_file = (const char *) udata;
179

    
180
        ep_dbg_cprintf(Dbg, 1, "signkey_cb(%s)\n", signing_key_file);
181

    
182
        fp = fopen(signing_key_file, "r");
183
        if (fp == NULL)
184
        {
185
                ep_app_error("cannot open signing key file %s", signing_key_file);
186
                return ep_stat_from_errno(errno);
187
        }
188

    
189
        skey = ep_crypto_key_read_fp(fp, signing_key_file,
190
                        EP_CRYPTO_KEYFORM_PEM, EP_CRYPTO_F_SECRET);
191
        if (skey == NULL)
192
        {
193
                ep_app_error("cannot read signing key file %s", signing_key_file);
194
                return ep_stat_from_errno(errno);
195
        }
196

    
197
        *skeyp = skey;
198
        return EP_STAT_OK;
199
}
200

    
201

    
202
void
203
usage(void)
204
{
205
        fprintf(stderr,
206
                        "Usage: %s [-1] [-a] [-D dbgspec] [-G router_addr] [-K key_file]\n"
207
                        "\t[-L log_file] [-q] [-S] log_name\n"
208
                        "    -1  write all input as one record\n"
209
                        "    -a  use asynchronous I/O\n"
210
                        "    -D  set debugging flags\n"
211
                        "    -G  IP host to contact for gdp_router\n"
212
                        "    -i  ignore append errors\n"
213
                        "    -K  signing key file\n"
214
                        "    -L  set logging file name (for debugging)\n"
215
                        "    -q  run quietly (no non-error output)\n"
216
                        "    -S  continue even if signing key cannot be found\n",
217
                        ep_app_getprogname());
218
        exit(EX_USAGE);
219
}
220

    
221

    
222
int
223
main(int argc, char **argv)
224
{
225
        gdp_gin_t *gin;
226
        gdp_name_t gdpiname;
227
        int opt;
228
        EP_STAT estat;
229
        char *gdpd_addr = NULL;
230
        bool show_usage = false;
231
        bool one_record = false;
232
        bool allow_no_signing_key = false;
233
        char *log_file_name = NULL;
234
        char *signing_key_file = NULL;
235
        gdp_open_info_t *info;
236

    
237
        // collect command-line arguments
238
        while ((opt = getopt(argc, argv, "1aD:G:iK:L:qS")) > 0)
239
        {
240
                switch (opt)
241
                {
242
                 case '1':
243
                        one_record = true;
244
                        Hexdump = true;
245
                        break;
246

    
247
                 case 'a':
248
                        AsyncIo = true;
249
                        break;
250

    
251
                 case 'D':
252
                        ep_dbg_set(optarg);
253
                        break;
254

    
255
                 case 'G':
256
                        gdpd_addr = optarg;
257
                        break;
258

    
259
                 case 'i':
260
                        KeepGoing = true;
261
                        break;
262

    
263
                 case 'K':
264
                        signing_key_file = optarg;
265
                        break;
266

    
267
                 case 'L':
268
                        log_file_name = optarg;
269
                        break;
270

    
271
                 case 'q':
272
                        Quiet = true;
273
                        break;
274

    
275
                 case 'S':
276
                        allow_no_signing_key = true;
277
                        break;
278

    
279
                 default:
280
                        show_usage = true;
281
                        break;
282
                }
283
        }
284
        argc -= optind;
285
        argv += optind;
286

    
287
        if (show_usage || argc != 1)
288
                usage();
289

    
290
        if (log_file_name != NULL)
291
        {
292
                // open a log file (for timing measurements)
293
                LogFile = fopen(log_file_name, "a");
294
                if (LogFile == NULL)
295
                        ep_app_error("Cannot open log file %s: %s",
296
                                        log_file_name, strerror(errno));
297
                else
298
                        setlinebuf(LogFile);
299
        }
300

    
301
        // initialize the GDP library
302
        estat = gdp_init(gdpd_addr);
303
        if (!EP_STAT_ISOK(estat))
304
        {
305
                ep_app_error("GDP Initialization failed");
306
                goto fail0;
307
        }
308

    
309
        // allow thread to settle to avoid interspersed debug output
310
        ep_time_nanosleep(INT64_C(100000000));
311

    
312
        // set up any open information
313
        info = gdp_open_info_new();
314

    
315
        if (signing_key_file != NULL)
316
        {
317
                gdp_open_info_set_signkey_cb(info, signkey_cb, signing_key_file);
318

    
319
#if 0        // old code: keep as an example of gdp_open_info_set_signing_key
320
                FILE *fp;
321
                EP_CRYPTO_KEY *skey;
322

323
                fp = fopen(signing_key_file, "r");
324
                if (fp == NULL)
325
                {
326
                        ep_app_error("cannot open signing key file %s", signing_key_file);
327
                        goto fail1;
328
                }
329

330
                skey = ep_crypto_key_read_fp(fp, signing_key_file,
331
                                EP_CRYPTO_KEYFORM_PEM, EP_CRYPTO_F_SECRET);
332
                if (skey == NULL)
333
                {
334
                        ep_app_error("cannot read signing key file %s", signing_key_file);
335
                        goto fail1;
336
                }
337

338
                estat = gdp_open_info_set_signing_key(info, skey);
339
                EP_STAT_CHECK(estat, goto fail1);
340
#endif
341
        }
342

    
343
        if (allow_no_signing_key)
344
                estat = gdp_open_info_set_no_skey_nonfatal(info, true);
345

    
346
        // open a GDP object with the provided name
347
        estat = gdp_parse_name(argv[0], gdpiname);
348
        if (EP_STAT_ISFAIL(estat))
349
                goto fail1;
350
        else
351
        {
352
                estat = gdp_gin_open(gdpiname, GDP_MODE_AO, info, &gin);
353
                if (EP_STAT_ISFAIL(estat))
354
                        goto fail1;
355
        }
356

    
357
        if (!Quiet)
358
        {
359
                gdp_pname_t pname;
360

    
361
                // dump the internal version of the GDP object to facilitate testing
362
                printf("GDPname: %s (%" PRIu64 " recs)\n",
363
                                gdp_printable_name(*gdp_gin_getname(gin), pname),
364
                                gdp_gin_getnrecs(gin));
365

    
366
                // OK, ready to go!
367
                fprintf(stdout, "\nStarting to read input\n");
368
        }
369

    
370
        {
371
                // we need a place to buffer the input
372
                gdp_datum_t *datum = gdp_datum_new();
373

    
374
                if (one_record)
375
                {
376
                        // read the entire stdin into a single datum
377
                        char buf[8 * 1024];
378
                        int l;
379

    
380
                        while ((l = fread(buf, 1, sizeof buf, stdin)) > 0)
381
                                gdp_buf_write(gdp_datum_getbuf(datum), buf, l);
382

    
383
                        estat = write_record(datum, gin);
384
                }
385
                else
386
                {
387
                        // write lines into multiple datums
388
                        char buf[200];
389

    
390
                        while (fgets(buf, sizeof buf, stdin) != NULL)
391
                        {
392
                                // strip off newlines
393
                                char *p = strchr(buf, '\n');
394
                                if (p != NULL)
395
                                        *p++ = '\0';
396

    
397
                                // first copy the text buffer into the datum buffer
398
                                gdp_datum_reset(datum);
399
                                gdp_buf_write(gdp_datum_getbuf(datum), buf, strlen(buf));
400

    
401
                                // write the record to the log
402
                                estat = write_record(datum, gin);
403
                                if (!EP_STAT_ISOK(estat) && !KeepGoing)
404
                                        break;
405
                        }
406
                }
407

    
408
                // OK, all done.  Free our resources and exit
409
                gdp_datum_free(datum);
410
        }
411

    
412
        // give a chance to collect async results
413
        if (AsyncIo)
414
                sleep(1);
415

    
416
        // tell the GDP that we are done
417
        gdp_gin_close(gin);
418

    
419
fail1:
420
        if (info != NULL)
421
                gdp_open_info_free(info);
422

    
423
fail0:
424
        ;                        // avoid compiler error
425
        int exitstat;
426

    
427
        if (EP_STAT_ISOK(estat))
428
                exitstat = EX_OK;
429
        else if (EP_STAT_IS_SAME(estat, GDP_STAT_NAK_NOROUTE))
430
                exitstat = EX_CANTCREAT;
431
        else if (EP_STAT_ISABORT(estat))
432
                exitstat = EX_SOFTWARE;
433
        else
434
                exitstat = EX_UNAVAILABLE;
435

    
436
        // OK status can have values; hide that from the user
437
        if (EP_STAT_ISOK(estat))
438
                estat = EP_STAT_OK;
439
        if (!EP_STAT_ISOK(estat))
440
                ep_app_message(estat, "exiting with status");
441
        else if (!Quiet)
442
                fprintf(stderr, "Exiting with status OK\n");
443
        return exitstat;
444
}