gdp / apps / gdp-writer.c @ master
History | View | Annotate | Download (9.78 KB)
1 |
/* vim: set ai sw=4 sts=4 ts=4 : */
|
---|---|
2 |
|
3 |
/*
|
4 |
** GDP-WRITER --- writes records to a log
|
5 |
**
|
6 |
** This reads the records one line at a time from standard input
|
7 |
** and assumes they are text, but there is no text requirement
|
8 |
** implied by the GDP.
|
9 |
**
|
10 |
** ----- BEGIN LICENSE BLOCK -----
|
11 |
** Applications for the Global Data Plane
|
12 |
** From the Ubiquitous Swarm Lab, 490 Cory Hall, U.C. Berkeley.
|
13 |
**
|
14 |
** Copyright (c) 2015-2019, Regents of the University of California.
|
15 |
** All rights reserved.
|
16 |
**
|
17 |
** Permission is hereby granted, without written agreement and without
|
18 |
** license or royalty fees, to use, copy, modify, and distribute this
|
19 |
** software and its documentation for any purpose, provided that the above
|
20 |
** copyright notice and the following two paragraphs appear in all copies
|
21 |
** of this software.
|
22 |
**
|
23 |
** IN NO EVENT SHALL REGENTS BE LIABLE TO ANY PARTY FOR DIRECT, INDIRECT,
|
24 |
** SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING LOST
|
25 |
** PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION,
|
26 |
** EVEN IF REGENTS HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
27 |
**
|
28 |
** REGENTS SPECIFICALLY DISCLAIMS ANY WARRANTIES, INCLUDING, BUT NOT
|
29 |
** LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
|
30 |
** FOR A PARTICULAR PURPOSE. THE SOFTWARE AND ACCOMPANYING DOCUMENTATION,
|
31 |
** IF ANY, PROVIDED HEREUNDER IS PROVIDED "AS IS". REGENTS HAS NO
|
32 |
** OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS,
|
33 |
** OR MODIFICATIONS.
|
34 |
** ----- END LICENSE BLOCK -----
|
35 |
*/
|
36 |
|
37 |
#include <ep/ep.h> |
38 |
#include <ep/ep_app.h> |
39 |
#include <ep/ep_dbg.h> |
40 |
#include <ep/ep_hexdump.h> |
41 |
#include <ep/ep_string.h> |
42 |
#include <gdp/gdp.h> |
43 |
|
44 |
#include <unistd.h> |
45 |
#include <errno.h> |
46 |
#include <fcntl.h> |
47 |
#include <getopt.h> |
48 |
#include <string.h> |
49 |
#include <sysexits.h> |
50 |
#include <sys/stat.h> |
51 |
|
52 |
bool AsyncIo = false; // use asynchronous I/O |
53 |
bool Quiet = false; // be silent (no chatty messages) |
54 |
bool Hexdump = false; // echo input in hex instead of ASCII |
55 |
bool KeepGoing = false; // keep going on append errors |
56 |
|
57 |
static EP_DBG Dbg = EP_DBG_INIT("gdp-writer", "gdp-writer"); |
58 |
|
59 |
/*
|
60 |
** DO_LOG --- log a timestamp (for performance checking).
|
61 |
*/
|
62 |
|
63 |
FILE *LogFile; |
64 |
|
65 |
void
|
66 |
do_log(const char *tag) |
67 |
{ |
68 |
struct timeval tv;
|
69 |
|
70 |
if (LogFile == NULL) |
71 |
return;
|
72 |
gettimeofday(&tv, NULL);
|
73 |
fprintf(LogFile, "%s %ld.%06ld\n", tag, tv.tv_sec, (long) tv.tv_usec); |
74 |
} |
75 |
|
76 |
#define LOG(tag) { if (LogFile != NULL) do_log(tag); } |
77 |
|
78 |
|
79 |
static const char *EventTypes[] = |
80 |
{ |
81 |
"Free (internal use)",
|
82 |
"Data",
|
83 |
"End of Subscription",
|
84 |
"Shutdown",
|
85 |
"Asynchronous Status",
|
86 |
}; |
87 |
|
88 |
void
|
89 |
showstat(gdp_event_t *gev) |
90 |
{ |
91 |
unsigned int evtype = gdp_event_gettype(gev); |
92 |
EP_STAT estat = gdp_event_getstat(gev); |
93 |
gdp_datum_t *d = gdp_event_getdatum(gev); |
94 |
char ebuf[100]; |
95 |
char tbuf[20]; |
96 |
const char *evname; |
97 |
|
98 |
if (evtype >= sizeof EventTypes / sizeof EventTypes[0]) |
99 |
{ |
100 |
snprintf(tbuf, sizeof tbuf, "%u", evtype); |
101 |
evname = tbuf; |
102 |
} |
103 |
else
|
104 |
{ |
105 |
evname = EventTypes[evtype]; |
106 |
} |
107 |
|
108 |
printf("Asynchronous event type %s:\n"
|
109 |
"\trecno %" PRIgdp_recno ", stat %s\n", |
110 |
evname, |
111 |
gdp_datum_getrecno(d), |
112 |
ep_stat_tostr(estat, ebuf, sizeof ebuf));
|
113 |
|
114 |
gdp_event_free(gev); |
115 |
} |
116 |
|
117 |
|
118 |
EP_STAT |
119 |
write_record(gdp_datum_t *datum, gdp_gin_t *gin) |
120 |
{ |
121 |
EP_STAT estat; |
122 |
|
123 |
// echo the input for that warm fuzzy feeling
|
124 |
if (!Quiet)
|
125 |
{ |
126 |
gdp_buf_t *dbuf = gdp_datum_getbuf(datum); |
127 |
int l = gdp_buf_getlength(dbuf);
|
128 |
unsigned char *buf = gdp_buf_getptr(dbuf, l); |
129 |
|
130 |
if (!Hexdump)
|
131 |
fprintf(stdout, "Got input %s%.*s%s\n",
|
132 |
EpChar->lquote, l, buf, EpChar->rquote); |
133 |
else
|
134 |
ep_hexdump(buf, l, stdout, EP_HEXDUMP_ASCII, 0);
|
135 |
} |
136 |
|
137 |
if (ep_dbg_test(Dbg, 60)) |
138 |
gdp_datum_print(datum, ep_dbg_getfile(), GDP_DATUM_PRDEBUG); |
139 |
|
140 |
// then send the buffer to the GDP
|
141 |
LOG("W");
|
142 |
if (AsyncIo)
|
143 |
{ |
144 |
estat = gdp_gin_append_async(gin, 1, &datum, NULL, showstat, NULL); |
145 |
EP_STAT_CHECK(estat, return estat);
|
146 |
|
147 |
// return value will be printed asynchronously
|
148 |
} |
149 |
else
|
150 |
{ |
151 |
estat = gdp_gin_append(gin, datum, NULL);
|
152 |
|
153 |
if (EP_STAT_ISOK(estat))
|
154 |
{ |
155 |
// print the return value (shows the record number assigned)
|
156 |
if (!Quiet)
|
157 |
gdp_datum_print(datum, stdout, GDP_DATUM_PRMETAONLY); |
158 |
} |
159 |
else if (!Quiet) |
160 |
{ |
161 |
char ebuf[100]; |
162 |
ep_app_error("Append error: %s",
|
163 |
ep_stat_tostr(estat, ebuf, sizeof ebuf));
|
164 |
} |
165 |
} |
166 |
return estat;
|
167 |
} |
168 |
|
169 |
|
170 |
EP_STAT |
171 |
signkey_cb( |
172 |
gdp_name_t gname, |
173 |
void *udata,
|
174 |
EP_CRYPTO_KEY **skeyp) |
175 |
{ |
176 |
FILE *fp; |
177 |
EP_CRYPTO_KEY *skey; |
178 |
const char *signing_key_file = (const char *) udata; |
179 |
|
180 |
ep_dbg_cprintf(Dbg, 1, "signkey_cb(%s)\n", signing_key_file); |
181 |
|
182 |
fp = fopen(signing_key_file, "r");
|
183 |
if (fp == NULL) |
184 |
{ |
185 |
ep_app_error("cannot open signing key file %s", signing_key_file);
|
186 |
return ep_stat_from_errno(errno);
|
187 |
} |
188 |
|
189 |
skey = ep_crypto_key_read_fp(fp, signing_key_file, |
190 |
EP_CRYPTO_KEYFORM_PEM, EP_CRYPTO_F_SECRET); |
191 |
if (skey == NULL) |
192 |
{ |
193 |
ep_app_error("cannot read signing key file %s", signing_key_file);
|
194 |
return ep_stat_from_errno(errno);
|
195 |
} |
196 |
|
197 |
*skeyp = skey; |
198 |
return EP_STAT_OK;
|
199 |
} |
200 |
|
201 |
|
202 |
void
|
203 |
usage(void)
|
204 |
{ |
205 |
fprintf(stderr, |
206 |
"Usage: %s [-1] [-a] [-D dbgspec] [-G router_addr] [-K key_file]\n"
|
207 |
"\t[-L log_file] [-q] [-S] log_name\n"
|
208 |
" -1 write all input as one record\n"
|
209 |
" -a use asynchronous I/O\n"
|
210 |
" -D set debugging flags\n"
|
211 |
" -G IP host to contact for gdp_router\n"
|
212 |
" -i ignore append errors\n"
|
213 |
" -K signing key file\n"
|
214 |
" -L set logging file name (for debugging)\n"
|
215 |
" -q run quietly (no non-error output)\n"
|
216 |
" -S continue even if signing key cannot be found\n",
|
217 |
ep_app_getprogname()); |
218 |
exit(EX_USAGE); |
219 |
} |
220 |
|
221 |
|
222 |
int
|
223 |
main(int argc, char **argv) |
224 |
{ |
225 |
gdp_gin_t *gin; |
226 |
gdp_name_t gdpiname; |
227 |
int opt;
|
228 |
EP_STAT estat; |
229 |
char *gdpd_addr = NULL; |
230 |
bool show_usage = false; |
231 |
bool one_record = false; |
232 |
bool allow_no_signing_key = false; |
233 |
char *log_file_name = NULL; |
234 |
char *signing_key_file = NULL; |
235 |
gdp_open_info_t *info; |
236 |
|
237 |
// collect command-line arguments
|
238 |
while ((opt = getopt(argc, argv, "1aD:G:iK:L:qS")) > 0) |
239 |
{ |
240 |
switch (opt)
|
241 |
{ |
242 |
case '1': |
243 |
one_record = true;
|
244 |
Hexdump = true;
|
245 |
break;
|
246 |
|
247 |
case 'a': |
248 |
AsyncIo = true;
|
249 |
break;
|
250 |
|
251 |
case 'D': |
252 |
ep_dbg_set(optarg); |
253 |
break;
|
254 |
|
255 |
case 'G': |
256 |
gdpd_addr = optarg; |
257 |
break;
|
258 |
|
259 |
case 'i': |
260 |
KeepGoing = true;
|
261 |
break;
|
262 |
|
263 |
case 'K': |
264 |
signing_key_file = optarg; |
265 |
break;
|
266 |
|
267 |
case 'L': |
268 |
log_file_name = optarg; |
269 |
break;
|
270 |
|
271 |
case 'q': |
272 |
Quiet = true;
|
273 |
break;
|
274 |
|
275 |
case 'S': |
276 |
allow_no_signing_key = true;
|
277 |
break;
|
278 |
|
279 |
default:
|
280 |
show_usage = true;
|
281 |
break;
|
282 |
} |
283 |
} |
284 |
argc -= optind; |
285 |
argv += optind; |
286 |
|
287 |
if (show_usage || argc != 1) |
288 |
usage(); |
289 |
|
290 |
if (log_file_name != NULL) |
291 |
{ |
292 |
// open a log file (for timing measurements)
|
293 |
LogFile = fopen(log_file_name, "a");
|
294 |
if (LogFile == NULL) |
295 |
ep_app_error("Cannot open log file %s: %s",
|
296 |
log_file_name, strerror(errno)); |
297 |
else
|
298 |
setlinebuf(LogFile); |
299 |
} |
300 |
|
301 |
// initialize the GDP library
|
302 |
estat = gdp_init(gdpd_addr); |
303 |
if (!EP_STAT_ISOK(estat))
|
304 |
{ |
305 |
ep_app_error("GDP Initialization failed");
|
306 |
goto fail0;
|
307 |
} |
308 |
|
309 |
// allow thread to settle to avoid interspersed debug output
|
310 |
ep_time_nanosleep(INT64_C(100000000));
|
311 |
|
312 |
// set up any open information
|
313 |
info = gdp_open_info_new(); |
314 |
|
315 |
if (signing_key_file != NULL) |
316 |
{ |
317 |
gdp_open_info_set_signkey_cb(info, signkey_cb, signing_key_file); |
318 |
|
319 |
#if 0 // old code: keep as an example of gdp_open_info_set_signing_key
|
320 |
FILE *fp;
|
321 |
EP_CRYPTO_KEY *skey;
|
322 |
|
323 |
fp = fopen(signing_key_file, "r");
|
324 |
if (fp == NULL)
|
325 |
{
|
326 |
ep_app_error("cannot open signing key file %s", signing_key_file);
|
327 |
goto fail1;
|
328 |
}
|
329 |
|
330 |
skey = ep_crypto_key_read_fp(fp, signing_key_file,
|
331 |
EP_CRYPTO_KEYFORM_PEM, EP_CRYPTO_F_SECRET);
|
332 |
if (skey == NULL)
|
333 |
{
|
334 |
ep_app_error("cannot read signing key file %s", signing_key_file);
|
335 |
goto fail1;
|
336 |
}
|
337 |
|
338 |
estat = gdp_open_info_set_signing_key(info, skey);
|
339 |
EP_STAT_CHECK(estat, goto fail1);
|
340 |
#endif
|
341 |
} |
342 |
|
343 |
if (allow_no_signing_key)
|
344 |
estat = gdp_open_info_set_no_skey_nonfatal(info, true);
|
345 |
|
346 |
// open a GDP object with the provided name
|
347 |
estat = gdp_parse_name(argv[0], gdpiname);
|
348 |
if (EP_STAT_ISFAIL(estat))
|
349 |
goto fail1;
|
350 |
else
|
351 |
{ |
352 |
estat = gdp_gin_open(gdpiname, GDP_MODE_AO, info, &gin); |
353 |
if (EP_STAT_ISFAIL(estat))
|
354 |
goto fail1;
|
355 |
} |
356 |
|
357 |
if (!Quiet)
|
358 |
{ |
359 |
gdp_pname_t pname; |
360 |
|
361 |
// dump the internal version of the GDP object to facilitate testing
|
362 |
printf("GDPname: %s (%" PRIu64 " recs)\n", |
363 |
gdp_printable_name(*gdp_gin_getname(gin), pname), |
364 |
gdp_gin_getnrecs(gin)); |
365 |
|
366 |
// OK, ready to go!
|
367 |
fprintf(stdout, "\nStarting to read input\n");
|
368 |
} |
369 |
|
370 |
{ |
371 |
// we need a place to buffer the input
|
372 |
gdp_datum_t *datum = gdp_datum_new(); |
373 |
|
374 |
if (one_record)
|
375 |
{ |
376 |
// read the entire stdin into a single datum
|
377 |
char buf[8 * 1024]; |
378 |
int l;
|
379 |
|
380 |
while ((l = fread(buf, 1, sizeof buf, stdin)) > 0) |
381 |
gdp_buf_write(gdp_datum_getbuf(datum), buf, l); |
382 |
|
383 |
estat = write_record(datum, gin); |
384 |
} |
385 |
else
|
386 |
{ |
387 |
// write lines into multiple datums
|
388 |
char buf[200]; |
389 |
|
390 |
while (fgets(buf, sizeof buf, stdin) != NULL) |
391 |
{ |
392 |
// strip off newlines
|
393 |
char *p = strchr(buf, '\n'); |
394 |
if (p != NULL) |
395 |
*p++ = '\0';
|
396 |
|
397 |
// first copy the text buffer into the datum buffer
|
398 |
gdp_datum_reset(datum); |
399 |
gdp_buf_write(gdp_datum_getbuf(datum), buf, strlen(buf)); |
400 |
|
401 |
// write the record to the log
|
402 |
estat = write_record(datum, gin); |
403 |
if (!EP_STAT_ISOK(estat) && !KeepGoing)
|
404 |
break;
|
405 |
} |
406 |
} |
407 |
|
408 |
// OK, all done. Free our resources and exit
|
409 |
gdp_datum_free(datum); |
410 |
} |
411 |
|
412 |
// give a chance to collect async results
|
413 |
if (AsyncIo)
|
414 |
sleep(1);
|
415 |
|
416 |
// tell the GDP that we are done
|
417 |
gdp_gin_close(gin); |
418 |
|
419 |
fail1:
|
420 |
if (info != NULL) |
421 |
gdp_open_info_free(info); |
422 |
|
423 |
fail0:
|
424 |
; // avoid compiler error
|
425 |
int exitstat;
|
426 |
|
427 |
if (EP_STAT_ISOK(estat))
|
428 |
exitstat = EX_OK; |
429 |
else if (EP_STAT_IS_SAME(estat, GDP_STAT_NAK_NOROUTE)) |
430 |
exitstat = EX_CANTCREAT; |
431 |
else if (EP_STAT_ISABORT(estat)) |
432 |
exitstat = EX_SOFTWARE; |
433 |
else
|
434 |
exitstat = EX_UNAVAILABLE; |
435 |
|
436 |
// OK status can have values; hide that from the user
|
437 |
if (EP_STAT_ISOK(estat))
|
438 |
estat = EP_STAT_OK; |
439 |
if (!EP_STAT_ISOK(estat))
|
440 |
ep_app_message(estat, "exiting with status");
|
441 |
else if (!Quiet) |
442 |
fprintf(stderr, "Exiting with status OK\n");
|
443 |
return exitstat;
|
444 |
} |