Fix update chain

This commit is contained in:
haad 2016-03-06 12:06:25 +01:00
parent eeef376d80
commit a0c863ddc1
3 changed files with 14 additions and 15 deletions

View File

@ -22,7 +22,8 @@ class OrbitClient {
if(subscribe === undefined) subscribe = true; if(subscribe === undefined) subscribe = true;
this.db.use(channel, this.user, password); this.db.use(channel, this.user, password);
this.db.events.on('write', this._onUpdated.bind(this)); this.db.events.on('write', this._onWrite.bind(this));
this.db.events.on('sync', this._onSync.bind(this));
if(subscribe) if(subscribe)
this._pubsub.subscribe(channel, password, async((channel, message) => this.db.sync(channel, message))); this._pubsub.subscribe(channel, password, async((channel, message) => this.db.sync(channel, message)));
@ -48,11 +49,15 @@ class OrbitClient {
this.network = null; this.network = null;
} }
_onUpdated(channel, hash) { _onWrite(channel, hash) {
this._pubsub.publish(channel, hash) this._pubsub.publish(channel, hash)
this.events.emit('data', channel, hash); this.events.emit('data', channel, hash);
} }
_onSync(channel, hash) {
this.events.emit('data', channel, hash);
}
_iterator(channel, password, options) { _iterator(channel, password, options) {
const messages = this.db.query(channel, password, options); const messages = this.db.query(channel, password, options);
let currentIndex = 0; let currentIndex = 0;

View File

@ -23,10 +23,11 @@ class OrbitDB {
} }
sync(channel, hash) { sync(channel, hash) {
// console.log("--> Head:", hash) // console.log("--> Head:", hash, this._logs[channel] !== undefined)
if(hash && this._logs[channel]) { if(hash && this._logs[channel]) {
const other = OrbitList.fromIpfsHash(this._ipfs, hash); const other = await(OrbitList.fromIpfsHash(this._ipfs, hash));
this._logs[channel].join(other); this._logs[channel].join(other);
this.events.emit('sync', channel, hash);
} }
} }

View File

@ -18,10 +18,7 @@ class Pubsub {
this._socket.on('disconnect', (socket) => console.log(`Disconnected from http://${host}:${port}`)); this._socket.on('disconnect', (socket) => console.log(`Disconnected from http://${host}:${port}`));
this._socket.on('error', (e) => console.log('Pubsub socket error:', e)); this._socket.on('error', (e) => console.log('Pubsub socket error:', e));
this._socket.on('message', this._handleMessage.bind(this)); this._socket.on('message', this._handleMessage.bind(this));
this._socket.on('latest', (hash, message) => { this._socket.on('latest', this._handleMessage.bind(this));
console.log(">", hash, message);
this._handleMessage(hash, message);
});
}); });
} }
@ -32,7 +29,7 @@ class Pubsub {
subscribe(hash, password, callback) { subscribe(hash, password, callback) {
if(!this._subscriptions[hash]) { if(!this._subscriptions[hash]) {
this._subscriptions[hash] = { head: null, callback: callback }; this._subscriptions[hash] = { callback: callback };
this._socket.emit('subscribe', { channel: hash }); this._socket.emit('subscribe', { channel: hash });
} }
} }
@ -47,12 +44,8 @@ class Pubsub {
} }
_handleMessage(hash, message) { _handleMessage(hash, message) {
if(this._subscriptions[hash]) { if(this._subscriptions[hash] && this._subscriptions[hash].callback)
this._subscriptions[hash].head = message; this._subscriptions[hash].callback(hash, message);
if(this._subscriptions[hash].callback)
this._subscriptions[hash].callback(hash, message);
}
} }
} }