Project

General

Profile

Statistics
| Branch: | Revision:

gdp-if / tensorflow / gdpfs.cc @ master

History | View | Annotate | Download (33.6 KB)

1
/* vim: set ai sw=4 sts=4 ts=4 :*/
2

    
3
#include "gdpfs.h"
4

    
5
#include <assert.h>
6
#include <time.h>
7
#include <iostream>
8
#include <cstdint>
9
#include <map>
10
#include <vector>
11
#include <string>
12

    
13
#include "GDPfs.pb.h"
14

    
15
static EP_DBG Dbg = EP_DBG_INIT("gdpfs.main", "GDP FileSystem");
16
static gdp_iomode_t GLOBAL_IOMODE;
17
static const char *LogPrefixDef = "tensorflow";
18

    
19
static gdp_recno_t RawAppend(
20
                                                gdp_gin_t *gin,
21
                                                const std::string& s,
22
                                                bool async = false);
23

    
24
// get time since epoch in ns
25
uint64_t TimeNS()
26
{
27
        struct timeval t;
28
        gettimeofday(&t, NULL);
29
        return (uint64_t) 1000*(t.tv_sec*1000000 + t.tv_usec);
30
}
31

    
32
// returns whether a given i/o mode is good enough for write ops
33
bool __writable_mode(gdp_iomode_t m)
34
{
35
        return (((uint32_t) m & (uint32_t) GDP_MODE_AO) != 0);
36
}
37

    
38
// Initializes the library. "mode" provides a general global I/O mode,
39
// invidual usage can be more restrictive. i.e. memory-mapped files are
40
// always in RO mode, even if the initialization mode allows writing.
41
GdpfsStatus GDPfsInit(gdp_iomode_t mode, const char* debug_setting)
42
{
43
        srand (time(NULL));
44

    
45
        ep_dbg_cprintf(Dbg, 2, "Setting global I/O mode to: %d\n", mode);
46
        GLOBAL_IOMODE = mode;
47
        ep_dbg_cprintf(Dbg, 2, "Done reading the configuration\n");
48

    
49
        return kSuccess;
50
}
51

    
52

    
53
// Initialize a GDPfs-based filesystem
54
// Since there is no root directory to used as an anchor object this
55
//   has to stick to low level primitives, hence the code duplication
56
//   (from GDPDir::NewEntry and GDPDir::AddEntry).
57
//   XXX This is a prime target for refactoring. XXX
58
// NOTE WELL: This does not update the cache, so you cannot combine it
59
//   with other operations, that is, the process must exit without using
60
//   the filesystem.
61
static GdpfsStatus RawCreateLog(std::string logname, gdp_gin_t **ginp);
62

    
63
GdpfsStatus GDPfsMkfs(const std::string& rootdir)
64
{
65
        GdpfsStatus gstat;
66
        gdp_gin_t *gin;
67

    
68
        ep_dbg_cprintf(Dbg, 10, "Initializing filesystem %s\n", rootdir.c_str());
69
        GDPfsInit(GDP_MODE_RA, NULL);
70

    
71
        // actually create the physical log
72
        gstat = RawCreateLog(rootdir, &gin);
73
        if (gstat != kSuccess)
74
                return gstat;
75

    
76
        // create a GDPfsMsg that we'll append to the log
77
        ep_dbg_cprintf(Dbg, 15, "Adding [%s] to the log\n", rootdir.c_str());
78

    
79
        GDPfs::GDPfsMsg m;
80
        GDPfs::GDPfsMeta *meta;
81

    
82
        meta = m.mutable_meta();
83
        meta->set_type(GDPfs::DIR);
84

    
85
        m.set_time_ns(TimeNS());
86

    
87
        // actually append it to the log
88
        std::string s;
89
        m.SerializeToString(&s);
90
        RawAppend(gin, s);
91
        gdp_gin_close(gin);
92
}
93

    
94

    
95
// Create a null terminated string of length "len" (including the null)
96
// and store it at provided memory location. If "cheat" is true, then
97
// a deterministic string is generated instead of truly random string,
98
// usually resulting in much faster creation.
99
// Assumes that "s" can hold "len" number of bytes.
100
void RandData(char *s, const int len, bool cheat)
101
{
102
        static const char alphanum[] =
103
            "0123456789"
104
                "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
105
        "abcdefghijklmnopqrstuvwxyz";
106

    
107
        if (cheat)
108
        {
109
                memset(s, 88, len-1);
110
        }
111
        else
112
        {
113
                for (int i = 0; i < len; ++i)
114
                        s[i] = alphanum[rand() % (sizeof(alphanum) - 1)];
115
        }
116
    s[len] = 0;
117
}
118

    
119

    
120
// Returns the full path (fname) split into individual components.
121
// Uses "://" as a delimiter for scheme, and "/" as a delimiter for
122
// path components. An example:
123
// "gdp://x/y/z" => "scheme" = "gdp", "parts"={"x", "y", "z"}
124
void SplitPath(const std::string& fname, std::string* scheme,
125
                                std::vector<std::string> *p)
126
{
127
        ep_dbg_cprintf(Dbg, 53, "SplitPath(%s)\n", fname.c_str());
128

    
129
        std::size_t s;
130
        s = fname.find("://");
131

    
132
        if (s != std::string::npos)
133
        {
134
                scheme->assign(fname.substr(0, s));
135
                // to account for the length of '://'
136
                s = s+3;
137
        }
138
        else
139
                s = 0;
140

    
141
        char *tmp = new char[fname.length()-s+1];
142
        strncpy(tmp, fname.c_str()+s, (fname.length()-s+1));
143

    
144
        p->clear();
145
        char *tok;
146
        std::string str;
147

    
148
        tok = strtok(tmp, "/");
149
        while (tok != NULL)
150
        {
151
                str.clear();
152
                str.assign(tok);
153
                p->push_back(str);
154
                tok = strtok(NULL, "/");
155
        }
156

    
157
        // just debugging output below.
158
        if (ep_dbg_test(Dbg, 53))
159
        {
160
                std::vector<std::string>::iterator it;
161

    
162
                ep_dbg_printf("SplitPath(%s) => scheme: [%s]\n",
163
                                                fname.c_str(), scheme->c_str());
164
                for (it=p->begin(); it!=p->end(); it++)
165
                        ep_dbg_printf("=> '%s' ", it->c_str());
166

    
167
                ep_dbg_printf("\n");
168
        }
169
}
170

    
171

    
172
// Takes in a full path (including "gdp://") and returns the
173
// name of the directory log in "topdir" and everything else
174
// in "remaining". As a side effect, performs sanitization of
175
// the path (duplicated '//', etc).
176
// e.g. "gdp://x/y/z" => "topdir" = "x", "remaining" = "y/z"
177
// Returns error when the path can not be parsed as a gdp path.
178
GdpfsStatus ParsePath(const std::string& fname, std::string* topdir,
179
                                                std::string* remaining)
