This commit is contained in:
haad 2016-02-10 17:27:26 +01:00
parent 079ea9853b
commit a419b0d527
3 changed files with 10 additions and 5 deletions

View File

@ -26,10 +26,10 @@ let run = (async(() => {
let timer = new Timer(true); let timer = new Timer(true);
running = true; running = true;
// channel.add(id + count); channel.add(id + count);
console.log("Query..."); console.log("Query...");
let items = channel.iterator({ limit: 1 }).collect(); let items = channel.iterator({ limit: 3 }).collect();
console.log(`Found items ${items.length} items`); console.log(`Found items ${items.length} items`);
var g = items.filter((e) => e.item.Payload.startsWith(id)) var g = items.filter((e) => e.item.Payload.startsWith(id))

View File

@ -176,9 +176,12 @@ class OrbitClient {
_createOperation(channel, password, operation, key, value, data) { _createOperation(channel, password, operation, key, value, data) {
let message, res = false; let message, res = false;
while(!res) { while(!res) {
// console.log("posting...")
message = this._createMessage(channel, password, operation, key, value); message = this._createMessage(channel, password, operation, key, value);
res = await(this._pubsub.publish(channel, message.hash, message.seq)); res = await(this._pubsub.publish(channel, message.hash, message.seq));
// if(!res) console.log("retry")
} }
// console.log("posted")
return message.Hash; return message.Hash;
} }

View File

@ -36,8 +36,10 @@ class PubSub {
publish(hash, message, seq, callback) { publish(hash, message, seq, callback) {
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
this.publishQueue.splice(0, 0, { hash: message.Hash, callback: resolve }); if(this.publishQueue.length === 0)
this.client2.publish(hash, JSON.stringify({ hash: message.Hash, seq: seq })); this.publishQueue.splice(0, 0, { hash: message.Hash, callback: resolve });
this.client2.publish(hash, JSON.stringify({ hash: message.Hash, seq: seq }));
setTimeout(() => resolve(false), 1000)
}); });
} }
@ -57,7 +59,7 @@ class PubSub {
var isNewer = seq > this._subscriptions[hash].seq; var isNewer = seq > this._subscriptions[hash].seq;
var item = this.publishQueue[this.publishQueue.length - 1]; var item = this.publishQueue[this.publishQueue.length - 1];
// console.log(".", newHead, item ? item.hash : '') // console.log(".", isNewer, newHead, item ? item.hash : '', seq, this._subscriptions[hash].seq, message)
if(item) { if(item) {
item.callback(isNewer && newHead === item.hash); item.callback(isNewer && newHead === item.hash);