WIP commit

This commit is contained in:
haad 2016-04-26 12:45:29 +02:00
parent 11fd73ab18
commit abe957890d
13 changed files with 443 additions and 357 deletions

84
examples/benchmark-kv.js Normal file
View File

@ -0,0 +1,84 @@
'use strict';
const await = require('asyncawait/await');
const async = require('asyncawait/async');
const ipfsd = require('ipfsd-ctl');
const OrbitDB = require('../src/Client');
const Timer = require('./Timer');
// usage: benchmark.js <host> <username> <channel>;
// orbit-server
const host = process.argv[2] ? process.argv[2] : 'localhost'
const port = 3333;
const username = process.argv[3] ? process.argv[3] : 'testrunner';
const password = '';
const channelName = process.argv[4] ? process.argv[4] : 'c1';
const startIpfs = () => {
return new Promise((resolve, reject) => {
ipfsd.disposableApi((err, ipfs) => {
if(err) console.error(err);
resolve(ipfs);
});
});
};
let run = (async(() => {
try {
// Connect
const ipfs = await(startIpfs());
const orbit = await(OrbitDB.connect(host, port, username, password, ipfs));
const db = await(orbit.kvstore(channelName));
// Metrics
let totalQueries = 0;
let seconds = 0;
let queriesPerSecond = 0;
let lastTenSeconds = 0;
// Metrics output
setInterval(() => {
seconds ++;
if(seconds % 10 === 0) {
console.log(`--> Average of ${lastTenSeconds/10} q/s in the last 10 seconds`)
if(lastTenSeconds === 0)
throw new Error("Problems!");
lastTenSeconds = 0
}
console.log(`${queriesPerSecond} queries per second, ${totalQueries} queries in ${seconds} seconds`)
queriesPerSecond = 0;
}, 1000);
const query = async(() => {
// let timer = new Timer();
// timer.start();
try {
await(db.put("keyA", username + totalQueries));
// console.log(`${timer.stop(true)} ms`);
totalQueries ++;
lastTenSeconds ++;
queriesPerSecond ++;
} catch(e) {
console.log(e);
}
process.nextTick(query);
});
query();
} catch(e) {
console.error("error:", e);
console.error(e.stack);
process.exit(1);
}
}))();
module.exports = run;

View File

