Project

General

Profile

Statistics
| Branch: | Revision:

gdp-if / i3mote / CoAP-gateway.py @ master

History | View | Annotate | Download (13.8 KB)

1 a483abfd Nitesh Mor
#!/usr/bin/env python
2
3
## A script that:
4
# - finds the physical sensors and their corresponding IPv6 addresses
5
#   connected to the current mesh.
6
# - Sets up named pipes (to be used as output files) for each of the
7
#   physical sensors
8
# - Starts up off-the-shelf `coap-client` binary with appropriate
9
#   arguments (especially important to include observe)
10
# - Starts up translator script that takes the raw COAP messages and
11
#   converts them to JSON string (based on the data structure
12
13
import urllib2
14
import json
15
import os
16
import tempfile
17
import time
18
import struct
19
import sys
20
import gdp
21
import subprocess
22
import threading
23 4e8ff471 Nitesh Mor
import argparse
24 a483abfd Nitesh Mor
25 ed12745d Nitesh Mor
26 a483abfd Nitesh Mor
COAP_BINARY="NOT_SET"
27 ed12745d Nitesh Mor
GW_CONTROL="NOT SET"
28 84a1c502 Nitesh Mor
FORMAT = ">hHHIhhhhhhBBHHHHHHHHHHHHHH"
29 a483abfd Nitesh Mor
BUFSIZE = struct.calcsize(FORMAT)
30
31 ed12745d Nitesh Mor
START_TIME = None          ## global start of the program
32
## timing parameters
33 ee53651c Nitesh Mor
COAP_SUBSCRIPTION = 900    ## Max time we subscribe for
34 ed12745d Nitesh Mor
COAP_TIMEOUT = 60          ## If we don't hear anything for
35
                           ## a subscription request, we restart.
36
POLLING_IVAL = 0.5         ## frequency of checking for new data
37 18d00142 Nitesh Mor
NET_POLLING_IVAL = 150     ## How often to check for network  
38 ed12745d Nitesh Mor
39 d1fc74d6 Nitesh Mor
40
##### Some helper functions #########
41
42
def cleanup_file(filename):
43
    """ equivalent to 'rm -f filename' """
44
    try:
45
        os.unlink(filename)
46
    except OSError as e:
47
        pass
48
49
50 ed12745d Nitesh Mor
class CoAPStats(object):
51
    """ Keeping statistics about a CoAP run (timing information is 
52
        accurate within 1 second)
53
    """
54
55 d1fc74d6 Nitesh Mor
    def __init__(self, logfile):
56
        """ initialize with a log-file to dump stats to """
57
        self.fh = open(logfile, "a")
58
59 ed12745d Nitesh Mor
        self.start_time = time.time()   ## subscription sent
60 d1fc74d6 Nitesh Mor
        self.fh.write("s:%d, t:[" % int(self.start_time-START_TIME))
61
        self.fh.flush()
62
63
        self.data_times = []            ## data received at these times
64 ed12745d Nitesh Mor
        self.end_time = None
65
66
    def data_recv(self):
67
        """ new data received on this subscription """
68 d1fc74d6 Nitesh Mor
        t = time.time()
69
        self.data_times.append(t)
70
        self.fh.write("%d, " % int(t-self.start_time))
71
        self.fh.flush()
72 ed12745d Nitesh Mor
73
    def terminate(self):
74
        """ subscription terminated """
75
        self.end_time = time.time()
76 8a883f0a Nitesh Mor
        self.fh.write("], d:%d\n" % int(self.end_time-self.start_time))
77 d1fc74d6 Nitesh Mor
        self.fh.flush()
78
        self.fh.close()
79 ed12745d Nitesh Mor
80
    def get_start(self):
81
        return self.start_time
82
83
    def last_activity(self):
84
        """ last activity on this subscription """
85
        if len(self.data_times)==0:
86
            return self.start_time
87
        else:
88
            return self.data_times[-1]
89
90
    def __str__(self):
91
        duration = -1 if (self.end_time is None) \
92
                        else (self.end_time-self.start_time)
93
        t = str([int(x-self.start_time) for x in self.data_times])
94
        return "s:%d, d:%d, t:%s" % \
95
                     (int(self.start_time-START_TIME), duration, t)
96
97
    def __repr__(self):
