Project

General

Profile

stuck-sub.py

A script to reproduce stuck subscriptions - Nitesh Mor, 08/30/2016 10:19 AM

Download (2.79 KB)

 
1
#!/usr/bin/env python
2

    
3
import gdp
4
import sys
5
import threading
6
import time
7
import pprint
8

    
9
# Assumes that logs are already created, and are named as follows:
10
#   PREFIXlog1, PREFIXlog2, PREFIXlog3...
11

    
12
# Global variables
13
PREFIX = "edu.eecs.berkeley.mor.aug30.threading-test."
14
lock = None
15
start = None
16

    
17
def localPrint(s):
18
    global lock
19
    global start
20
    lock.acquire()
21
    print "%9.6f ## %s] %s" % ((time.time()-start),
22
                            threading.current_thread().getName(), s)
23
    sys.stdout.flush()
24
    lock.release()
25

    
26
def sleep(t):
27
    # localPrint("Sleeping for %i" % t)
28
    time.sleep(t)
29

    
30
def appendThread(logname, sleepBefore, numRecords):
31

    
32
    h = gdp.GDP_GCL(gdp.GDP_NAME(PREFIX + logname), gdp.GDP_MODE_AO)
33
    localPrint(str(h))
34
    sleep(sleepBefore)
35
    for i in xrange(numRecords):
36
        sleep(1)
37
        data = "\"%s-%s-%d\"" % (logname,
38
                        threading.current_thread().getName(), i)
39
        localPrint("Appending %s to %s" % (data, logname))
40
        h.append({"data": data})
41

    
42

    
43
def subscribeThread(logname):
44

    
45
    h = gdp.GDP_GCL(gdp.GDP_NAME(PREFIX + logname), gdp.GDP_MODE_RO)
46
    localPrint(str(h))
47
    h.subscribe(0, 0, None)
48
    while True:
49
        event = h.get_next_event(None)
50
        localPrint("Got data: %s on %s" % (event["datum"]["data"], logname))
51
        # pprint.pprint(event)
52

    
53
def main(numA, numS, delay, numR):
54

    
55
    gdp.gdp_init()
56
    # gdp.dbg_set("*=15")
57

    
58
    global start
59
    start = time.time()
60
    global lock
61
    lock = threading.Lock()
62

    
63
    for i in range(numA):
64
        localPrint("Starting AThread%d" % (i+1))
65
        logname = "log"+str(i+1)
66
        t = threading.Thread(target=appendThread,
67
                            name="AThread"+str(i+1),
68
                            args=(logname, delay*(i+1), numR))
69
        t.daemon = False
70
        t.start()
71

    
72
    for i in range(numS):
73
        localPrint("Starting SThread%d" % (i+1))
74
        logname = "log"+str(i+1) 
75
        t = threading.Thread(target=subscribeThread,
76
                            name="SThread"+str(i+1),
77
                            args=(logname,))
78
        t.daemon = True
79
        t.start()
80

    
81

    
82

    
83
USAGE = """
84

85
    Runs `numA` threads appending data to `numA` logs (one log per
86
    thread). Also starts `numS` subscriber threads (again, one log
87
    per thread). `delay` and numRec` control the execution pattern.
88
    `delay` is the delay in seconds before starting to append data
89
    in an append thread, `numRec` is the number of records that are
90
    appended in each thread (one second delay between consecutive
91
    appends within a thread)
92

93
    Assumes that the logs are pre-created
94

95
"""
96
 
97
if __name__=="__main__":
98
    if len(sys.argv)<5:
99
        print "Uusage: %s numA numS delay numRec" % sys.argv[0]
100
        print USAGE
101
        sys.exit(1)
102
    main(int(sys.argv[1]), int(sys.argv[2]), int(sys.argv[3]), int(sys.argv[4]))