Project

General

Profile

Statistics
| Branch: | Revision:

gdp-if / websocket-subscriber / gdp-ws.py @ master

History | View | Annotate | Download (5.12 KB)

1
#!/usr/bin/env python
2

    
3
import sys
4
import gdp
5
import traceback
6
import threading
7
import json
8
import argparse
9
from twisted.python import log
10
from twisted.internet import reactor, ssl
11

    
12

    
13
from autobahn.twisted.websocket import WebSocketServerProtocol, \
14
        WebSocketServerFactory
15

    
16

    
17
class GDPSubscriptionProtocol(WebSocketServerProtocol):
18

    
19
    def __init__(self, *args):
20
        WebSocketServerProtocol.__init__(self, *args)
21
        self.conn_active = False
22
        self.remote = None
23

    
24
    def onConnect(self, request):
25
        print "[%s] client connected." % request.peer
26
        self.remote = request.peer
27

    
28
    def onOpen(self):
29
        print "[%s] connection open." % self.remote
30
        self.conn_active = True
31

    
32
    def onMessage(self, payload, isBinary):
33
        print "[%s] received message: %s" % (self.remote, payload)
34
        reactor.callInThread(self.__onMessage, payload, isBinary)
35

    
36
    def onClose(self, wasClean, code, reason):
37
        print "[%s] connection closed." % (self.remote)
38
        self.conn_active = False
39

    
40
    def __onMessage(self, payload, isBinary):
41
        """
42
        Runs in a non-reactor thread.
43
        payload is a dictionary to specify subscription parameters:
44
           {"logname": <>,      # required
45
            "startrec": <>,     # optional
46
            "numrec": <>,       # optional
47
           }
48
        """
49

    
50
        ## timeout of 100 ms
51
        ev_timeout = {"tv_sec": 0, "tv_nsec": 100 * 10**6, "tv_accuracy": 0.0}
52

    
53
        try:
54
            thread_ = threading.current_thread().getName()
55
            payload_dict = json.loads(payload)
56
            logname = payload_dict["logname"]
57
            startrec = payload_dict.get("startrec", 0)
58
            numrec = payload_dict.get("numrec", 1)
59

    
60
            print "[%s][%s] Subscribing to log=%s, startrec=%d, numrec=%d" % \
61
                              (thread_, self.remote, logname, startrec, numrec)
62

    
63
            lh = gdp.GDP_GIN(gdp.GDP_NAME(logname), gdp.GDP_MODE_RO)
64
            lh.subscribe_by_recno(startrec, numrec, None)
65

    
66
            while self.conn_active:
67

    
68
                event = None
69
                while event is None and self.conn_active:
70
                    event = lh.get_next_event(ev_timeout)
71

    
72
                if not self.conn_active:
73
                    break
74

    
75
                # Create a dictionary that we send to the client
76
                __evdict = {}
77
                gin_handle = event["gin"]
78
                event_ep_stat = event["stat"]
79
                assert lh == gin_handle
80
                datum = event["datum"]
81

    
82
                __evdict["logname"] = logname
83
                __evdict["ep_stat"] = (event_ep_stat.code,
84
                                        gdp.MISC.ep_stat_tostr(event_ep_stat))
85
                __evdict["type"] = event["type"]
86
                __evdict["datum"] = {"recno": datum["recno"],
87
                                     "data": datum["buf"].peek(),
88
                                     "ts": datum["ts"]}
89

    
90
                # send the event to client, but remove binary signature first
91
                ## this will have problems with binary data in log, but there
92
                ## is no better alternative if we want to stick to JSON
93
                __evstr = json.dumps(__evdict)
94
                print "[%s] sending event: %s" % (self.remote, __evstr)
95
                reactor.callFromThread(self.sendMessage, __evstr, False)
96

    
97
                if event["type"] == gdp.GDP_EVENT_DONE:
98
                    break
99

    
100
            ## How did we reach here.
101
            reason = "subcription completed" if self.conn_active \
102
                                                     else "client left"
103
            print "[%s] finished, because %s" % (self.remote, reason)
104
            if not self.conn_active:
105
                print "[%s] terminating subscription" % self.remote
106
                lh.unsubscribe()
107

    
108
        except Exception as e:
109
            # send any errors back to the client
110
            error_string = "Exception: %s\n\n" % str(e)
111
            error_string += traceback.format_exc()
112
            print "[%s] %s" % (self.remote, error_string)
113
            reactor.callFromThread(self.sendMessage, error_string, False)
114

    
115

    
116

    
117
if __name__ == "__main__":
118

    
119
    parser = argparse.ArgumentParser()
120
    parser.add_argument("-p", "--port", type=int, default=9007,
121
                            help="TCP port to serve requests on, default=9007")
122
    parser.add_argument("-s", "--secure", action='store_true',
123
                            help="Enable wss (listens on ws-port + 1)")
124
    parser.add_argument("--key", type=str,
125
                            help="Private key file for secure websockets")
126
    parser.add_argument("--cert", type=str,
127
                            help="Certificate file for secure websockets")
128

    
129
    # Get the actual arguments
130
    args = parser.parse_args()
131

    
132
    # gdp.gdp_init()
133
    # gdp.dbg_set("*=20")
134

    
135
    log.startLogging(sys.stderr)
136

    
137
    # Set up argument parsing code
138
    factory = WebSocketServerFactory()
139
    factory.protocol = GDPSubscriptionProtocol
140

    
141
    reactor.listenTCP(args.port, factory)
142

    
143
    if args.secure:
144
        contextFactory = ssl.DefaultOpenSSLContextFactory(args.key, args.cert)
145
        reactor.listenSSL(args.port+1, factory, contextFactory)
146

    
147
    reactor.run()