98
        return self.__str__()
99
100
101
class Node(object):
102 a483abfd Nitesh Mor
    """
103
    Represents a physical node.
104
    """
105
106 919a2002 Nitesh Mor
    def __init__(self, nodeid, _eui64, _ipv6, prefix):
107 a483abfd Nitesh Mor
108 d1fc74d6 Nitesh Mor
        ## Node specific information
109 919a2002 Nitesh Mor
        self.nodeid = nodeid
110 a483abfd Nitesh Mor
        self.eui64 = _eui64.encode('ascii', 'ignore').translate(None, '-')
111
        self.ipv6 = _ipv6.encode('ascii', 'ignore')
112 d1fc74d6 Nitesh Mor
113
        ## GDP log
114 a483abfd Nitesh Mor
        self.prefix = prefix
115 ad3cb99c Nitesh Mor
        self.logname = "%s.%s" % (self.prefix, self.eui64)
116
117 d1fc74d6 Nitesh Mor
        ## Output of coap binary goes here
118
        self.fifo = os.path.join(tempfile.gettempdir(),
119
                                    "i3mesh-" + self.eui64)
120
121
        # Subscription statistics go to
122
        self.stat_file = os.path.join(tempfile.gettempdir(),
123
                                self.eui64 + "-subscription_stats.log")
124 a483abfd Nitesh Mor
125 d1fc74d6 Nitesh Mor
        print ">>> Node (%s), \n\tIPV6 (%s), \n\ttmpfile (%s), " \
126
                    "\n\tGDP log(%s), \n\tSubscription statistics(%s)" %\
127
                         (self.eui64, self.ipv6, self.fifo,
128
                                self.logname, self.stat_file)
129 2c06ba68 Nitesh Mor
130 d1fc74d6 Nitesh Mor
        ## basic cleanup
131
        cleanup_file(self.fifo)
132
        cleanup_file(self.stat_file)
133
134 ed12745d Nitesh Mor
        ## statistics for restarts, etc
135 d1fc74d6 Nitesh Mor
        self.coap_stats = []
136 919a2002 Nitesh Mor
137
138 ed12745d Nitesh Mor
    def start(self):
139
        """ Start the subscription for this node """
140 18d00142 Nitesh Mor
        # Variable to indicate whether thread is alive or not
141
        self.alive = True
142
143 ed12745d Nitesh Mor
        ## Start the thread that maintains communication with this node
144
        self.thr = threading.Thread(target=self.__processing_thread)
145
        self.thr.start()
146 919a2002 Nitesh Mor
147
148 ed12745d Nitesh Mor
    def stop(self):
149
        """ stop the subscription and cleanup """
150
        self.alive = False
151
        while self.thr.is_alive():
152
            time.sleep(POLLING_IVAL)
153
        assert self.thr.is_alive()==False
154 919a2002 Nitesh Mor
155 a483abfd Nitesh Mor
156 ed12745d Nitesh Mor
    def __del__(self):
157
        self.stop()
158 a483abfd Nitesh Mor
159
160 ed12745d Nitesh Mor
    def __processing_thread(self):
161 a483abfd Nitesh Mor
        """
162 ed12745d Nitesh Mor
        A thread that maintains CoAP subscription to this node and pushes
163
        this to GDP.
164 a483abfd Nitesh Mor
        """
165 ed12745d Nitesh Mor
        
166
        ##Get COAP data from the specified ipv6 address, which represents
167
        ##    a sensor with EUI64. Write this to a temp file at the
168
        ##    specified location.
169
        coap_args = [COAP_BINARY, '-v', '1',
170
                            '-B', str(COAP_SUBSCRIPTION),
171
                            '-s', str(COAP_SUBSCRIPTION),
172 eb67dcdf Nitesh Mor
                            '-N', '-o', self.fifo, '-m', 'get',
173 a483abfd Nitesh Mor
                            "coap://[%s]/sensors" % self.ipv6]
174 ad3cb99c Nitesh Mor
175 d1fc74d6 Nitesh Mor
        ## GDP log handle
176
        __loghandle = gdp.GDP_GCL(gdp.GDP_NAME(self.logname), gdp.GDP_MODE_AO)
177
178 ed12745d Nitesh Mor
        while self.alive:
179
            ## One iteration corresponds to one coap subscription
