diff --git a/src/OrbitClient.js b/src/OrbitClient.js index 11c7fce..a5d1fc0 100644 --- a/src/OrbitClient.js +++ b/src/OrbitClient.js @@ -22,7 +22,8 @@ class OrbitClient { if(subscribe === undefined) subscribe = true; this.db.use(channel, this.user, password); - this.db.events.on('write', this._onUpdated.bind(this)); + this.db.events.on('write', this._onWrite.bind(this)); + this.db.events.on('sync', this._onSync.bind(this)); if(subscribe) this._pubsub.subscribe(channel, password, async((channel, message) => this.db.sync(channel, message))); @@ -48,11 +49,15 @@ class OrbitClient { this.network = null; } - _onUpdated(channel, hash) { + _onWrite(channel, hash) { this._pubsub.publish(channel, hash) this.events.emit('data', channel, hash); } + _onSync(channel, hash) { + this.events.emit('data', channel, hash); + } + _iterator(channel, password, options) { const messages = this.db.query(channel, password, options); let currentIndex = 0; diff --git a/src/OrbitDB.js b/src/OrbitDB.js index 077e582..b78ed14 100644 --- a/src/OrbitDB.js +++ b/src/OrbitDB.js @@ -23,10 +23,11 @@ class OrbitDB { } sync(channel, hash) { - // console.log("--> Head:", hash) + // console.log("--> Head:", hash, this._logs[channel] !== undefined) if(hash && this._logs[channel]) { - const other = OrbitList.fromIpfsHash(this._ipfs, hash); + const other = await(OrbitList.fromIpfsHash(this._ipfs, hash)); this._logs[channel].join(other); + this.events.emit('sync', channel, hash); } } diff --git a/src/PubSub.js b/src/PubSub.js index 6f8c19b..e996094 100644 --- a/src/PubSub.js +++ b/src/PubSub.js @@ -18,10 +18,7 @@ class Pubsub { this._socket.on('disconnect', (socket) => console.log(`Disconnected from http://${host}:${port}`)); this._socket.on('error', (e) => console.log('Pubsub socket error:', e)); this._socket.on('message', this._handleMessage.bind(this)); - this._socket.on('latest', (hash, message) => { - console.log(">", hash, message); - this._handleMessage(hash, message); - }); + this._socket.on('latest', this._handleMessage.bind(this)); }); } @@ -32,7 +29,7 @@ class Pubsub { subscribe(hash, password, callback) { if(!this._subscriptions[hash]) { - this._subscriptions[hash] = { head: null, callback: callback }; + this._subscriptions[hash] = { callback: callback }; this._socket.emit('subscribe', { channel: hash }); } } @@ -47,12 +44,8 @@ class Pubsub { } _handleMessage(hash, message) { - if(this._subscriptions[hash]) { - this._subscriptions[hash].head = message; - - if(this._subscriptions[hash].callback) - this._subscriptions[hash].callback(hash, message); - } + if(this._subscriptions[hash] && this._subscriptions[hash].callback) + this._subscriptions[hash].callback(hash, message); } }