Project

General

Profile

Statistics
| Branch: | Revision:

gdp-if / i3mote / CoAP-gateway.py @ master

History | View | Annotate | Download (13.8 KB)

1
#!/usr/bin/env python
2

    
3
## A script that:
4
# - finds the physical sensors and their corresponding IPv6 addresses
5
#   connected to the current mesh.
6
# - Sets up named pipes (to be used as output files) for each of the
7
#   physical sensors
8
# - Starts up off-the-shelf `coap-client` binary with appropriate
9
#   arguments (especially important to include observe)
10
# - Starts up translator script that takes the raw COAP messages and
11
#   converts them to JSON string (based on the data structure
12

    
13
import urllib2
14
import json
15
import os
16
import tempfile
17
import time
18
import struct
19
import sys
20
import gdp
21
import subprocess
22
import threading
23
import argparse
24

    
25

    
26
COAP_BINARY="NOT_SET"
27
GW_CONTROL="NOT SET"
28
FORMAT = ">hHHIhhhhhhBBHHHHHHHHHHHHHH"
29
BUFSIZE = struct.calcsize(FORMAT)
30

    
31
START_TIME = None          ## global start of the program
32
## timing parameters
33
COAP_SUBSCRIPTION = 900    ## Max time we subscribe for
34
COAP_TIMEOUT = 60          ## If we don't hear anything for
35
                           ## a subscription request, we restart.
36
POLLING_IVAL = 0.5         ## frequency of checking for new data
37
NET_POLLING_IVAL = 150     ## How often to check for network  
38

    
39

    
40
##### Some helper functions #########
41

    
42
def cleanup_file(filename):
43
    """ equivalent to 'rm -f filename' """
44
    try:
45
        os.unlink(filename)
46
    except OSError as e:
47
        pass
48

    
49

    
50
class CoAPStats(object):
51
    """ Keeping statistics about a CoAP run (timing information is 
52
        accurate within 1 second)
53
    """
54

    
55
    def __init__(self, logfile):
56
        """ initialize with a log-file to dump stats to """
57
        self.fh = open(logfile, "a")
58

    
59
        self.start_time = time.time()   ## subscription sent
60
        self.fh.write("s:%d, t:[" % int(self.start_time-START_TIME))
61
        self.fh.flush()
62

    
63
        self.data_times = []            ## data received at these times
64
        self.end_time = None
65

    
66
    def data_recv(self):
67
        """ new data received on this subscription """
68
        t = time.time()
69
        self.data_times.append(t)
70
        self.fh.write("%d, " % int(t-self.start_time))
71
        self.fh.flush()
72

    
73
    def terminate(self):
74
        """ subscription terminated """
75
        self.end_time = time.time()
76
        self.fh.write("], d:%d\n" % int(self.end_time-self.start_time))
77
        self.fh.flush()
78
        self.fh.close()
79

    
80
    def get_start(self):
81
        return self.start_time
82

    
83
    def last_activity(self):
84
        """ last activity on this subscription """
85
        if len(self.data_times)==0:
86
            return self.start_time
87
        else:
88
            return self.data_times[-1]
89

    
90
    def __str__(self):
91
        duration = -1 if (self.end_time is None) \
92
                        else (self.end_time-self.start_time)
93
        t = str([int(x-self.start_time) for x in self.data_times])
94
        return "s:%d, d:%d, t:%s" % \
95
                     (int(self.start_time-START_TIME), duration, t)
96

    
97
    def __repr__(self):
98
        return self.__str__()
99

    
100

    
101
class Node(object):
102
    """
103
    Represents a physical node.
104
    """
105

    
106
    def __init__(self, nodeid, _eui64, _ipv6, prefix):
107

    
108
        ## Node specific information
109
        self.nodeid = nodeid
110
        self.eui64 = _eui64.encode('ascii', 'ignore').translate(None, '-')
111
        self.ipv6 = _ipv6.encode('ascii', 'ignore')
112

    
113
        ## GDP log