180 ad3cb99c Nitesh Mor
181 2620d21d Nitesh Mor
            time.sleep(POLLING_IVAL)
182
183 d1fc74d6 Nitesh Mor
            ## for self.fifo
184
            __fh = None
185
            __seekptr = 0
186 ad3cb99c Nitesh Mor
187 ed12745d Nitesh Mor
            ## start the actual process
188 4e8ff471 Nitesh Mor
            __outf = os.path.join(tempfile.gettempdir(), "%s.out" % self.ipv6)
189
            __errf = os.path.join(tempfile.gettempdir(), "%s.err" % self.ipv6)
190
            __out = open(__outf, "w")
191
            __err = open(__errf, "w")
192
            coap_process = subprocess.Popen(coap_args, stdout=__out,
193
                                                        stderr=__err)
194 d1fc74d6 Nitesh Mor
            __cur_stat = CoAPStats(self.stat_file)
195 ad3cb99c Nitesh Mor
196 a560430a Nitesh Mor
            while self.alive and (coap_process.poll() is None):
197 ad3cb99c Nitesh Mor
198 ed12745d Nitesh Mor
                time.sleep(POLLING_IVAL)
199 ad3cb99c Nitesh Mor
200 d1fc74d6 Nitesh Mor
                last_activity = __cur_stat.last_activity()
201 ed12745d Nitesh Mor
                if time.time()-last_activity>=COAP_TIMEOUT:
202 d1fc74d6 Nitesh Mor
                    ## Get out of inner loop and do cleanup
203 ed12745d Nitesh Mor
                    break
204 ad3cb99c Nitesh Mor
205 ed12745d Nitesh Mor
                ## attempt to open file, if we haven't already
206 d1fc74d6 Nitesh Mor
                if __fh is None:
207 ed12745d Nitesh Mor
                    try:
208 d1fc74d6 Nitesh Mor
                        __fh = open(self.fifo)
209 ed12745d Nitesh Mor
                    except IOError as e:
210 d1fc74d6 Nitesh Mor
                        ## no data yet, continue with next
211
                        ##  iteration of inner loop
212 ed12745d Nitesh Mor
                        continue
213 eb67dcdf Nitesh Mor
214 ed12745d Nitesh Mor
                ## Read data from the temp file written to by CoAP,
215
                ## parse it to JSON and append it to a GDP log 
216
                ## (name derived from parameters).
217
                ## Write everything that hasn't been written yet
218 d1fc74d6 Nitesh Mor
                assert __fh is not None
219
                assert __seekptr%BUFSIZE == 0
220 ed12745d Nitesh Mor
        
221 d1fc74d6 Nitesh Mor
                __fh.seek(__seekptr)
222 ed12745d Nitesh Mor
        
223
                # ugly
224 d1fc74d6 Nitesh Mor
                data = __fh.read(BUFSIZE)
225 ed12745d Nitesh Mor
                if len(data)<BUFSIZE:
226 d1fc74d6 Nitesh Mor
                    ## not enough data, continue next iteration
227
                    ##  of inner loop
228 ed12745d Nitesh Mor
                    continue
229
230 d1fc74d6 Nitesh Mor
                __cur_stat.data_recv()
231 ed12745d Nitesh Mor
                json_string = self.parseCOAP(data[:BUFSIZE])
232 18d00142 Nitesh Mor
                try:
233
                    __loghandle.append({'data': json_string})
234
                except gdp.MISC.EP_STAT_Exception as e:
235
                    print "Log %s, error %s" % (self.logname, e)
236
                    print "Killing (and hopefully restarting) this node"
237
                    self.alive = False
238
                    break
239 d1fc74d6 Nitesh Mor
                __seekptr += BUFSIZE
240
241
                ## End of inner loop
242 ed12745d Nitesh Mor
        
243
            ## Make sure we kill the coap process
244 2620d21d Nitesh Mor
            if coap_process.poll() is None:
245 ed12745d Nitesh Mor
                coap_process.kill()
246
247 d1fc74d6 Nitesh Mor
            __cur_stat.terminate()
248
            self.coap_stats.append(__cur_stat)
249
250
            if __fh is not None:
251
                __fh.close()
252
253 4e8ff471 Nitesh Mor
            ## close the file handles
254
            __out.close()
