First implementation of KV-store (WIP)

This commit is contained in:
haad 2016-01-19 11:43:09 +08:00
parent 2a3dbff5ac
commit 67fd17984e
6 changed files with 145 additions and 78 deletions

View File

@ -1,11 +1,12 @@
'use strict';
let encryption = require('./Encryption');
const encryption = require('./Encryption');
let HashCacheOps = {
const HashCacheOps = {
Add: "ADD",
Put: "PUT",
Delete: "DELETE"
}
};
class HashCacheItem {
constructor(operation, sequenceNumber, targetHash, metaInfo) {
@ -31,8 +32,16 @@ class EncryptedHashCacheItem extends HashCacheItem {
}
}
class KeyedEncryptedHashCacheItem extends EncryptedHashCacheItem {
constructor(operation, key, sequenceNumber, targetHash, metaInfo, publicKey, privateKey, salt) {
super(operation, sequenceNumber, targetHash, metaInfo, publicKey, privateKey, salt);
this.key = key;
}
}
module.exports = {
HashCacheOps: HashCacheOps,
HashCacheItem: HashCacheItem,
EncryptedHashCacheItem: EncryptedHashCacheItem
};
EncryptedHashCacheItem: EncryptedHashCacheItem,
KeyedEncryptedHashCacheItem: KeyedEncryptedHashCacheItem
};

View File