180
{
181
        uint32_t i=0;
182
        std::string scheme;
183
        std::vector<std::string> parts;
184

    
185
        topdir->clear();
186
        remaining->clear();
187

    
188
        SplitPath(fname, &scheme, &parts);
189

    
190
        if (scheme != "gdp" || parts.size() == 0)
191
        {
192
                ep_dbg_cprintf(Dbg, 2, "Invalid GDP path: %s\n", fname.c_str());
193
                return kInvalidArg;
194
        }
195
        
196
        topdir->assign(parts[0]);
197
        for (i=1; i<parts.size(); i++)
198
        {
199
                remaining->append(parts[i]);
200
                if (i!=parts.size()-1)
201
                        remaining->append("/");
202
        }
203

    
204
        ep_dbg_cprintf(Dbg, 43, "ParsePath(%s) => %s, %s\n",
205
                                                        fname.c_str(), topdir->c_str(),
206
                                                        remaining->c_str());
207
        return kSuccess;
208
}
209

    
210
// Splits the name into dirname and basename (the last part
211
// of a name. Input name may or may not have the protocol
212
// (gdp://) included. However, dirname never includes this
213
// protocol name.
214
// e.g., "gdp://x/y/z" => dirname = "x/y", basename = "z".
215
void BaseDirName(const std::string& fname, std::string* dirname,
216
                                        std::string* basename)
217
{
218
        uint32_t i;
219
        std::string scheme;
220
        std::vector<std::string> parts;
221

    
222
        dirname->clear();
223
        basename->clear();
224

    
225
        SplitPath(fname, &scheme, &parts);
226

    
227
        for (i=0; i<parts.size()-1; i++){
228
                dirname->append(parts[i]);
229
                if (i!=parts.size()-2)
230
                        dirname->append("/");
231
        }
232
        basename->assign(parts[parts.size()-1]);
233

    
234
        ep_dbg_cprintf(Dbg, 42, "BaseDirName(%s) => %s, %s\n",
235
                                                        fname.c_str(), dirname->c_str(),
236
                                                        basename->c_str());
237
}
238

    
239
// Returns whether the path is a valid file/directory name.
240
// At the moment, simply checks for presence of a '/' in the name.
241
// XXX the checking should be more extensive and thorough
242
bool NameValid(const std::string& path)
243
{
244
        std::size_t found = path.find("/");
245
        return (found == std::string::npos);
246
}
247

    
248
// Do a low (GDP) level log creation.  The (human-oriented) log name
249
// must already be created.
250
static GdpfsStatus RawCreateLog(std::string logname, gdp_gin_t **ginp)
251
{
252
        EP_STAT estat;
253
        gdp_md_t *gmd;
254
        gdp_name_t log_internal;
255
        // sig-key management. We don't create keys, however.
256
        bool usesig;
257
        const char *keyfile = NULL;
258
        EP_CRYPTO_KEY *key = NULL;
259
        gdp_create_info_t *gci = gdp_create_info_new();
260
        const char *logname_c = NULL;
261

    
262
        if (logname.length() > 0)
263
        {
264
                logname_c = logname.c_str();
265
                estat = gdp_parse_name(logname_c, log_internal);
266
                if (!EP_STAT_ISOK(estat))
267
                        return kGdpNameError;
268
        }
269

    
270
        ep_dbg_cprintf(Dbg, 4, "Creating log: %s\n", logname_c);
271

    
272
        gmd = gdp_md_new(0);
273

    
274
        // read a signature key if we are supposed to use signatures
275
        usesig = ep_adm_getboolparam("swarm.gdpfs.usesig", false);
276

    
277
        if (usesig)
278
        {
279

    
280
                // read the key in "key"
281
                keyfile = ep_adm_getstrparam("swarm.gdpfs.keyfile", NULL);
282
                if (keyfile == NULL)
283
                {
284
                        ep_app_error("No keyfile provided.");
285
                        exit(70);        // EX_SOFTWARE XXX fix this
286
                }
287

    
288
                // read the provided keyfile and provide to create info
289
                key = ep_crypto_key_read_file(keyfile, EP_CRYPTO_KEYFORM_PEM,
290
                                                        EP_CRYPTO_F_SECRET);
291
                if (key == NULL)
292
                {
293
                        ep_app_error("Cannot read keyfile %s", keyfile);
294
                        exit(70);        // EX_SOFTWARE XXX fix this
295
                }
296
                estat = gdp_create_info_set_owner_key(gci, key, "sha256");
297
                if (!EP_STAT_ISOK(estat))
298
                {
299
                        ep_app_error("Could not set owner key");
300
                        exit(70);        // EX_SOFTWARE XXX fix this
301
                }
302
        }
303
        else
304
        {
305
                estat = gdp_create_info_new_owner_key(gci, NULL, "none",
306
                                0, NULL, "none");
307
                if (!EP_STAT_ISOK(estat))
308
                {
309
                        ep_app_error("Could not set null owner key for %s",
310
                                        logname_c);
311
                        exit(70);        // EX_SOFTWARE XXX fix this
312
                }
313
        }
314

    
315
        estat = gdp_gin_create(gci, logname_c, ginp);
316
        gdp_create_info_free(&gci);
317

    
318
        if (EP_STAT_ISOK(estat))
319
        {
320
                ep_dbg_cprintf(Dbg, 4, "Successfully created log\n");
321
                return kSuccess;
322
        }
323
        else
324
        {
325
                ep_app_message(estat, "Error creating log %s", logname_c);
326
                return kGdpErrorCreate;
327
        }
328
}
329

    
330
// Create a new log with a random name and return the full name in
331
// `logname` and the open GIN in `ginp`.
332
GdpfsStatus CreateLog(std::string *logname, gdp_gin_t **ginp)
333
{
334

    
335
        assert (__writable_mode(GLOBAL_IOMODE));
336

    
337
#if 0
338
        const char *prefix;
339
        char suffix[12];
340

341
        prefix = ep_adm_getstrparam("swarm.gdpfs.logprefix", NULL);
342
        if (prefix == NULL || prefix[0] == '\0')
343
                prefix = getenv("GDP_NAME_ROOT");
344
        if (prefix == NULL || prefix[0] == '\0')
345
                prefix = LogPrefixDef;
346
        RandData(suffix, sizeof suffix);
347

348
        char generated_name[strlen(prefix) + sizeof suffix + 2];
349
        snprintf(generated_name, sizeof generated_name, "%s.%s", prefix, suffix);
350
#endif
351

    
352
        // passing a null string creates an anonymous log
353
        std::string nullname;
354
        gdp_gin_t *gin;
355
        GdpfsStatus kstat = RawCreateLog(nullname, &gin);
356
        if (kstat == kSuccess)
357
        {
358
                gdp_pname_t pname;
359
                gdp_printable_name(*gdp_gin_getname(gin), pname);
360
                logname->assign(pname);
361
                if (ginp != NULL)
362
                        *ginp = gin;
363
                else
364
                        gdp_gin_close(gin);
365
        }
366
        return kstat;
367
}
368

    
369
// Parsing utility function. Given a string s, parse this as
370
// an GDPfsMsg set with GDPfsFchunk. Returns offset,
371
// length and actual data (as a string).
372
GdpfsStatus ParseMsgFchunk(const std::string& s, size_t *offset,
373
                                                        size_t *len, std::string *d)
