Project

General

Profile

stuck-sub.c

Nitesh Mor, 08/31/2016 01:04 PM

Download (4.2 KB)

 
1
#include <stdio.h>
2
#include <stdarg.h>
3
#include <time.h>
4
#include <unistd.h>
5
#include <string.h>
6
#include <gdp/gdp.h>
7
#include <pthread.h>
8

    
9
pthread_mutex_t lock;
10
time_t start;
11
struct ArgStruct
12
{
13
    char *logname;
14
    int sleepBefore;
15
    int numRecords;
16
};
17

    
18
void
19
localPrint(const char *s, ...)
20
{
21
    char tmpStr[100];
22
    va_list fmt;
23
    va_start(fmt, s);
24
    pthread_t cur_thread = pthread_self();
25
    pthread_getname_np(cur_thread, tmpStr, 100);
26

    
27
    pthread_mutex_lock(&lock);
28
    printf("%9.6f ## %s] ", (double)(time(NULL)-start), tmpStr);
29
    vprintf(s, fmt);
30
    fflush(stdout);
31
    pthread_mutex_unlock(&lock);
32

    
33
    va_end(fmt);
34
}
35

    
36
void
37
*appendThread(void *arguments)
38
{
39

    
40
    // do this, because we can only pass one argument to pthread_create
41
    struct ArgStruct *args = (struct ArgStruct *)arguments;
42
    char *logname = args->logname;
43
    int sleepBefore = args->sleepBefore;
44
    int numRecords = args->numRecords;
45

    
46
    // rest of the logic
47
    int i;
48
    char threadName[100];
49
    char data[100];
50
    gdp_gcl_t *gcl;
51
    gdp_name_t gcliname;
52
    EP_STAT estat;
53

    
54
    pthread_t cur_thread = pthread_self();
55
    pthread_getname_np(cur_thread, threadName, 100);
56

    
57
    // open the GCL
58
    gdp_parse_name(logname, gcliname);
59
    // no error checks for now...
60
    estat = gdp_gcl_open(gcliname, GDP_MODE_AO, NULL, &gcl);
61

    
62
    gdp_datum_t *datum = gdp_datum_new();
63

    
64

    
65
    sleep(sleepBefore);
66
    for (i=0; i<numRecords; i++)
67
    {
68
        sleep(1);
69
        sprintf(data, "\"%s-%s-%d\"", logname, threadName, i);
70
        localPrint("Appending %s to %s\n", data, logname);
71

    
72
        // do the actual append
73
        gdp_buf_write(gdp_datum_getbuf(datum), data, strlen(data));
74
        gdp_gcl_append(gcl, datum);
75
    }
76

    
77
    gdp_datum_free(datum);
78
    return NULL;
79
}
80

    
81
void
82
*subscribeThread(void *arguments)
83
{
84
    char *logname = (char *) arguments;
85

    
86
    char data[100];
87
    gdp_gcl_t *gcl;
88
    gdp_name_t gcliname;
89
    EP_STAT estat;
90

    
91
    // open the GCL
92
    gdp_parse_name(logname, gcliname);
93
    // no error checks for now...
94
    estat = gdp_gcl_open(gcliname, GDP_MODE_RO, NULL, &gcl);
95

    
96
    estat = gdp_gcl_subscribe(gcl, 0, 0, NULL, NULL, NULL);
97

    
98
    for (;;)
99
    {
100
        // get next event
101
        gdp_event_t *gev = gdp_event_next(gcl, 0);
102

    
103
        gdp_datum_t *datum = gdp_event_getdatum(gev);
104
        gdp_buf_read(gdp_datum_getbuf(datum), data, 100);        
105

    
106
        localPrint("Got data: %s on %s\n", data, logname);
107
        gdp_event_free(gev);
108
    }
109
    return NULL;
110
}
111

    
112
int
113
main(int argc, char *argv[])
114
{
115
    if (argc<5)
116
    {
117
        printf("Usage: %s numA numS delay numRec\n", argv[0]);
118
        return -1;
119
    }
120

    
121
    int i, numA, numS, delay, numR;
122
    char tmpStr[100];
123

    
124
    numA = (int) strtol(argv[1], NULL, 10);
125
    numS = (int) strtol(argv[2], NULL, 10);
126
    delay = (int) strtol(argv[3], NULL, 10);
127
    numR = (int) strtol(argv[4], NULL, 10);
128

    
129
    pthread_t *appendTIDs = (pthread_t *)malloc(sizeof(pthread_t)*numA);
130

    
131
    gdp_init(NULL);
132
    start = time(NULL);
133
    pthread_setname_np(pthread_self(), "MainThread");
134

    
135
    // create append threads
136
    for (i=0; i<numA; i++)
137
    {
138
        localPrint("Starting AThread%d\n", i+1);
139
        char *logname = malloc(sizeof(char)*100);
140
        sprintf(logname, "log%d", i+1);
141
        char *threadName = malloc(sizeof(char)*100);
142
        sprintf(threadName, "AThread%d", i+1);
143

    
144
        // create a structure to hold the arguments
145
        struct ArgStruct *args = malloc(sizeof (struct ArgStruct));
146
        args->logname = logname;
147
        args->sleepBefore = delay*(i+1);
148
        args->numRecords = numR;
149

    
150
        // do the actual thread creation
151
        pthread_create(&appendTIDs[i], NULL, appendThread, args);
152
        pthread_setname_np(appendTIDs[i], threadName);
153
    }
154

    
155
    // create subscription threads
156
    for (i=0; i<numS; i++)
157
    {
158
        pthread_t t;
159
        localPrint("Starting SThread%d\n", i+1);
160
        char *logname = malloc(sizeof(char)*100);
161
        sprintf(logname, "log%d", i+1);
162
        char *threadName = malloc(sizeof(char)*100);
163
        sprintf(threadName, "SThread%d", i+1);
164

    
165
        pthread_create(&t, NULL, subscribeThread, logname);
166
        pthread_setname_np(t, threadName);
167
    }
168

    
169
    // join on append threads
170
    for (i=0; i<numA; i++)
171
    {
172
        pthread_join(appendTIDs[i], NULL);
173
    }
174
    return 0;
175
}