First version of pubsub communication
This commit is contained in:
parent
8b905c2bda
commit
bfb7dfc311
@ -41,8 +41,8 @@ let run = (async(() => {
|
||||
});
|
||||
console.log(JSON.stringify(items, null, 2));
|
||||
|
||||
console.log("--> remove", hash1);
|
||||
orbit.channel(c1).remove({ key: hash1 });
|
||||
// console.log("--> remove", hash1);
|
||||
// orbit.channel(c1).remove({ key: hash1 });
|
||||
|
||||
items = orbit.channel(c1).iterator({ limit: -1 }).collect();
|
||||
items = items.map((e) => {
|
||||
@ -50,15 +50,21 @@ let run = (async(() => {
|
||||
});
|
||||
console.log(JSON.stringify(items, null, 2));
|
||||
|
||||
setInterval(async(() => {
|
||||
orbit.channel(c1).add("hello at " + new Date().getTime());
|
||||
}), 1234);
|
||||
/*
|
||||
// You can also get the event based on its hash
|
||||
var value = orbit.channel(c1).get(hash2);
|
||||
console.log("key:", hash2, "value:", value);
|
||||
*/
|
||||
// console.log("--> remove", hash2);
|
||||
// orbit.channel(c1).remove({ key: hash2 });
|
||||
|
||||
console.log("--> remove", hash2);
|
||||
orbit.channel(c1).remove({ key: hash2 });
|
||||
// items = orbit.channel(c1).iterator({ limit: -1 }).collect();
|
||||
// console.log(JSON.stringify(items, null, 2));
|
||||
|
||||
items = orbit.channel(c1).iterator({ limit: -1 }).collect();
|
||||
console.log(JSON.stringify(items, null, 2));
|
||||
// process.exit(0);
|
||||
|
||||
} catch(e) {
|
||||
console.error("error:", e);
|
||||
|
@ -27,6 +27,16 @@ class OrbitClient {
|
||||
|
||||
channel(hash, password) {
|
||||
if(password === undefined) password = '';
|
||||
|
||||
this._pubsub.subscribe(hash, password, (channel, message) => {
|
||||
const m = this._getMessages(hash, password, { gte: message });
|
||||
m.forEach((e) => {
|
||||
const userData = await(ipfsAPI.getObject(this.ipfs, e.item.meta.from))
|
||||
const user = JSON.parse(userData.Data)["user"];
|
||||
console.log(`${user}>`, e.item.Payload, `(op: ${e.item.op}, ${e.item.key})`);
|
||||
});
|
||||
});
|
||||
|
||||
return {
|
||||
info: (options) => this._info(hash, password),
|
||||
delete: () => this._deleteChannel(hash, password),
|
||||
@ -80,11 +90,13 @@ class OrbitClient {
|
||||
const key = options.key ? options.key : null;
|
||||
|
||||
let startFromHash;
|
||||
if(lt || lte) {
|
||||
if(lte || lt) {
|
||||
startFromHash = lte ? lte : lt;
|
||||
} else if (gte || gt) {
|
||||
startFromHash = gte ? gte : gt;
|
||||
} else {
|
||||
// var channel = await (this.client.linkedList(channel, password).head());
|
||||
var channel = PubSub.latest(channel);
|
||||
var channel = this._pubsub.latest(channel);
|
||||
startFromHash = channel.head ? channel.head : null;
|
||||
}
|
||||
|
||||
@ -128,7 +140,7 @@ class OrbitClient {
|
||||
// Get the current channel head and bump the sequence number
|
||||
let seq = 0;
|
||||
// const currentHead = await(this.client.linkedList(channel, password).head())
|
||||
const currentHead = PubSub.latest(channel);
|
||||
const currentHead = this._pubsub.latest(channel);
|
||||
if(currentHead.head) {
|
||||
const headItem = await (ipfsAPI.getObject(this.ipfs, currentHead.head));
|
||||
seq = JSON.parse(headItem.Data)["seq"] + 1;
|
||||
@ -136,7 +148,7 @@ class OrbitClient {
|
||||
|
||||
// Create meta info
|
||||
const size = -1;
|
||||
const metaInfo = new MetaInfo(ItemTypes.Message, size, new Date().getTime());
|
||||
const metaInfo = new MetaInfo(ItemTypes.Message, size, this.user.id, new Date().getTime());
|
||||
|
||||
// Create the hash cache item
|
||||
const hcItem = new HashCacheItem(operation, key, seq, target, metaInfo, null, pubkey, privkey, password);
|
||||
@ -174,13 +186,13 @@ class OrbitClient {
|
||||
_createOperation(channel, password, operation, key, value) {
|
||||
const message = this._createMessage(channel, password, operation, key, value);
|
||||
// await(this.client.linkedList(channel, password).add(message.Hash));
|
||||
PubSub.publish(channel, message.Hash)
|
||||
this._pubsub.publish(channel, message.Hash)
|
||||
return message.Hash;
|
||||
}
|
||||
|
||||
_deleteChannel(channel, password) {
|
||||
// await(this.client.linkedList(channel, password).delete());
|
||||
PubSub.delete(channel);
|
||||
this._pubsub.delete(channel);
|
||||
return true;
|
||||
}
|
||||
|
||||
@ -196,12 +208,14 @@ class OrbitClient {
|
||||
|
||||
_info(channel, password) {
|
||||
// return await(this.client.linkedList(channel, password).head());
|
||||
var l = PubSub.latest(channel);
|
||||
var l = this._pubsub.latest(channel);
|
||||
return l;
|
||||
}
|
||||
|
||||
_connect(host, username, password) {
|
||||
this.client = await(HashCache.connect(host, username, password));
|
||||
this._pubsub = new PubSub(host, username, password);
|
||||
// this.client = await(HashCache.connect(host, username, password));
|
||||
this.client = this._pubsub._client;
|
||||
this.user = this.client.info.user;
|
||||
this.network = {
|
||||
id: this.client.info.networkId,
|
||||
|
@ -1,39 +1,68 @@
|
||||
'use strict';
|
||||
|
||||
let messages = {};
|
||||
var async = require('asyncawait/async');
|
||||
var await = require('asyncawait/await');
|
||||
var HashCache = require('./HashCacheClient');
|
||||
|
||||
class PubSub {
|
||||
constructor() {
|
||||
constructor(host, username, password) {
|
||||
this._subscriptions = [];
|
||||
this._messages = {};
|
||||
this._client = await(HashCache.connect(host, username, password));
|
||||
|
||||
// Poll for the new head
|
||||
setInterval(async(() => {
|
||||
Object.keys(this._subscriptions).forEach(this._poll.bind(this));
|
||||
}), 500);
|
||||
}
|
||||
|
||||
static latest(hash) {
|
||||
return { head: messages[hash] && messages[hash].length > 0 ? messages[hash][messages[hash].length - 1] : null, modes: {} };
|
||||
_poll(hash) {
|
||||
const currentHead = this._subscriptions[hash].head;
|
||||
const channel = await(this._client.linkedList(hash, this._subscriptions[hash].password).head());
|
||||
const newHead = channel.head;
|
||||
if(currentHead !== newHead) {
|
||||
// console.log("NEW HEAD!", newHead);
|
||||
|
||||
this._subscriptions[hash].head = newHead;
|
||||
|
||||
if(!this._messages[hash])
|
||||
this._messages[hash] = [];
|
||||
|
||||
this._messages[hash].push(newHead);
|
||||
|
||||
if(this._subscriptions[hash].callback)
|
||||
this._subscriptions[hash].callback(hash, newHead);
|
||||
}
|
||||
}
|
||||
|
||||
static publish(hash, message) {
|
||||
if(!messages[hash]) messages[hash] = [];
|
||||
messages[hash].push(message);
|
||||
subscribe(channel, password, callback) {
|
||||
if(!this._subscriptions[channel] || this._subscriptions[channel].password !== password) {
|
||||
console.log("SUBSCRIBE:", channel);
|
||||
this._subscriptions[channel] = {
|
||||
channel: channel,
|
||||
password: password,
|
||||
head: null,
|
||||
callback: callback
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
static delete(hash) {
|
||||
messages[hash] = [];
|
||||
unsubscribe(channel) {
|
||||
delete this._subscriptions[channel];
|
||||
delete this._messages[channel];
|
||||
}
|
||||
|
||||
onNewMessage(channel, message) {
|
||||
/*
|
||||
// From orbit-server:
|
||||
var hash = req.params.hash;
|
||||
var head = req.body.head;
|
||||
if(!head) throw "Invalid request";
|
||||
var user = authorize(req, res);
|
||||
var channel = await(Database.getChannel(hash));
|
||||
channel.authenticateRead(req.body.password);
|
||||
var uid = await (ipfsAPI.putObject(ipfs, JSON.stringify(user.get())));
|
||||
channel.authenticateWrite(uid.Hash);
|
||||
await(verifyMessage(head, channel));
|
||||
await(channel.updateHead(head))
|
||||
*/
|
||||
publish(hash, message) {
|
||||
if(!this._messages[hash]) this._messages[hash] = [];
|
||||
await(this._client.linkedList(hash, this._subscriptions[hash].password).add(message));
|
||||
}
|
||||
|
||||
latest(hash) {
|
||||
return { head: this._messages[hash] && this._messages[hash].length > 0 ? this._messages[hash][this._messages[hash].length - 1] : null, modes: {} };
|
||||
}
|
||||
|
||||
delete(hash) {
|
||||
this._messages[hash] = [];
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user