gdp-if / websocket-subscriber / gdp-ws.py @ master
History | View | Annotate | Download (5.12 KB)
1 | e1535212 | Nitesh Mor | #!/usr/bin/env python
|
---|---|---|---|
2 | |||
3 | import sys |
||
4 | import gdp |
||
5 | import traceback |
||
6 | 69489e35 | Nitesh Mor | import threading |
7 | e1535212 | Nitesh Mor | import json |
8 | import argparse |
||
9 | from twisted.python import log |
||
10 | d9b9a6c3 | Nitesh Mor | from twisted.internet import reactor, ssl |
11 | |||
12 | e1535212 | Nitesh Mor | |
13 | from autobahn.twisted.websocket import WebSocketServerProtocol, \ |
||
14 | 4e0b39be | Nitesh Mor | WebSocketServerFactory
|
15 | e1535212 | Nitesh Mor | |
16 | |||
17 | class GDPSubscriptionProtocol(WebSocketServerProtocol): |
||
18 | |||
19 | def __init__(self, *args): |
||
20 | WebSocketServerProtocol.__init__(self, *args)
|
||
21 | self.conn_active = False |
||
22 | self.remote = None |
||
23 | |||
24 | def onConnect(self, request): |
||
25 | print "[%s] client connected." % request.peer |
||
26 | self.remote = request.peer
|
||
27 | |||
28 | def onOpen(self): |
||
29 | print "[%s] connection open." % self.remote |
||
30 | self.conn_active = True |
||
31 | |||
32 | def onMessage(self, payload, isBinary): |
||
33 | print "[%s] received message: %s" % (self.remote, payload) |
||
34 | reactor.callInThread(self.__onMessage, payload, isBinary)
|
||
35 | |||
36 | def onClose(self, wasClean, code, reason): |
||
37 | print "[%s] connection closed." % (self.remote) |
||
38 | self.conn_active = False |
||
39 | |||
40 | def __onMessage(self, payload, isBinary): |
||
41 | """
|
||
42 | 4e0b39be | Nitesh Mor | Runs in a non-reactor thread.
|
43 | e1535212 | Nitesh Mor | payload is a dictionary to specify subscription parameters:
|
44 | {"logname": <>, # required
|
||
45 | "startrec": <>, # optional
|
||
46 | "numrec": <>, # optional
|
||
47 | }
|
||
48 | """
|
||
49 | |||
50 | ## timeout of 100 ms
|
||
51 | ev_timeout = {"tv_sec": 0, "tv_nsec": 100 * 10**6, "tv_accuracy": 0.0} |
||
52 | 4e0b39be | Nitesh Mor | |
53 | e1535212 | Nitesh Mor | try:
|
54 | 69489e35 | Nitesh Mor | thread_ = threading.current_thread().getName() |
55 | e1535212 | Nitesh Mor | payload_dict = json.loads(payload) |
56 | logname = payload_dict["logname"]
|
||
57 | startrec = payload_dict.get("startrec", 0) |
||
58 | numrec = payload_dict.get("numrec", 1) |
||
59 | 4e0b39be | Nitesh Mor | |
60 | 69489e35 | Nitesh Mor | print "[%s][%s] Subscribing to log=%s, startrec=%d, numrec=%d" % \ |
61 | (thread_, self.remote, logname, startrec, numrec)
|
||
62 | e1535212 | Nitesh Mor | |
63 | 4e0b39be | Nitesh Mor | lh = gdp.GDP_GIN(gdp.GDP_NAME(logname), gdp.GDP_MODE_RO) |
64 | lh.subscribe_by_recno(startrec, numrec, None)
|
||
65 | e1535212 | Nitesh Mor | |
66 | while self.conn_active: |
||
67 | 4e0b39be | Nitesh Mor | |
68 | e1535212 | Nitesh Mor | event = None
|
69 | while event is None and self.conn_active: |
||
70 | 4e0b39be | Nitesh Mor | event = lh.get_next_event(ev_timeout) |
71 | |||
72 | e1535212 | Nitesh Mor | if not self.conn_active: |
73 | break
|
||
74 | 4e0b39be | Nitesh Mor | |
75 | # Create a dictionary that we send to the client
|
||
76 | __evdict = {} |
||
77 | gin_handle = event["gin"]
|
||
78 | event_ep_stat = event["stat"]
|
||
79 | assert lh == gin_handle
|
||
80 | datum = event["datum"]
|
||
81 | |||
82 | __evdict["logname"] = logname
|
||
83 | __evdict["ep_stat"] = (event_ep_stat.code,
|
||
84 | e1535212 | Nitesh Mor | gdp.MISC.ep_stat_tostr(event_ep_stat)) |
85 | 4e0b39be | Nitesh Mor | __evdict["type"] = event["type"] |
86 | __evdict["datum"] = {"recno": datum["recno"], |
||
87 | "data": datum["buf"].peek(), |
||
88 | "ts": datum["ts"]} |
||
89 | |||
90 | 76ddcf59 | Nitesh Mor | # send the event to client, but remove binary signature first
|
91 | ## this will have problems with binary data in log, but there
|
||
92 | ## is no better alternative if we want to stick to JSON
|
||
93 | 4e0b39be | Nitesh Mor | __evstr = json.dumps(__evdict) |
94 | print "[%s] sending event: %s" % (self.remote, __evstr) |
||
95 | reactor.callFromThread(self.sendMessage, __evstr, False) |
||
96 | |||
97 | if event["type"] == gdp.GDP_EVENT_DONE: |
||
98 | e1535212 | Nitesh Mor | break
|
99 | |||
100 | ## How did we reach here.
|
||
101 | reason = "subcription completed" if self.conn_active \ |
||
102 | else "client left" |
||
103 | print "[%s] finished, because %s" % (self.remote, reason) |
||
104 | ca607e6c | Nitesh Mor | if not self.conn_active: |
105 | print "[%s] terminating subscription" % self.remote |
||
106 | lh.unsubscribe() |
||
107 | e1535212 | Nitesh Mor | |
108 | except Exception as e: |
||
109 | # send any errors back to the client
|
||
110 | error_string = "Exception: %s\n\n" % str(e) |
||
111 | error_string += traceback.format_exc() |
||
112 | print "[%s] %s" % (self.remote, error_string) |
||
113 | reactor.callFromThread(self.sendMessage, error_string, False) |
||
114 | |||
115 | |||
116 | |||
117 | if __name__ == "__main__": |
||
118 | |||
119 | parser = argparse.ArgumentParser() |
||
120 | parser.add_argument("-p", "--port", type=int, default=9007, |
||
121 | help="TCP port to serve requests on, default=9007")
|
||
122 | d9b9a6c3 | Nitesh Mor | parser.add_argument("-s", "--secure", action='store_true', |
123 | help="Enable wss (listens on ws-port + 1)")
|
||
124 | parser.add_argument("--key", type=str, |
||
125 | help="Private key file for secure websockets")
|
||
126 | parser.add_argument("--cert", type=str, |
||
127 | help="Certificate file for secure websockets")
|
||
128 | e1535212 | Nitesh Mor | |
129 | # Get the actual arguments
|
||
130 | args = parser.parse_args() |
||
131 | |||
132 | # gdp.gdp_init()
|
||
133 | # gdp.dbg_set("*=20")
|
||
134 | |||
135 | log.startLogging(sys.stderr) |
||
136 | d9b9a6c3 | Nitesh Mor | |
137 | e1535212 | Nitesh Mor | # Set up argument parsing code
|
138 | factory = WebSocketServerFactory() |
||
139 | factory.protocol = GDPSubscriptionProtocol |
||
140 | |||
141 | reactor.listenTCP(args.port, factory) |
||
142 | d9b9a6c3 | Nitesh Mor | |
143 | if args.secure:
|
||
144 | contextFactory = ssl.DefaultOpenSSLContextFactory(args.key, args.cert) |
||
145 | reactor.listenSSL(args.port+1, factory, contextFactory)
|
||
146 | |||
147 | e1535212 | Nitesh Mor | reactor.run() |