gdp-if / websocket-subscriber / gdp-ws.py @ master
History | View | Annotate | Download (5.12 KB)
1 |
#!/usr/bin/env python
|
---|---|
2 |
|
3 |
import sys |
4 |
import gdp |
5 |
import traceback |
6 |
import threading |
7 |
import json |
8 |
import argparse |
9 |
from twisted.python import log |
10 |
from twisted.internet import reactor, ssl |
11 |
|
12 |
|
13 |
from autobahn.twisted.websocket import WebSocketServerProtocol, \ |
14 |
WebSocketServerFactory
|
15 |
|
16 |
|
17 |
class GDPSubscriptionProtocol(WebSocketServerProtocol): |
18 |
|
19 |
def __init__(self, *args): |
20 |
WebSocketServerProtocol.__init__(self, *args)
|
21 |
self.conn_active = False |
22 |
self.remote = None |
23 |
|
24 |
def onConnect(self, request): |
25 |
print "[%s] client connected." % request.peer |
26 |
self.remote = request.peer
|
27 |
|
28 |
def onOpen(self): |
29 |
print "[%s] connection open." % self.remote |
30 |
self.conn_active = True |
31 |
|
32 |
def onMessage(self, payload, isBinary): |
33 |
print "[%s] received message: %s" % (self.remote, payload) |
34 |
reactor.callInThread(self.__onMessage, payload, isBinary)
|
35 |
|
36 |
def onClose(self, wasClean, code, reason): |
37 |
print "[%s] connection closed." % (self.remote) |
38 |
self.conn_active = False |
39 |
|
40 |
def __onMessage(self, payload, isBinary): |
41 |
"""
|
42 |
Runs in a non-reactor thread.
|
43 |
payload is a dictionary to specify subscription parameters:
|
44 |
{"logname": <>, # required
|
45 |
"startrec": <>, # optional
|
46 |
"numrec": <>, # optional
|
47 |
}
|
48 |
"""
|
49 |
|
50 |
## timeout of 100 ms
|
51 |
ev_timeout = {"tv_sec": 0, "tv_nsec": 100 * 10**6, "tv_accuracy": 0.0} |
52 |
|
53 |
try:
|
54 |
thread_ = threading.current_thread().getName() |
55 |
payload_dict = json.loads(payload) |
56 |
logname = payload_dict["logname"]
|
57 |
startrec = payload_dict.get("startrec", 0) |
58 |
numrec = payload_dict.get("numrec", 1) |
59 |
|
60 |
print "[%s][%s] Subscribing to log=%s, startrec=%d, numrec=%d" % \ |
61 |
(thread_, self.remote, logname, startrec, numrec)
|
62 |
|
63 |
lh = gdp.GDP_GIN(gdp.GDP_NAME(logname), gdp.GDP_MODE_RO) |
64 |
lh.subscribe_by_recno(startrec, numrec, None)
|
65 |
|
66 |
while self.conn_active: |
67 |
|
68 |
event = None
|
69 |
while event is None and self.conn_active: |
70 |
event = lh.get_next_event(ev_timeout) |
71 |
|
72 |
if not self.conn_active: |
73 |
break
|
74 |
|
75 |
# Create a dictionary that we send to the client
|
76 |
__evdict = {} |
77 |
gin_handle = event["gin"]
|
78 |
event_ep_stat = event["stat"]
|
79 |
assert lh == gin_handle
|
80 |
datum = event["datum"]
|
81 |
|
82 |
__evdict["logname"] = logname
|
83 |
__evdict["ep_stat"] = (event_ep_stat.code,
|
84 |
gdp.MISC.ep_stat_tostr(event_ep_stat)) |
85 |
__evdict["type"] = event["type"] |
86 |
__evdict["datum"] = {"recno": datum["recno"], |
87 |
"data": datum["buf"].peek(), |
88 |
"ts": datum["ts"]} |
89 |
|
90 |
# send the event to client, but remove binary signature first
|
91 |
## this will have problems with binary data in log, but there
|
92 |
## is no better alternative if we want to stick to JSON
|
93 |
__evstr = json.dumps(__evdict) |
94 |
print "[%s] sending event: %s" % (self.remote, __evstr) |
95 |
reactor.callFromThread(self.sendMessage, __evstr, False) |
96 |
|
97 |
if event["type"] == gdp.GDP_EVENT_DONE: |
98 |
break
|
99 |
|
100 |
## How did we reach here.
|
101 |
reason = "subcription completed" if self.conn_active \ |
102 |
else "client left" |
103 |
print "[%s] finished, because %s" % (self.remote, reason) |
104 |
if not self.conn_active: |
105 |
print "[%s] terminating subscription" % self.remote |
106 |
lh.unsubscribe() |
107 |
|
108 |
except Exception as e: |
109 |
# send any errors back to the client
|
110 |
error_string = "Exception: %s\n\n" % str(e) |
111 |
error_string += traceback.format_exc() |
112 |
print "[%s] %s" % (self.remote, error_string) |
113 |
reactor.callFromThread(self.sendMessage, error_string, False) |
114 |
|
115 |
|
116 |
|
117 |
if __name__ == "__main__": |
118 |
|
119 |
parser = argparse.ArgumentParser() |
120 |
parser.add_argument("-p", "--port", type=int, default=9007, |
121 |
help="TCP port to serve requests on, default=9007")
|
122 |
parser.add_argument("-s", "--secure", action='store_true', |
123 |
help="Enable wss (listens on ws-port + 1)")
|
124 |
parser.add_argument("--key", type=str, |
125 |
help="Private key file for secure websockets")
|
126 |
parser.add_argument("--cert", type=str, |
127 |
help="Certificate file for secure websockets")
|
128 |
|
129 |
# Get the actual arguments
|
130 |
args = parser.parse_args() |
131 |
|
132 |
# gdp.gdp_init()
|
133 |
# gdp.dbg_set("*=20")
|
134 |
|
135 |
log.startLogging(sys.stderr) |
136 |
|
137 |
# Set up argument parsing code
|
138 |
factory = WebSocketServerFactory() |
139 |
factory.protocol = GDPSubscriptionProtocol |
140 |
|
141 |
reactor.listenTCP(args.port, factory) |
142 |
|
143 |
if args.secure:
|
144 |
contextFactory = ssl.DefaultOpenSSLContextFactory(args.key, args.cert) |
145 |
reactor.listenSSL(args.port+1, factory, contextFactory)
|
146 |
|
147 |
reactor.run() |