Refactor indexing

This commit is contained in:
haad 2016-04-28 21:34:14 +02:00
parent 399a76928f
commit 06ee3ebf31
13 changed files with 2182 additions and 2254 deletions

4069
dist/orbitdb.min.js vendored

File diff suppressed because it is too large Load Diff

View File

@ -26521,8 +26521,8 @@
var logger = __webpack_require__(295).create("orbit-db.Client");
var PubSub = __webpack_require__(329);
var CounterStore = __webpack_require__(379);
var KeyValueStore = __webpack_require__(397);
var EventStore = __webpack_require__(399);
var KeyValueStore = __webpack_require__(396);
var EventStore = __webpack_require__(398);
var OrbitDB = function () {
function OrbitDB(ipfs, options) {
@ -26635,12 +26635,12 @@
}
}, {
key: '_subscribe',
value: function _subscribe(db, dbname, subscribe, callback) {
value: function _subscribe(store, dbname, subscribe, callback) {
var _this4 = this;
if (subscribe === undefined) subscribe = true;
return db.use(dbname, this.user.username).then(function (events) {
return store.use(dbname, this.user.username).then(function (events) {
events.on('readable', _this4._onSync.bind(_this4));
events.on('data', _this4._onWrite.bind(_this4));
events.on('load', _this4._onLoad.bind(_this4));
@ -26653,8 +26653,8 @@
}, {
key: '_onMessage',
value: function _onMessage(channel, message) {
[this.eventStore, this.kvStore, this.counterStore].forEach(function (db) {
db.sync(channel, message).catch(function (e) {
[this.eventStore, this.kvStore, this.counterStore].forEach(function (store) {
store.sync(channel, message).catch(function (e) {
return logger.error(e.stack);
});
});
@ -35404,7 +35404,6 @@
var Store = __webpack_require__(383);
var CounterIndex = __webpack_require__(393);
var OpTypes = __webpack_require__(396);
var CounterStore = function (_Store) {
(0, _inherits3.default)(CounterStore, _Store);
@ -35441,7 +35440,7 @@
var counter = this._index.get(dbname);
if (counter) {
counter.increment(amount);
return this._addOperation(dbname, OpTypes.Inc, null, counter.payload);
return this._addOperation(dbname, 'COUNTER', null, counter.payload);
}
}
}]);
@ -35591,7 +35590,7 @@
this.events[dbname] = new EventEmitter();
var oplog = new OperationsLog(this._ipfs, dbname, this.events[dbname], this.options);
return oplog.create(id).then(function () {
return oplog.load(id).then(function () {
return _this._oplogs[dbname] = oplog;
}).then(function () {
return _this._index.updateIndex(oplog);
@ -35608,6 +35607,10 @@
if (hash && oplog) {
return oplog.merge(hash).then(function () {
return _this2._index.updateIndex(oplog);
}).then(function () {
// if(this._log.items.length - oldCount === 0)
_this2.events[dbname].emit('readable', dbname);
_this2.events[dbname].emit('data', _this2.dbname);
}).then(function () {
return _this2;
});
@ -35632,7 +35635,9 @@
return result = op;
}).then(function () {
return _this3._index.updateIndex(oplog);
}).then(function () {
})
// .then(() => this.events[dbname].emit('data', dbname))
.then(function () {
return result;
});
}
@ -35647,7 +35652,7 @@
/* 384 */
/***/ function(module, exports, __webpack_require__) {
/* WEBPACK VAR INJECTION */(function(Buffer) {'use strict';
'use strict';
var _assign = __webpack_require__(119);
@ -35657,10 +35662,6 @@
var _promise2 = _interopRequireDefault(_promise);
var _stringify = __webpack_require__(176);
var _stringify2 = _interopRequireDefault(_stringify);
var _classCallCheck2 = __webpack_require__(327);
var _classCallCheck3 = _interopRequireDefault(_classCallCheck2);
@ -35687,34 +35688,14 @@
}
(0, _createClass3.default)(OperationsLog, [{
key: 'create',
value: function create(id) {
var _this = this;
this.events.emit('load', this.dbname);
return Log.create(this._ipfs, id).then(function (log) {
return _this._log = log;
}).then(function () {
return Cache.loadCache(_this.options.cacheFile);
}).then(function () {
return _this.merge(Cache.get(_this.dbname));
}).then(function () {
return _this;
});
}
}, {
key: 'addOperation',
value: function addOperation(operation, key, value) {
var _this2 = this;
var _this = this;
var entry = {
op: operation,
key: key,
value: value,
meta: {
size: Buffer.byteLength(value ? (0, _stringify2.default)(value) : '', 'utf8'),
ts: new Date().getTime()
}
value: value
};
var opHash = void 0,
@ -35722,19 +35703,35 @@
return this._log.add(entry).then(function (op) {
return opHash = op.hash;
}).then(function () {
return Log.getIpfsHash(_this2._ipfs, _this2._log);
return Log.getIpfsHash(_this._ipfs, _this._log);
}).then(function (hash) {
return logHash = hash;
}).then(function () {
return _this2._lastWrite = logHash;
return _this._lastWrite = logHash;
}).then(function () {
return Cache.set(_this2.dbname, logHash);
return Cache.set(_this.dbname, logHash);
}).then(function () {
return _this2.events.emit('data', _this2.dbname, logHash);
return _this.events.emit('data', _this.dbname, logHash);
}).then(function () {
return opHash;
});
}
}, {
key: 'load',
value: function load(id) {
var _this2 = this;
this.events.emit('load', this.dbname);
return Log.create(this._ipfs, id).then(function (log) {
return _this2._log = log;
}).then(function () {
return Cache.loadCache(_this2.options.cacheFile);
}).then(function () {
return _this2.merge(Cache.get(_this2.dbname));
}).then(function () {
return _this2;
});
}
}, {
key: 'merge',
value: function merge(hash) {
@ -35749,9 +35746,12 @@
return _this3._log.join(other);
}).then(function () {
return Cache.set(_this3.dbname, hash);
}).then(function () {
if (_this3._log.items.length - oldCount === 0) _this3.events.emit('readable', _this3.dbname, hash);
}).then(function () {
})
// .then(() => {
// // if(this._log.items.length - oldCount === 0)
// this.events.emit('readable', this.dbname, hash)
// })
.then(function () {
return _this3;
});
}
@ -35774,7 +35774,6 @@
}();
module.exports = OperationsLog;
/* WEBPACK VAR INJECTION */}.call(exports, __webpack_require__(2).Buffer))
/***/ },
/* 385 */
@ -58763,13 +58762,13 @@
(0, _createClass3.default)(CounterIndex, [{
key: 'createCounter',
value: function createCounter(key, id) {
this._index[key] = new Counter(id);
value: function createCounter(dbname, id) {
this._index[dbname] = new Counter(id);
}
}, {
key: 'get',
value: function get(key) {
return this._index[key];
value: function get(dbname) {
return this._index[dbname];
}
}, {
key: 'updateIndex',
@ -58777,7 +58776,7 @@
var counter = this._index[oplog.dbname];
if (counter) {
oplog.ops.filter(function (f) {
return f !== undefined;
return f && f.op === 'COUNTER';
}).map(function (f) {
return Counter.from(f.value);
}).forEach(function (f) {
@ -58901,27 +58900,6 @@
/***/ },
/* 396 */
/***/ function(module, exports) {
'use strict';
var OpTypes = {
Add: "ADD",
Put: "PUT",
Delete: "DELETE",
Inc: "INC",
isInsert: function isInsert(op) {
return op === "ADD" || op === "PUT";
},
isDelete: function isDelete(op) {
return op === "DELETE";
}
};
module.exports = OpTypes;
/***/ },
/* 397 */
/***/ function(module, exports, __webpack_require__) {
'use strict';
@ -58953,8 +58931,7 @@
function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { default: obj }; }
var Store = __webpack_require__(383);
var KVIndex = __webpack_require__(398);
var OpTypes = __webpack_require__(396);
var KVIndex = __webpack_require__(397);
var KeyValueStore = function (_Store) {
(0, _inherits3.default)(KeyValueStore, _Store);
@ -58987,12 +58964,12 @@
}, {
key: 'put',
value: function put(dbname, key, data) {
return this._addOperation(dbname, OpTypes.Put, key, data);
return this._addOperation(dbname, 'PUT', key, data);
}
}, {
key: 'del',
value: function del(dbname, key) {
return this._addOperation(dbname, OpTypes.Delete, key);
return this._addOperation(dbname, 'DELETE', key);
}
}]);
return KeyValueStore;
@ -59001,7 +58978,7 @@
module.exports = KeyValueStore;
/***/ },
/* 398 */
/* 397 */
/***/ function(module, exports, __webpack_require__) {
'use strict';
@ -59016,8 +58993,6 @@
function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { default: obj }; }
var OpTypes = __webpack_require__(396);
var KeyValueIndex = function () {
function KeyValueIndex() {
(0, _classCallCheck3.default)(this, KeyValueIndex);
@ -59036,10 +59011,11 @@
var _this = this;
var handled = [];
var _createLWWSet = function _createLWWSet(item) {
if (handled.indexOf(item.key) === -1) {
handled.push(item.key);
if (OpTypes.isInsert(item.op)) return item;
if (item.op === 'PUT') return item;
}
return null;
};
@ -59058,16 +59034,16 @@
module.exports = KeyValueIndex;
/***/ },
/* 399 */
/* 398 */
/***/ function(module, exports, __webpack_require__) {
'use strict';
var _defineProperty2 = __webpack_require__(400);
var _defineProperty2 = __webpack_require__(399);
var _defineProperty3 = _interopRequireDefault(_defineProperty2);
var _iterator2 = __webpack_require__(401);
var _iterator2 = __webpack_require__(400);
var _iterator3 = _interopRequireDefault(_iterator2);
@ -59099,8 +59075,7 @@
var Lazy = __webpack_require__(387);
var Store = __webpack_require__(383);
var EventLogIndex = __webpack_require__(403);
var OpTypes = __webpack_require__(396);
var EventLogIndex = __webpack_require__(402);
var EventStore = function (_Store) {
(0, _inherits3.default)(EventStore, _Store);
@ -59123,12 +59098,12 @@
}, {
key: 'add',
value: function add(dbname, data) {
return this._addOperation(dbname, OpTypes.Add, null, data);
return this._addOperation(dbname, 'ADD', null, data);
}
}, {
key: 'remove',
value: function remove(dbname, hash) {
return this._addOperation(dbname, OpTypes.Delete, hash);
return this._addOperation(dbname, 'DELETE', hash);
}
}, {
key: 'iterator',
@ -59186,7 +59161,7 @@
module.exports = EventStore;
/***/ },
/* 400 */
/* 399 */
/***/ function(module, exports, __webpack_require__) {
"use strict";
@ -59215,13 +59190,13 @@
};
/***/ },
/* 401 */
/* 400 */
/***/ function(module, exports, __webpack_require__) {
module.exports = { "default": __webpack_require__(402), __esModule: true };
module.exports = { "default": __webpack_require__(401), __esModule: true };
/***/ },
/* 402 */
/* 401 */
/***/ function(module, exports, __webpack_require__) {
__webpack_require__(299);
@ -59229,7 +59204,7 @@
module.exports = __webpack_require__(136)('iterator');
/***/ },
/* 403 */
/* 402 */
/***/ function(module, exports, __webpack_require__) {
'use strict';
@ -59244,8 +59219,6 @@
function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { default: obj }; }
var OpTypes = __webpack_require__(396);
var EventLogIndex = function () {
function EventLogIndex() {
(0, _classCallCheck3.default)(this, EventLogIndex);
@ -59265,7 +59238,7 @@
var _createLWWSet = function _createLWWSet(item) {
if (handled.indexOf(item.key) === -1) {
handled.push(item.key);
if (OpTypes.isInsert(item.op)) return item;
if (item.op === 'ADD') return item;
}
return null;
};

View File

@ -40,7 +40,8 @@ class OrbitDB {
get: (key) => db.get(dbname, key),
del: (key) => db.del(dbname, key),
delete: () => db.delete(dbname),
close: () => this._pubsub.unsubscribe(dbname)
close: () => this._pubsub.unsubscribe(dbname),
sync: (hash) => db.sync(dbname, hash)
}
return this._subscribe(db, dbname, subscribe).then(() => api);
@ -66,10 +67,10 @@ class OrbitDB {
this.network = null;
}
_subscribe(db, dbname, subscribe, callback) {
_subscribe(store, dbname, subscribe, callback) {
if(subscribe === undefined) subscribe = true;
return db.use(dbname, this.user.username).then((events) => {
return store.use(dbname, this.user.username).then((events) => {
events.on('readable', this._onSync.bind(this));
events.on('data', this._onWrite.bind(this));
events.on('load', this._onLoad.bind(this));
@ -82,8 +83,8 @@ class OrbitDB {
}
_onMessage(channel, message) {
[this.eventStore, this.kvStore, this.counterStore].forEach((db) => {
db.sync(channel, message).catch((e) => logger.error(e.stack));
[this.eventStore, this.kvStore, this.counterStore].forEach((store) => {
store.sync(channel, message).catch((e) => logger.error(e.stack));
})
}

View File

@ -1,12 +0,0 @@
'use strict';
const OpTypes = {
Add: "ADD",
Put: "PUT",
Delete: "DELETE",
Inc: "INC",
isInsert: (op) => op === "ADD" || op === "PUT",
isDelete: (op) => op === "DELETE"
};
module.exports = OpTypes;

View File

@ -14,60 +14,52 @@ class OperationsLog {
}
get ops() {
return this._log.items.map((f) => {
Object.assign(f.payload, { hash: f.hash });
if(f.payload.key === null)
Object.assign(f.payload, { key: f.hash });
return f.payload;
});
}
create(id) {
this.events.emit('load', this.dbname);
return Log.create(this._ipfs, id)
.then((log) => this._log = log)
.then(() => Cache.loadCache(this.options.cacheFile))
.then(() => this.merge(Cache.get(this.dbname)))
.then(() => this);
return this._log.items;
}
addOperation(operation, key, value) {
const entry = {
op: operation,
key: key,
value: value,
meta: {
size: Buffer.byteLength(value ? JSON.stringify(value) : '', 'utf8'),
ts: new Date().getTime()
}
value: value
};
let opHash, logHash;
let node, logHash;
return this._log.add(entry)
.then((op) => opHash = op.hash)
.then((op) => node = op)
.then(() => {
Object.assign(node.payload, { hash: node.hash });
if(node.payload.key === null)
Object.assign(node.payload, { key: node.hash });
})
.then(() => Log.getIpfsHash(this._ipfs, this._log))
.then((hash) => logHash = hash)
.then(() => this._lastWrite = logHash)
.then(() => Cache.set(this.dbname, logHash))
.then(() => this.events.emit('data', this.dbname, logHash))
.then(() => opHash)
.then(() => node.payload)
}
load(id) {
this.events.emit('load', this.dbname);
return Log.create(this._ipfs, id)
.then((log) => this._log = log)
.then(() => Cache.loadCache(this.options.cacheFile))
.then(() => this.merge(Cache.get(this.dbname)))
}
merge(hash) {
if(!hash || hash === this._lastWrite || !this._log)
return Promise.resolve();
return Promise.resolve([]);
this.events.emit('load', this.dbname);
const oldCount = this._log.items.length;
let newItems = [];
return Log.fromIpfsHash(this._ipfs, hash)
.then((other) => this._log.join(other))
.then((merged) => newItems = merged)
.then(() => Cache.set(this.dbname, hash))
.then(() => {
if(this._log.items.length - oldCount === 0)
this.events.emit('readable', this.dbname, hash)
})
.then(() => this)
.then(() => newItems.map((f) => f.payload))
}
delete() {

View File

@ -16,21 +16,24 @@ class Store {
use(dbname, id) {
this.events[dbname] = new EventEmitter();
const oplog = new OperationsLog(this._ipfs, dbname, this.events[dbname], this.options);
return oplog.create(id)
return oplog.load(id)
.then((merged) => this._index.updateIndex(oplog, merged))
.then(() => this._oplogs[dbname] = oplog)
.then(() => this._index.updateIndex(oplog))
.then(() => this.events[dbname]);
}
sync(dbname, hash) {
const oplog = this._oplogs[dbname];
let newItems;
if(hash && oplog) {
return oplog.merge(hash)
.then(() => this._index.updateIndex(oplog))
.then(() => this);
.then((merged) => newItems = merged)
.then(() => this._index.updateIndex(oplog, newItems))
.then(() => this.events[dbname].emit('readable', dbname))
.then(() => newItems)
}
return Promise.resolve(this);
return Promise.resolve([]);
}
delete(dbname) {
@ -44,8 +47,8 @@ class Store {
if(oplog) {
return oplog.addOperation(type, key, data)
.then((op) => result = op)
.then(() => this._index.updateIndex(oplog))
.then(() => result);
.then(() => this._index.updateIndex(oplog, [result]))
.then(() => result.hash);
}
}
}

View File

@ -7,19 +7,19 @@ class CounterIndex {
this._index = {};
}
createCounter(key, id) {
this._index[key] = new Counter(id);
createCounter(dbname, id) {
this._index[dbname] = new Counter(id);
}
get(key) {
return this._index[key];
get(dbname) {
return this._index[dbname];
}
updateIndex(oplog) {
updateIndex(oplog, updated) {
const counter = this._index[oplog.dbname];
if(counter) {
oplog.ops
.filter((f) => f !== undefined)
updated
.filter((f) => f && f.op === 'COUNTER')
.map((f) => Counter.from(f.value))
.forEach((f) => counter.merge(f))

View File

@ -2,7 +2,6 @@
const Store = require('../Store');
const CounterIndex = require('./CounterIndex');
const OpTypes = require('../../oplog/OpTypes');
class CounterStore extends Store {
constructor(ipfs, options) {
@ -28,7 +27,7 @@ class CounterStore extends Store {
const counter = this._index.get(dbname);
if(counter) {
counter.increment(amount);
return this._addOperation(dbname, OpTypes.Inc, null, counter.payload);
return this._addOperation(dbname, 'COUNTER', null, counter.payload);
}
}
}

View File

@ -1,32 +1,27 @@
'use strict';
const OpTypes = require('../../oplog/OpTypes');
class EventLogIndex {
constructor() {
this._index = [];
this._index = {};
}
get() {
return this._index;
return Object.keys(this._index).map((f) => this._index[f]);
}
updateIndex(oplog) {
updateIndex(oplog, updated) {
let handled = [];
const _createLWWSet = (item) => {
updated.forEach((item) => {
if(handled.indexOf(item.key) === -1) {
handled.push(item.key);
if(OpTypes.isInsert(item.op))
return item;
if(item.op === 'ADD') {
this._index[item.key] = item
} else if(item.op === 'DELETE') {
delete this._index[item.key];
}
}
return null;
};
this._index = oplog.ops
.reverse()
.filter((f) => f !== undefined)
.map(_createLWWSet)
.filter((f) => f !== null);
});
}
}

View File

@ -3,7 +3,6 @@
const Lazy = require('lazy.js');
const Store = require('../Store');
const EventLogIndex = require('./EventIndex');
const OpTypes = require('../../oplog/OpTypes');
class EventStore extends Store {
constructor(ipfs, options) {
@ -17,11 +16,11 @@ class EventStore extends Store {
}
add(dbname, data) {
return this._addOperation(dbname, OpTypes.Add, null, data);
return this._addOperation(dbname, 'ADD', null, data);
}
remove(dbname, hash) {
return this._addOperation(dbname, OpTypes.Delete, hash);
return this._addOperation(dbname, 'DELETE', hash);
}
iterator(dbname, options) {
@ -53,10 +52,10 @@ class EventStore extends Store {
if(opts.gt || opts.gte) {
// Greater than case
result = this._read(this._index.get().reverse(), opts.gt ? opts.gt : opts.gte, amount, opts.gte ? true : false)
result = this._read(this._index.get(), opts.gt ? opts.gt : opts.gte, amount, opts.gte ? true : false)
} else {
// Lower than and lastN case, search latest first by reversing the sequence
result = this._read(this._index.get(), opts.lt ? opts.lt : opts.lte, amount, opts.lte || !opts.lt).reverse()
result = this._read(this._index.get().reverse(), opts.lt ? opts.lt : opts.lte, amount, opts.lte || !opts.lt).reverse()
}
if(opts.reverse) result.reverse();

View File

@ -1,7 +1,5 @@
'use strict';
const OpTypes = require('../../oplog/OpTypes');
class KeyValueIndex {
constructor() {
this._index = {};
@ -11,23 +9,18 @@ class KeyValueIndex {
return this._index[key];
}
updateIndex(oplog) {
updateIndex(oplog, updated) {
let handled = [];
const _createLWWSet = (item) => {
updated.reverse().forEach((item) => {
if(handled.indexOf(item.key) === -1) {
handled.push(item.key);
if(OpTypes.isInsert(item.op))
return item;
if(item.op === 'PUT')
this._index[item.key] = item.value
else if (item.op === 'DELETE')
delete this._index[item.key];
}
return null;
};
this._index = {};
oplog.ops
.reverse()
.map(_createLWWSet)
.filter((f) => f !== null)
.forEach((f) => this._index[f.key] = f.value);
});
}
}

View File

@ -2,7 +2,6 @@
const Store = require('../Store');
const KVIndex = require('./KeyValueIndex');
const OpTypes = require('../../oplog/OpTypes');
class KeyValueStore extends Store {
constructor(ipfs, options) {
@ -24,11 +23,11 @@ class KeyValueStore extends Store {
}
put(dbname, key, data) {
return this._addOperation(dbname, OpTypes.Put, key, data);
return this._addOperation(dbname, 'PUT', key, data);
}
del(dbname, key) {
return this._addOperation(dbname, OpTypes.Delete, key);
return this._addOperation(dbname, 'DELETE', key);
}
}

View File

@ -22,20 +22,20 @@ const startIpfs = () => {
if(err) console.error(err);
resolve(ipfs);
});
// ipfsd.local((err, node) => {
// if(err) reject(err);
// node.startDaemon((err, ipfs) => {
// if(err) reject(err);
// resolve(ipfs);
// });
// });
ipfsd.local((err, node) => {
if(err) reject(err);
node.startDaemon((err, ipfs) => {
if(err) reject(err);
resolve(ipfs);
});
});
});
};
describe('Orbit Client', function() {
this.timeout(30000);
let ipfs, client, db;
let ipfs, client, client2, db;
let channel = 'abcdefgh';
const cacheFile = path.join(process.cwd(), '/test', 'orbit-db-test-cache.json');
@ -45,6 +45,7 @@ describe('Orbit Client', function() {
try {
ipfs = await(startIpfs());
client = await(OrbitDB.connect('localhost', 3333, username, password, ipfs, { allowOffline: true }));
client2 = await(OrbitDB.connect('localhost', 3333, username + "2", password, ipfs, { allowOffline: true }));
} catch(e) {
console.log(e);
assert.equal(e, null);
@ -528,15 +529,27 @@ describe('Orbit Client', function() {
describe('Key-Value Store', function() {
beforeEach(async((done) => {
db = await(client.kvstore(channel, '', false));
db = await(client.kvstore(channel, false));
db.delete();
done();
}));
afterEach(() => {
afterEach((done) => {
db.delete();
db.close();
done();
});
it('put', async((done) => {
const db2 = await(client2.kvstore(channel, false));
await(db.put('key1', 'hello1'));
await(db2.put('key1', 'hello2'));
await(db.sync('QmNtELU2N3heY9cFgRuLWavgov7NTXibNyZCxcTCYjw1TM'))
const value = db.get('key1');
assert.equal(value, 'hello2');
done();
}));
it('put', async((done) => {
await(db.put('key1', 'hello!'));
const value = db.get('key1');