114
        self.prefix = prefix
115
        self.logname = "%s.%s" % (self.prefix, self.eui64)
116

    
117
        ## Output of coap binary goes here
118
        self.fifo = os.path.join(tempfile.gettempdir(),
119
                                    "i3mesh-" + self.eui64)
120

    
121
        # Subscription statistics go to
122
        self.stat_file = os.path.join(tempfile.gettempdir(),
123
                                self.eui64 + "-subscription_stats.log")
124

    
125
        print ">>> Node (%s), \n\tIPV6 (%s), \n\ttmpfile (%s), " \
126
                    "\n\tGDP log(%s), \n\tSubscription statistics(%s)" %\
127
                         (self.eui64, self.ipv6, self.fifo,
128
                                self.logname, self.stat_file)
129

    
130
        ## basic cleanup
131
        cleanup_file(self.fifo)
132
        cleanup_file(self.stat_file)
133

    
134
        ## statistics for restarts, etc
135
        self.coap_stats = []
136

    
137

    
138
    def start(self):
139
        """ Start the subscription for this node """
140
        # Variable to indicate whether thread is alive or not
141
        self.alive = True
142

    
143
        ## Start the thread that maintains communication with this node
144
        self.thr = threading.Thread(target=self.__processing_thread)
145
        self.thr.start()
146

    
147

    
148
    def stop(self):
149
        """ stop the subscription and cleanup """
150
        self.alive = False
151
        while self.thr.is_alive():
152
            time.sleep(POLLING_IVAL)
153
        assert self.thr.is_alive()==False
154

    
155

    
156
    def __del__(self):
157
        self.stop()
158

    
159

    
160
    def __processing_thread(self):
161
        """
162
        A thread that maintains CoAP subscription to this node and pushes
163
        this to GDP.
164
        """
165
        
166
        ##Get COAP data from the specified ipv6 address, which represents
167
        ##    a sensor with EUI64. Write this to a temp file at the
168
        ##    specified location.
169
        coap_args = [COAP_BINARY, '-v', '1',
170
                            '-B', str(COAP_SUBSCRIPTION),
171
                            '-s', str(COAP_SUBSCRIPTION),
172
                            '-N', '-o', self.fifo, '-m', 'get',
173
                            "coap://[%s]/sensors" % self.ipv6]
174

    
175
        ## GDP log handle
176
        __loghandle = gdp.GDP_GCL(gdp.GDP_NAME(self.logname), gdp.GDP_MODE_AO)
177

    
178
        while self.alive:
179
            ## One iteration corresponds to one coap subscription
180

    
181
            time.sleep(POLLING_IVAL)
182

    
183
            ## for self.fifo
184
            __fh = None
185
            __seekptr = 0
186

    
187
            ## start the actual process
188
            __outf = os.path.join(tempfile.gettempdir(), "%s.out" % self.ipv6)
189
            __errf = os.path.join(tempfile.gettempdir(), "%s.err" % self.ipv6)
190
            __out = open(__outf, "w")
191
            __err = open(__errf, "w")
192
            coap_process = subprocess.Popen(coap_args, stdout=__out,
193
                                                        stderr=__err)
194
            __cur_stat = CoAPStats(self.stat_file)
195

    
196
            while self.alive and (coap_process.poll() is None):
197

    
198
                time.sleep(POLLING_IVAL)
199

    
200
                last_activity = __cur_stat.last_activity()
201
                if time.time()-last_activity>=COAP_TIMEOUT:
202
                    ## Get out of inner loop and do cleanup
203
                    break
204

    
205
                ## attempt to open file, if we haven't already
206
                if __fh is None:
207
                    try:
208
                        __fh = open(self.fifo)
209
                    except IOError as e:
210
                        ## no data yet, continue with next
211
                        ##  iteration of inner loop
212
                        continue
213

    
214
                ## Read data from the temp file written to by CoAP,
215
                ## parse it to JSON and append it to a GDP log 
