1
|
|
2
|
|
3
|
#include <stdio.h>
|
4
|
#include <stdarg.h>
|
5
|
#include <time.h>
|
6
|
#include <unistd.h>
|
7
|
#include <string.h>
|
8
|
#include <ep/ep_app.h>
|
9
|
#include <ep/ep_dbg.h>
|
10
|
#include <gdp/gdp.h>
|
11
|
#include <pthread.h>
|
12
|
|
13
|
#if __APPLE__
|
14
|
#define SET_THREAD_NAME(n) pthread_setname_np(n)
|
15
|
#else
|
16
|
#define SET_THREAD_NAME(n) pthread_setname_np(pthread_self(), n)
|
17
|
#endif
|
18
|
|
19
|
pthread_mutex_t lock;
|
20
|
time_t start;
|
21
|
struct ArgStruct
|
22
|
{
|
23
|
char *logname;
|
24
|
int sleepBefore;
|
25
|
int numRecords;
|
26
|
char *threadName;
|
27
|
};
|
28
|
|
29
|
void
|
30
|
localPrint(const char *s, ...)
|
31
|
{
|
32
|
char tmpStr[100];
|
33
|
va_list fmt;
|
34
|
va_start(fmt, s);
|
35
|
pthread_t cur_thread = pthread_self();
|
36
|
pthread_getname_np(cur_thread, tmpStr, 100);
|
37
|
|
38
|
pthread_mutex_lock(&lock);
|
39
|
printf("%9.6f ## %s] ", (double)(time(NULL)-start), tmpStr);
|
40
|
vprintf(s, fmt);
|
41
|
fflush(stdout);
|
42
|
pthread_mutex_unlock(&lock);
|
43
|
|
44
|
va_end(fmt);
|
45
|
}
|
46
|
|
47
|
void
|
48
|
*appendThread(void *arguments)
|
49
|
{
|
50
|
|
51
|
|
52
|
struct ArgStruct *args = (struct ArgStruct *)arguments;
|
53
|
char *logname = args->logname;
|
54
|
int sleepBefore = args->sleepBefore;
|
55
|
int numRecords = args->numRecords;
|
56
|
char *threadName = args->threadName;
|
57
|
|
58
|
|
59
|
int i;
|
60
|
char data[100];
|
61
|
gdp_gcl_t *gcl;
|
62
|
gdp_name_t gcliname;
|
63
|
EP_STAT estat;
|
64
|
|
65
|
SET_THREAD_NAME(threadName);
|
66
|
|
67
|
|
68
|
gdp_parse_name(logname, gcliname);
|
69
|
|
70
|
estat = gdp_gcl_open(gcliname, GDP_MODE_AO, NULL, &gcl);
|
71
|
ep_app_message(estat, "%s: gdp_gcl_open", threadName);
|
72
|
|
73
|
gdp_datum_t *datum = gdp_datum_new();
|
74
|
|
75
|
|
76
|
sleep(sleepBefore);
|
77
|
for (i=0; i<numRecords; i++)
|
78
|
{
|
79
|
sleep(1);
|
80
|
sprintf(data, "\"%s-%s-%d\"", logname, threadName, i);
|
81
|
localPrint("Appending %s to %s\n", data, logname);
|
82
|
|
83
|
|
84
|
gdp_buf_write(gdp_datum_getbuf(datum), data, strlen(data));
|
85
|
estat = gdp_gcl_append(gcl, datum);
|
86
|
ep_app_message(estat, "%s: gdp_gcl_append", threadName);
|
87
|
}
|
88
|
|
89
|
gdp_datum_free(datum);
|
90
|
return NULL;
|
91
|
}
|
92
|
|
93
|
void
|
94
|
*subscribeThread(void *arguments)
|
95
|
{
|
96
|
struct ArgStruct *args = arguments;
|
97
|
char *logname = args->logname;
|
98
|
|
99
|
char data[100];
|
100
|
gdp_gcl_t *gcl;
|
101
|
gdp_name_t gcliname;
|
102
|
EP_STAT estat;
|
103
|
|
104
|
logname = args->logname;
|
105
|
SET_THREAD_NAME(args->threadName);
|
106
|
|
107
|
|
108
|
gdp_parse_name(logname, gcliname);
|
109
|
|
110
|
estat = gdp_gcl_open(gcliname, GDP_MODE_RO, NULL, &gcl);
|
111
|
ep_app_message(estat, "%s: gdp_gcl_open", args->threadName);
|
112
|
|
113
|
estat = gdp_gcl_subscribe(gcl, 0, 0, NULL, NULL, NULL);
|
114
|
ep_app_message(estat, "%s: gdp_gcl_subscribe", args->threadName);
|
115
|
|
116
|
for (;;)
|
117
|
{
|
118
|
|
119
|
gdp_event_t *gev = gdp_event_next(gcl, 0);
|
120
|
|
121
|
gdp_datum_t *datum = gdp_event_getdatum(gev);
|
122
|
gdp_buf_read(gdp_datum_getbuf(datum), data, 100);
|
123
|
|
124
|
localPrint("Got data: %s on %s\n", data, logname);
|
125
|
gdp_event_free(gev);
|
126
|
}
|
127
|
return NULL;
|
128
|
}
|
129
|
|
130
|
int
|
131
|
main(int argc, char *argv[])
|
132
|
{
|
133
|
int opt;
|
134
|
while ((opt = getopt(argc, argv, "D:")) > 0)
|
135
|
{
|
136
|
switch (opt)
|
137
|
{
|
138
|
case 'D':
|
139
|
ep_dbg_set(optarg);
|
140
|
break;
|
141
|
}
|
142
|
}
|
143
|
argc -= optind;
|
144
|
argv += optind;
|
145
|
|
146
|
if (argc != 4)
|
147
|
{
|
148
|
printf("Usage: %s [-Ddbg] numA numS delay numRec\n", argv[0]);
|
149
|
return -1;
|
150
|
}
|
151
|
|
152
|
int i, numA, numS, delay, numR;
|
153
|
|
154
|
|
155
|
numA = (int) strtol(argv[0], NULL, 10);
|
156
|
numS = (int) strtol(argv[1], NULL, 10);
|
157
|
delay = (int) strtol(argv[2], NULL, 10);
|
158
|
numR = (int) strtol(argv[3], NULL, 10);
|
159
|
|
160
|
pthread_t *appendTIDs = (pthread_t *)malloc(sizeof(pthread_t)*numA);
|
161
|
|
162
|
gdp_init(NULL);
|
163
|
start = time(NULL);
|
164
|
SET_THREAD_NAME("MainThread");
|
165
|
|
166
|
|
167
|
for (i=0; i<numA; i++)
|
168
|
{
|
169
|
localPrint("Starting AThread%d\n", i+1);
|
170
|
char *logname = malloc(sizeof(char)*100);
|
171
|
sprintf(logname, "log%d", i+1);
|
172
|
char *threadName = malloc(sizeof(char)*100);
|
173
|
sprintf(threadName, "AThread%d", i+1);
|
174
|
|
175
|
|
176
|
struct ArgStruct *args = malloc(sizeof (struct ArgStruct));
|
177
|
args->logname = logname;
|
178
|
args->sleepBefore = delay*(i+1);
|
179
|
args->numRecords = numR;
|
180
|
args->threadName = threadName;
|
181
|
|
182
|
|
183
|
pthread_create(&appendTIDs[i], NULL, appendThread, args);
|
184
|
}
|
185
|
|
186
|
|
187
|
for (i=0; i<numS; i++)
|
188
|
{
|
189
|
pthread_t t;
|
190
|
localPrint("Starting SThread%d\n", i+1);
|
191
|
char *logname = malloc(sizeof(char)*100);
|
192
|
sprintf(logname, "log%d", i+1);
|
193
|
char *threadName = malloc(sizeof(char)*100);
|
194
|
sprintf(threadName, "SThread%d", i+1);
|
195
|
|
196
|
|
197
|
struct ArgStruct *args = malloc(sizeof (struct ArgStruct));
|
198
|
args->logname = logname;
|
199
|
args->threadName = threadName;
|
200
|
|
201
|
pthread_create(&t, NULL, subscribeThread, args);
|
202
|
}
|
203
|
|
204
|
|
205
|
for (i=0; i<numA; i++)
|
206
|
{
|
207
|
pthread_join(appendTIDs[i], NULL);
|
208
|
}
|
209
|
return 0;
|
210
|
}
|