374
{
375
        GDPfs::GDPfsMsg m;
376
        bool stat;
377

    
378
        stat = m.ParseFromString(s);
379
        if (!stat) return kProtoParsingError;
380

    
381
        ep_dbg_cprintf(Dbg, 60, "Parsed message: %s\n",
382
                                                                m.DebugString().c_str());
383

    
384
        // FIXME this shouldn't be an assertion; we are reading
385
        // data potentially generated by a different program with
386
        // a different MAX_RECSIZE
387
        assert (m.fchunk().length() <= MAX_RECSIZE);
388

    
389
        *offset = (size_t) m.fchunk().offset();
390
        *len = (size_t) m.fchunk().length();
391
        d->assign(m.fchunk().data());
392

    
393
        return kSuccess;
394
}
395

    
396

    
397
/***********************************************************/
398
/***********************************************************/
399
/***********************************************************/
400

    
401
GDPFileLowLevel::GDPFileLowLevel(const std::string& rootlog, gdp_iomode_t mode)
402
{
403

    
404
        EP_STAT estat;
405
        bool usesig = false;
406
        gdp_open_info_t* info;
407

    
408
        // Let's just initialize things manually instead of
409
        // initialization lists, at least for now.
410
        logname_ = rootlog;
411
        mode_ = (gdp_iomode_t) ((uint32_t) mode & (uint32_t) GLOBAL_IOMODE);
412

    
413
        ep_dbg_cprintf(Dbg, 10, "Opening %s [mode=%d]\n", logname_.c_str(), mode_);
414
        estat = gdp_parse_name(logname_.c_str(), gobname_);
415
        if (!EP_STAT_ISOK(estat))
416
        {
417
                char ebuf[100];
418
                ep_app_fatal("cannot parse %s: %s",
419
                                logname_.c_str(),
420
                                ep_stat_tostr(estat, ebuf, sizeof ebuf));
421
        }
422
        info = gdp_open_info_new();
423

    
424
        // read a signature key if we are supposed to use signatures
425
        usesig = ep_adm_getboolparam("swarm.gdpfs.usesig", false);
426
        if (usesig && __writable_mode(mode_))
427
        {
428
                FILE *fp;
429
                EP_CRYPTO_KEY *skey = NULL;
430
                const char *keyfile = NULL;
431

    
432
                keyfile = ep_adm_getstrparam("swarm.gdpfs.keyfile", NULL);
433
                fp = fopen(keyfile, "r");
434
                if (fp == NULL)
435
                {
436
                        ep_app_error("cannot open signing key file %s", keyfile);
437
                        exit(70);        // EX_SOFTWARE XXX fix this
438
                }
439

    
440
                skey = ep_crypto_key_read_fp(fp, keyfile,
441
                                EP_CRYPTO_KEYFORM_PEM, EP_CRYPTO_F_SECRET);
442
                if (skey == NULL)
443
                {
444
                        ep_app_error("cannot read signing key file %s", keyfile);
445
                        exit(70);        // EX_SOFTWARE XXX fix this
446
                }
447

    
448
                estat = gdp_open_info_set_signing_key(info, skey);
449
        }
450

    
451
        if (usesig && !__writable_mode(mode_))
452
        {
453
                gdp_open_info_set_vrfy(info, true);
454
        }
455

    
456
        estat = gdp_gin_open(gobname_, mode_, info, &handle_);
457
        if (EP_STAT_ISFAIL(estat))
458
        {
459
                char ebuf[100];
460
                ep_app_fatal("cannot open %s: %s",
461
                                logname_.c_str(),
462
                                ep_stat_tostr(estat, ebuf, sizeof ebuf));
463
        }
464

    
465
        // initialize some useful state variables
466
        SyncLog();
467
}
468

    
469

    
470
GDPFileLowLevel::~GDPFileLowLevel()
471
{
472
        ep_dbg_cprintf(Dbg, 10, "Closing log %s\n", logname_.c_str());
473

    
474
        EP_STAT estat;
475
        std::map<gdp_recno_t, std::string*>::iterator it;
476

    
477
        estat = gdp_gin_close(handle_);
478

    
479
        // also need to free memory allocated for rec_cache
480
        for (it=rec_cache_.begin(); it!=rec_cache_.end(); it++)
481
        {
482
                ep_dbg_cprintf(Dbg, 40, "Freeing memory for rec: %ld\n", it->first);
483
                delete it->second;
484
        }
485
        ep_dbg_cprintf(Dbg, 10, "Done cleaning up memory\n");
486
}
487

    
488

    
489
// read contents of a record in string, returns read recno which may
490
// be different
491
gdp_recno_t GDPFileLowLevel::ReadRecord(gdp_recno_t recno, std::string *s)
492
{
493
        ep_dbg_cprintf(Dbg, 25, "Reading record %ld\n", recno);
494

    
495
        EP_STAT estat;
496
        gdp_buf_t *buf;
497
        size_t buflen, readbytes;
498
        gdp_recno_t _recno;
499
        gdp_datum_t *datum;
500
        char localbuf[MAX_RECSIZE];
501

    
502
        if (GetCache(recno, s)==kSuccess)
503
        {
504
                ep_dbg_cprintf(Dbg, 40, "Cache hit %ld\n", recno);
505
                // GetCache already filled "s" with data
506
                _recno = recno;
507
        }
508
        else
509
        {
510
                ep_dbg_cprintf(Dbg, 40, "Cache miss %ld\n", recno);
511
                datum = gdp_datum_new();
512
                estat = gdp_gin_read_by_recno(handle_, recno, datum);
513

    
514
                if (EP_STAT_ISOK(estat))
515
                {
516
                        buf = gdp_datum_getbuf(datum);
517
                        _recno = gdp_datum_getrecno(datum);
518
                        buflen = gdp_datum_getdlen(datum);
519

    
520
                        ep_dbg_cprintf(Dbg, 30, "Record size: %ld\n", buflen);
521

    
522
                        assert (buflen<=MAX_RECSIZE);
523
                        readbytes = gdp_buf_peek(buf, &localbuf, buflen);
524
                        assert (readbytes==buflen);
525

    
526
                        s->assign(localbuf, readbytes);
527
                        SetCache(_recno, s);
528
                }
529
                else
530
                {
531
                        ep_app_message(estat, "Error reading %ld", recno);
532
                        _recno = 0;
533
                }
534
                gdp_datum_free(datum);
535
        }
536

    
537
        // just a sanity check.
538
        if (recno > 0)
539
                assert (_recno == recno);
540

    
541
        return _recno;
542
}
543

    
544
// s should have space for at least numrec string pointers
545
// returns the number of records read.
546
int32_t GDPFileLowLevel::ReadRecordAsync(gdp_recno_t startrec,
547
                                                                                 int32_t numrec, std::string **s)