216
                ## (name derived from parameters).
217
                ## Write everything that hasn't been written yet
218
                assert __fh is not None
219
                assert __seekptr%BUFSIZE == 0
220
        
221
                __fh.seek(__seekptr)
222
        
223
                # ugly
224
                data = __fh.read(BUFSIZE)
225
                if len(data)<BUFSIZE:
226
                    ## not enough data, continue next iteration
227
                    ##  of inner loop
228
                    continue
229

    
230
                __cur_stat.data_recv()
231
                json_string = self.parseCOAP(data[:BUFSIZE])
232
                try:
233
                    __loghandle.append({'data': json_string})
234
                except gdp.MISC.EP_STAT_Exception as e:
235
                    print "Log %s, error %s" % (self.logname, e)
236
                    print "Killing (and hopefully restarting) this node"
237
                    self.alive = False
238
                    break
239
                __seekptr += BUFSIZE
240

    
241
                ## End of inner loop
242
        
243
            ## Make sure we kill the coap process
244
            if coap_process.poll() is None:
245
                coap_process.kill()
246

    
247
            __cur_stat.terminate()
248
            self.coap_stats.append(__cur_stat)
249

    
250
            if __fh is not None:
251
                __fh.close()
252

    
253
            ## close the file handles
254
            __out.close()
255
            __err.close()
256

    
257
            cleanup_file(self.fifo)
258

    
259
            ## End of outer loop
260

    
261

    
262
    @staticmethod
263
    def parseCOAP(buf):
264
        """
265
        Read a TI COAP message and convert it to JSON string
266
        """
267

    
268
        # This is from `app.js` provided by TI
269
        assert len(buf) == BUFSIZE
270

    
271
        parsed = struct.unpack(FORMAT, buf)
272
        message = { "tamb"              : parsed[0]/100.0,
273
                    "rhum"              : parsed[1]/100.0,
274
                    "lux"               : parsed[2]/100.0,
275
                    "press"             : parsed[3]/100.0,
276
                    "gyrox"             : parsed[4]/100.0,
277
                    "gyroy"             : parsed[5]/100.0,
278
                    "gyroz"             : parsed[6]/100.0,
279
                    "accelx"            : parsed[7]/100.0,
280
                    "accely"            : parsed[8]/100.0,
281
                    "accelz"            : parsed[9]/100.0,
282
                    "led"               : parsed[10],
283
                    "channel"           : parsed[11],
284
                    "bat"               : parsed[12]/100.0,
285
                    "eh"                : parsed[13]/100.0,
286
                    "eh1"               : parsed[13]/100.0,
287
                    "cc2650_active"     : parsed[14]/100.0,
288
                    "cc2650_sleep"      : parsed[15]/100.0,
289
                    "rf_tx"             : parsed[16]/100.0,
290
                    "rf_rx"             : parsed[17]/100.0,
291
                    "ssm_active"        : parsed[18]/100.0,
292
                    "ssm_sleep"         : parsed[19]/100.0,
293
                    "gpsen_active"      : parsed[20]/100.0,
294
                    "gpsen_sleep"       : parsed[21]/100.0,
295
                    "msp432_active"     : parsed[22]/100.0,
296
                    "msp432_sleep"      : parsed[23]/100.0,
297
                    "others"            : parsed[24]/100.0 }
298
        return json.dumps(message)
299

    
300

    
301
def fetch_nodes():
302
    """ Fetches the list of nodes from http://localhost/nodes.
303
        Note; ignores the gateway node
304
    """
305
    response = urllib2.urlopen("http://localhost/nodes")
306
    json_str = response.read()
307
    json_data = json.loads(json_str)
308
    # Note that the resulting data has no strings, it only has unicodes
309
    nodes = [(n['_id'], n['eui64'], n['address']) 
310
                            for n in json_data if n['_id'] != 1]
311
    return nodes
312

    
313

    
314
def network_working():
315
    """ Return true if the network is working properly, false otherwise """
