Refactor OrbitClient and Pubsub to use Lists

This commit is contained in:
haad 2016-02-21 17:52:20 +02:00
parent 3863150624
commit 8caf8cbb9a
11 changed files with 879 additions and 824 deletions

View File

@ -0,0 +1,52 @@
'use strict';
var async = require('asyncawait/async');
var await = require('asyncawait/await');
var OrbitClient = require('../src/OrbitClient');
var Timer = require('./Timer');
// var host = '178.62.229.175';
var host = 'localhost';
var port = 6379;
var username = 'LambOfGod';
var password = '';
let run = (async(() => {
try {
var orbit = OrbitClient.connect(host, port, username, password);
const c1 = 'c1';
const channel = orbit.channel(c1);
let count = 1;
let id = 'Log: Query '
let running = false;
setInterval(async(() => {
if(!running) {
running = true;
// let timer = new Timer(true);
channel.put("lamb", "of god" + count);
// console.log(`Query #${count} took ${timer.stop(true)} ms\n`);
let v = channel.get("lamb");
console.log("---------------------------------------------------")
console.log("Id | Seq | Ver | Data")
console.log("---------------------------------------------------")
console.log(v);
console.log("---------------------------------------------------")
running = false;
count ++;
}
}), 500);
} catch(e) {
console.error("error:", e);
console.error(e.stack);
process.exit(1);
}
}))();
module.exports = run;

View File

