Project

General

Profile

sub.py

Nitesh Mor, 11/01/2016 10:38 AM

Download (2.66 KB)

 
1
#!/usr/bin/env python
2

    
3
import gdp
4
import sys
5
import threading
6
import time
7
import pprint
8

    
9
# Assumes that logs are already created, and are named as follows:
10
lock = None
11
start = None
12

    
13
def localPrint(s):
14
    global lock
15
    global start
16
    lock.acquire()
17
    print "%9.6f ## %s] %s" % ((time.time()-start),
18
                            threading.current_thread().getName(), s)
19
    sys.stdout.flush()
20
    lock.release()
21

    
22
def sleep(t):
23
    # localPrint("Sleeping for %i" % t)
24
    time.sleep(t)
25

    
26
def appendThread(logname, sleepBefore, numRecords):
27

    
28
    h = gdp.GDP_GCL(gdp.GDP_NAME(logname), gdp.GDP_MODE_AO)
29
    localPrint(str(h))
30
    sleep(sleepBefore)
31
    for i in xrange(numRecords):
32
        sleep(1)
33
        data = "\"%s-%s-%d\"" % (logname,
34
                        threading.current_thread().getName(), i)
35
        localPrint("Appending %s to %s" % (data, logname))
36
        h.append({"data": data})
37

    
38

    
39
def subscribeThread(logname):
40

    
41
    h = gdp.GDP_GCL(gdp.GDP_NAME(logname), gdp.GDP_MODE_RO)
42
    localPrint(str(h))
43
    h.subscribe(0, 0, None)
44
    while True:
45
        event = h.get_next_event(None)
46
        localPrint("Got data: %s on %s" % (event["datum"]["data"], logname))
47
        # pprint.pprint(event)
48

    
49
def main(logname, numS, delay, numR):
50

    
51
    gdp.gdp_init()
52
    # gdp.dbg_set("gdp.event=85")
53

    
54
    global start
55
    start = time.time()
56
    global lock
57
    lock = threading.Lock()
58

    
59
    localPrint("logname: %s" % logname)
60

    
61
    # Starting up a single thread for appends
62
    localPrint("Starting Append Thread")
63
    t = threading.Thread(target=appendThread,
64
                        name="AThread", args=(logname, delay, numR))
65
    t.daemon = False
66
    t.start()
67

    
68
        # Starting a number of threads for subscriptions
69
    for i in range(numS):
70
        localPrint("Starting SThread%d" % (i+1))
71
        t = threading.Thread(target=subscribeThread,
72
                            name="SThread"+str(i+1),
73
                            args=(logname,))
74
        t.daemon = True
75
        t.start()
76

    
77

    
78

    
79
USAGE = """
80

81
    * Runs a single threads appending data to `logname`.
82

83
    * Also starts `numS` subscriber threads, all subscribed simultaneously
84
    to the same log.
85

86
    * `delay` and numRec` control the execution pattern.
87
    `delay` is the delay in seconds before starting to append data
88
    in an append thread, `numRec` is the number of records that are
89
    appended in each thread (one second delay between consecutive
90
    appends within a thread)
91

92
    Assumes that the log is precreated.
93

94
"""
95
 
96
if __name__=="__main__":
97
    if len(sys.argv)<5:
98
        print "Uusage: %s logname numS delay numRec" % sys.argv[0]
99
        print USAGE
100
        sys.exit(1)
101
    main(sys.argv[1], int(sys.argv[2]), int(sys.argv[3]), int(sys.argv[4]))