548
{
549
        ep_dbg_cprintf(Dbg, 25, "Reading record %ld[+%d]\n", startrec, numrec);
550

    
551
        EP_STAT estat;
552
        gdp_buf_t *buf;
553
        size_t buflen, readbytes;
554
        gdp_recno_t _recno;
555
        gdp_datum_t *datum;
556
        gdp_event_t *ev;
557
        int32_t ctr = 0;
558
        char localbuf[MAX_RECSIZE];
559

    
560
        // early exit if numrec == 0
561
        if (numrec <= 0)
562
                return 0;
563

    
564
        estat = gdp_gin_read_by_recno_async(handle_, startrec, numrec, NULL, NULL);
565

    
566
        if (EP_STAT_ISOK(estat))
567
        {
568
                while (true)
569
                {
570
                        // this blocks
571
                        ev = gdp_event_next(handle_, NULL);
572

    
573
                        if (gdp_event_gettype(ev) == GDP_EVENT_DONE)
574
                        {
575
                                gdp_event_free(ev);
576
                                break;
577
                        }
578

    
579
                        datum = gdp_event_getdatum(ev);
580

    
581
                        buf = gdp_datum_getbuf(datum);
582
                        _recno = gdp_datum_getrecno(datum);
583
                        buflen = gdp_datum_getdlen(datum);
584

    
585
                        ep_dbg_cprintf(Dbg, 30, "Record size: %ld\n", buflen);
586

    
587
                        assert (buflen<=MAX_RECSIZE);
588
                        readbytes = gdp_buf_peek(buf, &localbuf, buflen);
589
                        assert (readbytes==buflen);
590

    
591
                        s[ctr]->assign(localbuf, readbytes);
592
                        SetCache(_recno, s[ctr]);
593
                        ctr++;
594

    
595
                        gdp_event_free(ev);
596
                }
597
        }
598
        else
599
        {
600
                ep_app_message(estat, "Error reading %ld[+%d]", startrec, numrec);
601
        }
602

    
603
        ep_dbg_cprintf(Dbg, 25, "Done reading records\n");
604
        return ctr;
605
}
606

    
607

    
608
// Append data from the string.  This is a function, not a method, so it
609
// can be used when initializing a filesystem, taking the GIN as a parameter
610
// rather than from `GDPFileLowLevel::handle_`.
611
static gdp_recno_t RawAppend(
612
                                                gdp_gin_t *gin,
613
                                                const std::string& s,
614
                                                bool async /*=false*/)
615
{
616
        if (ep_dbg_test(Dbg, 25))
617
        {
618
                gdp_pname_t pname;
619

    
620
                ep_dbg_printf("RawAppend: %ld bytes (%s) to %s\n",
621
                        s.length(), async ? "async" : "sync",
622
                        gdp_printable_name(*gdp_gin_getname(gin), pname));
623
        }
624

    
625
        gdp_recno_t _recno;
626
        gdp_datum_t *datum;
627
        gdp_buf_t *buf;
628
        std::string *c;
629
        EP_STAT estat;
630

    
631
        datum = gdp_datum_new();
632
        buf = gdp_datum_getbuf(datum);
633
        gdp_buf_write(buf, s.data(), s.length());
634

    
635
        if (async)
636
                estat = gdp_gin_append_async(gin, 1, &datum, NULL, NULL, NULL);
637
        else
638
                estat = gdp_gin_append(gin, datum, NULL);
639

    
640
        if (!EP_STAT_ISOK(estat))
641
        {
642
                ep_app_message(estat, "Error appending data");
643
                _recno = -1;
644
        }
645
        else if (!async)
646
        {
647
                _recno = gdp_datum_getrecno(datum);
648
        }
649
        else
650
        {
651
                _recno = 0;
652
        }
653
        gdp_datum_free(datum);
654

    
655
        return _recno;
656
}
657

    
658
// Method version of the same thing.  Keeps track of the current
659
// record number.
660
GdpfsStatus GDPFileLowLevel::AppendRecord(
661
                                                                const std::string& s,
662
                                                                bool async /*=false*/)