255
            __err.close()
256
257 d1fc74d6 Nitesh Mor
            cleanup_file(self.fifo)
258
259
            ## End of outer loop
260
261 ad3cb99c Nitesh Mor
262
    @staticmethod
263 a483abfd Nitesh Mor
    def parseCOAP(buf):
264
        """
265
        Read a TI COAP message and convert it to JSON string
266
        """
267 ad3cb99c Nitesh Mor
268 a483abfd Nitesh Mor
        # This is from `app.js` provided by TI
269
        assert len(buf) == BUFSIZE
270 ad3cb99c Nitesh Mor
271 a483abfd Nitesh Mor
        parsed = struct.unpack(FORMAT, buf)
272
        message = { "tamb"              : parsed[0]/100.0,
273
                    "rhum"              : parsed[1]/100.0,
274
                    "lux"               : parsed[2]/100.0,
275
                    "press"             : parsed[3]/100.0,
276
                    "gyrox"             : parsed[4]/100.0,
277
                    "gyroy"             : parsed[5]/100.0,
278
                    "gyroz"             : parsed[6]/100.0,
279
                    "accelx"            : parsed[7]/100.0,
280
                    "accely"            : parsed[8]/100.0,
281
                    "accelz"            : parsed[9]/100.0,
282
                    "led"               : parsed[10],
283
                    "channel"           : parsed[11],
284
                    "bat"               : parsed[12]/100.0,
285
                    "eh"                : parsed[13]/100.0,
286 84a1c502 Nitesh Mor
                    "eh1"               : parsed[13]/100.0,
287 a483abfd Nitesh Mor
                    "cc2650_active"     : parsed[14]/100.0,
288
                    "cc2650_sleep"      : parsed[15]/100.0,
289
                    "rf_tx"             : parsed[16]/100.0,
290
                    "rf_rx"             : parsed[17]/100.0,
291
                    "ssm_active"        : parsed[18]/100.0,
292
                    "ssm_sleep"         : parsed[19]/100.0,
293
                    "gpsen_active"      : parsed[20]/100.0,
294
                    "gpsen_sleep"       : parsed[21]/100.0,
295
                    "msp432_active"     : parsed[22]/100.0,
296
                    "msp432_sleep"      : parsed[23]/100.0,
297
                    "others"            : parsed[24]/100.0 }
298
        return json.dumps(message)
299 ad3cb99c Nitesh Mor
300
301 a483abfd Nitesh Mor
def fetch_nodes():
302 919a2002 Nitesh Mor
    """ Fetches the list of nodes from http://localhost/nodes.
303
        Note; ignores the gateway node
304
    """
305 a483abfd Nitesh Mor
    response = urllib2.urlopen("http://localhost/nodes")
306
    json_str = response.read()
307
    json_data = json.loads(json_str)
308
    # Note that the resulting data has no strings, it only has unicodes
309 919a2002 Nitesh Mor
    nodes = [(n['_id'], n['eui64'], n['address']) 
310
                            for n in json_data if n['_id'] != 1]
311
    return nodes
312
313
314
def network_working():
315
    """ Return true if the network is working properly, false otherwise """
316
317
    print "Checking whether gateway and app.js are working"
318
    try:
319
        l = len(fetch_nodes())
320
        assert l>=1
321
        print "Network up, %d nodes" % l
322
        return True
323
    except (AssertionError, urllib2.URLError) as e:
324
        print "Network crashed, %s" % e
325
        return False
326
327
328
329
def restart_gw_appjs():
330
    """ Check if the network is functioning properly. If not, restart it"""
331
    print "(Re)starting gateway"
332 ed12745d Nitesh Mor
    subprocess.call([GW_CONTROL, "stop"])
333
    subprocess.call([GW_CONTROL, "start"])
334
335 919a2002 Nitesh Mor
336 a483abfd Nitesh Mor
337 845e66b6 Nitesh Mor
def main(selfname, gw_control, coap, prefix):
338 a483abfd Nitesh Mor
339 ed12745d Nitesh Mor
    global GW_CONTROL
340
    GW_CONTROL = gw_control
341 a483abfd Nitesh Mor
342
    global COAP_BINARY
343
    COAP_BINARY = coap
344
345 ed12745d Nitesh Mor
    global START_TIME
