Project

General

Profile

Statistics
| Branch: | Tag: | Revision:

gdp / test / t_pcari_1.c @ master

History | View | Annotate | Download (3.63 KB)

1
/* vim: set ai sw=4 sts=4 ts=4 : */
2

    
3
/*
4
**  C version of PCARI "test_rw_gdp.py".
5
*/
6

    
7
#include <gdp/gdp.h>
8

    
9
#include <ep/ep_app.h>
10
#include <ep/ep_crypto.h>
11
#include <ep/ep_dbg.h>
12
#include <ep/ep_time.h>
13

    
14
#include <stdio.h>
15
#include <sysexits.h>
16
#include <unistd.h>
17

    
18
EP_TIME_SPEC                WriteInterval =                { 1, 0, 0.0 };
19
bool                                Errors =                        false;
20

    
21
void
22
usage(void)
23
{
24
        fprintf(stderr, "Usage: %s [-D dbgspec] [-G router-addr]\n"
25
                        "    [-s final-sleep-time] [-w write-interval] log1 log2\n"
26
                        "Defaults:\n"
27
                        "    final-sleep-time = 10s\n"
28
                        "    write-interval = 1s\n",
29
                        ep_app_getprogname());
30
        exit(EX_USAGE);
31
}
32

    
33
static void *
34
writer(void *_)
35
{
36
        gdp_gin_t *gin = _;
37
        static int counter = 0;
38
        EP_STAT estat;
39
        gdp_hash_t *prevhash = NULL;
40
        gdp_datum_t *datum = gdp_datum_new();
41

    
42
        for (;;)
43
        {
44
                char data[30];
45
                snprintf(data, sizeof data, "Hi %d", counter++);
46
                ep_app_info("writer: %s", data);
47

    
48
                gdp_datum_reset(datum);
49
                gdp_buf_t *dbuf = gdp_datum_getbuf(datum);
50
                gdp_buf_printf(dbuf, "%s", data);
51
                estat = gdp_gin_append(gin, datum, prevhash);
52
                if (!EP_STAT_ISOK(estat))
53
                        ep_app_message(estat, "writer append");
54
                ep_time_sleep(&WriteInterval);
55
        }
56
        return NULL;
57
}
58

    
59
static void *
60
reader(void *_)
61
{
62
        gdp_gin_t *gin = _;
63
        EP_STAT estat;
64

    
65
        estat = gdp_gin_subscribe_by_recno(gin, 0, 0, NULL, NULL, NULL);
66
        if (!EP_STAT_ISOK(estat))
67
                ep_app_message(estat, "reader subscribe");
68
        for (;;)
69
        {
70
                gdp_event_t *gev = gdp_event_next(gin, NULL);
71
                gdp_event_print(gev, stdout);
72
                gdp_event_free(gev);
73
        }
74
        return NULL;
75
}
76

    
77
static void
78
stat_check(EP_STAT estat, const char *where)
79
{
80
        if (EP_STAT_ISOK(estat))
81
                return;
82
        ep_app_message(estat, "%s", where);
83
        Errors = true;
84
}
85

    
86
int main(int argc, char **argv)
87
{
88
        EP_STAT estat;
89
        char *router_addr = NULL;
90
        int opt;
91
        EP_TIME_SPEC final_sleep_time = { 10, 0, 0.0 };
92

    
93
        while ((opt = getopt(argc, argv, "D:G:s:w:")) > 0)
94
        {
95
                switch (opt)
96
                {
97
                  case 'D':
98
                        ep_dbg_set(optarg);
99
                        break;
100

    
101
                  case 'G':
102
                        router_addr = optarg;
103
                        break;
104

    
105
                  case 's':
106
                        ep_time_parse_interval(optarg, 's', &final_sleep_time);
107
                        break;
108

    
109
                  case 'w':
110
                        ep_time_parse_interval(optarg, 's', &WriteInterval);
111
                        break;
112
                }
113
        }
114
        argc -= optind;
115
        argv += optind;
116

    
117
        if (argc != 2)
118
                usage();
119

    
120
        // initialize the GDP library
121
        estat = gdp_init(router_addr);
122
        if (!EP_STAT_ISOK(estat))
123
                ep_app_fatal("GDP Initialization failed");
124

    
125
        // open log1 for writing and spawn a writer thread
126
        ep_app_info("Opening log1w");
127
        gdp_name_t log1name;
128
        estat = gdp_parse_name(argv[0], log1name);
129
        stat_check(estat, "gdp_parse_name(log1)");
130
        gdp_gin_t *log1w;
131
        estat = gdp_gin_open(log1name, GDP_MODE_AO, NULL, &log1w);
132
        stat_check(estat, "gdp_gin_open(log1w)");
133
        if (Errors)
134
                exit(EX_UNAVAILABLE);
135

    
136
        ep_app_info("Spawning thread w1");
137
        EP_THR thr_w1;
138
        ep_thr_spawn(&thr_w1, &writer, log1w);
139

    
140
        // open log1 and log2 for reading
141
        ep_app_info("Opening log1r");
142
        gdp_gin_t *log1r;
143
        estat = gdp_gin_open(log1name, GDP_MODE_RO, NULL, &log1r);
144
        stat_check(estat, "gdp_gin_open(log1r)");
145
        ep_app_info("Opening log2r");
146
        gdp_name_t log2name;
147
        estat = gdp_parse_name(argv[1], log2name);
148
        stat_check(estat, "gdp_parse_name(log2)");
149
        gdp_gin_t *log2r;
150
        estat = gdp_gin_open(log2name, GDP_MODE_RO, NULL, &log2r);
151
        stat_check(estat, "gdp_gin_open(log2r)");
152
        if (Errors)
153
                exit(EX_UNAVAILABLE);
154

    
155
        // spawn two subscriber threads, one on each log
156
        ep_app_info("Spawning thread r2");
157
        EP_THR thr_r2;
158
        ep_thr_spawn(&thr_r2, &reader, log2r);
159
        ep_app_info("Spawning thread r1");
160
        EP_THR thr_r1;
161
        ep_thr_spawn(&thr_r1, &reader, log1r);
162

    
163
        // run threads for a while to collect results
164
        ep_app_info("Waiting for status");
165
        ep_time_sleep(&final_sleep_time);
166

    
167
        // exiting will kill off the threads
168
        ep_app_info("Exiting");
169
        exit(EX_OK);
170
}