Project

General

Profile

Statistics
| Branch: | Revision:

gdp-if / websocket-subscriber / gdp-ws.py @ master

History | View | Annotate | Download (5.12 KB)

1 e1535212 Nitesh Mor
#!/usr/bin/env python
2
3
import sys
4
import gdp
5
import traceback
6 69489e35 Nitesh Mor
import threading
7 e1535212 Nitesh Mor
import json
8
import argparse
9
from twisted.python import log
10 d9b9a6c3 Nitesh Mor
from twisted.internet import reactor, ssl
11
12 e1535212 Nitesh Mor
13
from autobahn.twisted.websocket import WebSocketServerProtocol, \
14 4e0b39be Nitesh Mor
        WebSocketServerFactory
15 e1535212 Nitesh Mor
16
17
class GDPSubscriptionProtocol(WebSocketServerProtocol):
18
19
    def __init__(self, *args):
20
        WebSocketServerProtocol.__init__(self, *args)
21
        self.conn_active = False
22
        self.remote = None
23
24
    def onConnect(self, request):
25
        print "[%s] client connected." % request.peer
26
        self.remote = request.peer
27
28
    def onOpen(self):
29
        print "[%s] connection open." % self.remote
30
        self.conn_active = True
31
32
    def onMessage(self, payload, isBinary):
33
        print "[%s] received message: %s" % (self.remote, payload)
34
        reactor.callInThread(self.__onMessage, payload, isBinary)
35
36
    def onClose(self, wasClean, code, reason):
37
        print "[%s] connection closed." % (self.remote)
38
        self.conn_active = False
39
40
    def __onMessage(self, payload, isBinary):
41
        """
42 4e0b39be Nitesh Mor
        Runs in a non-reactor thread.
43 e1535212 Nitesh Mor
        payload is a dictionary to specify subscription parameters:
44
           {"logname": <>,      # required
45
            "startrec": <>,     # optional
46
            "numrec": <>,       # optional
47
           }
48
        """
49
50
        ## timeout of 100 ms
51
        ev_timeout = {"tv_sec": 0, "tv_nsec": 100 * 10**6, "tv_accuracy": 0.0}
52 4e0b39be Nitesh Mor
53 e1535212 Nitesh Mor
        try:
54 69489e35 Nitesh Mor
            thread_ = threading.current_thread().getName()
55 e1535212 Nitesh Mor
            payload_dict = json.loads(payload)
56
            logname = payload_dict["logname"]
57
            startrec = payload_dict.get("startrec", 0)
58
            numrec = payload_dict.get("numrec", 1)
59 4e0b39be Nitesh Mor
60 69489e35 Nitesh Mor
            print "[%s][%s] Subscribing to log=%s, startrec=%d, numrec=%d" % \
61
                              (thread_, self.remote, logname, startrec, numrec)
62 e1535212 Nitesh Mor
63 4e0b39be Nitesh Mor
            lh = gdp.GDP_GIN(gdp.GDP_NAME(logname), gdp.GDP_MODE_RO)
64
            lh.subscribe_by_recno(startrec, numrec, None)
65 e1535212 Nitesh Mor
66
            while self.conn_active:
67 4e0b39be Nitesh Mor
68 e1535212 Nitesh Mor
                event = None
69
                while event is None and self.conn_active:
70 4e0b39be Nitesh Mor
                    event = lh.get_next_event(ev_timeout)
71
72 e1535212 Nitesh Mor
                if not self.conn_active:
73
                    break
74 4e0b39be Nitesh Mor
75
                # Create a dictionary that we send to the client
76
                __evdict = {}
77
                gin_handle = event["gin"]
78
                event_ep_stat = event["stat"]
79
                assert lh == gin_handle
80
                datum = event["datum"]
81
82
                __evdict["logname"] = logname
83
                __evdict["ep_stat"] = (event_ep_stat.code,
84 e1535212 Nitesh Mor
                                        gdp.MISC.ep_stat_tostr(event_ep_stat))
85 4e0b39be Nitesh Mor
                __evdict["type"] = event["type"]
86
                __evdict["datum"] = {"recno": datum["recno"],
87
                                     "data": datum["buf"].peek(),
88
                                     "ts": datum["ts"]}
89
90 76ddcf59 Nitesh Mor
                # send the event to client, but remove binary signature first
91
                ## this will have problems with binary data in log, but there
92
                ## is no better alternative if we want to stick to JSON
93 4e0b39be Nitesh Mor
                __evstr = json.dumps(__evdict)
94
                print "[%s] sending event: %s" % (self.remote, __evstr)
95
                reactor.callFromThread(self.sendMessage, __evstr, False)
96
97
                if event["type"] == gdp.GDP_EVENT_DONE:
98 e1535212 Nitesh Mor
                    break
99
100
            ## How did we reach here.
101
            reason = "subcription completed" if self.conn_active \
102
                                                     else "client left"
103
            print "[%s] finished, because %s" % (self.remote, reason)
104 ca607e6c Nitesh Mor
            if not self.conn_active:
105
                print "[%s] terminating subscription" % self.remote
106
                lh.unsubscribe()
107 e1535212 Nitesh Mor
108
        except Exception as e:
109
            # send any errors back to the client
110
            error_string = "Exception: %s\n\n" % str(e)
111
            error_string += traceback.format_exc()
112
            print "[%s] %s" % (self.remote, error_string)
113
            reactor.callFromThread(self.sendMessage, error_string, False)
114
115
116
117
if __name__ == "__main__":
118
119
    parser = argparse.ArgumentParser()
120
    parser.add_argument("-p", "--port", type=int, default=9007,
121
                            help="TCP port to serve requests on, default=9007")
122 d9b9a6c3 Nitesh Mor
    parser.add_argument("-s", "--secure", action='store_true',
123
                            help="Enable wss (listens on ws-port + 1)")
124
    parser.add_argument("--key", type=str,
125
                            help="Private key file for secure websockets")
126
    parser.add_argument("--cert", type=str,
127
                            help="Certificate file for secure websockets")
128 e1535212 Nitesh Mor
129
    # Get the actual arguments
130
    args = parser.parse_args()
131
132
    # gdp.gdp_init()
133
    # gdp.dbg_set("*=20")
134
135
    log.startLogging(sys.stderr)
136 d9b9a6c3 Nitesh Mor
137 e1535212 Nitesh Mor
    # Set up argument parsing code
138
    factory = WebSocketServerFactory()
139
    factory.protocol = GDPSubscriptionProtocol
140
141
    reactor.listenTCP(args.port, factory)
142 d9b9a6c3 Nitesh Mor
143
    if args.secure:
144
        contextFactory = ssl.DefaultOpenSSLContextFactory(args.key, args.cert)
145
        reactor.listenSSL(args.port+1, factory, contextFactory)
146
147 e1535212 Nitesh Mor
    reactor.run()