Refactor stores to work with OperationsLog and ipfs-log

This commit is contained in:
haad 2016-05-06 12:20:11 +02:00
parent 180affa688
commit 3ccab7b17d
10 changed files with 191 additions and 110 deletions

View File

@ -4,20 +4,27 @@ const ipfsd = require('ipfsd-ctl');
const OrbitDB = require('../src/OrbitDB'); const OrbitDB = require('../src/OrbitDB');
const Timer = require('./Timer'); const Timer = require('./Timer');
// usage: benchmark.js <host> <username> <channel>; // usage: benchmark.js <network hash> <username> <channel>;
// orbit-server // orbit-server
const host = process.argv[2] ? process.argv[2] : 'localhost' // const network = 'QmRB8x6aErtKTFHDNRiViixSKYwW1DbfcvJHaZy1hnRzLM'; // dev server
const port = 3333; const network = 'QmYPobvobKsyoCKTw476yTui611XABf927KxUPCf4gRLRr'; // 'localhost:3333'
const username = process.argv[3] ? process.argv[3] : 'testrunner'; const username = process.argv[2] ? process.argv[2] : 'testrunner';
const password = ''; const password = '';
const channelName = process.argv[4] ? process.argv[4] : 'c1'; const channelName = process.argv[3] ? process.argv[3] : 'c1';
const startIpfs = () => { const startIpfs = () => {
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
ipfsd.disposableApi((err, ipfs) => { // ipfsd.disposableApi((err, ipfs) => {
if(err) console.error(err); // if(err) console.error(err);
resolve(ipfs); // resolve(ipfs);
// });
ipfsd.local((err, node) => {
if(err) reject(err);
node.startDaemon((err, ipfs) => {
if(err) reject(err);
resolve(ipfs);
});
}); });
}); });
}; };
@ -41,7 +48,7 @@ let run = (() => {
// Connect // Connect
console.log(`Connecting...`) console.log(`Connecting...`)
startIpfs() startIpfs()
.then((ipfs) => OrbitDB.connect(host, port, username, password, ipfs)) .then((ipfs) => OrbitDB.connect(network, username, password, ipfs))
.then((orbit) => orbit.eventlog(channelName)) .then((orbit) => orbit.eventlog(channelName))
.then(queryLoop) .then(queryLoop)
.then(() => { .then(() => {

View File

@ -19,7 +19,7 @@ class OrbitDB {
eventlog(dbname, options) { eventlog(dbname, options) {
if(!options) options = { subscribe: true }; if(!options) options = { subscribe: true };
const store = new EventStore(this._ipfs, dbname, options); const store = new EventStore(this._ipfs, this.user.username, dbname, options);
return this._subscribe(store, dbname, options.subscribe) return this._subscribe(store, dbname, options.subscribe)
.then(() => this.stores[dbname] = store) .then(() => this.stores[dbname] = store)
.then(() => store); .then(() => store);
@ -27,7 +27,7 @@ class OrbitDB {
kvstore(dbname, options) { kvstore(dbname, options) {
if(!options) options = { subscribe: true }; if(!options) options = { subscribe: true };
const store = new KeyValueStore(this._ipfs, dbname, options); const store = new KeyValueStore(this._ipfs, this.user.username, dbname, options);
return this._subscribe(store, dbname, options.subscribe) return this._subscribe(store, dbname, options.subscribe)
.then(() => this.stores[dbname] = store) .then(() => this.stores[dbname] = store)
.then(() => store); .then(() => store);
@ -35,7 +35,7 @@ class OrbitDB {
counter(dbname, options) { counter(dbname, options) {
if(!options) options = { subscribe: true }; if(!options) options = { subscribe: true };
const store = new CounterStore(this._ipfs, dbname, options); const store = new CounterStore(this._ipfs, this.user.username, dbname, options);
return this._subscribe(store, dbname, options.subscribe) return this._subscribe(store, dbname, options.subscribe)
.then(() => this.stores[dbname] = store) .then(() => this.stores[dbname] = store)
.then(() => store); .then(() => store);
@ -70,6 +70,7 @@ class OrbitDB {
} }
_onWrite(dbname, hash) { _onWrite(dbname, hash) {
if(!hash) throw new Error("Hash can't be null!");
this._pubsub.publish(dbname, hash); this._pubsub.publish(dbname, hash);
this.events.emit('data', dbname, hash); this.events.emit('data', dbname, hash);
} }

View File

@ -3,66 +3,82 @@
const Log = require('ipfs-log'); const Log = require('ipfs-log');
const Cache = require('./Cache'); const Cache = require('./Cache');
class OperationsLog { class OperationsLog extends Log {
constructor(ipfs, dbname, opts) { constructor(ipfs, id, name, opts) {
this.dbname = dbname; super(ipfs, id, name, opts)
this.options = opts || { cacheFile: null }; // this.name = name;
if(!opts) opts = {};
if(!opts.cacheFile) Object.assign(opts, { cacheFile: null });
this.options = opts;
this._lastWrite = null; this._lastWrite = null;
this._ipfs = ipfs; Cache.loadCache(this.options.cacheFile);
this._log = null; // this._ipfs = ipfs;
// this._log = null;
} }
get ops() { // get items() {
return this._log.items; // return this._log.items;
} // }
addOperation(operation, key, value) { // add(operation, key, value) {
const entry = { add(entry) {
op: operation, // const entry = {
key: key, // op: operation,
value: value, // key: key,
meta: { // value: value,
ts: new Date().getTime() // meta: {
} // ts: new Date().getTime()
}; // }
// };
let node, logHash; let node, logHash;
return this._log.add(entry) return super.add(entry)
.then((op) => node = op) .then((op) => node = op)
.then(() => Object.assign(node.payload, { hash: node.hash })) .then(() => Object.assign(node.payload, { hash: node.hash }))
.then(() => Log.getIpfsHash(this._ipfs, this._log)) .then(() => Log.getIpfsHash(this._ipfs, this))
.then((hash) => logHash = hash) .then((hash) => logHash = hash)
.then(() => this._lastWrite = logHash) .then(() => this._lastWrite = logHash)
.then(() => Cache.set(this.dbname, logHash)) .then(() => Cache.set(this.name, logHash))
.then(() => { .then(() => {
return { operation: node.payload, log: logHash }; return node.payload;
}) })
} }
load(id) { load() {
return Log.create(this._ipfs, id) // this._log = new Log(this._ipfs, this.id, this.dbname, options)
.then((log) => this._log = log) // return Log.create(this._ipfs, id)
.then(() => Cache.loadCache(this.options.cacheFile)) // .then((log) => this._log = log)
.then(() => this.merge(Cache.get(this.dbname))) // .then(() => Cache.loadCache(this.options.cacheFile))
// .then(() => this.merge(Cache.get(this.name)))
// Cache.loadCache(this.options.cacheFile);
// console.log("THIS", this.name, Cache.get(this.name));
const cached = Cache.get(this.name);
if(cached) {
return this.options.Log.fromIpfsHash(this._ipfs, cached)
.then((log) => this.join(log));
}
return Promise.resolve([]);
} }
merge(hash) { join(other) {
if(!hash || hash === this._lastWrite || !this._log) // if(!hash || hash === this._lastWrite)
return Promise.resolve([]); // return Promise.resolve([]);
const oldCount = this._log.items.length; const oldCount = this.items.length;
let newItems = []; let newItems = [];
return Log.fromIpfsHash(this._ipfs, hash) // return Log.fromIpfsHash(this._ipfs, hash)
.then((other) => this._log.join(other)) // .then((other) => super.join(other))
// console.log("OTHER", other)
return super.join(other)
.then((merged) => newItems = merged) .then((merged) => newItems = merged)
.then(() => Cache.set(this.dbname, hash)) .then(() => Log.getIpfsHash(this._ipfs, this))
.then((hash) => Cache.set(this.name, hash))
.then(() => newItems.forEach((f) => Object.assign(f.payload, { hash: f.hash }))) .then(() => newItems.forEach((f) => Object.assign(f.payload, { hash: f.hash })))
.then(() => newItems.map((f) => f.payload)) .then(() => newItems.map((f) => f.payload))
} }
delete() {
this._log.clear();
}
} }
module.exports = OperationsLog; module.exports = OperationsLog;

View File

@ -1,7 +1,8 @@
'use strict'; 'use strict';
class DefaultIndex { class DefaultIndex {
constructor() { constructor(id) {
this.id = id;
this._index = []; this._index = [];
} }
@ -9,7 +10,7 @@ class DefaultIndex {
return this._index; return this._index;
} }
updateIndex(oplog) { updateIndex(oplog, entries) {
this._index = oplog.ops this._index = oplog.ops
} }
} }

View File

@ -5,19 +5,25 @@ const OperationsLog = require('../oplog/OperationsLog');
const DefaultIndex = require('./DefaultIndex'); const DefaultIndex = require('./DefaultIndex');
class Store { class Store {
constructor(ipfs, dbname, options) { constructor(ipfs, id, dbname, options) {
this.id = id;
this.dbname = dbname; this.dbname = dbname;
this.events = new EventEmitter(); this.events = new EventEmitter();
this.options = options || {};
this._index = new DefaultIndex(); if(!options) options = {};
if(!options.Index) Object.assign(options, { Index: DefaultIndex });
if(!options.Log) Object.assign(options, { Log: OperationsLog });
this.options = options;
this._index = new this.options.Index(this.id);
this._oplog = null; this._oplog = null;
this._ipfs = ipfs; this._ipfs = ipfs;
} }
use(id) { use() {
this.events.emit('load', this.dbname); this.events.emit('load', this.dbname);
this._oplog = new OperationsLog(this._ipfs, this.dbname, this.options); this._oplog = new this.options.Log(this._ipfs, this.id, this.dbname, this.options);
return this._oplog.load(id) return this._oplog.load()
.then((merged) => this._index.updateIndex(this._oplog, merged)) .then((merged) => this._index.updateIndex(this._oplog, merged))
.then(() => this.events.emit('readable', this.dbname)) .then(() => this.events.emit('readable', this.dbname))
.then(() => this.events); .then(() => this.events);
@ -33,7 +39,8 @@ class Store {
let newItems; let newItems;
this.events.emit('load', this.dbname); this.events.emit('load', this.dbname);
return this._oplog.merge(hash) return this.options.Log.fromIpfsHash(this._ipfs, hash)
.then((log) => this._oplog.join(log))
.then((merged) => newItems = merged) .then((merged) => newItems = merged)
.then(() => this._index.updateIndex(this._oplog, newItems)) .then(() => this._index.updateIndex(this._oplog, newItems))
.then(() => { .then(() => {
@ -44,18 +51,22 @@ class Store {
} }
delete() { delete() {
this._index = new this.options.Index(this.id);
if(this._oplog) if(this._oplog)
this._oplog.delete(); this._oplog.clear();
} }
_addOperation(type, key, data) { // _addOperation(type, key, data) {
let result; _addOperation(data) {
let result, logHash;
if(this._oplog) { if(this._oplog) {
return this._oplog.addOperation(type, key, data) return this._oplog.add(data)
.then((op) => result = op) .then((op) => result = op)
.then(() => this._index.updateIndex(this._oplog, [result.operation])) .then(() => this.options.Log.getIpfsHash(this._ipfs, this._oplog))
.then(() => this.events.emit('data', this.dbname, result.log)) .then((hash) => logHash = hash)
.then(() => result.operation.hash); .then(() => this._index.updateIndex(this._oplog, [result]))
.then(() => this.events.emit('data', this.dbname, logHash))
.then(() => result.hash);
} }
} }
} }

View File

@ -3,11 +3,7 @@
const Counter = require('../../crdts/GCounter'); const Counter = require('../../crdts/GCounter');
class CounterIndex { class CounterIndex {
constructor() { constructor(id) {
this._counter = null;
}
createCounter(id) {
this._counter = new Counter(id); this._counter = new Counter(id);
} }
@ -15,9 +11,13 @@ class CounterIndex {
return this._counter; return this._counter;
} }
updateIndex(oplog, updated) { updateIndex(oplog, added) {
// console.log("ADDED", added)
if(this._counter) { if(this._counter) {
updated.filter((f) => f && f.op === 'COUNTER') // added.filter((f) => f && f.payload.op === 'COUNTER')
// .map((f) => Counter.from(f.payload.value))
// .forEach((f) => this._counter.merge(f))
added.filter((f) => f && f.op === 'COUNTER')
.map((f) => Counter.from(f.value)) .map((f) => Counter.from(f.value))
.forEach((f) => this._counter.merge(f)) .forEach((f) => this._counter.merge(f))
} }

View File

@ -1,22 +1,14 @@
'use strict'; 'use strict';
const Log = require('ipfs-log')
const Store = require('../Store'); const Store = require('../Store');
const CounterIndex = require('./CounterIndex'); const CounterIndex = require('./CounterIndex');
class CounterStore extends Store { class CounterStore extends Store {
constructor(ipfs, dbname, options) { constructor(ipfs, id, dbname, options) {
super(ipfs, dbname, options) // Object.assign(options, { Index: CounterIndex, Log: Log });
this._index = new CounterIndex(); Object.assign(options || {}, { Index: CounterIndex });
} super(ipfs, id, dbname, options)
use(id) {
this._index.createCounter(id);
return super.use(id);
}
delete() {
super.delete();
this._index = new CounterIndex();
} }
value() { value() {
@ -27,7 +19,15 @@ class CounterStore extends Store {
const counter = this._index.get(); const counter = this._index.get();
if(counter) { if(counter) {
counter.increment(amount); counter.increment(amount);
return this._addOperation('COUNTER', null, counter.payload); const operation = {
op: 'COUNTER',
key: null,
value: counter.payload,
meta: {
ts: new Date().getTime()
}
};
return this._addOperation(operation);
} }
} }
} }

View File

@ -1,26 +1,49 @@
'use strict'; 'use strict';
const Lazy = require('lazy.js'); const Lazy = require('lazy.js');
const Store = require('../Store'); const Store = require('../Store');
const EventLogIndex = require('./EventIndex'); const EventIndex = require('./EventIndex');
class EventStore extends Store { class EventStore extends Store {
constructor(ipfs, dbname, options) { constructor(ipfs, id, dbname, options) {
super(ipfs, dbname, options) Object.assign(options || {}, { Index: EventIndex });
this._index = new EventLogIndex(); super(ipfs, id, dbname, options)
} }
delete(dbname) { // constructor(ipfs, id, dbname, options) {
super.delete(); // super(ipfs, dbname, options)
this._index = new EventLogIndex(); // this._index = new EventLogIndex();
} // }
// delete(dbname) {
// super.delete();
// this._index = new EventLogIndex();
// }
add(data) { add(data) {
return this._addOperation('ADD', null, data); const operation = {
op: 'ADD',
key: null,
value: data,
meta: {
ts: new Date().getTime()
}
};
return this._addOperation(operation);
// return this._addOperation('ADD', null, data);
} }
remove(hash) { remove(hash) {
return this._addOperation('DEL', null, hash); const operation = {
op: 'DEL',
key: null,
value: hash,
meta: {
ts: new Date().getTime()
}
};
return this._addOperation(operation);
// return this._addOperation('DEL', null, hash);
} }
iterator(options) { iterator(options) {

View File

@ -1,18 +1,22 @@
'use strict'; 'use strict';
const Store = require('../Store'); const Store = require('../Store');
const KVIndex = require('./KeyValueIndex'); const KeyValueIndex = require('./KeyValueIndex');
class KeyValueStore extends Store { class KeyValueStore extends Store {
constructor(ipfs, dbname, options) { constructor(ipfs, id, dbname, options) {
super(ipfs, dbname, options) Object.assign(options || {}, { Index: KeyValueIndex });
this._index = new KVIndex(); super(ipfs, id, dbname, options)
} }
// constructor(ipfs, dbname, options) {
// super(ipfs, dbname, options)
// this._index = new KVIndex();
// }
delete() { // delete() {
super.delete(); // super.delete();
this._index = new KVIndex(); // this._index = new KVIndex();
} // }
get(key) { get(key) {
return this._index.get(key); return this._index.get(key);
@ -23,11 +27,29 @@ class KeyValueStore extends Store {
} }
put(key, data) { put(key, data) {
return this._addOperation('PUT', key, data); const operation = {
op: 'PUT',
key: key,
value: data,
meta: {
ts: new Date().getTime()
}
};
return this._addOperation(operation);
// return this._addOperation('PUT', key, data);
} }
del(key) { del(key) {
return this._addOperation('DEL', key); const operation = {
op: 'DEL',
key: key,
value: null,
meta: {
ts: new Date().getTime()
}
};
return this._addOperation(operation);
// return this._addOperation('DEL', key);
} }
} }

View File

@ -8,7 +8,7 @@ const OrbitDB = require('../src/OrbitDB');
const OrbitServer = require('orbit-server/src/server'); const OrbitServer = require('orbit-server/src/server');
// Mute logging // Mute logging
require('logplease').setLogLevel('ERROR'); // require('logplease').setLogLevel('ERROR');
const network = 'QmYPobvobKsyoCKTw476yTui611XABf927KxUPCf4gRLRr'; // network.json const network = 'QmYPobvobKsyoCKTw476yTui611XABf927KxUPCf4gRLRr'; // network.json
const username = 'testrunner'; const username = 'testrunner';