663
{
664
        if (ep_dbg_test(Dbg, 22))
665
        {
666
                gdp_pname_t pname;
667
                ep_dbg_printf("GDPFileLowLevel::AppendRecord: %ld %s\n",
668
                                s.length(), logname_.c_str());
669
        }
670

    
671
        assert (__writable_mode(mode_));
672

    
673
        gdp_recno_t _recno = RawAppend(handle_, s, async);
674
        if (_recno == 0)
675
                _recno = maxrecs_ + 1;
676
        if (_recno < 0)
677
                return kFailure;
678
        SetCache(_recno, &s);
679
        maxrecs_ = _recno;
680
        return kSuccess;
681
}
682

    
683
void GDPFileLowLevel::SetType(GDPfs::FileType type)
684
{
685
        ep_dbg_cprintf(Dbg, 10, "SetType(%d)\n", (int)type);
686
        assert (__writable_mode(mode_));
687

    
688
        std::string s;
689
        GDPfs::GDPfsMsg m;
690
        GDPfs::GDPfsMeta *meta;
691

    
692
        meta = m.mutable_meta();
693
        meta->set_type(type);
694

    
695
        m.set_time_ns(TimeNS());
696

    
697
        m.SerializeToString(&s);
698
        AppendRecord(s);
699

    
700
        // also update the local variable type
701
        type_ = type;
702
}
703

    
704

    
705
GDPfs::FileType GDPFileLowLevel::GetType(bool sync /*=false*/)
706
{
707
        ep_dbg_cprintf(Dbg, 15, "Querying for type\n");
708
        if (sync==true)
709
                SyncLog();
710
        return type_;
711
}
712

    
713

    
714
gdp_recno_t GDPFileLowLevel::GetNumRecs(bool sync /*=false*/)
715
{
716
        if (sync == true)
717
                SyncLog();
718
        ep_dbg_cprintf(Dbg, 15, "# records: %ld\n", maxrecs_);
719
        return maxrecs_;
720
}
721

    
722
uint64_t GDPFileLowLevel::GetMTime()
723
{
724
        ep_dbg_cprintf(Dbg, 15, "GetMTime()\n");
725
        SyncLog(); // we always sync.
726
        return mtime_ns_;
727
}
728

    
729

    
730
void GDPFileLowLevel::SyncLog()
731
{
732
        ep_dbg_cprintf(Dbg, 15, "sync: checking for new data\n");
733

    
734
        gdp_recno_t r, maxrecs;
735
        std::string s;
736
        GDPfs::GDPfsMsg m;
737

    
738
        maxrecs = gdp_gin_getnrecs(handle_);
739

    
740
        if (maxrecs > maxrecs_)                // something new
741
        {
742
                ep_dbg_cprintf(Dbg, 15, "New records\n");
743

    
744
                // do we need to check for type?
745
                if (maxrecs_ == 0)        // yes; we didn't have type info already
746
                {
747
                        assert (type_ == GDPfs::UNKNOWN_TYPE);
748

    
749
                        r = ReadRecord(1, &s);
750
                        assert (r==1);
751

    
752
                        m.ParseFromString(s);
753
                        assert (m.has_meta());
754

    
755
                        type_ = m.meta().type();
756
                }
757

    
758
                {
759
                        // may as well cause cache hit, so need to worry
760
                        // about extra network overhead.
761
                        r = ReadRecord(maxrecs, &s);
762
                        m.ParseFromString(s);
763
                        mtime_ns_ = m.time_ns();
764
                }
765

    
766
                maxrecs_ = maxrecs;
767
        }
768

    
769
        // logging... nothing fancy.
770
        if ep_dbg_test(Dbg, 15)
771
        {
772
                switch (type_)
773
                {
774
                        case GDPfs::UNKNOWN_TYPE:
775
                                ep_dbg_printf("Type: UNKNOWN_TYPE\n");
776
                                break;
777
                        case GDPfs::FILE:
778
                                ep_dbg_printf("Type: FILE\n");
779
                                break;
780
                        case GDPfs::DIR:
781
                                ep_dbg_printf("Type: DIR\n");
782
                                break;
783
                        default:
784
                                ep_dbg_printf("Can't figure out type\n");
785
                }
786

    
787
                ep_dbg_printf("Last update time: %ld\n", mtime_ns_);
788
                ep_dbg_printf("Number of records: %ld\n", maxrecs_);
789
        }
790

    
791
        return;
792
}
793

    
794
// copies data from string s, and sets the cache
795
GdpfsStatus GDPFileLowLevel::SetCache(const gdp_recno_t recno,
796
                                                                                        const std::string *s)
797
{
798
        ep_dbg_cprintf(Dbg, 40, "Setting cache for %ld\n", recno);
799
        if (recno <= 0)
800
                return kFailure;
801

    
802
        std::string *c;
803
        c = new std::string;
804
        c->assign(s->data(), s->length());
805
        rec_cache_[recno] = c;
806

    
807
        return kSuccess;
808
}
809

    
810
// attempts to find recno in cache and set string with the appropriate
811
// value. Returns false if recno can not be found in the cache.
812
GdpfsStatus GDPFileLowLevel::GetCache(const gdp_recno_t recno, std::string *s)
813
{
814
        ep_dbg_cprintf(Dbg, 40, "Retrieving %ld from cache\n", recno);
815
        std::string *sptr;
816

    
817
        if (recno <= 0)
818
                return kFailure;
819

    
820
        if (rec_cache_.find(recno) != rec_cache_.end())
821
        {
822
                sptr = rec_cache_.find(recno)->second;
823
                s->assign(*sptr);
824
                return kSuccess;
825
        }
826
        return kFailure;
827
}
828

    
829

    
830
/***********************************************************/
831
/***********************************************************/
832
/***********************************************************/
833

    
834
GDPDir::GDPDir(const std::string& rootlog, gdp_iomode_t mode)
835
                                : GDPFileLowLevel(rootlog, mode)
836
{
837
        int i;
838
        std::string s, name, _logname_;
839
        GDPfs::GDPfsMsg m;
840
        GDPfs::Operation op;
841
        std::map<std::string, std::string>::iterator it;
842

    
843
        if (type_ != GDPfs::DIR)
844
        {
845
                if (__writable_mode(mode_))
846
                        SetType(GDPfs::DIR);
847
                else
848
                        ep_dbg_cprintf(Dbg, 1, "FileType isn't setup properly, "
849
                                                                        "but we are in R/O mode\n");
850
        }
851

    
852
        // just to populate our local cache with ReadRecordAsync
853
        if (maxrecs_ > 0)
854
        {
855
                int i, readrecs, numrec;
856
                std::string **sarray;
857

    
858
                numrec = maxrecs_-1;
859
                sarray = new std::string* [numrec];
860

    
861
                for (i=0; i<numrec; i++)
862
                        sarray[i] = new std::string;
863

    
864
                readrecs = ReadRecordAsync(2, numrec, sarray);
865
                assert (readrecs == numrec);
866

    
867
                for (i=0; i<numrec; i++)
868
                        delete sarray[i];
869
                delete sarray;
870
        }
871

    
872
        // populate dentries_
873
        for (i=2; i<=maxrecs_; i++)
874
        {
875
                s.clear();
876
                name.clear();
877
                _logname_.clear();
878

    
879
                ReadRecord(i, &s);
880
                m.ParseFromString(s);
881

    
882
                op = m.dentry().op();
883

    
884
                if (op == GDPfs::UNKNOWN_OP)
885
                {
886
                        ep_dbg_cprintf(Dbg, 1, "unknown op in dentry\n");
887
                }
888
                else
889
                {
890
                        name.assign(m.dentry().name());
891
                        it = dentries_.find(name);
892
                        if (op == GDPfs::DELETE)
893
                        {
894
                                if (it != dentries_.end())
895
                                {
896
                                        ep_dbg_cprintf(Dbg, 10, "Deleting entry: %s\n",
897
                                                                                                        name.c_str());
898
                                        dentries_.erase(name);
899
                                }
900
                        }
901
                        else if (op == GDPfs::ADD)
902
                        {
903
                                _logname_.assign(m.dentry().logname());
904
                                ep_dbg_cprintf(Dbg, 13, "Entry: %s => %s\n",
905
                                                                                name.c_str(), _logname_.c_str());
906
                                if (it != dentries_.end())
907
                                {
908
                                        ep_dbg_cprintf(Dbg, 15, "First deleting %s\n",
909
                                                                                                        name.c_str());
910
                                        dentries_.erase(name);
911
                                }
912
                                dentries_[name] = _logname_;
913
                        }
914
                        else
915
                                assert (false);
916
                }
917
        }
918

    
919
        ep_dbg_cprintf(Dbg, 12, "loaded %ld entries\n", dentries_.size());
920
}
921

    
922

    
923
// create a new file+log; stores the logname. If recursive==true,
924
// any non-existent paths on the way are created.
925
void GDPDir::NewFile(const std::string name, std::string *logname,
926
                                                bool recursive)
