Refactor subscriptions. Fix join.

This commit is contained in:
haad 2016-03-07 16:48:49 +01:00
parent cf2fefd553
commit 514f914b0f
6 changed files with 58 additions and 15 deletions

View File

@ -22,8 +22,8 @@ class OrbitClient {
if(subscribe === undefined) subscribe = true;
this.db.use(channel, this.user, password);
this.db.events.on('write', this._onWrite.bind(this));
this.db.events.on('sync', this._onSync.bind(this));
this.db.events[channel].on('write', this._onWrite.bind(this));
this.db.events[channel].on('sync', this._onSync.bind(this));
if(subscribe)
this._pubsub.subscribe(channel, password, async((channel, message) => this.db.sync(channel, message)));
@ -38,6 +38,8 @@ class OrbitClient {
const items = this._iterator(channel, password, { key: key }).collect();
return items[0] ? items[0].value : null;
},
subscribe: () => {
},
leave: () => this._pubsub.unsubscribe(channel)
}
}
@ -50,8 +52,8 @@ class OrbitClient {
}
_onWrite(channel, hash) {
this._pubsub.publish(channel, hash)
// this.events.emit('data', channel, hash);
this._pubsub.publish(channel, hash);
this.events.emit('data', channel, hash);
}
_onSync(channel, hash) {

View File

@ -13,21 +13,27 @@ class OrbitDB {
constructor(ipfs) {
this._ipfs = ipfs;
this._logs = {};
this.events = new EventEmitter();
this.events = {};
}
/* Public methods */
use(channel, user, password) {
this.user = user;
this._logs[channel] = new OrbitList(this._ipfs, this.user.username);
this.events[channel] = new EventEmitter();
}
sync(channel, hash) {
// console.log("--> Head:", hash, this._logs[channel] !== undefined)
// console.log("--> Head:", hash)
if(hash && this._logs[channel]) {
const oldCount = this._logs[channel].items.length;
const other = await(OrbitList.fromIpfsHash(this._ipfs, hash));
await(this._logs[channel].join(other));
this.events.emit('sync', channel, 'empty');
// Only emit the event if something was added
const joinedCount = (this._logs[channel].items.length - oldCount);
if(joinedCount > 0)
this.events[channel].emit('sync', channel, 'empty');
}
}
@ -110,7 +116,7 @@ class OrbitDB {
// Write an op to the db
_write(channel, password, operation, key, value, data) {
const hash = await(Operation.create(this._ipfs, this._logs[channel], this.user, operation, key, value));
this.events.emit('write', channel, hash);
this.events[channel].emit('write', channel, hash);
return key;
}
}

View File

@ -18,7 +18,7 @@ class Pubsub {
this._socket.on('disconnect', (socket) => console.log(`Disconnected from http://${host}:${port}`));
this._socket.on('error', (e) => console.log('Pubsub socket error:', e));
this._socket.on('message', this._handleMessage.bind(this));
this._socket.on('latest', this._handleMessage.bind(this));
this._socket.on('subscribed', this._handleMessage.bind(this));
});
}
@ -30,7 +30,7 @@ class Pubsub {
subscribe(hash, password, callback) {
if(!this._subscriptions[hash]) {
this._subscriptions[hash] = { callback: callback };
this._socket.emit('subscribe', { channel: hash });
this._socket.emit('subscribe', { channel: hash }); // calls back with 'latest' message
}
}

View File

@ -1,5 +1,6 @@
'use strict';
const _ = require('lodash');
const Lazy = require('lazy.js');
const Node = require('./Node');
@ -23,9 +24,14 @@ class List {
join(other) {
this.seq = (other.seq && other.seq > this.seq ? other.seq : this.seq) + 1;
this.ver = 0;
const current = Lazy(this._currentBatch).difference(this._items);
const others = Lazy(other.items).difference(this._items);
const final = current.union(others);
// TODO: figure out how to do real equality check with Lazy.js
// const current = Lazy(this._currentBatch).difference(this._items);
// const others = Lazy(other.items).difference(current.toA);
// const final = current.union(others).uniq((f) => f.compactId);
const current = _.differenceWith(this._currentBatch, this._items, Node.equals);
const others = _.differenceWith(other.items, this._items, Node.equals);
const final = _.unionWith(current, others, Node.equals);
this._items = Lazy(this._items).concat(final).toArray();
this._currentBatch = [];
}

View File

@ -30,7 +30,7 @@ class OrbitList extends List {
join(other) {
super.join(other);
this._fetchHistory(other.items);
await(this._fetchHistory(other.items));
}
/* Private methods */
@ -44,7 +44,8 @@ class OrbitList extends List {
.map((f) => this._insert(f)) // Insert to the list
.take(MaxHistory) // How many items from the history we should fetch
.toArray();
// console.log("--> Fetched", res.length, "items from the history\n");
// console.log("--> Fetched", res.length, "items from the history");
return res;
}
// Fetch items in the linked list recursively

View File

@ -286,6 +286,34 @@ describe('OrbitList', async(function() {
done();
}));
it('joins unique items', async((done) => {
const list1 = new List(ipfs, 'A');
const list2 = new List(ipfs, 'B');
list1.add("helloA1")
list1.add("helloA2")
list2.add("helloB1")
list2.add("helloB2")
list1.join(list2);
list1.add("helloA3")
list1.join(list2);
list1.join(list2);
list1.add("helloA4")
list1.add("helloA5")
const lastItem = list1.items[list1.items.length - 1];
assert.equal(list1.items.length, 7);
assert.equal(lastItem.next.length, 2);
assert.equal(lastItem.next[0].compactId, 'A.3.0');
assert.equal(lastItem.next[0].hash, 'QmRnSuNkgqVFMDwdCNMQ83CR3SAWEkAms3zJyP6Pw9bkx4');
assert.equal(lastItem.next[1].compactId, 'B.0.1');
assert.equal(lastItem.next[1].hash, 'QmVmkwMoz4vnvHQwvFwqaoWCrjonsPpyJ6i436Zajht5ao');
done();
}));
it('finds the next head when adding a new element', async((done) => {
const list1 = new List(ipfs, 'A');
list1.add("helloA1")