From 67fd17984ed161d5420da718f326ea022c7de26f Mon Sep 17 00:00:00 2001 From: haad Date: Tue, 19 Jan 2016 11:43:09 +0800 Subject: [PATCH] First implementation of KV-store (WIP) --- HashCacheItem.js | 19 ++++-- OrbitClient.js | 126 ++++++++++++++++++++++++------------- README.md | 24 +++---- examples/readMessages.js | 14 ++++- examples/writeMessages.js | 28 +++++---- test/orbit-client-tests.js | 12 ++-- 6 files changed, 145 insertions(+), 78 deletions(-) diff --git a/HashCacheItem.js b/HashCacheItem.js index aef3d83..23d215f 100644 --- a/HashCacheItem.js +++ b/HashCacheItem.js @@ -1,11 +1,12 @@ 'use strict'; -let encryption = require('./Encryption'); +const encryption = require('./Encryption'); -let HashCacheOps = { +const HashCacheOps = { Add: "ADD", + Put: "PUT", Delete: "DELETE" -} +}; class HashCacheItem { constructor(operation, sequenceNumber, targetHash, metaInfo) { @@ -31,8 +32,16 @@ class EncryptedHashCacheItem extends HashCacheItem { } } +class KeyedEncryptedHashCacheItem extends EncryptedHashCacheItem { + constructor(operation, key, sequenceNumber, targetHash, metaInfo, publicKey, privateKey, salt) { + super(operation, sequenceNumber, targetHash, metaInfo, publicKey, privateKey, salt); + this.key = key; + } +} + module.exports = { HashCacheOps: HashCacheOps, HashCacheItem: HashCacheItem, - EncryptedHashCacheItem: EncryptedHashCacheItem -}; \ No newline at end of file + EncryptedHashCacheItem: EncryptedHashCacheItem, + KeyedEncryptedHashCacheItem: KeyedEncryptedHashCacheItem +}; diff --git a/OrbitClient.js b/OrbitClient.js index b8573c5..c4cf521 100644 --- a/OrbitClient.js +++ b/OrbitClient.js @@ -6,6 +6,7 @@ var ipfsDaemon = require('./ipfs-daemon'); var ipfsAPI = require('./ipfs-api-promised'); var HashCache = require('./HashCacheClient'); var HashCacheItem = require('./HashCacheItem').EncryptedHashCacheItem; +var KeyedHashCacheItem = require('./HashCacheItem').KeyedEncryptedHashCacheItem; var HashCacheOps = require('./HashCacheItem').HashCacheOps; var MetaInfo = require('./MetaInfo'); var ItemTypes = require('./ItemTypes'); @@ -29,13 +30,24 @@ class OrbitClient { return { info: (options) => this._info(hash, password), iterator: (options) => this._iterator(hash, password, options), - put: (text, options) => { + add: (text, options) => { + // TODO: create updateChannelSequence(), move the update to send() and remove() this.sequences[hash] = !this.sequences[hash] ? this._getChannelSequence(hash, password) : this.sequences[hash] + 1; return this._send(hash, password, text, options); }, - remove: (targetHash) => { + put: (key, data, options) => { this.sequences[hash] = !this.sequences[hash] ? this._getChannelSequence(hash, password) : this.sequences[hash] + 1; - return this._remove(hash, password, targetHash); + return this._put(hash, password, key, data, options); + }, + get: (key, options) => { + options = options ? Object.assign(options, { key: key }) : { key: key } + // console.log(JSON.stringify(this._iterator(hash, password, options).collect())); + const items = this._iterator(hash, password, options).collect(); + return items[0] ? items[0].item.Payload : null; + }, + remove: (options) => { + this.sequences[hash] = !this.sequences[hash] ? this._getChannelSequence(hash, password) : this.sequences[hash] + 1; + return this._remove(hash, password, options); }, delete: () => this._delete(hash, password), setMode: (mode) => this._setMode(hash, password, mode) @@ -52,6 +64,29 @@ class OrbitClient { return seq; } + _iterator(channel, password, options) { + const messages = this._getMessages(channel, password, options); + + // Iterator interface implementation + let currentIndex = 0; + let iterator = { + [Symbol.iterator]() { + return this; + }, + next: () => { + let item = { value: null, done: true }; + if(currentIndex < messages.length) { + item = { value: messages[currentIndex], done: false }; + currentIndex ++; + } + return item; + }, + collect: () => messages + } + + return iterator; + } + _getMessages(channel, password, options) { var messages = []; @@ -64,6 +99,7 @@ class OrbitClient { var lt = options.lt ? options.lt : null; var lte = options.lte ? options.lte : null; var reverse = options.reverse ? options.reverse : false; + var key = options.key ? options.key : null; var startFromHash; if(lt || lte) { @@ -77,7 +113,7 @@ class OrbitClient { if(startFromHash) { // Get messages - messages = this._fetchRecursive(startFromHash, password, limit, gte ? gte : gt); + messages = this._fetchRecursive(startFromHash, password, limit, gte ? gte : gt, 0, [], key); // Slice the array var startIndex = 0; @@ -97,50 +133,28 @@ class OrbitClient { return messages; } - _iterator(channel, password, options) { - var messages = this._getMessages(channel, password, options); - - // Iterator interface implementation - var currentIndex = 0; - let iterator = { - [Symbol.iterator]() { - return this; - }, - next: () => { - var item = { value: null, done: true }; - if(currentIndex < messages.length) { - item = { value: messages[currentIndex], done: false }; - currentIndex ++; - } - return item; - }, - collect: () => messages - } - - return iterator; - } - _fetchOne(hash, password) { - var item = null; + let item = null; if(hash) { item = await (ipfsAPI.getObject(this.ipfs, hash)); let data = JSON.parse(item.Data); - // verify + // verify signature const verified = Encryption.verify(data.target, data.pubkey, data.sig, data.seq, password); if(!verified) throw "Item '" + hash + "' has the wrong signature" - // decrypt + // decrypt data structure const targetDec = Encryption.decrypt(data.target, privkey, 'TODO: pubkey'); const metaDec = Encryption.decrypt(data.meta, privkey, 'TODO: pubkey'); - data.target = targetDec; - data.meta = JSON.parse(metaDec); + data.target = targetDec; + data.meta = JSON.parse(metaDec); - if(data.op === HashCacheOps.Add) { - const payload = await (ipfsAPI.getObject(this.ipfs, data.target)); + // fetch and decrypt content + if(data.op === HashCacheOps.Add || data.op === HashCacheOps.Put) { + const payload = await (ipfsAPI.getObject(this.ipfs, data.target)); const contentEnc = JSON.parse(payload.Data)["content"]; const contentDec = Encryption.decrypt(contentEnc, privkey, 'TODO: pubkey'); - item.Payload = contentDec; + item.Payload = contentDec; } item.Data = data; @@ -157,7 +171,7 @@ class OrbitClient { return contains; } - _fetchRecursive(hash, password, amount, last, currentDepth, deleted) { + _fetchRecursive(hash, password, amount, last, currentDepth, deleted, key) { var res = []; var deletedItems = deleted ? deleted : []; @@ -165,21 +179,32 @@ class OrbitClient { var message = await (this._fetchOne(hash, password)); + // console.log(message); + if(message.Data.op === HashCacheOps.Add && !this._contains(deletedItems, hash)) { res.push({ hash: hash, item: message }); currentDepth ++; + } else if(message.Data.op === HashCacheOps.Put && !this._contains(deletedItems, message.Data.key)) { + if(!key || key && key === message.Data.key) { + res.push({ hash: hash, item: message }); + currentDepth ++; + deletedItems.push(message.Data.key); + } } else if(message.Data.op === HashCacheOps.Delete) { deletedItems.push(message.Data.target); } - if((last && hash === last)) + if(key && message.Data.key === key) + return res; + + if(last && hash === last) return res; if(!last && amount > -1 && currentDepth >= amount) return res; if(message && message.Links[0]) { - var next = this._fetchRecursive(message.Links[0].Hash, password, amount, last, currentDepth, deletedItems); + var next = this._fetchRecursive(message.Links[0].Hash, password, amount, last, currentDepth, deletedItems, key); res = res.concat(next); } @@ -192,11 +217,16 @@ class OrbitClient { return await (ipfsAPI.putObject(this.ipfs, JSON.stringify(post))); } - _createMessage(channel, password, target, operation) { + _createMessage(channel, password, key, target, operation, options) { var seq = this.sequences[channel]; var size = -1; var metaInfo = new MetaInfo(ItemTypes.Message, size, new Date().getTime()); - var hcItem = new HashCacheItem(operation, seq, target, metaInfo, pubkey, privkey, password); + var hcItem; + if(operation === HashCacheOps.Put) + hcItem = new KeyedHashCacheItem(operation, key, seq, target, metaInfo, pubkey, privkey, password); + else + hcItem = new HashCacheItem(operation, seq, target, metaInfo, pubkey, privkey, password); + var item = await (ipfsAPI.putObject(this.ipfs, JSON.stringify(hcItem))); var newHead = { Hash: item.Hash }; @@ -213,13 +243,23 @@ class OrbitClient { _send(channel, password, text, options) { // TODO: check options for what type to publish as (text, snippet, file, etc.) var post = this._publish(text); - var message = this._createMessage(channel, password, post.Hash, HashCacheOps.Add); + var message = this._createMessage(channel, password, null, post.Hash, HashCacheOps.Add); await(this.client.linkedList(channel, password).add(message.Hash)); return message.Hash; } - _remove(channel, password, target) { - var message = this._createMessage(channel, password, target, HashCacheOps.Delete); + // WIP + _put(channel, password, key, data, options) { + // TODO: options + var post = this._publish(data); + var message = this._createMessage(channel, password, key, post.Hash, HashCacheOps.Put); + await(this.client.linkedList(channel, password).add(message.Hash)); + return message.Hash; + } + + _remove(channel, password, options) { + const target = options.key ? options.key : (options.hash ? options.hash : null); + const message = this._createMessage(channel, password, null, target, HashCacheOps.Delete); await(this.client.linkedList(channel, password).add(message.Hash)) return message.Hash; } diff --git a/README.md b/README.md index 11b7e77..9b59489 100644 --- a/README.md +++ b/README.md @@ -21,9 +21,11 @@ orbit-server uses linked lists on top of IPFS. orbit-server not *yet* released, channel(name, password) - .put(text) + .add(data: String) - .remove(hash) + .put(key, data: String) + + .remove(hash or key) .iterator([options]) @@ -40,18 +42,18 @@ var host = 'localhost:3006'; // orbit-server address async(() => { // Connect - var orbit = OrbitClient.connect(host, username, password); // OrbitClient + const orbit = OrbitClient.connect(host, username, password); // OrbitClient - var channelName = 'hello-world'; + const channelName = 'hello-world'; // Send a message - var head = orbit.channel(channelName).send('hello'); // + const head = orbit.channel(channelName).send('hello'); // // Delete a message orbit.channel(channelName).remove(head); // Iterator options - var options = { limit: -1 }; // fetch all messages, default is 1 + const options = { limit: -1 }; // fetch all messages, default is 1 // { // gt: , // gte: , @@ -62,8 +64,8 @@ async(() => { // } // Get messages - var iter = orbit.channel(channelName).iterator(options); // Symbol.iterator - var next = iter.next(); // { value: , done: false|true} + const iter = orbit.channel(channelName).iterator(options); // Symbol.iterator + const next = iter.next(); // { value: , done: false|true} // OR: // var all = iter.collect(); // returns all elements as an array @@ -75,14 +77,14 @@ async(() => { orbit.channel(channelName).remove(next.value.hash); // remove first element iterator returns // Set modes - var password = 'hello'; - var channelModes; + const password = 'hello'; + const channelModes; channelModes = orbit.channel(channel).setMode({ mode: "+r", params: { password: password } }); // { modes: { r: { password: 'hello' } } } channelModes = orbit.channel(channel, password).setMode({ mode: "+w", params: { ops: [orbit.user.id] } }); // { modes: { ... } } channelModes = orbit.channel(channel, password).setMode({ mode: "-r" }); // { modes: { ... } } channelModes = orbit.channel(channel, '').setMode({ mode: "-w" }); // { modes: {} } // Delete channel - var result = orbit.channel(channelName, channelPwd).delete(); // true | false + const result = orbit.channel(channelName, channelPwd).delete(); // true | false })(); ``` diff --git a/examples/readMessages.js b/examples/readMessages.js index 9c3330e..c63e244 100644 --- a/examples/readMessages.js +++ b/examples/readMessages.js @@ -27,13 +27,25 @@ let run = (async(() => { // Get all messages var iter = orbit.channel(channel, '').iterator(options); - for(let i of iter) { + console.log(i.item.Data.key); console.log(i.item.Data.seq, i.item.Data.op, i.hash, "ts: " + i.item.Data.meta.ts, i.item.Payload); } console.log("Fetch messages took " + timer.stop() + "ms"); + console.log("-------- KV store -------") + orbit.channel(channel, '').put("key3", "this is the value you're looking for222"); + var val = orbit.channel(channel, '').get("key3"); + console.log("key3:", val); + + orbit.channel(channel, '').put("key4", "this will be deleted"); + var val2 = orbit.channel(channel, '').get("key4"); + console.log("key4:", val2); + orbit.channel(channel, '').remove({ key: "key4" }); + val2 = orbit.channel(channel, '').get("key4"); + console.log("key4:", val2); + } catch(e) { console.error("error:", e); console.error(e.stack); diff --git a/examples/writeMessages.js b/examples/writeMessages.js index 52b14e2..c4cfdd5 100644 --- a/examples/writeMessages.js +++ b/examples/writeMessages.js @@ -19,34 +19,38 @@ let run = (async(() => { var result = orbit.channel(channel, '').delete(); // Add the first message and delete it immediately - orbit.channel(channel, '').put("hello world!"); - var e = orbit.channel(channel, '').iterator({ limit: -1 }).collect()[0].hash; - orbit.channel(channel, '').remove(e); + // orbit.channel(channel, '').put("hello world!"); + // var e = orbit.channel(channel, '').iterator({ limit: -1 }).collect()[0].hash; + // orbit.channel(channel, '').remove(e); + orbit.channel(channel, '').put("key two", "hello world!!!"); - var messages = 100; + var messages = 10; var i = 1; while(i <= messages) { var timer = new Timer(true); // Send a message // var head = orbit.channel(channel, '').send(JSON.stringify({ omg: "hello" })); - var head = orbit.channel(channel, '').put("hello world " + i); + var head = orbit.channel(channel, '').put("key one", "hello world " + i); console.log(i, head, timer.stop() + "ms"); if(i === 4) { console.log("remove", head); - orbit.channel(channel, '').remove(head); + // orbit.channel(channel, '').remove(head); } i ++; } var items = orbit.channel(channel, '').iterator({ limit: -1 }).collect(); - orbit.channel(channel, '').remove(items[2].hash); // 97 - orbit.channel(channel, '').remove(items[3].hash); // 96 - orbit.channel(channel, '').remove(items[66].hash); // 34 - orbit.channel(channel, '').remove(items[items.length - 10].hash); // 11 - orbit.channel(channel, '').remove(items[items.length - 9].hash); // 10 - orbit.channel(channel, '').remove(items[items.length - 8].hash); // 9 + // console.log(items); + var e = orbit.channel(channel, '').iterator({ limit: -1 }).collect(); + orbit.channel(channel, '').remove({ key: "key one" }); + // orbit.channel(channel, '').remove(items[2].hash); // 97 + // orbit.channel(channel, '').remove(items[3].hash); // 96 + // orbit.channel(channel, '').remove(items[66].hash); // 34 + // orbit.channel(channel, '').remove(items[items.length - 10].hash); // 11 + // orbit.channel(channel, '').remove(items[items.length - 9].hash); // 10 + // orbit.channel(channel, '').remove(items[items.length - 8].hash); // 9 } catch(e) { console.error("error:", e); diff --git a/test/orbit-client-tests.js b/test/orbit-client-tests.js index 2c7fc0f..84d6755 100644 --- a/test/orbit-client-tests.js +++ b/test/orbit-client-tests.js @@ -99,7 +99,7 @@ describe('Orbit Client', () => { })); it('gets channel info on an existing channel', async((done) => { - var msg = orbit.channel(channel, '').put('hello'); + var msg = orbit.channel(channel, '').add('hello'); var info = orbit.channel(channel, '').info(); assert.notEqual(info, null); assert.equal(info.head, msg); @@ -157,7 +157,7 @@ describe('Orbit Client', () => { describe('Insert', function() { it('adds an item to an empty channel', async((done) => { try { - head = orbit.channel(channel, '').put('hello'); + head = orbit.channel(channel, '').add('hello'); assert.notEqual(head, null); assert.equal(head.startsWith('Qm'), true); assert.equal(head.length, 46); @@ -169,7 +169,7 @@ describe('Orbit Client', () => { it('adds a new item to a channel with one item', async((done) => { try { - second = orbit.channel(channel, '').put('hello'); + second = orbit.channel(channel, '').add('hello'); assert.notEqual(second, null); assert.notEqual(second, head); assert.equal(second.startsWith('Qm'), true); @@ -183,7 +183,7 @@ describe('Orbit Client', () => { it('adds five items', async((done) => { for(var i = 0; i < 5; i ++) { try { - var s = orbit.channel(channel, '').put('hello'); + var s = orbit.channel(channel, '').add('hello'); assert.notEqual(s, null); assert.equal(s.startsWith('Qm'), true); assert.equal(s.length, 46); @@ -198,7 +198,7 @@ describe('Orbit Client', () => { try { var msg = new Buffer(512); msg.fill('a') - var s = orbit.channel(channel, '').put(msg.toString()); + var s = orbit.channel(channel, '').add(msg.toString()); assert.notEqual(s, null); assert.equal(s.startsWith('Qm'), true); assert.equal(s.length, 46); @@ -219,7 +219,7 @@ describe('Orbit Client', () => { var result = orbit.channel(channel, '').delete(); var iter = orbit.channel(channel, '').iterator(); for(var i = 0; i < itemCount; i ++) { - var s = orbit.channel(channel, '').put('hello' + i); + var s = orbit.channel(channel, '').add('hello' + i); items.push(s); } resolve();