927
{
928
        ep_dbg_cprintf(Dbg, 5, "Creating new file: %s\n", name.c_str());
929
        NewEntry(name, GDPfs::FILE, logname, recursive);
930
        ep_dbg_cprintf(Dbg, 5, "Done creating new file: %s => %s\n",
931
                                                                        name.c_str(), logname->c_str());
932
}
933

    
934
// create a directory. Any non-existent parent directories
935
// are created if recursive==true
936
void GDPDir::CreateDir(const std::string name, bool recursive)
937
{
938
        ep_dbg_cprintf(Dbg, 5, "Creating new dir: %s\n", name.c_str());
939
        NewEntry(name, GDPfs::DIR, NULL, recursive);
940
        ep_dbg_cprintf(Dbg, 5, "Done creating new dir: %s\n", name.c_str());
941
}
942

    
943

    
944
// Add a new entry to the meta-log describing the item with "file
945
// system" name `name` and type `t`.  If `recursive` is set, any
946
// intermediate "directories" will be created.  If `t` is
947
// `GDPfs::FILE` it will also create a new log to hold the contents
948
// and store the name of that log in `logname`.
949
void GDPDir::NewEntry(std::string name, GDPfs::FileType t,
950
                                                std::string *logname, bool recursive)
951
{
952
        gdp_pname_t pname;
953
        ep_dbg_cprintf(Dbg, 10, "Adding entry %s to log %s\n",
954
                        name.c_str(),
955
                        gdp_printable_name(*gdp_gin_getname(handle_), pname));
956

    
957
        if (NameExists(name))
958
        {
959
                ep_dbg_cprintf(Dbg, 5, "Name [%s] exists\n", name.c_str());
960
                if (t==GDPfs::FILE)
961
                        GetEntryLogname(name, logname);
962
                return;
963
        }
964

    
965
        // first, some assertions
966
        assert (__writable_mode(mode_));
967
        assert (!NameExists(name));
968
        if (t==GDPfs::DIR)
969
                assert (logname == NULL);
970

    
971
        // declare variables
972
        std::string dirname, basename;
973

    
974
        // make sure the parent directory exists
975
        BaseDirName(name, &dirname, &basename);
976
        if ((dirname.length()!=0) && recursive)
977
        {
978
                // recursively create the path.
979
                ep_dbg_cprintf(Dbg, 10, "Creating new dir [%s]\n",
980
                                                                                        dirname.c_str());
981
                NewEntry(dirname, GDPfs::DIR, NULL, recursive);
982
        }
983

    
984
        // this should be fine now, unless we were asked
985
        // explicitly to not create the dirname
986
        assert (dirname.length()==0 || NameExists(dirname));
987

    
988
        // create a log if we are creating a new file.
989
        if (t == GDPfs::FILE)
990
        {
991
                ep_dbg_cprintf(Dbg, 10, "Creating new log\n");
992
                CreateLog(logname, NULL);
993
                // the following just initializes the file type
994
                //GDPFile f(logname, GDP_MODE_RA);
995
        }
996

    
997
        // actually add the entry to the metainfo log
998
        AddEntry(name, *logname, t);
999
}
1000

    
1001

    
1002
// Add an entry to the meta log for this filesystem that describes a new
1003
// file or directory.  The `name` is the human-oriented "file name",
1004
// `logname` is the internal log name (relevent for files only),
1005
// and `t` is the type (GDPfs::DIR or GDPfs::FILE).
1006
void GDPDir::AddEntry(std::string name, std::string logname, GDPfs::FileType t)
1007
{
1008

    
1009
        // create a GDPfsMsg that we'll append to the metadata log
1010
        if (ep_dbg_test(Dbg, 15))
1011
        {
1012
                gdp_pname_t pname;
1013
                ep_dbg_printf("Adding [%s => %s] to metalog %s\n",
1014
                                                name.c_str(), logname.c_str(),
1015
                                                gdp_printable_name(*gdp_gin_getname(handle_), pname));
1016
        }
1017

    
1018
        assert (__writable_mode(mode_));
1019

    
1020
        std::string s;
1021
        GDPfs::GDPfsMsg m;
1022
        GDPfs::GDPfsDentry *d;
1023
        std::map<std::string, std::string>::iterator it;
1024

    
1025
        d = m.mutable_dentry();
1026
        d->set_op(GDPfs::ADD);
1027
        d->set_name(name);
1028
        if (!logname.empty())
1029
                d->set_logname(logname);
1030
        d->set_type(t);
1031

    
1032
        m.set_time_ns(TimeNS());
1033
        // actually append it to the log
1034
        m.SerializeToString(&s);
1035
        AppendRecord(s);
1036

    
1037
        // also update our local state
1038
        it = dentries_.find(name);
1039
        if (it != dentries_.end())
1040
                dentries_.erase(name);
1041
        dentries_[name] = logname;
1042

    
1043
}
1044

    
1045

    
1046
// Add a "delete" entry to the master meta file.  This undoes the effect
1047
// of the corresponding AddEntry.
1048
void GDPDir::DelEntry(std::string name)
1049
{
1050

    
1051
        ep_dbg_cprintf(Dbg, 10, "Deleting entry %s\n", name.c_str());
1052
        assert (__writable_mode(mode_));
1053

    
1054
        std::string s;
1055
        GDPfs::GDPfsMsg m;
1056
        GDPfs::GDPfsDentry *d;
1057
        std::map<std::string, std::string>::iterator it;
1058

    
1059
        d = m.mutable_dentry();
1060
        d->set_op(GDPfs::DELETE);
1061
        d->set_name(name);
1062

    
1063
        m.set_time_ns(TimeNS());
1064
        m.SerializeToString(&s);
1065
        AppendRecord(s);
1066

    
1067
        // also update our local state
1068
        it = dentries_.find(name);
1069
        if (it != dentries_.end())
1070
                dentries_.erase(name);
1071
}
1072

    
1073
void GDPDir::RenameEntry(const std::string& oldname, const std::string& newname)
1074
{
1075

    
1076
        ep_dbg_cprintf(Dbg, 10, "Renaming entry %s to %s\n",
1077
                                                                oldname.c_str(), newname.c_str());
1078
        assert (__writable_mode(mode_));
1079
        assert (NameExists(oldname));
1080
        if (NameExists(newname))
1081
                ep_dbg_cprintf(Dbg, 2, "Overwriting existing name\n");
1082

    
1083
        // collect information about our source
1084
        std::string logname;
1085
        GDPfs::FileType t;
1086
        t = GetEntryType(oldname);
1087
        if (t == GDPfs::FILE)
1088
                GetEntryLogname(oldname, &logname);
1089

    
1090
        // execute the delete + add
1091
        DelEntry(oldname);
1092
        AddEntry(newname, logname, t);
1093
}
1094

    
1095
void GDPDir::GetChildren(const std::string& dirname,
1096
                                                        std::vector<std::string>* children)