316

    
317
    print "Checking whether gateway and app.js are working"
318
    try:
319
        l = len(fetch_nodes())
320
        assert l>=1
321
        print "Network up, %d nodes" % l
322
        return True
323
    except (AssertionError, urllib2.URLError) as e:
324
        print "Network crashed, %s" % e
325
        return False
326

    
327

    
328

    
329
def restart_gw_appjs():
330
    """ Check if the network is functioning properly. If not, restart it"""
331
    print "(Re)starting gateway"
332
    subprocess.call([GW_CONTROL, "stop"])
333
    subprocess.call([GW_CONTROL, "start"])
334

    
335

    
336

    
337
def main(selfname, gw_control, coap, prefix):
338

    
339
    global GW_CONTROL
340
    GW_CONTROL = gw_control
341

    
342
    global COAP_BINARY
343
    COAP_BINARY = coap
344

    
345
    global START_TIME
346
    START_TIME = time.time()
347

    
348
    logfile = os.path.join(tempfile.gettempdir(), "%s.log" % selfname)
349
    lh = open(logfile, "a")
350
    lh.write(">> %s: Starting %s\n" % (time.asctime(), selfname))
351
    lh.flush()
352

    
353
    gdp.gdp_init()
354

    
355
    ## containers for tuples describing nodes
356
    _prev_nodes, _nodes = [], []
357
    ## The following contains 'Node' objects
358
    nodes = []
359

    
360
    while True:
361

    
362
        print "##############################"
363
        ## See if we need a network (re)start
364
        print "Time since start: %d" % int(time.time()-START_TIME)
365
        network_good = network_working()
366
        lh.write("## %s: Network in good order? % s\n" %
367
                                    (time.asctime(), network_good))
368
        lh.flush()
369

    
370
        if not network_good:
371
            lh.write("@@ %s: Restarting network\n" % time.asctime())
372
            lh.flush()
373
            restart_gw_appjs()
374
            _prev_nodes = []
375
            for node in nodes:
376
                node.stop()
377
            nodes = []
378
        else:
379
            _prev_nodes = _nodes
380

    
381
        _nodes = fetch_nodes()
382

    
383
        ## figure out the nodes we need to add/remove
384
        _del = set(_prev_nodes)-set(_nodes)
385
        _new = set(_nodes)-set(_prev_nodes)
386

    
387
        print "Dropped nodes:", list(_del)
388
        print "New nodes:", list(_new)
389

    
390
        ## first get rid of dropped nodes
391
        __tmp = []
392
        for n in _del:
393
            for node in nodes:
394
                if n[0] == node.nodeid:
395
                    node.stop()
396
                    __tmp.append(node)
397
        for node in __tmp:
398
            nodes.remove(node)
399
        del __tmp
400

    
401
        ## Restart any dead nodes
402
        for node in nodes:
403
            if node.alive == False:
404
                node.start()
405

    
406
        ## Create new nodes
407
        for n in _new:
408
            node = Node(n[0], n[1], n[2], prefix)
409
            node.start()
410
            nodes.append(node)
411

    
412
        ## Do some sleeping
413
        time.sleep(NET_POLLING_IVAL)
414

    
415
    lh.flush()
416
    lh.close()
417

    
418

    
419
if __name__ == "__main__":
420

    
421
    parser = argparse.ArgumentParser(description="Data collection for TI "
422
                                "i3motes. Essentially a CoAP=>GDP gateway")
423
    parser.add_argument("gw_control", type=str,
424
                            help="The restart script for gateway")
425
    parser.add_argument("coap_client", type=str,
426
                            help="The libcoap binary for your platform")
427
    parser.add_argument("log_prefix", type=str,
428
                            help="log-prefix for GDP logs, e.g. "
429
                                 "edu.berkeley.eecs.swarmlab.device")
430
    args = parser.parse_args()
431

    
432
    selfname = sys.argv[0]
433
    main(selfname, args.gw_control, args.coap_client, args.log_prefix)