@ -27,16 +27,18 @@ let run = (async(() => {
running = true;
// let timer = new Timer(true);
channel.add(id + count);
channel.add("Hello " + count);
// console.log(`Query #${count} took ${timer.stop(true)} ms\n`);
// console.log("Query...");
const c = channel.iterator({ limit: -1 }).collect().length;
let items = channel.iterator({ limit: 5 }).collect();
// console.log(items);
console.log("---------------------------------------------------")
console.log("Id | Seq | Ver | Data")
// console.log("Id | Seq | Ver | Data")
console.log("Key | Value")
console.log("---------------------------------------------------")
console.log(items.map((e) => `${e.id} | ${e.seq} | ${e.ver} | ${e.data}`).join("\n"));
// console.log(items.map((e) => `${e.id} | ${e.seq} | ${e.ver} | ${e.data}`).join("\n"));
console.log(items.map((e) => `${e.payload.key} | ${e.payload.value}`).join("\n"));
console.log("---------------------------------------------------")
console.log(`Found ${items.length} items from ${c}\n`);

View File

@ -16,14 +16,14 @@ let run = (async(() => {
try {
var orbit = OrbitClient.connect(host, port, username, password);
const c1 = 'c1';
let channel;
let count = 1;
let id = 'Log: Query '
let channel;
setInterval(async(() => {
if(channel) {
channel.add(id + count);
channel.add(username + " " + count);
count ++;
}
}), process.argv[3] ? process.argv[3] : 1000);

102
src/DataStore.js Normal file
View File

@ -0,0 +1,102 @@
'use strict';
const _ = require('lodash');
const async = require('asyncawait/async');
const await = require('asyncawait/await');
const ipfsAPI = require('orbit-common/lib/ipfs-api-promised');
const OrbitList = require('./list/OrbitList');
const HashCacheOps = require('./HashCacheOps');
const DefaultAmount = 1;
class DataStore {
constructor(id, ipfs) {
this._ipfs = ipfs;
this.list = new OrbitList(id, this._ipfs);
}
add(hash) {
this.list.add(hash);
}
join(other) {
this.list.join(other);
}
clear() {
this.list.clear();
}
get(options) {
return this._fetchRecursive(options);
}
_fetchOne(index) {
const item = this.list.items[this.list.items.length - index - 1];
if(item) {
await(item.getPayload());
const f = item.compact();
return { hash: f.data, payload: f.Payload };
}
return null;
}
_fetchRecursive(options, currentAmount, deleted, res) {
// console.log("-->")
// console.log("opts:", options, currentAmount)
const opts = {
amount: options && options.amount ? options.amount : DefaultAmount,
first: options && options.first ? options.first : null,
last: options && options.last ? options.last : null,
key: options && options.key ? options.key : null
};
let result = res ? res : [];
let handledItems = deleted ? deleted : [];
if(!currentAmount) currentAmount = 0;
const item = this._fetchOne(currentAmount);
// console.log("ITEM", item)
if(item && item.payload) {
const wasHandled = _.includes(handledItems, item.payload.key);
if((item.payload.op === HashCacheOps.Put || item.payload.op === HashCacheOps.Add) && !wasHandled) {
if((!opts.key || (opts.key && opts.key === item.payload.key)) &&
(!opts.first || (opts.first && (opts.first === item.payload.key && result.length === 0))
|| (opts.first && (opts.first !== item.payload.key && result.length > 0))))
{
// console.log("PUSH!", item, currentAmount, result.length);
result.push(item);
handledItems.push(item.payload.key);
}
} else if(item.payload.op === HashCacheOps.Delete) {
// console.log("DELETE!", item);
handledItems.push(item.payload.key);
}
currentAmount ++;
if(opts.key && item.payload.key === opts.key)
return result;
// console.log("ITEM", item.payload.key, opts.last)
if(opts.last && item.payload.key === opts.last)
return result;
if(!opts.last && opts.amount > -1 && result.length >= opts.amount)
return result;
if(currentAmount >= this.list.items.length)
return result;
// console.log("RES!", result)
result = this._fetchRecursive(opts, currentAmount, handledItems, result);
}
return result;
}
}
module.exports = DataStore;

View File

@ -2,6 +2,15 @@
const Encryption = require('orbit-common/lib/Encryption');
class OrbitDBItem {
constructor(operation, key, value, metaInfo) {
this.op = operation;
this.key = key;
this.value = value;
this.meta = metaInfo;
}
}
class HashCacheItem {
constructor(operation, key, sequenceNumber, targetHash, metaInfo, next) {
this.op = operation;
@ -55,6 +64,7 @@ class EncryptedHashCacheItem extends HashCacheItem {
}
module.exports = {
OrbitDBItem: OrbitDBItem,
HashCacheItem: HashCacheItem,
EncryptedHashCacheItem: EncryptedHashCacheItem
};

View File

@ -6,56 +6,54 @@ var Keystore = require('orbit-common/lib/Keystore');
var Encryption = require('orbit-common/lib/Encryption');
var ipfsDaemon = require('orbit-common/lib/ipfs-daemon');
var ipfsAPI = require('orbit-common/lib/ipfs-api-promised');
var HashCache = require('./HashCacheClient');
var HashCacheItem = require('./HashCacheItem').EncryptedHashCacheItem;
var HashCacheItem = require('./HashCacheItem').HashCacheItem;
var OrbitDBItem = require('./HashCacheItem').OrbitDBItem;
var HashCacheOps = require('./HashCacheOps');
var ItemTypes = require('./ItemTypes');
var MetaInfo = require('./MetaInfo');
var Post = require('./Post');
var Aggregator = require('./Aggregator');
var PubSub = require('./PubSub');
const List = require('./list/OrbitList');
var Timer = require('../examples/Timer');
const List = require('./list/OrbitList');
const DataStore = require('./DataStore');
var pubkey = Keystore.getKeys().publicKey;
var privkey = Keystore.getKeys().privateKey;
let vvv = {};
class OrbitClient {
constructor(ipfs) {
this.ipfs = ipfs;
this._ipfs = ipfs;
this.network = {};
this.user = null;
this.list = null; // TODO move to DataStore
}
channel(hash, password) {
if(password === undefined) password = '';
this._pubsub.subscribe(hash, password, async((hash, message) => {
const other = await(List.fromIpfsHash(this.ipfs, message));
const other = await(List.fromIpfsHash(this._ipfs, message));
// console.log(">", other.id, other.seq, other.ver);
if(other.id !== this.user.username) {
let timer = new Timer(true);
this.list.join(other); // TODO: move to DataStore
console.log(`Join took ${timer.stop(true)} ms`);
// let timer = new Timer(true);
this._store.join(other);
// console.log(`Join took ${timer.stop(true)} ms`);
}
}));
return {
info: (options) => this._info(hash, password),
// info: (options) => this._info(hash, password),
delete: () => this._deleteChannel(hash, password),
iterator: (options) => this._iterator(hash, password, options),
setMode: (mode) => this._setMode(hash, password, mode),
add: (data) => this._add(hash, password, data),
//TODO: tests
del: (options) => this._remove(hash, password, options),
del: (key) => this._remove(hash, password, key),
put: (key, data) => this._put(hash, password, key, data),
get: (key, options) => {
const items = this._iterator(hash, password, { key: key }).collect();
return items[0] ? items[0].item.Payload : null;
return items[0] ? items[0].payload.value : null;
},
//TODO: tests
leave: () => this._pubsub.unsubscribe(hash)
}
}
@ -95,72 +93,45 @@ class OrbitClient {
const reverse = options.reverse ? options.reverse : false;
const key = options.key ? options.key : null;
let startFromHash;
if(lte || lt) {
startFromHash = lte ? lte : lt;
} else {
var channel = this._info(channel, password);
startFromHash = channel.head ? channel.head : null;
}
if((gt || lt) && limit > -1) limit += 1;
if(startFromHash) {
const opts = {
amount: limit,
last: gte ? gte : gt,
key: key
};
const opts = {
amount: limit,
first: lte ? lte : lt,
last: gte ? gte : gt,
key: key
};
// Get messages
// messages = Aggregator.fetchRecursive(this.ipfs, startFromHash, password, opts);
messages = this.list.items.map((f) => f.compact()); // TODO: move to DataStore
// Get messages
messages = await(this._store.get(opts));
// console.log("M", messages)
// Slice the array
let startIndex = 0;
let endIndex = messages.length;
if(limit < 0) {
endIndex = messages.length - (gt ? 1 : 0);
} else {
startIndex = Math.max(0, messages.length - limit);
endIndex = messages.length - ((gt || lt) ? 1 : 0);
}
// Remove the first/last item if greater/lesser than is set
let startIndex = lt ? 1 : 0;
let endIndex = gt ? messages.length - 1 : messages.length;
messages = messages.slice(startIndex, endIndex)
// console.log("M2", messages)
messages = messages.slice(startIndex, endIndex)
}
if(reverse) messages.reverse();
if(!reverse) messages.reverse();
return messages;
}
_publish(data) {
let post = new Post(data);
post.encrypt(privkey, pubkey);
return await (ipfsAPI.putObject(this.ipfs, JSON.stringify(post)));
// post.encrypt(privkey, pubkey);
return await (ipfsAPI.putObject(this._ipfs, JSON.stringify(post)));
}
_createMessage(channel, password, operation, key, target) {
_createMessage(channel, password, operation, key, value) {
// Create meta info
const size = -1;
const metaInfo = new MetaInfo(ItemTypes.Message, size, this.user.id, new Date().getTime());
// Get the current channel head and bump the sequence number
let seq = this._info(channel, password).seq + 1;
let head = this._info(channel, password).head;
// Create the hash cache item
const hcItem = new HashCacheItem(operation, key, seq, target, metaInfo, null, pubkey, privkey, password);
const item = new OrbitDBItem(operation, key, value, metaInfo);
// Save the item to ipfs
const data = await (ipfsAPI.putObject(this.ipfs, JSON.stringify(hcItem)));
let newHead = { Hash: data.Hash };
// If this is not the first item in the channel, patch with the previous (ie. link as next)
if(seq > 0)
newHead = await (ipfsAPI.patchObject(this.ipfs, data.Hash, head));
return { hash: newHead, seq: seq };
const data = await (ipfsAPI.putObject(this._ipfs, JSON.stringify(item)));
return data.Hash;
}
/* DB Operations */
@ -175,48 +146,46 @@ class OrbitClient {
return await(this._createOperation(channel, password, HashCacheOps.Put, key, post.Hash));
}
_remove(channel, password, options) {
const key = null;
const target = options.key ? options.key : (options.hash ? options.hash : null);
return await(this._createOperation(channel, password, HashCacheOps.Delete, key, target));
_remove(channel, password, hash) {
return await(this._createOperation(channel, password, HashCacheOps.Delete, hash, null));
}
_createOperation(channel, password, operation, key, value, data) {
let message = this._createMessage(channel, password, operation, key, value);
this.list.add(message.hash.Hash); // TODO: move to DataStore
const listHash = await(this.list.getIpfsHash());
let hash = this._createMessage(channel, password, operation, key, value);
this._store.add(hash);
const listHash = await(this._store.list.getIpfsHash());
await(this._pubsub.publish(channel, listHash));
return message.hash.Hash;
return key;
}
_deleteChannel(channel, password) {
this._pubsub.delete(channel, password);
this._store.clear();
return true;
}
_setMode(channel, password, modes) {
let m = [];
if(typeof modes !== 'Array')
m.push(modes);
else
m = modes;
// const res = await(this.client.linkedList(channel, password).setMode(m));
// return res.modes;
return { todo: 'TODO!' }
}
// _setMode(channel, password, modes) {
// let m = [];
// if(typeof modes !== 'Array')
// m.push(modes);
// else
// m = modes;
// // const res = await(this.client.linkedList(channel, password).setMode(m));
// // return res.modes;
// return { todo: 'TODO!' }
// }
_info(channel, password) {
var l = this._pubsub.latest(channel);
return l;
}
// _info(channel, password) {
// var l = this._pubsub.latest(channel);
// return l;
// }
_connect(host, port, username, password) {
return new Promise((resolve, reject) => {
this._pubsub = new PubSub(this.ipfs, host, port, username, password);
this._pubsub = new PubSub(this._ipfs, host, port, username, password);
// this.client = this._pubsub._client;
// this.user = this.client.info.user;
this.user = { id: 'hello-todo', username: username }
this.list = new List(username, this.ipfs); // TODO: move to DataStore
this._store = new DataStore(username, this._ipfs);
resolve();
// this.network = {
// id: this.client.info.networkId,

View File

@ -1,9 +1,6 @@
'use strict';
var async = require('asyncawait/async');
var await = require('asyncawait/await');
var redis = require("redis");
var Aggregator = require('./Aggregator');
const redis = require("redis");
const List = require('./list/OrbitList');
class Pubsub2 {
@ -13,17 +10,13 @@ class Pubsub2 {
this.client1 = redis.createClient({ host: host, port: port });
this.client2 = redis.createClient({ host: host, port: port });
this.client1.on("message", this._handleMessage.bind(this));
this.client1.on('connect', () => {
console.log('redis connected');
});
// this.client1.on("subscribe", (channel, count) => {
// });
// this.client1.on('connect', () => console.log('redis connected'));
// this.client1.on("subscribe", (channel, count) => console.log(`subscribed to ${channel}`));
}
subscribe(hash, password, callback) {
if(!this._subscriptions[hash] || this._subscriptions[hash].password !== password) {
this._subscriptions[hash] = {
topic: hash,
password: password,
head: null,
callback: callback
@ -46,10 +39,6 @@ class Pubsub2 {
return { head: this._subscriptions[hash] ? this._subscriptions[hash].head : null };
}
delete(hash, password) {
delete this._subscriptions[hash];
}
_handleMessage(hash, message) {
if(this._subscriptions[hash]) {
this._subscriptions[hash].head = message;
@ -60,127 +49,4 @@ class Pubsub2 {
}
}
/*
class PubSub {
constructor(ipfs, host, port, username, password, resolve) {
this.ipfs = ipfs;
this._subscriptions = {};
this.client1 = redis.createClient({ host: host, port: port });
this.client2 = redis.createClient({ host: host, port: port });
this.client3 = redis.createClient({ host: host, port: port });
this.client1.on("message", this._handleMessage.bind(this));
this.publishQueue = [];
this.client1.on('connect', function() {
console.log('redis connected');
resolve();
});
this.client1.on("subscribe", function (channel, count) {
console.log("subscribed to pubsub topic '" + channel + "' (" + count + " peers)");
});
}
subscribe(hash, password, head, callback) {
if(!this._subscriptions[hash] || this._subscriptions[hash].password !== password) {
this._subscriptions[hash] = {
topic: hash,
password: password,
head: null,
callback: callback,
seq: -1
};
// this.client3.get("orbit." + hash, (err, reply) => {
// if(reply) {
// let d = JSON.parse(reply);
// this._subscriptions[hash].seq = d.seq;
// this._subscriptions[hash].head = d.head;
// if(err) console.log(err);
// console.log(`head of '${hash}' is`, this._subscriptions[hash].head, "seq:", this._subscriptions[hash].seq);
// }
// });
this.client1.subscribe(hash);
this.client2.publish(hash, JSON.stringify({ r: "HEAD" }));
}
return new Promise((resolve, reject) => {
setTimeout(() => {
console.log("pubsub initialized")
resolve();
}, 1000);
});
}
unsubscribe(hash) {
delete this._subscriptions[hash];
this.client1.unsubscribe();
this.client2.unsubscribe();
}
publish(hash, message, seq, callback) {
return new Promise((resolve, reject) => {
if(this.publishQueue.length === 0) {
this.publishQueue.splice(0, 0, { hash: message.Hash, callback: resolve });
console.log("...")
this.client2.publish(hash, JSON.stringify({ hash: message.Hash, seq: seq }));
console.log("published")
} else {
console.log("queue full!")
resolve(false);
}
});
}
latest(hash) {
return { head: this._subscriptions[hash].head, modes: {}, seq: this._subscriptions[hash].seq };
}
delete(hash, password) {
delete this._subscriptions[hash];
}
_handleMessage(hash, event) {
if(this._subscriptions[hash]) {
var message = JSON.parse(event)
if(message.hash) {
var newHead = message.hash;
var seq = message.seq;
var isNewer = seq > this._subscriptions[hash].seq;
var item = this.publishQueue[this.publishQueue.length - 1];
// console.log(".", newHead, item ? item.hash : '', seq, this._subscriptions[hash].seq, isNewer)
if(item) {
this.publishQueue.pop();
item.callback(isNewer && newHead === item.hash);
}
if(isNewer)
this._updateSubscription(hash, newHead, seq);
} else if(message.r === 'HEAD') {
console.log("SEND HEAD!")
this.client2.publish(hash, JSON.stringify(this.latest(hash)));
} else {
console.log("GOT HEAD!", message)
var isNewer = message.seq > this._subscriptions[hash].seq;
if(isNewer) {
console.log("NEW HEAD!")
this.publishQueue.pop();
this._updateSubscription(hash, message.head, message.seq);
}
}
}
}
_updateSubscription(hash, message, seq) {
// this.client3.set("orbit." + hash, JSON.stringify({ head: message, seq: seq }));
this._subscriptions[hash].seq = seq;
this._subscriptions[hash].head = message;
if(this._subscriptions[hash].callback)
this._subscriptions[hash].callback(hash, message, seq);
}
}
*/
module.exports = Pubsub2;

View File

@ -21,6 +21,11 @@ class OrbitList extends List {
this.ver ++;
}
clear() {
this._items = [];
this._currentBatch = [];
}
getIpfsHash() {
return new Promise(async((resolve, reject) => {
const list = await(ipfsAPI.putObject(this._ipfs, JSON.stringify(this.toJson())));
@ -28,6 +33,14 @@ class OrbitList extends List {
}));
}
static fromIpfsHash(ipfs, hash) {
return new Promise(async((resolve, reject) => {
const l = await(ipfsAPI.getObject(ipfs, hash));
const list = OrbitList.fromJson(ipfs, JSON.parse(l.Data));
resolve(list);
}));
}
static fromJson(ipfs, json) {
let list = new List(json.id);
list.seq = json.seq;
@ -35,14 +48,6 @@ class OrbitList extends List {
list._items = _.uniqWith(json.items.map((f) => new Node(ipfs, f.id, f.seq, f.ver, f.data, f.next)), _.isEqual);
return list;
}
static fromIpfsHash(ipfs, hash) {
return new Promise(async((resolve, reject) => {
const l = await(ipfsAPI.getObject(ipfs, hash));
const list = OrbitList.fromJson(ipfs, JSON.parse(l.Data));
resolve(list);
}));
}
}
module.exports = OrbitList;

View File

@ -1,7 +1,8 @@
'use strict';
const ipfsAPI = require('orbit-common/lib/ipfs-api-promised');
const async = require('asyncawait/async');
const await = require('asyncawait/await');
const ipfsAPI = require('orbit-common/lib/ipfs-api-promised');
const Node = require('./Node');
class OrbitNode extends Node {
@ -20,8 +21,24 @@ class OrbitNode extends Node {
return "" + this.id + "." + this.seq + "." + this.ver + "." + this.hash;
}
getPayload() {
if(!this.Payload) {
return new Promise(async((resolve, reject) => {
const payload = await(ipfsAPI.getObject(this._ipfs, this.data));
this.Payload = JSON.parse(payload.Data);
if(this.Payload.value) {
const value = await(ipfsAPI.getObject(this._ipfs, this.Payload.value));
this.Payload.value = JSON.parse(value.Data)["content"];
}
resolve(this);
}));
} else {
return this;
}
}
compact() {
return { id: this.id, seq: this.seq, ver: this.ver, data: this.data, next: this.next }
return { id: this.id, seq: this.seq, ver: this.ver, data: this.data, next: this.next, Payload: this.Payload }
}
}

File diff suppressed because it is too large Load Diff

View File

@ -148,9 +148,9 @@ describe('OrbitList', async(function() {
seq: 0,
ver: 3,
items: [
{ id: 'A', seq: 0, ver: 0, data: 'hello1', next: [] },
{ id: 'A', seq: 0, ver: 1, data: 'hello2', next: ['A.0.0.QmZfdeMV77si491NPX83Q8eRYE9WNzVorHrfWJPrJ51brt'] },
{ id: 'A', seq: 0, ver: 2, data: 'hello3', next: ['A.0.1.QmbbtEWe4qHLSjtW2HkPuszFW3zfBTXBdPrkXMdbePxqfK'] }
{ id: 'A', seq: 0, ver: 0, data: 'hello1', next: [], Payload: undefined },
{ id: 'A', seq: 0, ver: 1, data: 'hello2', next: ['A.0.0.QmZfdeMV77si491NPX83Q8eRYE9WNzVorHrfWJPrJ51brt'], Payload: undefined },
{ id: 'A', seq: 0, ver: 2, data: 'hello3', next: ['A.0.1.QmbbtEWe4qHLSjtW2HkPuszFW3zfBTXBdPrkXMdbePxqfK'], Payload: undefined }
]
};
// console.log(JSON.stringify(json, null, 1))