1097
{
1098

    
1099
        std::string dir, base;
1100
        std::map<std::string, std::string>::iterator it;
1101

    
1102
        for (it= dentries_.begin(); it != dentries_.end(); it++)
1103
        {
1104
                BaseDirName(it->first, &dir, &base);
1105
                ep_dbg_cprintf(Dbg, 15, "comparing %s with %s\n",
1106
                                                        dir.c_str(), dirname.c_str());
1107
                if (dir.compare(dirname) == 0)
1108
                {
1109
                        ep_dbg_cprintf(Dbg, 12, "Child => %s\n", base.c_str());
1110
                        children->push_back(base);
1111
                }
1112
        }
1113
}
1114

    
1115

    
1116
void GDPDir::GetMatchingPaths(const std::string& pattern,
1117
                                                                std::vector<std::string>* results)
1118
{
1119

    
1120
        ep_dbg_cprintf(Dbg, 4, "GetMatchingPaths(%s)\n", pattern.c_str());
1121
        ep_dbg_cprintf(Dbg, 1, "Note that this isn't fully implemented yet.\n");
1122

    
1123
        if (pattern.find('*')!=std::string::npos)
1124
        {
1125
                ep_dbg_cprintf(Dbg, 1, " '*' Not yet implemented\n");
1126
                return;
1127
        }
1128
        if (pattern.find('?')!=std::string::npos)
1129
        {
1130
                ep_dbg_cprintf(Dbg, 1, " '?' Not yet implemented\n");
1131
                return;
1132
        }
1133
        if (pattern.find('[')!=std::string::npos)
1134
        {
1135
                ep_dbg_cprintf(Dbg, 1, " '[' Not yet implemented\n");
1136
                return;
1137
        }
1138
        if (pattern.find(']')!=std::string::npos)
1139
        {
1140
                ep_dbg_cprintf(Dbg, 1, " ']' Not yet implemented\n");
1141
                return;
1142
        }
1143
        if (pattern.find("\\\\")!=std::string::npos)
1144
        {
1145
                ep_dbg_cprintf(Dbg, 1, " ds Not yet implemented\n");
1146
                return;
1147
        }
1148

    
1149
        size_t pos;
1150
        std::map<std::string, std::string>::iterator it;
1151

    
1152
        for (it=dentries_.begin(); it!=dentries_.end(); it++)
1153
        {
1154
                ep_dbg_cprintf(Dbg, 30, "Matching %s against: %s\n",
1155
                                                                        pattern.c_str(), it->first.c_str());
1156
                pos = it->first.find(pattern);
1157
                if (pos == 0)
1158
                {
1159
                        ep_dbg_cprintf(Dbg, 22, "Found match %s\n", it->first.c_str());
1160
                        results->push_back(it->first);
1161
                }
1162
        }
1163

    
1164
        return;
1165

    
1166
}
1167

    
1168

    
1169
bool GDPDir::NameExists(const std::string& name)
1170
{
1171
        ep_dbg_cprintf(Dbg, 20, "NameExists(%s)\n", name.c_str());
1172
        std::map<std::string, std::string>::iterator it;
1173
        it = dentries_.find(name);
1174
        return (it != dentries_.end());
1175
}
1176

    
1177
void GDPDir::Stat(const std::string& name, GdpStat* stat)
1178
{
1179
        ep_dbg_cprintf(Dbg, 4, "Stat(%s)\n", name.c_str());
1180

    
1181
        stat->is_directory = (GetEntryType(name) == GDPfs::DIR);
1182
        if (!stat->is_directory)
1183
        {
1184
                // first get the logname
1185
                std::string logname;
1186
                GetEntryLogname(name, &logname);
1187
                GDPFile f(logname, GDP_MODE_RO);
1188

    
1189
                stat->length = (uint64_t) f.GetFileSize();
1190
                stat->mtime_nsec = f.GetMTime();
1191
        }
1192
        else
1193
        {
1194
                stat->length = 0;
1195
                stat->mtime_nsec = TimeNS();        // XXX: fix this
1196
        }
1197

    
1198
        ep_dbg_cprintf(Dbg, 4, "Stat(%s)=%ld, %ld\n", name.c_str(),
1199
                                                                        stat->length, stat->mtime_nsec);
1200
}
1201

    
1202

    
1203
// Given a gdpfs file name, return the corresponding log name.
1204
void GDPDir::GetEntryLogname(const std::string& name,
1205
                                                        std::string *logname)
1206
{
1207
        assert (NameExists(name));
1208

    
1209
        std::map<std::string, std::string>::iterator it;
1210
        it = dentries_.find(name);
1211
        assert (it != dentries_.end());
1212
        logname->assign(it->second.c_str(), it->second.length());
1213
}
1214

    
1215
GDPfs::FileType GDPDir::GetEntryType(const std::string& name)
1216
{
1217
        assert (NameExists(name));
1218

    
1219
        std::map<std::string, std::string>::iterator it;
1220
        it = dentries_.find(name);
1221
        assert (it != dentries_.end());
1222
        if (it->second.empty())
1223
                return GDPfs::DIR;        // dirs don't have a logname associated
1224
        else
1225
                return GDPfs::FILE;
1226
}
1227

    
1228
/***********************************************************/
1229
/***********************************************************/
1230
/***********************************************************/
1231

    
1232
GDPFile::GDPFile(const std::string& logname, gdp_iomode_t mode)
1233
                                : GDPFileLowLevel(logname, mode)
