forked from membase/ep-engine
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmutation_log.hh
More file actions
525 lines (417 loc) · 12.2 KB
/
mutation_log.hh
File metadata and controls
525 lines (417 loc) · 12.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
#ifndef MUTATION_LOG_HH
#define MUTATION_LOG_HH 1
#include <vector>
#include <set>
#include <iterator>
#include <limits>
#include <algorithm>
#include <functional>
#include <numeric>
#include <iterator>
#include <exception>
#include <cstdio>
#include <fcntl.h>
#include "common.hh"
#include "atomic.hh"
#include "histo.hh"
#define ML_BUFLEN (128 * 1024 * 1024)
const size_t MIN_LOG_HEADER_SIZE(4096);
const uint8_t MUTATION_LOG_MAGIC(0x45);
const size_t HEADER_RESERVED(4);
const uint32_t LOG_VERSION(1);
const size_t LOG_ENTRY_BUF_SIZE(512);
const int DISABLED_FD(-3);
const uint8_t SYNC_COMMIT_1(1);
const uint8_t SYNC_COMMIT_2(2);
const uint8_t SYNC_FULL(SYNC_COMMIT_1 | SYNC_COMMIT_2);
const uint8_t FLUSH_COMMIT_1(4);
const uint8_t FLUSH_COMMIT_2(8);
const uint8_t FLUSH_FULL(FLUSH_COMMIT_1 | FLUSH_COMMIT_2);
const uint8_t DEFAULT_SYNC_CONF(FLUSH_COMMIT_2 | SYNC_COMMIT_2);
/**
* The header block representing the first 4k (or so) of a MutationLog
* file.
*/
class LogHeaderBlock {
public:
LogHeaderBlock() : _version(htonl(LOG_VERSION)), _blockSize(0), _blockCount(0), _rdwr(1) {
}
void set(uint32_t bs, uint32_t bc=1) {
_blockSize = htonl(bs);
_blockCount = htonl(bc);
}
void set(const uint8_t *buf, size_t buflen) {
assert(buflen == MIN_LOG_HEADER_SIZE);
int offset(0);
memcpy(&_version, buf + offset, sizeof(_version));
offset += sizeof(_version);
memcpy(&_blockSize, buf + offset, sizeof(_blockSize));
offset += sizeof(_blockSize);
memcpy(&_blockCount, buf + offset, sizeof(_blockCount));
offset += sizeof(_blockCount);
memcpy(&_rdwr, buf + offset, sizeof(_rdwr));
offset += sizeof(_rdwr);
}
uint32_t version() const {
return ntohl(_version);
}
uint32_t blockSize() const {
return ntohl(_blockSize);
}
uint32_t blockCount() const {
return ntohl(_blockCount);
}
uint32_t rdwr() const {
return ntohl(_rdwr);
}
void setRdwr(uint32_t nval) {
_rdwr = htonl(nval);
}
private:
uint32_t _version;
uint32_t _blockSize;
uint32_t _blockCount;
uint32_t _rdwr;
};
typedef enum {
ML_NEW, ML_DEL, ML_DEL_ALL, ML_COMMIT1, ML_COMMIT2
} mutation_log_type_t;
#define MUTATION_LOG_TYPES 5
extern const char *mutation_log_type_names[];
/**
* An entry in the MutationLog.
*/
class MutationLogEntry {
public:
/**
* Initialize a new entry inside the given buffer.
*
* @param r the rowid
* @param t the type of log entry
* @param vb the vbucket
* @param k the key
*/
static MutationLogEntry* newEntry(uint8_t *buf,
uint64_t r, mutation_log_type_t t,
uint16_t vb, const std::string &k) {
return new (buf) MutationLogEntry(r, t, vb, k);
}
/**
* Initialize a new entry using the contents of the given buffer.
*
* @param buf a chunk of memory thought to contain a valid MutationLogEntry
* @param buflen the length of said buf
*/
static MutationLogEntry* newEntry(uint8_t *buf, size_t buflen) {
assert(buflen >= len(0));
MutationLogEntry *me = reinterpret_cast<MutationLogEntry*>(buf);
assert(me->magic == MUTATION_LOG_MAGIC);
assert(buflen >= me->len());
return me;
}
void operator delete(void *) {
// Statically buffered. There is no delete.
}
/**
* The size of a MutationLogEntry, in bytes, containing a key of
* the specified length.
*/
static size_t len(size_t klen) {
// 13 == the exact empty record size as will be packed into
// the layout
return 13 + klen;
}
/**
* The number of bytes of the serialized form of this
* MutationLogEntry.
*/
size_t len() const {
return MutationLogEntry::len(keylen);
}
/**
* This entry's key.
*/
const std::string key() const {
return std::string(_key, keylen);
}
/**
* This entry's rowid.
*/
uint64_t rowid() const;
/**
* This entry's vbucket.
*/
uint16_t vbucket() const {
return ntohs(_vbucket);
}
/**
* The type of this log entry.
*/
uint8_t type() const {
return _type;
}
private:
friend std::ostream& operator<< (std::ostream& out,
const MutationLogEntry &e);
MutationLogEntry(uint64_t r, mutation_log_type_t t,
uint16_t vb, const std::string &k)
: _rowid(htonll(r)), _vbucket(htons(vb)), magic(MUTATION_LOG_MAGIC),
_type(static_cast<uint8_t>(t)),
keylen(static_cast<uint8_t>(k.length())) {
assert(k.length() <= std::numeric_limits<uint8_t>::max());
memcpy(_key, k.data(), k.length());
}
uint64_t _rowid;
uint16_t _vbucket;
uint8_t magic;
uint8_t _type;
uint8_t keylen;
char _key[1];
DISALLOW_COPY_AND_ASSIGN(MutationLogEntry);
};
std::ostream& operator <<(std::ostream &out, const MutationLogEntry &mle);
/**
* The MutationLog records major key events to allow ep-engine to more
* quickly restore the server to its previous state upon restart.
*/
class MutationLog {
public:
MutationLog(const std::string &path, const size_t bs=4096);
~MutationLog();
void newItem(uint16_t vbucket, const std::string &key, uint64_t rowid);
void delItem(uint16_t vbucket, const std::string &key);
void deleteAll(uint16_t vbucket);
void commit1();
void commit2();
void flush();
void sync();
void disable();
bool isEnabled() const {
return file != DISABLED_FD;
}
bool isOpen() const {
return file >= 0;
}
LogHeaderBlock header() const {
return headerBlock;
}
void setSyncConfig(uint8_t sconf) {
syncConfig = sconf;
}
uint8_t getSyncConfig() const {
return syncConfig & SYNC_FULL;
}
uint8_t getFlushConfig() const {
return syncConfig & FLUSH_FULL;
}
size_t getBlockSize() const {
return blockSize;
}
bool exists() const;
const std::string &getLogFile() const { return logPath; }
/**
* Open and initialize the log.
*
* This typically happens automatically.
*/
void open(bool _readOnly = false);
/**
* Close the log file.
*/
void close();
/**
* Reset the log.
*/
bool reset();
/**
* Replace the current log with a given log.
*/
bool replaceWith(MutationLog &mlog);
bool setSyncConfig(const std::string &s);
bool setFlushConfig(const std::string &s);
/**
* Reset the item type counts to the given values.
*
* This is used by the loader as part of initialization.
*/
void resetCounts(size_t *);
/**
* Exception thrown upon failure to read a mutation log.
*/
class WriteException : public std::runtime_error {
public:
WriteException(const std::string &s) : std::runtime_error(s) {}
};
/**
* Exception thrown upon failure to read a mutation log.
*/
class ReadException : public std::runtime_error {
public:
ReadException(const std::string &s) : std::runtime_error(s) {}
};
class FileNotFoundException : public ReadException {
public:
FileNotFoundException(const std::string &s) : ReadException(s) {}
};
/**
* Exception thrown when a CRC mismatch is read in the log.
*/
class CRCReadException : public ReadException {
public:
CRCReadException() : ReadException("CRC Mismatch") {}
};
/**
* Exception thrown when a short read occurred.
*/
class ShortReadException : public ReadException {
public:
ShortReadException() : ReadException("Short Read") {}
};
/**
* An iterator for the mutation log.
*
* A ReadException may be thrown at any point along iteration.
*/
class iterator : public std::iterator<std::input_iterator_tag,
const MutationLogEntry*> {
public:
iterator(const iterator& mit);
~iterator();
iterator& operator++();
iterator& operator++(int);
bool operator==(const iterator& rhs);
bool operator!=(const iterator& rhs);
const MutationLogEntry* operator*();
private:
friend class MutationLog;
iterator(const MutationLog *l, bool e=false);
void nextBlock();
size_t bufferBytesRemaining();
void prepItem();
const MutationLog *log;
uint8_t *entryBuf;
uint8_t *buf;
uint8_t *p;
off_t offset;
uint16_t items;
bool isEnd;
};
/**
* An iterator pointing to the beginning of the log file.
*/
iterator begin() {
iterator it(iterator(this));
it.nextBlock();
return it;
}
/**
* An iterator pointing at the end of the log file.
*/
iterator end() {
return iterator(this, true);
}
//! Items logged by type.
Atomic<size_t> itemsLogged[MUTATION_LOG_TYPES];
//! Histogram of block padding sizes.
Histogram<uint32_t> paddingHisto;
//! Flush time histogram.
Histogram<hrtime_t> flushTimeHisto;
//! Sync time histogram.
Histogram<hrtime_t> syncTimeHisto;
//! Size of the log
Atomic<size_t> logSize;
private:
void needWriteAccess(void) {
if (readOnly) {
throw WriteException("Invalid access (file opened read only)");
}
}
void writeEntry(MutationLogEntry *mle);
void writeInitialBlock();
void readInitialBlock();
void updateInitialBlock(void);
void prepareWrites();
int fd() const { return file; }
LogHeaderBlock headerBlock;
const std::string logPath;
size_t blockSize;
size_t blockPos;
int file;
uint16_t entries;
uint8_t *entryBuffer;
uint8_t *blockBuffer;
uint8_t syncConfig;
bool readOnly;
DISALLOW_COPY_AND_ASSIGN(MutationLog);
};
/// @cond DETAILS
//! rowid, (uint8_t)mutation_log_type_t
typedef std::pair<uint64_t, uint8_t> mutation_log_event_t;
/// @endcond
/**
* MutationLogHarvester::apply callback type.
*/
typedef void (*mlCallback)(void*, uint16_t, const std::string &, uint64_t);
typedef void (*mlCallbackWithQueue)(uint16_t,
std::vector<std::pair<std::string, uint64_t> > &,
void *arg);
/**
* Type for mutation log leftovers.
*/
struct mutation_log_uncommitted_t {
std::string key;
uint64_t rowid;
mutation_log_type_t type;
uint16_t vbucket;
};
class EventuallyPersistentEngine;
/**
* Read log entries back from the log to reconstruct the state.
*/
class MutationLogHarvester {
public:
MutationLogHarvester(MutationLog &ml, EventuallyPersistentEngine *e = NULL) :
mlog(ml), engine(e)
{
memset(itemsSeen, 0, sizeof(itemsSeen));
}
/**
* Set a vbucket before loading.
*/
void setVBucket(uint16_t vb) {
vbid_set.insert(vb);
}
/**
* Load the entries from the file.
*
* @return true if the file was clean and can likely be trusted.
*/
bool load();
/**
* Apply the processed log entries through the given function.
*/
void apply(void *arg, mlCallback mlc);
void apply(void *arg, mlCallbackWithQueue mlc);
/**
* Get the total number of entries found in the log.
*/
size_t total();
/**
* Get all of the counts of log entries by type.
*/
size_t *getItemsSeen() {
return itemsSeen;
}
/**
* Get the list of uncommitted keys and stuff from the log.
*/
void getUncommitted(std::vector<mutation_log_uncommitted_t> &uitems);
private:
MutationLog &mlog;
EventuallyPersistentEngine *engine;
std::set<uint16_t> vbid_set;
unordered_map<uint16_t, unordered_map<std::string, uint64_t> > committed;
unordered_map<uint16_t, unordered_map<std::string, mutation_log_event_t> > loading;
size_t itemsSeen[MUTATION_LOG_TYPES];
};
#endif /* MUTATION_LOG_HH */