Project

General

Profile

stuck-sub.c

Eric Allman, 09/04/2016 04:09 PM

Download (5.02 KB)

 
1
/* vim: set ai sw=4 sts=4 ts=4 : */
2

    
3
#include <stdio.h>
4
#include <stdarg.h>
5
#include <time.h>
6
#include <unistd.h>
7
#include <string.h>
8
#include <ep/ep_app.h>
9
#include <ep/ep_dbg.h>
10
#include <gdp/gdp.h>
11
#include <pthread.h>
12

    
13
#if __APPLE__
14
#define SET_THREAD_NAME(n)        pthread_setname_np(n)
15
#else
16
#define SET_THREAD_NAME(n)        pthread_setname_np(pthread_self(), n)
17
#endif
18

    
19
pthread_mutex_t lock;
20
time_t start;
21
struct ArgStruct
22
{
23
    char *logname;
24
    int sleepBefore;
25
    int numRecords;
26
    char *threadName;
27
};
28

    
29
void
30
localPrint(const char *s, ...)
31
{
32
    char tmpStr[100];
33
    va_list fmt;
34
    va_start(fmt, s);
35
    pthread_t cur_thread = pthread_self();
36
    pthread_getname_np(cur_thread, tmpStr, 100);
37

    
38
    pthread_mutex_lock(&lock);
39
    printf("%9.6f ## %s] ", (double)(time(NULL)-start), tmpStr);
40
    vprintf(s, fmt);
41
    fflush(stdout);
42
    pthread_mutex_unlock(&lock);
43

    
44
    va_end(fmt);
45
}
46

    
47
void
48
*appendThread(void *arguments)
49
{
50

    
51
    // do this, because we can only pass one argument to pthread_create
52
    struct ArgStruct *args = (struct ArgStruct *)arguments;
53
    char *logname = args->logname;
54
    int sleepBefore = args->sleepBefore;
55
    int numRecords = args->numRecords;
56
    char *threadName = args->threadName;
57

    
58
    // rest of the logic
59
    int i;
60
    char data[100];
61
    gdp_gcl_t *gcl;
62
    gdp_name_t gcliname;
63
    EP_STAT estat;
64

    
65
    SET_THREAD_NAME(threadName);
66

    
67
    // open the GCL
68
    gdp_parse_name(logname, gcliname);
69
    // no error checks for now...
70
    estat = gdp_gcl_open(gcliname, GDP_MODE_AO, NULL, &gcl);
71
        ep_app_message(estat, "%s: gdp_gcl_open", threadName);
72

    
73
    gdp_datum_t *datum = gdp_datum_new();
74

    
75

    
76
    sleep(sleepBefore);
77
    for (i=0; i<numRecords; i++)
78
    {
79
        sleep(1);
80
        sprintf(data, "\"%s-%s-%d\"", logname, threadName, i);
81
        localPrint("Appending %s to %s\n", data, logname);
82

    
83
        // do the actual append
84
        gdp_buf_write(gdp_datum_getbuf(datum), data, strlen(data));
85
        estat = gdp_gcl_append(gcl, datum);
86
                ep_app_message(estat, "%s: gdp_gcl_append", threadName);
87
    }
88

    
89
    gdp_datum_free(datum);
90
    return NULL;
91
}
92

    
93
void
94
*subscribeThread(void *arguments)
95
{
96
    struct ArgStruct *args = arguments;
97
    char *logname = args->logname;
98

    
99
    char data[100];
100
    gdp_gcl_t *gcl;
101
    gdp_name_t gcliname;
102
    EP_STAT estat;
103

    
104
    logname = args->logname;
105
    SET_THREAD_NAME(args->threadName);
106

    
107
    // open the GCL
108
    gdp_parse_name(logname, gcliname);
109
    // no error checks for now...
110
    estat = gdp_gcl_open(gcliname, GDP_MODE_RO, NULL, &gcl);
111
        ep_app_message(estat, "%s: gdp_gcl_open", args->threadName);
112

    
113
    estat = gdp_gcl_subscribe(gcl, 0, 0, NULL, NULL, NULL);
114
        ep_app_message(estat, "%s: gdp_gcl_subscribe", args->threadName);
115

    
116
    for (;;)
117
    {
118
        // get next event
119
        gdp_event_t *gev = gdp_event_next(gcl, 0);
120

    
121
        gdp_datum_t *datum = gdp_event_getdatum(gev);
122
        gdp_buf_read(gdp_datum_getbuf(datum), data, 100);        
123

    
124
        localPrint("Got data: %s on %s\n", data, logname);
125
        gdp_event_free(gev);
126
    }
127
    return NULL;
128
}
129

    
130
int
131
main(int argc, char *argv[])
132
{
133
        int opt;
134
        while ((opt = getopt(argc, argv, "D:")) > 0)
135
        {
136
                switch (opt)
137
                {
138
                case 'D':
139
                        ep_dbg_set(optarg);
140
                        break;
141
                }
142
        }
143
        argc -= optind;
144
        argv += optind;
145

    
146
    if (argc != 4)
147
    {
148
        printf("Usage: %s [-Ddbg] numA numS delay numRec\n", argv[0]);
149
        return -1;
150
    }
151

    
152
    int i, numA, numS, delay, numR;
153
    //char tmpStr[100];
154

    
155
    numA = (int) strtol(argv[0], NULL, 10);
156
    numS = (int) strtol(argv[1], NULL, 10);
157
    delay = (int) strtol(argv[2], NULL, 10);
158
    numR = (int) strtol(argv[3], NULL, 10);
159

    
160
    pthread_t *appendTIDs = (pthread_t *)malloc(sizeof(pthread_t)*numA);
161

    
162
    gdp_init(NULL);
163
    start = time(NULL);
164
    SET_THREAD_NAME("MainThread");
165

    
166
    // create append threads
167
    for (i=0; i<numA; i++)
168
    {
169
        localPrint("Starting AThread%d\n", i+1);
170
        char *logname = malloc(sizeof(char)*100);
171
        sprintf(logname, "log%d", i+1);
172
        char *threadName = malloc(sizeof(char)*100);
173
        sprintf(threadName, "AThread%d", i+1);
174

    
175
        // create a structure to hold the arguments
176
        struct ArgStruct *args = malloc(sizeof (struct ArgStruct));
177
        args->logname = logname;
178
        args->sleepBefore = delay*(i+1);
179
        args->numRecords = numR;
180
                args->threadName = threadName;
181

    
182
        // do the actual thread creation
183
        pthread_create(&appendTIDs[i], NULL, appendThread, args);
184
    }
185

    
186
    // create subscription threads
187
    for (i=0; i<numS; i++)
188
    {
189
        pthread_t t;
190
        localPrint("Starting SThread%d\n", i+1);
191
        char *logname = malloc(sizeof(char)*100);
192
        sprintf(logname, "log%d", i+1);
193
        char *threadName = malloc(sizeof(char)*100);
194
        sprintf(threadName, "SThread%d", i+1);
195

    
196
        // create a structure to hold the arguments
197
        struct ArgStruct *args = malloc(sizeof (struct ArgStruct));
198
        args->logname = logname;
199
                args->threadName = threadName;
200

    
201
        pthread_create(&t, NULL, subscribeThread, args);
202
    }
203

    
204
    // join on append threads
205
    for (i=0; i<numA; i++)
206
    {
207
        pthread_join(appendTIDs[i], NULL);
208
    }
209
    return 0;
210
}