346
    START_TIME = time.time()
347
348 845e66b6 Nitesh Mor
    logfile = os.path.join(tempfile.gettempdir(), "%s.log" % selfname)
349
    lh = open(logfile, "a")
350
    lh.write(">> %s: Starting %s\n" % (time.asctime(), selfname))
351 f30c2887 Nitesh Mor
    lh.flush()
352 845e66b6 Nitesh Mor
353 a483abfd Nitesh Mor
    gdp.gdp_init()
354
355 919a2002 Nitesh Mor
    ## containers for tuples describing nodes
356
    _prev_nodes, _nodes = [], []
357
    ## The following contains 'Node' objects
358 a483abfd Nitesh Mor
    nodes = []
359
360
    while True:
361 919a2002 Nitesh Mor
362 ed12745d Nitesh Mor
        print "##############################"
363 919a2002 Nitesh Mor
        ## See if we need a network (re)start
364 d1fc74d6 Nitesh Mor
        print "Time since start: %d" % int(time.time()-START_TIME)
365 919a2002 Nitesh Mor
        network_good = network_working()
366 845e66b6 Nitesh Mor
        lh.write("## %s: Network in good order? % s\n" %
367
                                    (time.asctime(), network_good))
368 f30c2887 Nitesh Mor
        lh.flush()
369 919a2002 Nitesh Mor
370
        if not network_good:
371 845e66b6 Nitesh Mor
            lh.write("@@ %s: Restarting network\n" % time.asctime())
372 f30c2887 Nitesh Mor
            lh.flush()
373 919a2002 Nitesh Mor
            restart_gw_appjs()
374
            _prev_nodes = []
375 f5e38d29 Nitesh Mor
            for node in nodes:
376
                node.stop()
377
            nodes = []
378 919a2002 Nitesh Mor
        else:
379
            _prev_nodes = _nodes
380 ed12745d Nitesh Mor
381
        _nodes = fetch_nodes()
382 919a2002 Nitesh Mor
383
        ## figure out the nodes we need to add/remove
384
        _del = set(_prev_nodes)-set(_nodes)
385
        _new = set(_nodes)-set(_prev_nodes)
386
387
        print "Dropped nodes:", list(_del)
388
        print "New nodes:", list(_new)
389
390
        ## first get rid of dropped nodes
391 a560430a Nitesh Mor
        __tmp = []
392 919a2002 Nitesh Mor
        for n in _del:
393 ed12745d Nitesh Mor
            for node in nodes:
394
                if n[0] == node.nodeid:
395
                    node.stop()
396
                    __tmp.append(node)
397 a560430a Nitesh Mor
        for node in __tmp:
398
            nodes.remove(node)
399
        del __tmp
400 919a2002 Nitesh Mor
401 18d00142 Nitesh Mor
        ## Restart any dead nodes
402
        for node in nodes:
403
            if node.alive == False:
404
                node.start()
405
406 919a2002 Nitesh Mor
        ## Create new nodes
407
        for n in _new:
408
            node = Node(n[0], n[1], n[2], prefix)
409 ed12745d Nitesh Mor
            node.start()
410 919a2002 Nitesh Mor
            nodes.append(node)
411
412 ed12745d Nitesh Mor
        ## Do some sleeping
413
        time.sleep(NET_POLLING_IVAL)
414 a483abfd Nitesh Mor
415 f30c2887 Nitesh Mor
    lh.flush()
416 845e66b6 Nitesh Mor
    lh.close()
417
418
419 a483abfd Nitesh Mor
if __name__ == "__main__":
420
421 4e8ff471 Nitesh Mor
    parser = argparse.ArgumentParser(description="Data collection for TI "
422
                                "i3motes. Essentially a CoAP=>GDP gateway")
423
    parser.add_argument("gw_control", type=str,
424
                            help="The restart script for gateway")
425
    parser.add_argument("coap_client", type=str,
426
                            help="The libcoap binary for your platform")
427
    parser.add_argument("log_prefix", type=str,
428
                            help="log-prefix for GDP logs, e.g. "
429
                                 "edu.berkeley.eecs.swarmlab.device")
430
    args = parser.parse_args()
431
432
    selfname = sys.argv[0]
433
    main(selfname, args.gw_control, args.coap_client, args.log_prefix)