diff --git a/examples/benchmark.js b/examples/benchmark.js index 44b2c01..633f9a3 100644 --- a/examples/benchmark.js +++ b/examples/benchmark.js @@ -4,20 +4,27 @@ const ipfsd = require('ipfsd-ctl'); const OrbitDB = require('../src/OrbitDB'); const Timer = require('./Timer'); -// usage: benchmark.js ; +// usage: benchmark.js ; // orbit-server -const host = process.argv[2] ? process.argv[2] : 'localhost' -const port = 3333; -const username = process.argv[3] ? process.argv[3] : 'testrunner'; +// const network = 'QmRB8x6aErtKTFHDNRiViixSKYwW1DbfcvJHaZy1hnRzLM'; // dev server +const network = 'QmYPobvobKsyoCKTw476yTui611XABf927KxUPCf4gRLRr'; // 'localhost:3333' +const username = process.argv[2] ? process.argv[2] : 'testrunner'; const password = ''; -const channelName = process.argv[4] ? process.argv[4] : 'c1'; +const channelName = process.argv[3] ? process.argv[3] : 'c1'; const startIpfs = () => { return new Promise((resolve, reject) => { - ipfsd.disposableApi((err, ipfs) => { - if(err) console.error(err); - resolve(ipfs); + // ipfsd.disposableApi((err, ipfs) => { + // if(err) console.error(err); + // resolve(ipfs); + // }); + ipfsd.local((err, node) => { + if(err) reject(err); + node.startDaemon((err, ipfs) => { + if(err) reject(err); + resolve(ipfs); + }); }); }); }; @@ -41,7 +48,7 @@ let run = (() => { // Connect console.log(`Connecting...`) startIpfs() - .then((ipfs) => OrbitDB.connect(host, port, username, password, ipfs)) + .then((ipfs) => OrbitDB.connect(network, username, password, ipfs)) .then((orbit) => orbit.eventlog(channelName)) .then(queryLoop) .then(() => { diff --git a/src/OrbitDB.js b/src/OrbitDB.js index ba21cd3..464f27b 100644 --- a/src/OrbitDB.js +++ b/src/OrbitDB.js @@ -19,7 +19,7 @@ class OrbitDB { eventlog(dbname, options) { if(!options) options = { subscribe: true }; - const store = new EventStore(this._ipfs, dbname, options); + const store = new EventStore(this._ipfs, this.user.username, dbname, options); return this._subscribe(store, dbname, options.subscribe) .then(() => this.stores[dbname] = store) .then(() => store); @@ -27,7 +27,7 @@ class OrbitDB { kvstore(dbname, options) { if(!options) options = { subscribe: true }; - const store = new KeyValueStore(this._ipfs, dbname, options); + const store = new KeyValueStore(this._ipfs, this.user.username, dbname, options); return this._subscribe(store, dbname, options.subscribe) .then(() => this.stores[dbname] = store) .then(() => store); @@ -35,7 +35,7 @@ class OrbitDB { counter(dbname, options) { if(!options) options = { subscribe: true }; - const store = new CounterStore(this._ipfs, dbname, options); + const store = new CounterStore(this._ipfs, this.user.username, dbname, options); return this._subscribe(store, dbname, options.subscribe) .then(() => this.stores[dbname] = store) .then(() => store); @@ -70,6 +70,7 @@ class OrbitDB { } _onWrite(dbname, hash) { + if(!hash) throw new Error("Hash can't be null!"); this._pubsub.publish(dbname, hash); this.events.emit('data', dbname, hash); } diff --git a/src/oplog/OperationsLog.js b/src/oplog/OperationsLog.js index 979eaf1..c3aa2fa 100644 --- a/src/oplog/OperationsLog.js +++ b/src/oplog/OperationsLog.js @@ -3,66 +3,82 @@ const Log = require('ipfs-log'); const Cache = require('./Cache'); -class OperationsLog { - constructor(ipfs, dbname, opts) { - this.dbname = dbname; - this.options = opts || { cacheFile: null }; +class OperationsLog extends Log { + constructor(ipfs, id, name, opts) { + super(ipfs, id, name, opts) + // this.name = name; + if(!opts) opts = {}; + if(!opts.cacheFile) Object.assign(opts, { cacheFile: null }); + this.options = opts; + this._lastWrite = null; - this._ipfs = ipfs; - this._log = null; + Cache.loadCache(this.options.cacheFile); + // this._ipfs = ipfs; + // this._log = null; } - get ops() { - return this._log.items; - } + // get items() { + // return this._log.items; + // } - addOperation(operation, key, value) { - const entry = { - op: operation, - key: key, - value: value, - meta: { - ts: new Date().getTime() - } - }; + // add(operation, key, value) { + add(entry) { + // const entry = { + // op: operation, + // key: key, + // value: value, + // meta: { + // ts: new Date().getTime() + // } + // }; let node, logHash; - return this._log.add(entry) + return super.add(entry) .then((op) => node = op) .then(() => Object.assign(node.payload, { hash: node.hash })) - .then(() => Log.getIpfsHash(this._ipfs, this._log)) + .then(() => Log.getIpfsHash(this._ipfs, this)) .then((hash) => logHash = hash) .then(() => this._lastWrite = logHash) - .then(() => Cache.set(this.dbname, logHash)) + .then(() => Cache.set(this.name, logHash)) .then(() => { - return { operation: node.payload, log: logHash }; + return node.payload; }) } - load(id) { - return Log.create(this._ipfs, id) - .then((log) => this._log = log) - .then(() => Cache.loadCache(this.options.cacheFile)) - .then(() => this.merge(Cache.get(this.dbname))) + load() { + // this._log = new Log(this._ipfs, this.id, this.dbname, options) + // return Log.create(this._ipfs, id) + // .then((log) => this._log = log) + // .then(() => Cache.loadCache(this.options.cacheFile)) + // .then(() => this.merge(Cache.get(this.name))) + // Cache.loadCache(this.options.cacheFile); + // console.log("THIS", this.name, Cache.get(this.name)); + const cached = Cache.get(this.name); + if(cached) { + return this.options.Log.fromIpfsHash(this._ipfs, cached) + .then((log) => this.join(log)); + } + + return Promise.resolve([]); } - merge(hash) { - if(!hash || hash === this._lastWrite || !this._log) - return Promise.resolve([]); + join(other) { + // if(!hash || hash === this._lastWrite) + // return Promise.resolve([]); - const oldCount = this._log.items.length; + const oldCount = this.items.length; let newItems = []; - return Log.fromIpfsHash(this._ipfs, hash) - .then((other) => this._log.join(other)) + // return Log.fromIpfsHash(this._ipfs, hash) + // .then((other) => super.join(other)) + // console.log("OTHER", other) + return super.join(other) .then((merged) => newItems = merged) - .then(() => Cache.set(this.dbname, hash)) + .then(() => Log.getIpfsHash(this._ipfs, this)) + .then((hash) => Cache.set(this.name, hash)) .then(() => newItems.forEach((f) => Object.assign(f.payload, { hash: f.hash }))) .then(() => newItems.map((f) => f.payload)) } - delete() { - this._log.clear(); - } } module.exports = OperationsLog; diff --git a/src/stores/DefaultIndex.js b/src/stores/DefaultIndex.js index 9a0f32b..55742f3 100644 --- a/src/stores/DefaultIndex.js +++ b/src/stores/DefaultIndex.js @@ -1,7 +1,8 @@ 'use strict'; class DefaultIndex { - constructor() { + constructor(id) { + this.id = id; this._index = []; } @@ -9,7 +10,7 @@ class DefaultIndex { return this._index; } - updateIndex(oplog) { + updateIndex(oplog, entries) { this._index = oplog.ops } } diff --git a/src/stores/Store.js b/src/stores/Store.js index ce8df12..434cb48 100644 --- a/src/stores/Store.js +++ b/src/stores/Store.js @@ -5,19 +5,25 @@ const OperationsLog = require('../oplog/OperationsLog'); const DefaultIndex = require('./DefaultIndex'); class Store { - constructor(ipfs, dbname, options) { + constructor(ipfs, id, dbname, options) { + this.id = id; this.dbname = dbname; this.events = new EventEmitter(); - this.options = options || {}; - this._index = new DefaultIndex(); + + if(!options) options = {}; + if(!options.Index) Object.assign(options, { Index: DefaultIndex }); + if(!options.Log) Object.assign(options, { Log: OperationsLog }); + + this.options = options; + this._index = new this.options.Index(this.id); this._oplog = null; this._ipfs = ipfs; } - use(id) { + use() { this.events.emit('load', this.dbname); - this._oplog = new OperationsLog(this._ipfs, this.dbname, this.options); - return this._oplog.load(id) + this._oplog = new this.options.Log(this._ipfs, this.id, this.dbname, this.options); + return this._oplog.load() .then((merged) => this._index.updateIndex(this._oplog, merged)) .then(() => this.events.emit('readable', this.dbname)) .then(() => this.events); @@ -33,7 +39,8 @@ class Store { let newItems; this.events.emit('load', this.dbname); - return this._oplog.merge(hash) + return this.options.Log.fromIpfsHash(this._ipfs, hash) + .then((log) => this._oplog.join(log)) .then((merged) => newItems = merged) .then(() => this._index.updateIndex(this._oplog, newItems)) .then(() => { @@ -44,18 +51,22 @@ class Store { } delete() { + this._index = new this.options.Index(this.id); if(this._oplog) - this._oplog.delete(); + this._oplog.clear(); } - _addOperation(type, key, data) { - let result; + // _addOperation(type, key, data) { + _addOperation(data) { + let result, logHash; if(this._oplog) { - return this._oplog.addOperation(type, key, data) + return this._oplog.add(data) .then((op) => result = op) - .then(() => this._index.updateIndex(this._oplog, [result.operation])) - .then(() => this.events.emit('data', this.dbname, result.log)) - .then(() => result.operation.hash); + .then(() => this.options.Log.getIpfsHash(this._ipfs, this._oplog)) + .then((hash) => logHash = hash) + .then(() => this._index.updateIndex(this._oplog, [result])) + .then(() => this.events.emit('data', this.dbname, logHash)) + .then(() => result.hash); } } } diff --git a/src/stores/counters/CounterIndex.js b/src/stores/counters/CounterIndex.js index ba5959d..259d555 100644 --- a/src/stores/counters/CounterIndex.js +++ b/src/stores/counters/CounterIndex.js @@ -3,11 +3,7 @@ const Counter = require('../../crdts/GCounter'); class CounterIndex { - constructor() { - this._counter = null; - } - - createCounter(id) { + constructor(id) { this._counter = new Counter(id); } @@ -15,9 +11,13 @@ class CounterIndex { return this._counter; } - updateIndex(oplog, updated) { + updateIndex(oplog, added) { + // console.log("ADDED", added) if(this._counter) { - updated.filter((f) => f && f.op === 'COUNTER') + // added.filter((f) => f && f.payload.op === 'COUNTER') + // .map((f) => Counter.from(f.payload.value)) + // .forEach((f) => this._counter.merge(f)) + added.filter((f) => f && f.op === 'COUNTER') .map((f) => Counter.from(f.value)) .forEach((f) => this._counter.merge(f)) } diff --git a/src/stores/counters/CounterStore.js b/src/stores/counters/CounterStore.js index c0c0291..539032f 100644 --- a/src/stores/counters/CounterStore.js +++ b/src/stores/counters/CounterStore.js @@ -1,22 +1,14 @@ 'use strict'; +const Log = require('ipfs-log') const Store = require('../Store'); const CounterIndex = require('./CounterIndex'); class CounterStore extends Store { - constructor(ipfs, dbname, options) { - super(ipfs, dbname, options) - this._index = new CounterIndex(); - } - - use(id) { - this._index.createCounter(id); - return super.use(id); - } - - delete() { - super.delete(); - this._index = new CounterIndex(); + constructor(ipfs, id, dbname, options) { + // Object.assign(options, { Index: CounterIndex, Log: Log }); + Object.assign(options || {}, { Index: CounterIndex }); + super(ipfs, id, dbname, options) } value() { @@ -27,7 +19,15 @@ class CounterStore extends Store { const counter = this._index.get(); if(counter) { counter.increment(amount); - return this._addOperation('COUNTER', null, counter.payload); + const operation = { + op: 'COUNTER', + key: null, + value: counter.payload, + meta: { + ts: new Date().getTime() + } + }; + return this._addOperation(operation); } } } diff --git a/src/stores/eventlog/EventStore.js b/src/stores/eventlog/EventStore.js index 8a4192b..9f9806d 100644 --- a/src/stores/eventlog/EventStore.js +++ b/src/stores/eventlog/EventStore.js @@ -1,26 +1,49 @@ 'use strict'; -const Lazy = require('lazy.js'); -const Store = require('../Store'); -const EventLogIndex = require('./EventIndex'); +const Lazy = require('lazy.js'); +const Store = require('../Store'); +const EventIndex = require('./EventIndex'); class EventStore extends Store { - constructor(ipfs, dbname, options) { - super(ipfs, dbname, options) - this._index = new EventLogIndex(); + constructor(ipfs, id, dbname, options) { + Object.assign(options || {}, { Index: EventIndex }); + super(ipfs, id, dbname, options) } - delete(dbname) { - super.delete(); - this._index = new EventLogIndex(); - } + // constructor(ipfs, id, dbname, options) { + // super(ipfs, dbname, options) + // this._index = new EventLogIndex(); + // } + + // delete(dbname) { + // super.delete(); + // this._index = new EventLogIndex(); + // } add(data) { - return this._addOperation('ADD', null, data); + const operation = { + op: 'ADD', + key: null, + value: data, + meta: { + ts: new Date().getTime() + } + }; + return this._addOperation(operation); + // return this._addOperation('ADD', null, data); } remove(hash) { - return this._addOperation('DEL', null, hash); + const operation = { + op: 'DEL', + key: null, + value: hash, + meta: { + ts: new Date().getTime() + } + }; + return this._addOperation(operation); + // return this._addOperation('DEL', null, hash); } iterator(options) { diff --git a/src/stores/kvstore/KeyValueStore.js b/src/stores/kvstore/KeyValueStore.js index 17091b6..328feae 100644 --- a/src/stores/kvstore/KeyValueStore.js +++ b/src/stores/kvstore/KeyValueStore.js @@ -1,18 +1,22 @@ 'use strict'; -const Store = require('../Store'); -const KVIndex = require('./KeyValueIndex'); +const Store = require('../Store'); +const KeyValueIndex = require('./KeyValueIndex'); class KeyValueStore extends Store { - constructor(ipfs, dbname, options) { - super(ipfs, dbname, options) - this._index = new KVIndex(); + constructor(ipfs, id, dbname, options) { + Object.assign(options || {}, { Index: KeyValueIndex }); + super(ipfs, id, dbname, options) } + // constructor(ipfs, dbname, options) { + // super(ipfs, dbname, options) + // this._index = new KVIndex(); + // } - delete() { - super.delete(); - this._index = new KVIndex(); - } + // delete() { + // super.delete(); + // this._index = new KVIndex(); + // } get(key) { return this._index.get(key); @@ -23,11 +27,29 @@ class KeyValueStore extends Store { } put(key, data) { - return this._addOperation('PUT', key, data); + const operation = { + op: 'PUT', + key: key, + value: data, + meta: { + ts: new Date().getTime() + } + }; + return this._addOperation(operation); + // return this._addOperation('PUT', key, data); } del(key) { - return this._addOperation('DEL', key); + const operation = { + op: 'DEL', + key: key, + value: null, + meta: { + ts: new Date().getTime() + } + }; + return this._addOperation(operation); + // return this._addOperation('DEL', key); } } diff --git a/test/counterdb.test.js b/test/counterdb.test.js index 24354d1..d794ef4 100644 --- a/test/counterdb.test.js +++ b/test/counterdb.test.js @@ -8,7 +8,7 @@ const OrbitDB = require('../src/OrbitDB'); const OrbitServer = require('orbit-server/src/server'); // Mute logging -require('logplease').setLogLevel('ERROR'); +// require('logplease').setLogLevel('ERROR'); const network = 'QmYPobvobKsyoCKTw476yTui611XABf927KxUPCf4gRLRr'; // network.json const username = 'testrunner';