1234
{
1235
        if (type_ != GDPfs::FILE)
1236
        {
1237
                if (__writable_mode(mode))
1238
                        SetType(GDPfs::FILE);
1239
                else
1240
                        ep_dbg_cprintf(Dbg, 1, "FileType isn't set properly, "
1241
                                                                                "but we are in RO mode.\n");
1242
        }
1243
        filesize_ = GetFileSize(true);
1244
}
1245

    
1246
size_t GDPFile::GetFileSize(bool sync /*=true*/)
1247
{
1248
        ep_dbg_cprintf(Dbg, 15, "Querying for filesize\n");
1249

    
1250
        size_t offset, len;
1251
        std::string msg, parsed;
1252
        assert (type_ == GDPfs::FILE);
1253

    
1254
        if (sync)
1255
        {
1256
                maxrecs_ = GetNumRecs(true);
1257
                ReadRecord(maxrecs_, &msg);
1258
                ParseMsgFchunk(msg, &offset, &len, &parsed);
1259
                filesize_ = offset + len;
1260
        }
1261

    
1262
        ep_dbg_cprintf(Dbg, 15, "Filesize is %ld\n", filesize_);
1263
        return filesize_;
1264
}
1265
/***********************************************************/
1266
/***********************************************************/
1267
/***********************************************************/
1268

    
1269
GDPFileROMemMap::GDPFileROMemMap(const std::string& logname,
1270
                                                                        bool async/*=false*/)
1271
                                : GDPFile(logname, GDP_MODE_RO)
1272
{
1273
        ep_dbg_cprintf(Dbg, 4, "initializing memmap\n");
1274
        filedata_.clear();
1275
        FetchRecords(2, maxrecs_);
1276
}
1277

    
1278
// fetches records [startrec...endrec] (both ends including)
1279
void GDPFileROMemMap::FetchRecords(gdp_recno_t startrec,
1280
                                                                   gdp_recno_t endrec, bool async)
1281
{
1282

    
1283
        if (endrec < startrec)        // early exit
1284
                return;
1285

    
1286
        int i, j=0, readrecs, numrec;
1287
        size_t offset, len;
1288
        std::string s, parsed;
1289
        std::string **sarray;
1290

    
1291
        if (!async)
1292
        {
1293
                for (i=startrec; i<=endrec; i++)
1294
                {
1295
                        s.clear();
1296
                        parsed.clear();
1297
                        ReadRecord(i, &s);
1298
                        ParseMsgFchunk(s, &offset, &len, &parsed);
1299

    
1300
                        filedata_.append(parsed.c_str(), parsed.length());
1301
                }
1302
        }
1303
        else
1304
        {
1305
                // this might have problems with 0 records
1306
                numrec = endrec-startrec+1;
1307
                sarray = new std::string* [numrec];
1308

    
1309
                for (i=0; i<numrec; i++)
1310
                        sarray[i] = new std::string;
1311

    
1312
                readrecs = ReadRecordAsync(startrec, numrec, sarray);
1313
                assert (readrecs == numrec);
1314

    
1315
                for (i=0; i<readrecs; i++)
1316
                {
1317
                        ParseMsgFchunk(*sarray[i], &offset, &len, &parsed);
1318
                        filedata_.append(parsed.c_str(), parsed.length());
1319
                }
1320

    
1321
                for (i=0; i<numrec; i++)
1322
                        delete sarray[i];
1323
                delete sarray;
1324
        }
1325
}
1326

    
1327

    
1328
/***********************************************************/
1329
/***********************************************************/
1330
/***********************************************************/
1331

    
1332
size_t GDPFileRO::Read(size_t offset, size_t len, char *buf)
1333
{
1334

    
1335
        // XXX A lame attempt at tolerating situations where
1336
        // someone updates the file elsewhere.
1337
        gdp_recno_t cur_recs = maxrecs_;
1338
        SyncLog();
1339
        if (maxrecs_ > cur_recs)
1340
        {
1341
                ep_dbg_cprintf(Dbg, 5, "The file seems to have updated\n");
1342
                // need to update things?
1343
                filesize_ = GetFileSize();
1344
                FetchRecords(cur_recs+1, maxrecs_);
1345
        }
1346
        else
1347
                ep_dbg_cprintf(Dbg, 9, "Reading data; file hasn't been updated\n");
1348

    
1349
        size_t read = offset + len > filesize_ ? filesize_ - offset : len;
1350
        memcpy(buf, filedata_.c_str()+offset, read);
1351
        return read;
1352
}
1353

    
1354
/***********************************************************/
1355
/***********************************************************/
1356
/***********************************************************/
1357

    
1358

    
1359
void GDPFileWO::Append(const std::string& s, bool async/*=false*/)
1360
{
1361
        // append the data in the string
1362
        ep_dbg_cprintf(Dbg, 25, "Appending %ld bytes\n", s.length());
1363

    
1364
        std::string tmp, a;
1365
        GDPfs::GDPfsMsg m;
1366
        GDPfs::GDPfsFchunk* f;
1367
        gdp_event_t *ev;
1368
        int i, _l, num_appends=0, acks=0;
1369

    
1370
        // Split the string into smaller chunks, and call appropriate
1371
        // append (sync vs async)
1372
        for (i=0; i<s.length(); i=i+MAX_WRITESIZE)
1373
        {
1374
                tmp.assign(s, i, MAX_WRITESIZE);
1375

    
1376
                _l = tmp.length();
1377
                f = m.mutable_fchunk();
1378

    
1379
                f->set_offset((uint64_t)filesize_);
1380
                f->set_length((uint32_t)_l);
1381
                f->set_data(tmp);
1382

    
1383
                m.set_time_ns(TimeNS());
1384
                m.SerializeToString(&a);
1385
                ep_dbg_cprintf(Dbg, 60, "Appending %s\n",
1386
                                                                m.DebugString().c_str());
1387

    
1388
                // actual append call.
1389
                AppendRecord(a, async);
1390

    
1391
                num_appends++;
1392
                if (!async) acks++;
1393

    
1394
                filesize_ += _l;
1395
        }
1396

    
1397
        // if we used asynchronous version, fetch the status codes.
1398
        if (async)
1399
        {
1400
                while (acks<num_appends)
1401
                {
1402
                        ev = gdp_event_next(handle_, NULL);
1403
                        if (gdp_event_gettype(ev) == GDP_EVENT_CREATED ||
1404
                                        gdp_event_gettype(ev) == GDP_EVENT_SUCCESS)
1405
                                acks++;
1406
                        else
1407
                        {
1408
                                ep_dbg_cprintf(Dbg, 2, "Unknown event type\n");
1409
                                gdp_event_print(ev, stderr);
1410
                                break;
1411
                        }
1412
                }
1413
                assert (acks == num_appends);
1414
        }
1415
}