@ -6,6 +6,7 @@ var ipfsDaemon = require('./ipfs-daemon');
var ipfsAPI = require('./ipfs-api-promised');
var HashCache = require('./HashCacheClient');
var HashCacheItem = require('./HashCacheItem').EncryptedHashCacheItem;
var KeyedHashCacheItem = require('./HashCacheItem').KeyedEncryptedHashCacheItem;
var HashCacheOps = require('./HashCacheItem').HashCacheOps;
var MetaInfo = require('./MetaInfo');
var ItemTypes = require('./ItemTypes');
@ -29,13 +30,24 @@ class OrbitClient {
return {
info: (options) => this._info(hash, password),
iterator: (options) => this._iterator(hash, password, options),
put: (text, options) => {
add: (text, options) => {
// TODO: create updateChannelSequence(), move the update to send() and remove()
this.sequences[hash] = !this.sequences[hash] ? this._getChannelSequence(hash, password) : this.sequences[hash] + 1;
return this._send(hash, password, text, options);
},
remove: (targetHash) => {
put: (key, data, options) => {
this.sequences[hash] = !this.sequences[hash] ? this._getChannelSequence(hash, password) : this.sequences[hash] + 1;
return this._remove(hash, password, targetHash);
return this._put(hash, password, key, data, options);
},
get: (key, options) => {
options = options ? Object.assign(options, { key: key }) : { key: key }
// console.log(JSON.stringify(this._iterator(hash, password, options).collect()));
const items = this._iterator(hash, password, options).collect();
return items[0] ? items[0].item.Payload : null;
},
remove: (options) => {
this.sequences[hash] = !this.sequences[hash] ? this._getChannelSequence(hash, password) : this.sequences[hash] + 1;
return this._remove(hash, password, options);
},
delete: () => this._delete(hash, password),
setMode: (mode) => this._setMode(hash, password, mode)
@ -52,6 +64,29 @@ class OrbitClient {
return seq;
}
_iterator(channel, password, options) {
const messages = this._getMessages(channel, password, options);
// Iterator interface implementation
let currentIndex = 0;
let iterator = {
[Symbol.iterator]() {
return this;
},
next: () => {
let item = { value: null, done: true };
if(currentIndex < messages.length) {
item = { value: messages[currentIndex], done: false };
currentIndex ++;
}
return item;
},
collect: () => messages
}
return iterator;
}
_getMessages(channel, password, options) {
var messages = [];
@ -64,6 +99,7 @@ class OrbitClient {
var lt = options.lt ? options.lt : null;
var lte = options.lte ? options.lte : null;
var reverse = options.reverse ? options.reverse : false;
var key = options.key ? options.key : null;
var startFromHash;
if(lt || lte) {
@ -77,7 +113,7 @@ class OrbitClient {
if(startFromHash) {
// Get messages
messages = this._fetchRecursive(startFromHash, password, limit, gte ? gte : gt);
messages = this._fetchRecursive(startFromHash, password, limit, gte ? gte : gt, 0, [], key);
// Slice the array
var startIndex = 0;
@ -97,50 +133,28 @@ class OrbitClient {
return messages;
}
_iterator(channel, password, options) {
var messages = this._getMessages(channel, password, options);
// Iterator interface implementation
var currentIndex = 0;
let iterator = {
[Symbol.iterator]() {
return this;
},
next: () => {
var item = { value: null, done: true };
if(currentIndex < messages.length) {
item = { value: messages[currentIndex], done: false };
currentIndex ++;
}
return item;
},
collect: () => messages
}
return iterator;
}
_fetchOne(hash, password) {
var item = null;
let item = null;
if(hash) {
item = await (ipfsAPI.getObject(this.ipfs, hash));
let data = JSON.parse(item.Data);
// verify
// verify signature
const verified = Encryption.verify(data.target, data.pubkey, data.sig, data.seq, password);
if(!verified) throw "Item '" + hash + "' has the wrong signature"
// decrypt
// decrypt data structure
const targetDec = Encryption.decrypt(data.target, privkey, 'TODO: pubkey');
const metaDec = Encryption.decrypt(data.meta, privkey, 'TODO: pubkey');
data.target = targetDec;
data.meta = JSON.parse(metaDec);
data.target = targetDec;
data.meta = JSON.parse(metaDec);
if(data.op === HashCacheOps.Add) {
const payload = await (ipfsAPI.getObject(this.ipfs, data.target));
// fetch and decrypt content
if(data.op === HashCacheOps.Add || data.op === HashCacheOps.Put) {
const payload = await (ipfsAPI.getObject(this.ipfs, data.target));
const contentEnc = JSON.parse(payload.Data)["content"];
const contentDec = Encryption.decrypt(contentEnc, privkey, 'TODO: pubkey');
item.Payload = contentDec;
item.Payload = contentDec;
}
item.Data = data;
@ -157,7 +171,7 @@ class OrbitClient {
return contains;
}
_fetchRecursive(hash, password, amount, last, currentDepth, deleted) {
_fetchRecursive(hash, password, amount, last, currentDepth, deleted, key) {
var res = [];
var deletedItems = deleted ? deleted : [];
@ -165,21 +179,32 @@ class OrbitClient {
var message = await (this._fetchOne(hash, password));
// console.log(message);
if(message.Data.op === HashCacheOps.Add && !this._contains(deletedItems, hash)) {
res.push({ hash: hash, item: message });
currentDepth ++;
} else if(message.Data.op === HashCacheOps.Put && !this._contains(deletedItems, message.Data.key)) {
if(!key || key && key === message.Data.key) {
res.push({ hash: hash, item: message });
currentDepth ++;
deletedItems.push(message.Data.key);
}
} else if(message.Data.op === HashCacheOps.Delete) {
deletedItems.push(message.Data.target);
}
if((last && hash === last))
if(key && message.Data.key === key)
return res;
if(last && hash === last)
return res;
if(!last && amount > -1 && currentDepth >= amount)
return res;
if(message && message.Links[0]) {
var next = this._fetchRecursive(message.Links[0].Hash, password, amount, last, currentDepth, deletedItems);
var next = this._fetchRecursive(message.Links[0].Hash, password, amount, last, currentDepth, deletedItems, key);
res = res.concat(next);
}
@ -192,11 +217,16 @@ class OrbitClient {
return await (ipfsAPI.putObject(this.ipfs, JSON.stringify(post)));
}
_createMessage(channel, password, target, operation) {
_createMessage(channel, password, key, target, operation, options) {
var seq = this.sequences[channel];
var size = -1;
var metaInfo = new MetaInfo(ItemTypes.Message, size, new Date().getTime());
var hcItem = new HashCacheItem(operation, seq, target, metaInfo, pubkey, privkey, password);
var hcItem;
if(operation === HashCacheOps.Put)
hcItem = new KeyedHashCacheItem(operation, key, seq, target, metaInfo, pubkey, privkey, password);
else
hcItem = new HashCacheItem(operation, seq, target, metaInfo, pubkey, privkey, password);
var item = await (ipfsAPI.putObject(this.ipfs, JSON.stringify(hcItem)));
var newHead = { Hash: item.Hash };
@ -213,13 +243,23 @@ class OrbitClient {
_send(channel, password, text, options) {
// TODO: check options for what type to publish as (text, snippet, file, etc.)
var post = this._publish(text);
var message = this._createMessage(channel, password, post.Hash, HashCacheOps.Add);
var message = this._createMessage(channel, password, null, post.Hash, HashCacheOps.Add);
await(this.client.linkedList(channel, password).add(message.Hash));
return message.Hash;
}
_remove(channel, password, target) {
var message = this._createMessage(channel, password, target, HashCacheOps.Delete);
// WIP
_put(channel, password, key, data, options) {
// TODO: options
var post = this._publish(data);
var message = this._createMessage(channel, password, key, post.Hash, HashCacheOps.Put);
await(this.client.linkedList(channel, password).add(message.Hash));
return message.Hash;
}
_remove(channel, password, options) {
const target = options.key ? options.key : (options.hash ? options.hash : null);
const message = this._createMessage(channel, password, null, target, HashCacheOps.Delete);
await(this.client.linkedList(channel, password).add(message.Hash))
return message.Hash;
}

View File

@ -21,9 +21,11 @@ orbit-server uses linked lists on top of IPFS. orbit-server not *yet* released,
channel(name, password)
.put(text)
.add(data: String)
.remove(hash)
.put(key, data: String)
.remove(hash or key)
.iterator([options])
@ -40,18 +42,18 @@ var host = 'localhost:3006'; // orbit-server address
async(() => {
// Connect
var orbit = OrbitClient.connect(host, username, password); // OrbitClient
const orbit = OrbitClient.connect(host, username, password); // OrbitClient
var channelName = 'hello-world';
const channelName = 'hello-world';
// Send a message
var head = orbit.channel(channelName).send('hello'); // <ipfs-hash>
const head = orbit.channel(channelName).send('hello'); // <ipfs-hash>
// Delete a message
orbit.channel(channelName).remove(head);
// Iterator options
var options = { limit: -1 }; // fetch all messages, default is 1
const options = { limit: -1 }; // fetch all messages, default is 1
// {
// gt: <hash>,
// gte: <hash>,
@ -62,8 +64,8 @@ async(() => {
// }
// Get messages
var iter = orbit.channel(channelName).iterator(options); // Symbol.iterator
var next = iter.next(); // { value: <item>, done: false|true}
const iter = orbit.channel(channelName).iterator(options); // Symbol.iterator
const next = iter.next(); // { value: <item>, done: false|true}
// OR:
// var all = iter.collect(); // returns all elements as an array
@ -75,14 +77,14 @@ async(() => {
orbit.channel(channelName).remove(next.value.hash); // remove first element iterator returns
// Set modes
var password = 'hello';
var channelModes;
const password = 'hello';
const channelModes;
channelModes = orbit.channel(channel).setMode({ mode: "+r", params: { password: password } }); // { modes: { r: { password: 'hello' } } }
channelModes = orbit.channel(channel, password).setMode({ mode: "+w", params: { ops: [orbit.user.id] } }); // { modes: { ... } }
channelModes = orbit.channel(channel, password).setMode({ mode: "-r" }); // { modes: { ... } }
channelModes = orbit.channel(channel, '').setMode({ mode: "-w" }); // { modes: {} }
// Delete channel
var result = orbit.channel(channelName, channelPwd).delete(); // true | false
const result = orbit.channel(channelName, channelPwd).delete(); // true | false
})();
```

View File

@ -27,13 +27,25 @@ let run = (async(() => {
// Get all messages
var iter = orbit.channel(channel, '').iterator(options);
for(let i of iter) {
console.log(i.item.Data.key);
console.log(i.item.Data.seq, i.item.Data.op, i.hash, "ts: " + i.item.Data.meta.ts, i.item.Payload);
}
console.log("Fetch messages took " + timer.stop() + "ms");
console.log("-------- KV store -------")
orbit.channel(channel, '').put("key3", "this is the value you're looking for222");
var val = orbit.channel(channel, '').get("key3");
console.log("key3:", val);
orbit.channel(channel, '').put("key4", "this will be deleted");
var val2 = orbit.channel(channel, '').get("key4");
console.log("key4:", val2);
orbit.channel(channel, '').remove({ key: "key4" });
val2 = orbit.channel(channel, '').get("key4");
console.log("key4:", val2);
} catch(e) {
console.error("error:", e);
console.error(e.stack);

View File

@ -19,34 +19,38 @@ let run = (async(() => {
var result = orbit.channel(channel, '').delete();
// Add the first message and delete it immediately
orbit.channel(channel, '').put("hello world!");
var e = orbit.channel(channel, '').iterator({ limit: -1 }).collect()[0].hash;
orbit.channel(channel, '').remove(e);
// orbit.channel(channel, '').put("hello world!");
// var e = orbit.channel(channel, '').iterator({ limit: -1 }).collect()[0].hash;
// orbit.channel(channel, '').remove(e);
orbit.channel(channel, '').put("key two", "hello world!!!");
var messages = 100;
var messages = 10;
var i = 1;
while(i <= messages) {
var timer = new Timer(true);
// Send a message
// var head = orbit.channel(channel, '').send(JSON.stringify({ omg: "hello" }));
var head = orbit.channel(channel, '').put("hello world " + i);
var head = orbit.channel(channel, '').put("key one", "hello world " + i);
console.log(i, head, timer.stop() + "ms");
if(i === 4) {
console.log("remove", head);
orbit.channel(channel, '').remove(head);
// orbit.channel(channel, '').remove(head);
}
i ++;
}
var items = orbit.channel(channel, '').iterator({ limit: -1 }).collect();
orbit.channel(channel, '').remove(items[2].hash); // 97
orbit.channel(channel, '').remove(items[3].hash); // 96
orbit.channel(channel, '').remove(items[66].hash); // 34
orbit.channel(channel, '').remove(items[items.length - 10].hash); // 11
orbit.channel(channel, '').remove(items[items.length - 9].hash); // 10
orbit.channel(channel, '').remove(items[items.length - 8].hash); // 9
// console.log(items);
var e = orbit.channel(channel, '').iterator({ limit: -1 }).collect();
orbit.channel(channel, '').remove({ key: "key one" });
// orbit.channel(channel, '').remove(items[2].hash); // 97
// orbit.channel(channel, '').remove(items[3].hash); // 96
// orbit.channel(channel, '').remove(items[66].hash); // 34
// orbit.channel(channel, '').remove(items[items.length - 10].hash); // 11
// orbit.channel(channel, '').remove(items[items.length - 9].hash); // 10
// orbit.channel(channel, '').remove(items[items.length - 8].hash); // 9
} catch(e) {
console.error("error:", e);

View File

@ -99,7 +99,7 @@ describe('Orbit Client', () => {
}));
it('gets channel info on an existing channel', async((done) => {
var msg = orbit.channel(channel, '').put('hello');
var msg = orbit.channel(channel, '').add('hello');
var info = orbit.channel(channel, '').info();
assert.notEqual(info, null);
assert.equal(info.head, msg);
@ -157,7 +157,7 @@ describe('Orbit Client', () => {
describe('Insert', function() {
it('adds an item to an empty channel', async((done) => {
try {
head = orbit.channel(channel, '').put('hello');
head = orbit.channel(channel, '').add('hello');
assert.notEqual(head, null);
assert.equal(head.startsWith('Qm'), true);
assert.equal(head.length, 46);
@ -169,7 +169,7 @@ describe('Orbit Client', () => {
it('adds a new item to a channel with one item', async((done) => {
try {
second = orbit.channel(channel, '').put('hello');
second = orbit.channel(channel, '').add('hello');
assert.notEqual(second, null);
assert.notEqual(second, head);
assert.equal(second.startsWith('Qm'), true);
@ -183,7 +183,7 @@ describe('Orbit Client', () => {
it('adds five items', async((done) => {
for(var i = 0; i < 5; i ++) {
try {
var s = orbit.channel(channel, '').put('hello');
var s = orbit.channel(channel, '').add('hello');
assert.notEqual(s, null);
assert.equal(s.startsWith('Qm'), true);
assert.equal(s.length, 46);
@ -198,7 +198,7 @@ describe('Orbit Client', () => {
try {
var msg = new Buffer(512);
msg.fill('a')
var s = orbit.channel(channel, '').put(msg.toString());
var s = orbit.channel(channel, '').add(msg.toString());
assert.notEqual(s, null);
assert.equal(s.startsWith('Qm'), true);
assert.equal(s.length, 46);
@ -219,7 +219,7 @@ describe('Orbit Client', () => {
var result = orbit.channel(channel, '').delete();
var iter = orbit.channel(channel, '').iterator();
for(var i = 0; i < itemCount; i ++) {
var s = orbit.channel(channel, '').put('hello' + i);
var s = orbit.channel(channel, '').add('hello' + i);
items.push(s);
}
resolve();