@ -3,7 +3,6 @@
const EventEmitter = require('events').EventEmitter;
const logger = require('logplease').create("orbit-db.Client");
const PubSub = require('./PubSub');
const OrbitDB = require('./OrbitDB');
const CounterDB = require('./db/CounterDB');
const KeyValueDB = require('./db/KeyValueDB');
const EventLogDB = require('./db/EventLogDB');
@ -15,10 +14,22 @@ class Client {
this.user = null;
this.network = null;
this.events = new EventEmitter();
this.options = options || {};
this.eventlogDB = new EventLogDB(this._ipfs, this.options);
this.counterDB = new CounterDB(this._ipfs, this.options);
this.keyvalueDB = new KeyValueDB(this._ipfs, this.options);
this.eventlogDB = new EventLogDB(this._ipfs, options);
this.counterDB = new CounterDB(this._ipfs, options);
this.keyvalueDB = new KeyValueDB(this._ipfs, options);
}
eventlog(dbname, subscribe) {
const db = this.eventlogDB;
const api = {
iterator: (options) => db.iterator(dbname, options),
add: (data) => db.add(dbname, data),
del: (hash) => db.remove(dbname, hash),
delete: () => db.delete(dbname),
close: () => this._pubsub.unsubscribe(dbname)
}
return this._subscribe(db, dbname, subscribe).then(() => api);
}
kvstore(dbname, subscribe) {
@ -48,19 +59,6 @@ class Client {
return this._subscribe(db, dbname, subscribe).then(() => api);
}
eventlog(dbname, subscribe) {
const db = this.eventlogDB;
const api = {
iterator: (options) => db.iterator(dbname, options),
add: (data) => db.add(dbname, data),
del: (hash) => db.remove(dbname, hash),
delete: () => db.delete(dbname),
close: () => this._pubsub.unsubscribe(dbname)
}
return this._subscribe(db, dbname, subscribe).then(() => api);
}
disconnect() {
this._pubsub.disconnect();
this._store = {};
@ -71,7 +69,7 @@ class Client {
_subscribe(db, dbname, subscribe, callback) {
if(subscribe === undefined) subscribe = true;
return db.use(dbname, this.user).then(() => {
return db.use(dbname, this.user.username).then(() => {
db.events[dbname].on('write', this._onWrite.bind(this));
db.events[dbname].on('sync', this._onSync.bind(this));
db.events[dbname].on('load', this._onLoad.bind(this));
@ -85,17 +83,22 @@ class Client {
}
_onMessage(channel, message) {
[this.eventlogDB, this.counterDB, this.keyvalueDB].forEach((db) => db.sync(channel, message))
// this.db.sync(channel, message);
// this.counterDB.sync(channel, message);
console.log("<--", channel, message)
this.eventlogDB.sync(channel, message);
this.counterDB.sync(channel, message).catch((e) => {
logger.error(e.stack);
})
this.keyvalueDB.sync(channel, message);
}
_onWrite(channel, hash) {
console.log("-->", channel, hash)
this._pubsub.publish(channel, hash);
this.events.emit('data', channel, hash);
}
_onSync(channel, hash) {
console.log("synced", channel, hash)
this.events.emit('data', channel, hash);
}

View File

@ -5,38 +5,68 @@ const OrbitDB = require('./OrbitDB');
const OpTypes = require('./Operation').Types;
const Counter = require('./GCounter');
class CounterIndex {
constructor() {
this._index = {};
}
createCounter(key, id) {
this._index[key] = new Counter(id);
}
get(key) {
return this._index[key];
}
updateIndex(oplog) {
console.log("UPDATE IDNEX!", JSON.stringify(oplog.ops, null, 2));
const counter = this._index[oplog.dbname];
if(counter) {
Lazy(oplog.ops)
.map((f) => Counter.from(f.value))
.each((f) => counter.merge(f))
this._index[oplog.dbname] = counter;
}
}
}
class CounterDB extends OrbitDB {
constructor(ipfs, options) {
super(ipfs, options)
this._counters = {};
// this._counters = {};
this._index = new CounterIndex();
}
use(dbname, user) {
this._counters[dbname] = new Counter(user.username);
return super.use(dbname, user);
use(dbname, id) {
// this._counters[dbname] = new Counter(id);
this._index.createCounter(dbname, id);
return super.use(dbname, id);
}
sync(dbname, hash) {
const counter = this._counters[dbname];
if(counter) {
return super.sync(dbname, hash).then((oplog) => {
return Lazy(oplog.ops)
.map((f) => Counter.from(f.value))
.map((f) => counter.merge(f))
.toArray();
});
}
}
// sync(dbname, hash) {
// const counter = this._counters[dbname];
// if(counter) {
// return super.sync(dbname, hash).then((oplog) => {
// console.log("OPFS", oplog)
// return Lazy(oplog.ops)
// .map((f) => Counter.from(f.value))
// .map((f) => counter.merge(f))
// .toArray();
// });
// }
// }
query(dbname) {
return this._counters[dbname].value;
// return this._counters[dbname].value;
return this._index.get(dbname).value;
}
inc(dbname, amount) {
const counter = this._counters[dbname];
const counter = this._index.get(dbname);
if(counter) {
counter.increment(amount);
return this._write(dbname, '', OpTypes.Inc, null, counter.payload);
return this._addOperation(dbname, OpTypes.Inc, null, counter.payload);
}
}
}

View File

@ -1,61 +1,31 @@
'use strict';
const Lazy = require('lazy.js');
const OrbitDB = require('./OrbitDB');
const OpTypes = require('./Operation').Types;
const GSet = require('./GSet');
const Lazy = require('lazy.js');
const OrbitDB = require('./OrbitDB');
const OpTypes = require('./Operation').Types;
const EventLogIndex = require('./EventLogIndex');
class EventLogDB extends OrbitDB {
constructor(ipfs, options) {
super(ipfs, options)
this._set = null;
// this._counters = {};
this._index = new EventLogIndex();
}
use(name, user) {
this._set = new GSet(user.username);
return super.use(name, user);
}
sync(dbname, hash) {
return super.sync(dbname, hash).then((oplog) => {
return Lazy(oplog.ops)
.map((f) => GSet.from(f.value))
.map((f) => this._set.merge(f))
.toArray();
});
delete(dbname) {
super.delete(dbname);
this._index = new EventLogIndex();
}
add(dbname, data) {
const oplog = this._oplogs[dbname];
if(oplog) {
return oplog.addOperation(dbname, OpTypes.Add, null, data).then((result) => {
this.events[dbname].emit('write', dbname, result.hash);
this._set.add(result.op.hash);
// console.log("OP", result)
return result.op.hash;
});
}
return;
return this._addOperation(dbname, OpTypes.Add, null, data);
}
remove(dbname, hash) {
const oplog = this._oplogs[dbname];
if(oplog) {
return oplog.addOperation(dbname, OpTypes.Delete, hash).then((result) => {
this.events[dbname].emit('write', dbname, result.hash);
this._set.remove(hash);
// console.log("OP", result)
return result.op.hash;
});
}
return;
return this._addOperation(dbname, OpTypes.Delete, hash);
}
iterator(dbname, options) {
const messages = this.query(dbname, options);
const messages = this._query(dbname, options);
let currentIndex = 0;
let iterator = {
[Symbol.iterator]() {
@ -75,34 +45,28 @@ class EventLogDB extends OrbitDB {
return iterator;
}
query(dbname, opts) {
_query(dbname, opts) {
if(!opts) opts = {};
const oplog = this._oplogs[dbname];
const amount = opts.limit ? (opts.limit > -1 ? opts.limit : oplog.ops.length) : 1; // Return 1 if no limit is provided
const amount = opts.limit ? (opts.limit > -1 ? opts.limit : this._index.get().length) : 1; // Return 1 if no limit is provided
let result = [];
if(opts.gt || opts.gte) {
// Greater than case
console.log("2")
result = this._read(this._set.value, opts.gt ? opts.gt : opts.gte, amount, opts.gte ? opts.gte : false)
result = this._read(this._index.get().reverse(), opts.gt ? opts.gt : opts.gte, amount, opts.gte ? opts.gte : false)
} else {
// Lower than and lastN case, search latest first by reversing the sequence
result = this._read(this._set.value.reverse(), opts.lt ? opts.lt : opts.lte, amount, opts.lte || !opts.lt).reverse()
result = this._read(this._index.get(), opts.lt ? opts.lt : opts.lte, amount, opts.lte || !opts.lt).reverse()
}
if(opts.reverse) result.reverse();
let res = result.toArray();
// const removed = this._itemsR.find((e) => e === item);
res = oplog.ops.filter((f) => res.find((e) => e === f.hash))
// console.log("RSULT", res)
return res;
return result.toArray();
}
_read(ops, key, amount, inclusive) {
// console.log("KET", key, amount, inclusive)
return Lazy(ops)
.skipWhile((f) => key && f !== key) // Drop elements until we have the first one requested
.skipWhile((f) => key && f.key !== key) // Drop elements until we have the first one requested
.drop(inclusive ? 0 : 1) // Drop the 'gt/lt' item, include 'gte/lte' item
.take(amount);
}

36
src/db/EventLogIndex.js Normal file
View File

@ -0,0 +1,36 @@
'use strict';
const Lazy = require('lazy.js');
const OpTypes = require('./Operation').Types;
class EventLogIndex {
constructor() {
this._index = [];
}
get() {
return this._index;
}
updateIndex(oplog) {
let handled = [];
const _createLWWSet = (item) => {
if(Lazy(handled).indexOf(item.key) === -1) {
handled.push(item.key);
if(OpTypes.isInsert(item.op))
return item;
}
return null;
};
const items = Lazy(oplog.ops.reverse())
.map(_createLWWSet) // Return items as LWW (ignore values after the first found)
.compact() // Remove nulls
// .take(oplog.ops.length)
.toArray();
this._index = items;
}
}
module.exports = EventLogIndex;

View File

@ -32,6 +32,7 @@ class GCounter {
}
merge(other) {
console.log("MERGE", other, this)
Object.keys(other._counters).forEach((f) => {
this._counters[f] = Math.max(this._counters[f] ? this._counters[f] : 0, other._counters[f]);
});

View File

@ -18,20 +18,16 @@ class GSet {
}
get value() {
// console.log("AAA", this._added, this._removed)
return Object.keys(this._added).map((f) => {
const removed = this._removed[f];
// console.log("--", removed, this._added[f]);
if(!removed || (removed && removed.ts < this._added[f].ts)) {
return f;
}
return Object.keys(this._added)
.map((f) => {
const removed = this._removed[f];
if(!removed || (removed && removed.ts < this._added[f].ts)) {
return f;
}
return null;
}).filter((f) => f !== null)
.map((f) => {
console.log("f", f)
return f;
});
return null;
})
.filter((f) => f !== null)
}
compare(other) {

35
src/db/KVIndex.js Normal file
View File

@ -0,0 +1,35 @@
'use strict';
const Lazy = require('lazy.js');
const OpTypes = require('./Operation').Types;
class KVIndex {
constructor() {
this._index = {};
}
get(key) {
return this._index[key];
}
updateIndex(oplog) {
let handled = [];
const _createLWWSet = (item) => {
if(Lazy(handled).indexOf(item.key) === -1) {
handled.push(item.key);
if(OpTypes.isInsert(item.op))
return item;
}
return null;
};
this._index = {};
Lazy(oplog.ops.reverse())
.map(_createLWWSet)
.compact()
// .take(oplog.ops.length)
.each((f) => this._index[f.key] = f.value);
}
}
module.exports = KVIndex;

View File

@ -1,98 +1,34 @@
'use strict';
const Lazy = require('lazy.js');
const OrbitDB = require('./OrbitDB');
const OpTypes = require('./Operation').Types;
const GSet = require('./GSet');
const KVIndex = require('./KVIndex');
class KeyValueDB extends OrbitDB {
constructor(ipfs, options) {
super(ipfs, options)
// this._set = null;
this._index = new KVIndex();
}
use(name, user) {
// this._set = new GSet(user.username);
return super.use(name, user);
delete(dbname) {
super.delete(dbname);
this._index = new KVIndex();
}
sync(dbname, hash) {
return super.sync(dbname, hash).then((oplog) => {
return Lazy(oplog.ops)
// .map((f) => GSet.from(f.value))
// .map((f) => this._set.merge(f))
.toArray();
});
}
put(dbname, key, data) {
// set.add(data);
const oplog = this._oplogs[dbname];
if(oplog) {
return oplog.addOperation(dbname, OpTypes.Put, key, data).then((result) => {
this.events[dbname].emit('write', dbname, result.hash);
// console.log("OP", result);
// this._set.add(result.op.hash, result.op.meta.ts);
return result.op.hash;
});
}
// return this._write(dbname, '', OpTypes.Put, key, data).then((op) => {
// console.log("OP", op);
// // this._set.add(op);
// })
get(dbname, key) {
return this._index.get(key);
}
set(dbname, key, data) {
this.put(dbname, key, data);
}
put(dbname, key, data) {
return this._addOperation(dbname, OpTypes.Put, key, data);
}
del(dbname, key) {
const oplog = this._oplogs[dbname];
if(oplog) {
return oplog.addOperation(dbname, OpTypes.Delete, key).then((result) => {
// console.log("OP", op);
return result.op.hash;
});
}
}
get(dbname, key) {
if(!key)
return;
const oplog = this._oplogs[dbname];
// console.log("INIT", JSON.stringify(this._set.value, null, 2), oplog.ops)
const items = oplog.ops.filter((f) => f.key === key)
console.log("ITEM", items, key)
let result = this._read(oplog.ops.reverse(), key, 1, true).toArray()[0];
// result = this._read(operations.reverse(), opts.key, 1, true).map((f) => f.value);
// let result = this._read(this._set.value, key).toArray()[0];
// let result = this._read(this._set.value, key).toArray()[0];
console.log("RSULT", result)
// result = oplog.ops.find((e) => e.hash === result).value;
return result ? result.value : null;
}
_read(ops, key) {
let handled = [];
const _createLWWSet = (item) => {
if(Lazy(handled).indexOf(item.key) === -1) {
handled.push(item.key);
if(OpTypes.isInsert(item.op))
return item;
}
return null;
};
// Find the items from the sequence (list of operations)
return Lazy(ops)
.skipWhile((f) => key && f.key !== key) // Drop elements until we have the first one requested
.map(_createLWWSet) // Return items as LWW (ignore values after the first found)
.compact() // Remove nulls
.take(1);
// return Lazy(ops)
// .skipWhile((f) => key && f !== key) // Drop elements until we have the first one requested
// .take(1);
return this._addOperation(dbname, OpTypes.Delete, key);
}
}

View File

@ -5,12 +5,8 @@ const Log = require('ipfs-log');
const Cache = require('../Cache');
const DBOperation = require('./Operation');
/*
Load, cache and index operations log
*/
class OperationsLog {
constructor(ipfs, dbname, opts) {
constructor(ipfs, dbname, events, opts) {
this.dbname = dbname;
this.options = opts || { cacheFile: null };
this.id = null;
@ -18,15 +14,19 @@ class OperationsLog {
this._ipfs = ipfs;
this._log = null;
this._cached = {};
this.events = events;
}
get ops() {
return Lazy(this._log.items).map((f) => this._cached[f.payload]).toArray();
return Lazy(this._log.items)
.map((f) => this._cached[f.payload])
.toArray();
}
create(user) {
this.id = user.username;
return Log.create(this._ipfs, this.id)
create(id) {
this.events.emit('load', this);
this.id = id;
return Log.create(this._ipfs, id)
.then((log) => this._log = log)
.then(() => {
if(this.options.cacheFile)
@ -35,11 +35,14 @@ class OperationsLog {
return;
})
.then(() => {
if(this.options.cacheFile)
if(this.options.cacheFile) {
console.log("from cache", this.dbname)
return this.sync(Cache.get(this.dbname))
}
return;
});
})
.then(() => this)
}
delete() {
@ -47,52 +50,54 @@ class OperationsLog {
}
sync(hash) {
// console.log("--> Head2:", hash, this.lastWrite)
console.log("0", hash, this.lastWrite)
if(!hash || hash === this.lastWrite || !this._log)
return Promise.resolve();
this.events.emit('load', this.dbname);
console.log("1")
const oldCount = this._log.items.length;
return Log.fromIpfsHash(this._ipfs, hash)
.then((other) => this._log.join(other))
.then((merged) => {
console.log("2")
if(this._log.items.length - oldCount === 0)
return;
return this._cacheInMemory(this._log);
})
.then(() => Cache.set(this.id, hash));
.then(() => {
console.log("3")
Cache.set(this.dbname, hash)
this.events.emit('sync', this.dbname, hash)
return this;
})
// .then(() => this.events.emit('sync', this.dbname, hash))
// .then(() => this)
}
addOperation(dbname, operation, key, value) {
addOperation(operation, key, value) {
let post;
return DBOperation.create(this._ipfs, this._log, this.user, operation, key, value)
// .then((op) => {
// post = op.Post;
// return log.add(op.Hash);
// })
// .then((node) => resolve({ node: node, op: post }))
.then((result) => {
// console.log("res1", result)
return this._log.add(result.Hash).then((node) => {
return { node: node, op: result.Post };
});
})
.then((result) => {
// console.log("res2", result)
this._cachePayload(result.node.payload, result.op);
return result;
}).then((result) => {
return Log.getIpfsHash(this._ipfs, this._log)
.then((listHash) => {
this.lastWrite = listHash;
Cache.set(this.dbname, listHash);
// this.events[dbname].emit('write', this.dbname, listHash);
return { hash: listHash, op: result.op };
});
}).then((result) => {
return result;
});
})
.then((result) => {
return Log.getIpfsHash(this._ipfs, this._log).then((hash) => {
this.lastWrite = hash;
Cache.set(this.dbname, hash);
console.log("----------------- write ------------------", this.id, hash)
this.events.emit('write', this.dbname, hash);
return result.op.hash;
});
})
}
_cacheInMemory(log) {

View File

@ -1,40 +1,44 @@
'use strict';
const Lazy = require('lazy.js');
const EventEmitter = require('events').EventEmitter;
const Log = require('ipfs-log');
const OperationsLog = require('./OperationsLog');
class OrbitDB {
constructor(ipfs, options) {
this._ipfs = ipfs;
this.options = options || {};
this.events = {};
this._index = null;
this._oplogs = {};
this.events = {};
this.options = options || {};
}
use(dbname, user) {
this.user = user;
use(dbname, id) {
this.events[dbname] = new EventEmitter();
this._oplogs[dbname] = new OperationsLog(this._ipfs, dbname);
this.events[dbname].emit('load');
return this._oplogs[dbname].create(user)
const oplog = new OperationsLog(this._ipfs, dbname, this.events[dbname], this.options);
return oplog.create(id)
.then(() => {
if(this._index)
this._index.updateIndex(oplog);
this._oplogs[dbname] = oplog;
return this;
});
}
sync(dbname, hash) {
// console.log("--> Head:", hash)
const oplog = this._oplogs[dbname];
if(oplog) {
this.events[dbname].emit('load');
if(hash && oplog) {
console.log("sync", dbname, hash, oplog.id)
return oplog.sync(hash)
.then(() => this.events[dbname].emit('sync'))
.then(() => oplog);
.then((result) => {
console.log("synced", dbname, hash, oplog.id)
console.log("res", result)
if(this._index)
this._index.updateIndex(oplog);
return this;
});
}
return Promise.resolve();
}
query(dbname) {
return Promise.resolve(this);
}
delete(dbname) {
@ -42,27 +46,16 @@ class OrbitDB {
this._oplogs[dbname].delete();
}
_write(dbname, password, operation, key, value) {
_addOperation(dbname, type, key, data) {
const oplog = this._oplogs[dbname];
const log = oplog._log;
return DBOperation.create(this._ipfs, log, this.user, operation, key, value)
.then((result) => {
// console.log("res", result)
return log.add(result.Hash);
})
.then((result) => {
// console.log("res", result)
oplog._cachePayload(result.node.payload, result.op);
return result;
}).then((result) => {
return Log.getIpfsHash(this._ipfs, log)
.then((listHash) => {
oplog.lastWrite = listHash;
Cache.set(dbname, listHash);
this.events[dbname].emit('write', dbname, listHash);
return result;
});
}).then((result) => result.node.payload);
if(oplog) {
return oplog.addOperation(type, key, data)
.then((result) => {
if(this._index)
this._index.updateIndex(oplog);
return result;
});
}
}
}

View File

@ -191,7 +191,6 @@ describe('Orbit Client', function() {
const head = await(db.add('hello1'));
const delop = await(db.del(head));
const items = db.iterator().collect();
console.log(items);
assert.equal(delop.startsWith('Qm'), true);
assert.equal(items.length, 0);
done();

View File

@ -1,115 +1,119 @@
// 'use strict';
'use strict';
// const assert = require('assert');
// const Promise = require('bluebird');
// const rimraf = require('rimraf')
// const ipfsd = require('ipfsd-ctl');
// const OrbitClient = require('../src/Client');
// const OrbitServer = require('orbit-server/src/server');
const assert = require('assert');
const Promise = require('bluebird');
const rimraf = require('rimraf')
const ipfsd = require('ipfsd-ctl');
const OrbitClient = require('../src/Client');
const OrbitServer = require('orbit-server/src/server');
// // Mute logging
// require('logplease').setLogLevel('ERROR');
// Mute logging
require('logplease').setLogLevel('ERROR');
// const username = 'testrunner';
// const username2 = 'rennurtset';
const username = 'testrunner';
const username2 = 'rennurtset';
// const ipfsPath = '/tmp/orbittests';
const ipfsPath = '/tmp/orbittests';
// const startIpfs = () => {
// return new Promise((resolve, reject) => {
// // ipfsd.local(ipfsPath, (err, node) => {
// // if(err) reject(err);
// // node.startDaemon((err, ipfs) => {
// // if(err) reject(err);
// // resolve(ipfs);
// // });
// // });
// OrbitServer.start();
// ipfsd.disposableApi((err, ipfs) => {
// if(err) reject(err);
// resolve(ipfs);
// });
// });
// };
const startIpfs = () => {
return new Promise((resolve, reject) => {
// ipfsd.local(ipfsPath, (err, node) => {
// if(err) reject(err);
// node.startDaemon((err, ipfs) => {
// if(err) reject(err);
// resolve(ipfs);
// });
// });
OrbitServer.start();
ipfsd.disposableApi((err, ipfs) => {
if(err) reject(err);
resolve(ipfs);
});
});
};
// describe('Orbit Client', function() {
// this.timeout(20000);
describe('Orbit Client', function() {
this.timeout(20000);
// let ipfs, client1, client2;
let ipfs, client1, client2;
// before((done) => {
// rimraf.sync('./orbit-db-cache.json')
// startIpfs().then((res) => {
// ipfs = res;
// Promise.map([username, username2], (login) => {
// return OrbitClient.connect('localhost', 3333, login, '', ipfs, { allowOffline: false, cacheFile: './orbit-db-cache.json' });
// }).then((clients) => {
// client1 = clients[0];
// client2 = clients[1];
// done();
// }).catch((e) => {
// console.log(e.stack);
// assert.equal(e, null);
// });
// });
// });
before((done) => {
rimraf.sync('./orbit-db-cache.json')
startIpfs().then((res) => {
ipfs = res;
Promise.map([username, username2], (login) => {
return OrbitClient.connect('localhost', 3333, login, '', ipfs, { allowOffline: false, cacheFile: './orbit-db-cache.json' });
}).then((clients) => {
client1 = clients[0];
client2 = clients[1];
done();
}).catch((e) => {
console.log(e.stack);
assert.equal(e, null);
});
});
});
// after((done) => {
// if(client1) client1.disconnect();
// if(client2) client2.disconnect();
// rimraf('./orbit-db-cache.json', done)
// });
after((done) => {
if(client1) client1.disconnect();
if(client2) client2.disconnect();
rimraf('./orbit-db-cache.json', done)
});
// describe('counters', function() {
// it('increases a counter value', (done) => {
// client1.counter('counter test', false).then((counter) => {
// Promise.map([13, 1], (f) => counter.inc(f), { concurrency: 1 }).then(() => {
// assert.equal(counter.value(), 14);
// done();
// }).catch((e) => {
// console.error(e.stack);
// assert.equal(null, e);
// done();
// });
// }).catch((e) => {
// console.error(e.stack);
// assert.equal(' ', e.message);
// done();
// });
// });
describe('counters', function() {
it('increases a counter value', (done) => {
client1.counter('counter test', false).then((counter) => {
Promise.map([13, 1], (f) => counter.inc(f), { concurrency: 1 }).then(() => {
assert.equal(counter.value(), 14);
done();
}).catch((e) => {
console.error(e.stack);
assert.equal(null, e);
done();
});
}).catch((e) => {
console.error(e.stack);
assert.equal(' ', e.message);
done();
});
});
// it('creates a new counter from cached data', function(done) {
// client1.counter('counter test', false).then((counter) => {
// assert.equal(counter.value(), 14);
// done();
// }).catch((e) => {
// console.error(e.stack);
// assert.equal(' ', e.message);
// done();
// });
// });
it('creates a new counter from cached data', function(done) {
client1.counter('counter test', false).then((counter) => {
console.log("COUNTER", counter)
assert.equal(counter.value(), 14);
done();
}).catch((e) => {
console.error(e.stack);
assert.equal(' ', e.message);
done();
});
});
// it('syncs counters', (done) => {
// const name = new Date().getTime();
// Promise.all([client1.counter(name), client2.counter(name)]).then((counters) => {
// const res1 = Promise.map([13, 10], (f) => counters[1].inc(f), { concurrency: 1 });
// const res2 = Promise.map([2, 5], (f) => counters[0].inc(f), { concurrency: 1 })
// Promise.all([res1, res2]).then((res) => {
// setTimeout(() => {
// assert.equal(counters[0].value(), 30);
// assert.equal(counters[1].value(), 30);
// done();
// }, 1000)
// }).catch((e) => {
// console.log(e);
// assert(e);
// done();
// });
// }).catch((e) => {
// console.log(e);
// assert(e);
// done();
// });
// });
// });
// });
it('syncs counters', (done) => {
const name = new Date().getTime();
Promise.all([client1.counter(name), client2.counter(name)]).then((counters) => {
// Promise.all([client1.counter(name)]).then((counters) => {
const res1 = Promise.map([13, 10], (f) => counters[0].inc(f), { concurrency: 1 });
const res2 = Promise.map([2, 5], (f) => counters[1].inc(f), { concurrency: 1 })
Promise.all([res1, res2]).then((res) => {
setTimeout(() => {
assert.equal(counters[0].value(), 30);
assert.equal(counters[1].value(), 30);
done();
}, 1000)
}).catch((e) => {
console.log(e);
assert(e);
done();
});
}).catch((e) => {
console.log(e);
assert(e);
done();
});
});
});
});