1
|
#include <stdio.h>
|
2
|
#include <stdarg.h>
|
3
|
#include <time.h>
|
4
|
#include <unistd.h>
|
5
|
#include <string.h>
|
6
|
#include <gdp/gdp.h>
|
7
|
#include <pthread.h>
|
8
|
|
9
|
pthread_mutex_t lock;
|
10
|
time_t start;
|
11
|
struct ArgStruct
|
12
|
{
|
13
|
char *logname;
|
14
|
int sleepBefore;
|
15
|
int numRecords;
|
16
|
};
|
17
|
|
18
|
void
|
19
|
localPrint(const char *s, ...)
|
20
|
{
|
21
|
char tmpStr[100];
|
22
|
va_list fmt;
|
23
|
va_start(fmt, s);
|
24
|
pthread_t cur_thread = pthread_self();
|
25
|
pthread_getname_np(cur_thread, tmpStr, 100);
|
26
|
|
27
|
pthread_mutex_lock(&lock);
|
28
|
printf("%9.6f ## %s] ", (double)(time(NULL)-start), tmpStr);
|
29
|
vprintf(s, fmt);
|
30
|
fflush(stdout);
|
31
|
pthread_mutex_unlock(&lock);
|
32
|
|
33
|
va_end(fmt);
|
34
|
}
|
35
|
|
36
|
void
|
37
|
*appendThread(void *arguments)
|
38
|
{
|
39
|
|
40
|
|
41
|
struct ArgStruct *args = (struct ArgStruct *)arguments;
|
42
|
char *logname = args->logname;
|
43
|
int sleepBefore = args->sleepBefore;
|
44
|
int numRecords = args->numRecords;
|
45
|
|
46
|
|
47
|
int i;
|
48
|
char threadName[100];
|
49
|
char data[100];
|
50
|
gdp_gcl_t *gcl;
|
51
|
gdp_name_t gcliname;
|
52
|
EP_STAT estat;
|
53
|
|
54
|
pthread_t cur_thread = pthread_self();
|
55
|
pthread_getname_np(cur_thread, threadName, 100);
|
56
|
|
57
|
|
58
|
gdp_parse_name(logname, gcliname);
|
59
|
|
60
|
estat = gdp_gcl_open(gcliname, GDP_MODE_AO, NULL, &gcl);
|
61
|
|
62
|
gdp_datum_t *datum = gdp_datum_new();
|
63
|
|
64
|
|
65
|
sleep(sleepBefore);
|
66
|
for (i=0; i<numRecords; i++)
|
67
|
{
|
68
|
sleep(1);
|
69
|
sprintf(data, "\"%s-%s-%d\"", logname, threadName, i);
|
70
|
localPrint("Appending %s to %s\n", data, logname);
|
71
|
|
72
|
|
73
|
gdp_buf_write(gdp_datum_getbuf(datum), data, strlen(data));
|
74
|
gdp_gcl_append(gcl, datum);
|
75
|
}
|
76
|
|
77
|
gdp_datum_free(datum);
|
78
|
return NULL;
|
79
|
}
|
80
|
|
81
|
void
|
82
|
*subscribeThread(void *arguments)
|
83
|
{
|
84
|
char *logname = (char *) arguments;
|
85
|
|
86
|
char data[100];
|
87
|
gdp_gcl_t *gcl;
|
88
|
gdp_name_t gcliname;
|
89
|
EP_STAT estat;
|
90
|
|
91
|
|
92
|
gdp_parse_name(logname, gcliname);
|
93
|
|
94
|
estat = gdp_gcl_open(gcliname, GDP_MODE_RO, NULL, &gcl);
|
95
|
|
96
|
estat = gdp_gcl_subscribe(gcl, 0, 0, NULL, NULL, NULL);
|
97
|
|
98
|
for (;;)
|
99
|
{
|
100
|
|
101
|
gdp_event_t *gev = gdp_event_next(gcl, 0);
|
102
|
|
103
|
gdp_datum_t *datum = gdp_event_getdatum(gev);
|
104
|
gdp_buf_read(gdp_datum_getbuf(datum), data, 100);
|
105
|
|
106
|
localPrint("Got data: %s on %s\n", data, logname);
|
107
|
gdp_event_free(gev);
|
108
|
}
|
109
|
return NULL;
|
110
|
}
|
111
|
|
112
|
int
|
113
|
main(int argc, char *argv[])
|
114
|
{
|
115
|
if (argc<5)
|
116
|
{
|
117
|
printf("Usage: %s numA numS delay numRec\n", argv[0]);
|
118
|
return -1;
|
119
|
}
|
120
|
|
121
|
int i, numA, numS, delay, numR;
|
122
|
char tmpStr[100];
|
123
|
|
124
|
numA = (int) strtol(argv[1], NULL, 10);
|
125
|
numS = (int) strtol(argv[2], NULL, 10);
|
126
|
delay = (int) strtol(argv[3], NULL, 10);
|
127
|
numR = (int) strtol(argv[4], NULL, 10);
|
128
|
|
129
|
pthread_t *appendTIDs = (pthread_t *)malloc(sizeof(pthread_t)*numA);
|
130
|
|
131
|
gdp_init(NULL);
|
132
|
start = time(NULL);
|
133
|
pthread_setname_np(pthread_self(), "MainThread");
|
134
|
|
135
|
|
136
|
for (i=0; i<numA; i++)
|
137
|
{
|
138
|
localPrint("Starting AThread%d\n", i+1);
|
139
|
char *logname = malloc(sizeof(char)*100);
|
140
|
sprintf(logname, "log%d", i+1);
|
141
|
char *threadName = malloc(sizeof(char)*100);
|
142
|
sprintf(threadName, "AThread%d", i+1);
|
143
|
|
144
|
|
145
|
struct ArgStruct *args = malloc(sizeof (struct ArgStruct));
|
146
|
args->logname = logname;
|
147
|
args->sleepBefore = delay*(i+1);
|
148
|
args->numRecords = numR;
|
149
|
|
150
|
|
151
|
pthread_create(&appendTIDs[i], NULL, appendThread, args);
|
152
|
pthread_setname_np(appendTIDs[i], threadName);
|
153
|
}
|
154
|
|
155
|
|
156
|
for (i=0; i<numS; i++)
|
157
|
{
|
158
|
pthread_t t;
|
159
|
localPrint("Starting SThread%d\n", i+1);
|
160
|
char *logname = malloc(sizeof(char)*100);
|
161
|
sprintf(logname, "log%d", i+1);
|
162
|
char *threadName = malloc(sizeof(char)*100);
|
163
|
sprintf(threadName, "SThread%d", i+1);
|
164
|
|
165
|
pthread_create(&t, NULL, subscribeThread, logname);
|
166
|
pthread_setname_np(t, threadName);
|
167
|
}
|
168
|
|
169
|
|
170
|
for (i=0; i<numA; i++)
|
171
|
{
|
172
|
pthread_join(appendTIDs[i], NULL);
|
173
|
}
|
174
|
return 0;
|
175
|
}
|