Move exchanging heads logic to its own function

Use latest store modules
Add a set of tests for checking database replication status
Fix tests as per new replication status api
Speed up and improve tests
This commit is contained in:
haad 2018-03-30 12:38:55 +02:00
parent 3349dd9b0b
commit 0558c32343
6 changed files with 578 additions and 497 deletions

100
package-lock.json generated
View File

@ -6158,9 +6158,9 @@
"dev": true
},
"ipfs-log": {
"version": "4.0.6",
"resolved": "https://registry.npmjs.org/ipfs-log/-/ipfs-log-4.0.6.tgz",
"integrity": "sha512-LcCFq8AF8CDCKFAM78QkbH5+6VHk0sX5qu6k7/b7wVetuaTx3Xq85axfFjj7D5YeQdssBGLqex1x6EAx9MX9+Q==",
"version": "4.1.0",
"resolved": "https://registry.npmjs.org/ipfs-log/-/ipfs-log-4.1.0.tgz",
"integrity": "sha512-teHxC0mUA9Jffq5nDzephsxsdx+RKB7pLhR+G7xtXsNLAYdUvrCFgItz8BRunNZOAE009u6QRyr6LJHF9F3rnQ==",
"requires": {
"p-map": "1.2.0",
"p-whilst": "1.0.0"
@ -6177,9 +6177,9 @@
}
},
"ipfs-pubsub-1on1": {
"version": "0.0.2",
"resolved": "https://registry.npmjs.org/ipfs-pubsub-1on1/-/ipfs-pubsub-1on1-0.0.2.tgz",
"integrity": "sha512-Y3FYTvEy9MJWl6e2tM8tQgqR6SPFE30DjAcUrB6AJ2Ha1oGZl1aVVhuXZWSEVYIGpF0546dklOTfhvIGB5CUBQ==",
"version": "0.0.3",
"resolved": "https://registry.npmjs.org/ipfs-pubsub-1on1/-/ipfs-pubsub-1on1-0.0.3.tgz",
"integrity": "sha512-sQvcv/gnLCEVDqGEE3/blrm4v5wwzTYKtC1LENrnW6NmHG6Cz+chaOTeRLR/rUhjGu32slL+4nTsPDtrX446GQ==",
"requires": {
"safe-buffer": "5.1.1"
}
@ -7280,7 +7280,7 @@
"keypair": "1.0.1",
"libp2p-crypto-secp256k1": "0.2.2",
"multihashing-async": "0.4.8",
"node-forge": "0.7.4",
"node-forge": "0.7.5",
"pem-jwk": "1.5.1",
"protons": "1.0.1",
"rsa-pem-to-jwk": "1.1.3",
@ -8489,9 +8489,9 @@
}
},
"node-forge": {
"version": "0.7.4",
"resolved": "https://registry.npmjs.org/node-forge/-/node-forge-0.7.4.tgz",
"integrity": "sha512-8Df0906+tq/omxuCZD6PqhPaQDYuyJ1d+VITgxoIA8zvQd1ru+nMJcDChHH324MWitIgbVkAkQoGEEVJNpn/PA==",
"version": "0.7.5",
"resolved": "https://registry.npmjs.org/node-forge/-/node-forge-0.7.5.tgz",
"integrity": "sha512-MmbQJ2MTESTjt3Gi/3yG1wGpIMhUfcIypUCGtTizFR9IiccFwxSpfp0vtIZlkFclEqERemxfnSdZEMR9VqqEFQ==",
"dev": true
},
"node-libs-browser": {
@ -8752,12 +8752,12 @@
}
},
"orbit-db-counterstore": {
"version": "1.3.1",
"resolved": "https://registry.npmjs.org/orbit-db-counterstore/-/orbit-db-counterstore-1.3.1.tgz",
"integrity": "sha512-ZrSv5dTOslPSkMFOzyGdr9aotioGH6Gg86ULWswF54//uD5p9034Eo0s2L5ez9KRqr196vFo+fx23uU3QAEV5A==",
"version": "1.4.0",
"resolved": "https://registry.npmjs.org/orbit-db-counterstore/-/orbit-db-counterstore-1.4.0.tgz",
"integrity": "sha512-L7GBp1q1LawWi398wHqMgN6LwdGssB1GomwsSCeSHTMO0iPPUNe7S+lsAuJ+ZWc07qH0wvHnWkd6CC2JJaLOIA==",
"requires": {
"crdts": "0.1.5",
"orbit-db-store": "2.4.0"
"orbit-db-store": "2.5.0"
}
},
"orbit-db-docstore": {
@ -8769,6 +8769,26 @@
"p-map": "1.1.1"
},
"dependencies": {
"ipfs-log": {
"version": "4.0.6",
"resolved": "https://registry.npmjs.org/ipfs-log/-/ipfs-log-4.0.6.tgz",
"integrity": "sha512-LcCFq8AF8CDCKFAM78QkbH5+6VHk0sX5qu6k7/b7wVetuaTx3Xq85axfFjj7D5YeQdssBGLqex1x6EAx9MX9+Q==",
"requires": {
"p-map": "1.1.1",
"p-whilst": "1.0.0"
}
},
"orbit-db-store": {
"version": "2.4.0",
"resolved": "https://registry.npmjs.org/orbit-db-store/-/orbit-db-store-2.4.0.tgz",
"integrity": "sha512-dIefF8jSvhwHj77YU7IR7rNoOJbUXaM6ByJmuQH05V5f6+b1Uu2vP4wm/9QQazozzU+D472fanpSC9RkBOi3bA==",
"requires": {
"ipfs-log": "4.0.6",
"logplease": "1.2.14",
"p-each-series": "1.0.0",
"readable-stream": "2.3.5"
}
},
"p-map": {
"version": "1.1.1",
"resolved": "https://registry.npmjs.org/p-map/-/p-map-1.1.1.tgz",
@ -8782,6 +8802,28 @@
"integrity": "sha512-e9/AFS7U6i4hcm35x2CmTyNhVNUfjd4CWgdT6nyF4UIjE4vJvvYmj1qZJx+L54HCqoLKhJuOs/yhmQIhH6wQEA==",
"requires": {
"orbit-db-store": "2.4.0"
},
"dependencies": {
"ipfs-log": {
"version": "4.0.6",
"resolved": "https://registry.npmjs.org/ipfs-log/-/ipfs-log-4.0.6.tgz",
"integrity": "sha512-LcCFq8AF8CDCKFAM78QkbH5+6VHk0sX5qu6k7/b7wVetuaTx3Xq85axfFjj7D5YeQdssBGLqex1x6EAx9MX9+Q==",
"requires": {
"p-map": "1.2.0",
"p-whilst": "1.0.0"
}
},
"orbit-db-store": {
"version": "2.4.0",
"resolved": "https://registry.npmjs.org/orbit-db-store/-/orbit-db-store-2.4.0.tgz",
"integrity": "sha512-dIefF8jSvhwHj77YU7IR7rNoOJbUXaM6ByJmuQH05V5f6+b1Uu2vP4wm/9QQazozzU+D472fanpSC9RkBOi3bA==",
"requires": {
"ipfs-log": "4.0.6",
"logplease": "1.2.14",
"p-each-series": "1.0.0",
"readable-stream": "2.3.5"
}
}
}
},
"orbit-db-feedstore": {
@ -8808,6 +8850,28 @@
"integrity": "sha512-4jB123qGQvfPVdF+3cv0gKYr1gcygPWqqO4/hr+9eiAL1NMRAdzo0gCMafOMDv9dWk21ppKKKcgugWBNNtZWyg==",
"requires": {
"orbit-db-store": "2.4.0"
},
"dependencies": {
"ipfs-log": {
"version": "4.0.6",
"resolved": "https://registry.npmjs.org/ipfs-log/-/ipfs-log-4.0.6.tgz",
"integrity": "sha512-LcCFq8AF8CDCKFAM78QkbH5+6VHk0sX5qu6k7/b7wVetuaTx3Xq85axfFjj7D5YeQdssBGLqex1x6EAx9MX9+Q==",
"requires": {
"p-map": "1.2.0",
"p-whilst": "1.0.0"
}
},
"orbit-db-store": {
"version": "2.4.0",
"resolved": "https://registry.npmjs.org/orbit-db-store/-/orbit-db-store-2.4.0.tgz",
"integrity": "sha512-dIefF8jSvhwHj77YU7IR7rNoOJbUXaM6ByJmuQH05V5f6+b1Uu2vP4wm/9QQazozzU+D472fanpSC9RkBOi3bA==",
"requires": {
"ipfs-log": "4.0.6",
"logplease": "1.2.14",
"p-each-series": "1.0.0",
"readable-stream": "2.3.5"
}
}
}
},
"orbit-db-pubsub": {
@ -8820,11 +8884,11 @@
}
},
"orbit-db-store": {
"version": "2.4.0",
"resolved": "https://registry.npmjs.org/orbit-db-store/-/orbit-db-store-2.4.0.tgz",
"integrity": "sha512-dIefF8jSvhwHj77YU7IR7rNoOJbUXaM6ByJmuQH05V5f6+b1Uu2vP4wm/9QQazozzU+D472fanpSC9RkBOi3bA==",
"version": "2.5.0",
"resolved": "https://registry.npmjs.org/orbit-db-store/-/orbit-db-store-2.5.0.tgz",
"integrity": "sha512-vKHRVNpKcyNrvMt0NSOU/jjzIcIBVk4sbYS9g7RdnJBWx3D5mDCh8xDzzkq3Yv5UZSQWDM4oiXK0h08hz9kYJQ==",
"requires": {
"ipfs-log": "4.0.6",
"ipfs-log": "4.1.0",
"logplease": "1.2.14",
"p-each-series": "1.0.0",
"readable-stream": "2.3.5"

View File

@ -13,16 +13,17 @@
},
"main": "src/OrbitDB.js",
"dependencies": {
"ipfs-pubsub-1on1": "~0.0.2",
"ipfs-pubsub-1on1": "~0.0.3",
"logplease": "^1.2.14",
"multihashes": "^0.4.12",
"orbit-db-cache": "~0.2.2",
"orbit-db-counterstore": "~1.3.1",
"orbit-db-counterstore": "~1.4.0",
"orbit-db-docstore": "~1.3.1",
"orbit-db-eventstore": "~1.3.1",
"orbit-db-feedstore": "~1.3.1",
"orbit-db-keystore": "~0.1.0",
"orbit-db-kvstore": "~1.3.1",
"orbit-db-store": "~2.5.0",
"orbit-db-pubsub": "~0.5.1"
},
"devDependencies": {

View File

@ -7,16 +7,16 @@ const KeyValueStore = require('orbit-db-kvstore')
const CounterStore = require('orbit-db-counterstore')
const DocumentStore = require('orbit-db-docstore')
const Pubsub = require('orbit-db-pubsub')
const Channel = require('ipfs-pubsub-1on1')
const Cache = require('orbit-db-cache')
const Keystore = require('orbit-db-keystore')
const AccessController = require('./ipfs-access-controller')
const OrbitDBAddress = require('./orbit-db-address')
const createDBManifest = require('./db-manifest')
const exchangeHeads = require('./exchange-heads')
const Logger = require('logplease')
const logger = Logger.create("orbit-db")
Logger.setLogLevel('NONE')
Logger.setLogLevel('ERROR')
// Mapping for 'database type' -> Class
let databaseTypes = {
@ -87,11 +87,14 @@ class OrbitDB {
delete this.stores[db.address.toString()]
}
// Close all direct connections to peers
Object.keys(this._directConnections).forEach(e => {
// Close a direct connection and remove it from internal state
const removeDirectConnect = e => {
this._directConnections[e].close()
delete this._directConnections[e]
})
}
// Close all direct connections to peers
Object.keys(this._directConnections).forEach(removeDirectConnect)
// Disconnect from pubsub
if (this._pubsub)
@ -136,6 +139,9 @@ class OrbitDB {
const addr = address.toString()
this.stores[addr] = store
// Subscribe to pubsub to get updates from peers,
// this is what hooks us into the message propagation layer
// and the p2p network
if(opts.replicate && this._pubsub)
this._pubsub.subscribe(addr, this._onMessage.bind(this), this._onPeerConnected.bind(this))
@ -153,44 +159,35 @@ class OrbitDB {
const store = this.stores[address]
try {
logger.debug(`Received ${heads.length} heads for '${address}':\n`, JSON.stringify(heads.map(e => e.hash), null, 2))
if (store)
if (store && heads && heads.length > 0) {
await store.sync(heads)
}
} catch (e) {
logger.error(e)
}
}
// Callback for when a peer connected to a database
async _onPeerConnected (address, peer, room) {
async _onPeerConnected (address, peer) {
logger.debug(`New peer '${peer}' connected to '${address}'`)
const store = this.stores[address]
if (store) {
// Create a direct channel to the connected peer
let channel = this._directConnections[peer]
if (!channel) {
try {
logger.debug(`Create a channel`)
channel = await Channel.open(this._ipfs, peer)
channel.on('message', (message) => this._onMessage(address, JSON.parse(message.data)))
this._directConnections[peer] = channel
logger.debug(`Channel created`)
} catch (e) {
console.error(e)
logger.error(e)
}
}
// Send the newly connected peer our latest heads
let heads = store._oplog.heads
if (heads.length > 0) {
// Wait for the direct channel to be fully connected
await channel.connect()
logger.debug(`Send latest heads of '${address}':\n`, JSON.stringify(heads.map(e => e.hash), null, 2))
channel.send(JSON.stringify(heads))
}
store.events.emit('peer', peer)
} else {
logger.error(`Database '${address}' is not open!`)
}
const getStore = address => this.stores[address]
const getDirectConnection = peer => this._directConnections[peer]
const onChannelCreated = channel => this._directConnections[channel._receiverID] = channel
const onMessage = (address, heads) => this._onMessage(address, heads)
const channel = await exchangeHeads(
this._ipfs,
address,
peer,
getStore,
getDirectConnection,
onMessage,
onChannelCreated
)
if (getStore(address))
getStore(address).events.emit('peer', peer)
}
// Callback when database was closed
@ -314,7 +311,7 @@ class OrbitDB {
// If we want to try and open the database local-only, throw an error
// if we don't have the database locally
if (options.localOnly && !haveDB) {
logger.error(`Database '${dbAddress}' doesn't exist!`)
logger.warn(`Database '${dbAddress}' doesn't exist!`)
throw new Error(`Database '${dbAddress}' doesn't exist!`)
}

45
src/exchange-heads.js Normal file
View File

@ -0,0 +1,45 @@
'use strict'
const Channel = require('ipfs-pubsub-1on1')
const Logger = require('logplease')
const logger = Logger.create("exchange-heads", { color: Logger.Colors.Yellow })
Logger.setLogLevel('ERROR')
const getHeadsForDatabase = store => (store && store._oplog) ? store._oplog.heads : []
const exchangeHeads = async (ipfs, address, peer, getStore, getDirectConnection, onMessage, onChannelCreated) => {
const _handleMessage = message => {
const msg = JSON.parse(message.data)
const { address, heads } = msg
onMessage(address, heads)
}
let channel = getDirectConnection(peer)
if (!channel) {
try {
logger.debug(`Create a channel to ${peer}`)
channel = await Channel.open(ipfs, peer)
channel.on('message', _handleMessage)
logger.debug(`Channel created to ${peer}`)
onChannelCreated(channel)
} catch (e) {
logger.error(e)
}
}
// Wait for the direct channel to be fully connected
await channel.connect()
logger.debug(`Connected to ${peer}`)
// Send the heads if we have any
const heads = getHeadsForDatabase(getStore(address))
logger.debug(`Send latest heads of '${address}':\n`, JSON.stringify(heads.map(e => e.hash), null, 2))
if (heads) {
channel.send(JSON.stringify({ address: address, heads: heads }))
}
return channel
}
module.exports = exchangeHeads

View File

@ -4,12 +4,16 @@ const assert = require('assert')
const mapSeries = require('p-each-series')
const rmrf = require('rimraf')
const OrbitDB = require('../src/OrbitDB')
const config = require('./utils/config')
const startIpfs = require('./utils/start-ipfs')
const stopIpfs = require('./utils/stop-ipfs')
const testAPIs = require('./utils/test-apis')
const connectPeers = require('./utils/connect-peers')
const waitForPeers = require('./utils/wait-for-peers')
// Include test utilities
const {
config,
startIpfs,
stopIpfs,
connectPeers,
waitForPeers,
testAPIs,
} = require('./utils')
const dbPath1 = './orbitdb/tests/multiple-databases/1'
const dbPath2 = './orbitdb/tests/multiple-databases/2'

View File

@ -23,52 +23,246 @@ const ipfsPath2 = './orbitdb/tests/replication/2/ipfs'
Object.keys(testAPIs).forEach(API => {
describe(`orbit-db - Replication (${API})`, function() {
this.timeout(config.timeout * 2)
this.timeout(config.timeout)
let ipfsd1, ipfsd2, ipfs1, ipfs2
let orbitdb1, orbitdb2, db1, db2
let id1, id2
describe('two peers', function() {
let timer
let options
let timer
let options
before(async () => {
config.daemon1.repo = ipfsPath1
config.daemon2.repo = ipfsPath2
rmrf.sync(config.daemon1.repo)
rmrf.sync(config.daemon2.repo)
rmrf.sync(dbPath1)
rmrf.sync(dbPath2)
ipfsd1 = await startIpfs(API, config.daemon1)
ipfsd2 = await startIpfs(API, config.daemon2)
ipfs1 = ipfsd1.api
ipfs2 = ipfsd2.api
// Use memory store for quicker tests
const memstore = new MemStore()
ipfs1.object.put = memstore.put.bind(memstore)
ipfs1.object.get = memstore.get.bind(memstore)
ipfs2.object.put = memstore.put.bind(memstore)
ipfs2.object.get = memstore.get.bind(memstore)
// Connect the peers manually to speed up test times
await connectPeers(ipfs1, ipfs2)
before(async () => {
config.daemon1.repo = ipfsPath1
config.daemon2.repo = ipfsPath2
rmrf.sync(config.daemon1.repo)
rmrf.sync(config.daemon2.repo)
rmrf.sync(dbPath1)
rmrf.sync(dbPath2)
ipfsd1 = await startIpfs(API, config.daemon1)
ipfsd2 = await startIpfs(API, config.daemon2)
ipfs1 = ipfsd1.api
ipfs2 = ipfsd2.api
// Use memory store for quicker tests
const memstore = new MemStore()
ipfs1.object.put = memstore.put.bind(memstore)
ipfs1.object.get = memstore.get.bind(memstore)
ipfs2.object.put = memstore.put.bind(memstore)
ipfs2.object.get = memstore.get.bind(memstore)
// Connect the peers manually to speed up test times
await connectPeers(ipfs1, ipfs2)
})
after(async () => {
if (ipfsd1)
await stopIpfs(ipfsd1)
if (ipfsd2)
await stopIpfs(ipfsd2)
})
beforeEach(async () => {
clearInterval(timer)
orbitdb1 = new OrbitDB(ipfs1, dbPath1)
orbitdb2 = new OrbitDB(ipfs2, dbPath2)
options = {
// Set write access for both clients
write: [
orbitdb1.key.getPublic('hex'),
orbitdb2.key.getPublic('hex')
],
}
options = Object.assign({}, options, { path: dbPath1 })
db1 = await orbitdb1.eventlog('replication-tests', options)
})
afterEach(async () => {
clearInterval(timer)
options = {}
if (db1)
await db1.drop()
if (db2)
await db2.drop()
if(orbitdb1)
await orbitdb1.stop()
if(orbitdb2)
await orbitdb2.stop()
})
it('replicates database of 1 entry', async () => {
// Set 'sync' flag on. It'll prevent creating a new local database and rather
// fetch the database from the network
options = Object.assign({}, options, { path: dbPath2, sync: true })
db2 = await orbitdb2.eventlog(db1.address.toString(), options)
await waitForPeers(ipfs2, [orbitdb1.id], db1.address.toString())
await db1.add('hello')
return new Promise(resolve => {
setTimeout(() => {
const items = db2.iterator().collect()
assert.equal(items.length, 1)
assert.equal(items[0].payload.value, 'hello')
resolve()
}, 500)
})
})
it('replicates database of 100 entries', async () => {
options = Object.assign({}, options, { path: dbPath2, sync: true })
db2 = await orbitdb2.eventlog(db1.address.toString(), options)
await waitForPeers(ipfs2, [orbitdb1.id], db1.address.toString())
const entryCount = 100
const entryArr = []
for (let i = 0; i < entryCount; i ++)
entryArr.push(i)
return new Promise(async (resolve, reject) => {
try {
const add = i => db1.add('hello' + i)
await mapSeries(entryArr, add)
} catch (e) {
reject(e)
}
timer = setInterval(() => {
const items = db2.iterator({ limit: -1 }).collect()
if (items.length === entryCount) {
clearInterval(timer)
assert.equal(items.length, entryCount)
assert.equal(items[0].payload.value, 'hello0')
assert.equal(items[items.length - 1].payload.value, 'hello99')
resolve()
}
}, 100)
})
})
it('emits correct replication info', async () => {
options = Object.assign({}, options, { path: dbPath2, sync: true })
db2 = await orbitdb2.eventlog(db1.address.toString(), options)
await waitForPeers(ipfs2, [orbitdb1.id], db1.address.toString())
let finished = false
let eventCount = { 'replicate': 0, 'replicate.progress': 0, 'replicated': 0 }
let events = []
let expectedEventCount = 99
db2.events.on('replicate', (address, entry) => {
eventCount['replicate'] ++
events.push({
event: 'replicate',
count: eventCount['replicate'],
entry: entry,
})
})
after(async () => {
if (ipfsd1)
await stopIpfs(ipfsd1)
if (ipfsd2)
await stopIpfs(ipfsd2)
db2.events.on('replicate.progress', (address, hash, entry, progress, total) => {
eventCount['replicate.progress'] ++
events.push({
event: 'replicate.progress',
count: eventCount['replicate.progress'],
entry: entry ,
replicationInfo: {
max: db2.replicationStatus.max,
progress: db2.replicationStatus.progress,
},
})
})
beforeEach(async () => {
clearInterval(timer)
db2.events.on('replicated', (address) => {
eventCount['replicated'] ++
events.push({
event: 'replicated',
count: eventCount['replicate'],
replicationInfo: {
max: db2.replicationStatus.max,
progress: db2.replicationStatus.progress,
},
})
// Resolve with a little timeout to make sure we
// don't receive more than one event
setTimeout(() => {
finished = db2.iterator({ limit: -1 }).collect().length === expectedEventCount
}, 200)
})
orbitdb1 = new OrbitDB(ipfs1, dbPath1)
orbitdb2 = new OrbitDB(ipfs2, dbPath2)
return new Promise((resolve, reject) => {
try {
timer = setInterval(() => {
if (finished) {
clearInterval(timer)
options = {
assert.equal(eventCount['replicate'], expectedEventCount)
assert.equal(eventCount['replicate.progress'], expectedEventCount)
const replicateEvents = events.filter(e => e.event === 'replicate')
assert.equal(replicateEvents.length, expectedEventCount)
assert.equal(replicateEvents[0].entry.payload.value.split(' ')[0], 'hello')
assert.equal(replicateEvents[0].entry.clock.time, 1)
const replicateProgressEvents = events.filter(e => e.event === 'replicate.progress')
assert.equal(replicateProgressEvents.length, expectedEventCount)
assert.equal(replicateProgressEvents[0].entry.payload.value.split(' ')[0], 'hello')
assert.equal(replicateProgressEvents[0].entry.clock.time, 1)
assert.equal(replicateProgressEvents[0].replicationInfo.max >= 1, true)
assert.equal(replicateProgressEvents[0].replicationInfo.progress, 1)
const replicatedEvents = events.filter(e => e.event === 'replicated')
assert.equal(replicatedEvents[0].replicationInfo.max >= 1, true)
assert.equal(replicatedEvents[0].replicationInfo.progress >= 1, true)
resolve()
}
}, 100)
} catch (e) {
reject(e)
}
// Trigger replication
let adds = []
for (let i = 0; i < expectedEventCount; i ++) {
adds.push(i)
}
mapSeries(adds, i => db1.add('hello ' + i))
})
})
it('emits correct replication info on fresh replication', async () => {
return new Promise(async (resolve, reject) => {
let finished = false
let eventCount = { 'replicate': 0, 'replicate.progress': 0, 'replicated': 0 }
let events = []
let expectedEventCount = 512
// Trigger replication
let adds = []
for (let i = 0; i < expectedEventCount; i ++) {
adds.push(i)
}
const add = async (i) => {
process.stdout.write("\rWriting " + (i + 1) + " / " + expectedEventCount + " ")
await db1.add('hello ' + i)
}
await mapSeries(adds, add)
console.log()
// Open second instance again
options = {
path: dbPath2,
overwrite: true,
sync: true,
// Set write access for both clients
write: [
orbitdb1.key.getPublic('hex'),
@ -76,88 +270,11 @@ Object.keys(testAPIs).forEach(API => {
],
}
options = Object.assign({}, options, { path: dbPath1 })
db1 = await orbitdb1.eventlog('replication-tests', options)
})
afterEach(async () => {
clearInterval(timer)
options = {}
if (db1)
await db1.drop()
if (db2)
await db2.drop()
if(orbitdb1)
await orbitdb1.stop()
if(orbitdb2)
await orbitdb2.stop()
})
it('replicates database of 1 entry', async () => {
// Set 'sync' flag on. It'll prevent creating a new local database and rather
// fetch the database from the network
options = Object.assign({}, options, { path: dbPath2, sync: true })
db2 = await orbitdb2.eventlog(db1.address.toString(), options)
await waitForPeers(ipfs2, [orbitdb1.id], db1.address.toString())
await db1.add('hello')
return new Promise(resolve => {
setTimeout(() => {
const items = db2.iterator().collect()
assert.equal(items.length, 1)
assert.equal(items[0].payload.value, 'hello')
resolve()
}, 1000)
})
})
it('replicates database of 100 entries', async () => {
options = Object.assign({}, options, { path: dbPath2, sync: true })
db2 = await orbitdb2.eventlog(db1.address.toString(), options)
await waitForPeers(ipfs2, [orbitdb1.id], db1.address.toString())
const entryCount = 100
const entryArr = []
for (let i = 0; i < entryCount; i ++)
entryArr.push(i)
return new Promise(async (resolve, reject) => {
try {
await mapSeries(entryArr, (i) => db1.add('hello' + i))
} catch (e) {
reject(e)
}
timer = setInterval(() => {
const items = db2.iterator({ limit: -1 }).collect()
if (items.length === entryCount) {
clearInterval(timer)
assert.equal(items.length, entryCount)
assert.equal(items[0].payload.value, 'hello0')
assert.equal(items[items.length - 1].payload.value, 'hello99')
resolve()
}
}, 1000)
})
})
it('emits correct replication info', async () => {
options = Object.assign({}, options, { path: dbPath2, sync: true })
db2 = await orbitdb2.eventlog(db1.address.toString(), options)
await waitForPeers(ipfs2, [orbitdb1.id], db1.address.toString())
let finished = false
let eventCount = { 'replicate': 0, 'replicate.progress': 0, 'replicated': 0 }
let events = []
let expectedEventCount = 99
db2.events.on('replicate', (address, entry) => {
eventCount['replicate'] ++
// console.log("[replicate] ", '#' + eventCount['replicate'] + ':', db2.replicationStatus.progress, '/', db2.replicationStatus.max, '| Tasks (in/queued/running/out):', db2._loader.tasksRequested, '/', db2._loader.tasksQueued, '/', db2._loader.tasksRunning, '/', db2._loader.tasksFinished)
events.push({
event: 'replicate',
count: eventCount['replicate'],
@ -165,8 +282,10 @@ Object.keys(testAPIs).forEach(API => {
})
})
db2.events.on('replicate.progress', (address, hash, entry, progress) => {
db2.events.on('replicate.progress', (address, hash, entry) => {
eventCount['replicate.progress'] ++
// console.log("[progress] ", '#' + eventCount['replicate.progress'] + ':', db2.replicationStatus.progress, '/', db2.replicationStatus.max, '| Tasks (in/queued/running/out):', db2._loader.tasksRequested, '/', db2._loader.tasksQueued, '/', db2._loader.tasksRunning, '/', db2._loader.tasksFinished)
// assert.equal(db2.replicationStatus.progress, eventCount['replicate.progress'])
events.push({
event: 'replicate.progress',
count: eventCount['replicate.progress'],
@ -174,354 +293,205 @@ Object.keys(testAPIs).forEach(API => {
replicationInfo: {
max: db2.replicationStatus.max,
progress: db2.replicationStatus.progress,
have: db2.replicationStatus.have,
},
})
})
db2.events.on('replicated', (address) => {
eventCount['replicated'] ++
db2.events.on('replicated', (address, length) => {
eventCount['replicated'] += length
// console.log("[replicated]", '#' + eventCount['replicated'] + ':', db2.replicationStatus.progress, '/', db2.replicationStatus.max, '| Tasks (in/queued/running/out):', db2._loader.tasksRequested, '/', db2._loader.tasksQueued, '/', db2._loader.tasksRunning, '/', db2._loader.tasksFinished, "|")
assert.equal(db2.replicationStatus.progress, eventCount['replicated'])
assert.equal(db2.replicationStatus.max, expectedEventCount)
// Test the replicator state
assert.equal(db2._loader.tasksRequested >= db2.replicationStatus.progress, true)
assert.equal(db2._loader.tasksQueued <= db2.options.referenceCount, true)
assert.equal(db2.options.referenceCount, 64)
assert.equal(db2._loader.tasksRunning, 0)
assert.equal(db2._loader.tasksFinished, db2.replicationStatus.progress)
events.push({
event: 'replicated',
count: eventCount['replicate'],
replicationInfo: {
max: db2.replicationStatus.max,
progress: db2.replicationStatus.progress,
have: db2.replicationStatus.have,
},
})
// Resolve with a little timeout to make sure we
// don't receive more than one event
setTimeout(() => {
finished = db2.iterator({ limit: -1 }).collect().length === expectedEventCount
}, 500)
})
return new Promise((resolve, reject) => {
try {
timer = setInterval(() => {
if (finished) {
clearInterval(timer)
assert.equal(eventCount['replicate'], expectedEventCount)
assert.equal(eventCount['replicate.progress'], expectedEventCount)
const replicateEvents = events.filter(e => e.event === 'replicate')
assert.equal(replicateEvents.length, expectedEventCount)
assert.equal(replicateEvents[0].entry.payload.value.split(' ')[0], 'hello')
assert.equal(replicateEvents[0].entry.clock.time, 1)
const replicateProgressEvents = events.filter(e => e.event === 'replicate.progress')
assert.equal(replicateProgressEvents.length, expectedEventCount)
assert.equal(replicateProgressEvents[0].entry.payload.value.split(' ')[0], 'hello')
assert.equal(replicateProgressEvents[0].entry.clock.time, 1)
assert.equal(replicateProgressEvents[0].replicationInfo.max, 1)
assert.equal(replicateProgressEvents[0].replicationInfo.progress, 1)
const replicatedEvents = events.filter(e => e.event === 'replicated')
assert.equal(replicatedEvents[0].replicationInfo.max, 1)
assert.equal(replicatedEvents[0].replicationInfo.progress, 1)
resolve()
}
}, 100)
} catch (e) {
reject(e)
}
// Trigger replication
let adds = []
for (let i = 0; i < expectedEventCount; i ++) {
adds.push(i)
}
mapSeries(adds, i => db1.add('hello ' + i))
})
})
it('emits correct replication info on fresh replication', async () => {
return new Promise(async (resolve, reject) => {
let finished = false
let eventCount = { 'replicate': 0, 'replicate.progress': 0, 'replicated': 0 }
let events = []
let expectedEventCount = 512
// Trigger replication
let adds = []
for (let i = 0; i < expectedEventCount; i ++) {
adds.push(i)
}
const add = async (i) => {
process.stdout.write("\rWriting " + (i + 1) + " / " + expectedEventCount)
await db1.add('hello ' + i)
}
await mapSeries(adds, add)
console.log()
// Open second instance again
options = {
path: dbPath2,
overwrite: true,
sync: true,
// Set write access for both clients
write: [
orbitdb1.key.getPublic('hex'),
orbitdb2.key.getPublic('hex')
],
}
db2 = await orbitdb2.eventlog(db1.address.toString(), options)
let current = 0
let total = 0
db2.events.on('replicate', (address, entry) => {
eventCount['replicate'] ++
total = db2.replicationStatus.max
// console.log("[replicate] ", '#' + eventCount['replicate'] + ':', current, '/', total, '| Tasks (in/queued/running/out):', db2._loader.tasksRequested, '/', db2._loader.tasksQueued, '/', db2._loader.tasksRunning, '/', db2._loader.tasksFinished)
events.push({
event: 'replicate',
count: eventCount['replicate'],
entry: entry,
})
})
db2.events.on('replicate.progress', (address, hash, entry) => {
eventCount['replicate.progress'] ++
current = db2.replicationStatus.progress
// console.log("[progress] ", '#' + eventCount['replicate.progress'] + ':', current, '/', total, '| Tasks (in/queued/running/out):', db2._loader.tasksRequested, '/', db2._loader.tasksQueued, '/', db2._loader.tasksRunning, '/', db2._loader.tasksFinished)
// assert.equal(db2.replicationStatus.progress, eventCount['replicate.progress'])
events.push({
event: 'replicate.progress',
count: eventCount['replicate.progress'],
entry: entry ,
replicationInfo: {
max: db2.replicationStatus.max,
progress: db2.replicationStatus.progress,
have: db2.replicationStatus.have,
},
})
})
db2.events.on('replicated', (address, length) => {
eventCount['replicated'] += length
current = db2.replicationStatus.progress
// console.log("[replicated]", '#' + eventCount['replicated'] + ':', current, '/', total, '| Tasks (in/queued/running/out):', db2._loader.tasksRequested, '/', db2._loader.tasksQueued, '/', db2._loader.tasksRunning, '/', db2._loader.tasksFinished, "|", db2._loader._stats.a, db2._loader._stats.b, db2._loader._stats.c, db2._loader._stats.d)
assert.equal(current, eventCount['replicated'])
assert.equal(total, expectedEventCount)
// Test the replicator state
assert.equal(db2._loader.tasksRequested >= current, true)
assert.equal(db2._loader.tasksQueued <= db2.options.referenceCount, true)
assert.equal(db2.options.referenceCount, 64)
assert.equal(db2._loader.tasksRunning, 0)
assert.equal(db2._loader.tasksFinished, current)
events.push({
event: 'replicated',
count: eventCount['replicate'],
replicationInfo: {
max: db2.replicationStatus.max,
progress: db2.replicationStatus.progress,
have: db2.replicationStatus.have,
},
})
// Resolve with a little timeout to make sure we
// don't receive more than one event
setTimeout( async () => {
// console.log(eventCount['replicate.progress'], expectedEventCount)
if (eventCount['replicate.progress'] === expectedEventCount) {
finished = true
}
}, 500)
})
const st = new Date().getTime()
timer = setInterval(async () => {
if (finished) {
clearInterval(timer)
// await db2.close()
const et = new Date().getTime()
console.log("Duration:", et - st, "ms")
try {
assert.equal(eventCount['replicate'], expectedEventCount)
assert.equal(eventCount['replicate.progress'], expectedEventCount)
const replicateEvents = events.filter(e => e.event === 'replicate')
assert.equal(replicateEvents.length, expectedEventCount)
assert.equal(replicateEvents[0].entry.payload.value.split(' ')[0], 'hello')
assert.equal(replicateEvents[0].entry.clock.time, expectedEventCount)
const replicateProgressEvents = events.filter(e => e.event === 'replicate.progress')
assert.equal(replicateProgressEvents.length, expectedEventCount)
assert.equal(replicateProgressEvents[0].entry.payload.value.split(' ')[0], 'hello')
assert.equal(replicateProgressEvents[0].entry.clock.time, expectedEventCount)
assert.equal(replicateProgressEvents[0].replicationInfo.max, expectedEventCount)
assert.equal(replicateProgressEvents[0].replicationInfo.progress, 1)
const replicatedEvents = events.filter(e => e.event === 'replicated')
assert.equal(replicatedEvents[0].replicationInfo.max, expectedEventCount)
assert.equal(replicatedEvents[replicatedEvents.length - 1].replicationInfo.progress, expectedEventCount)
resolve()
} catch (e) {
reject(e)
}
setTimeout( async () => {
// console.log(eventCount['replicate.progress'], expectedEventCount)
if (eventCount['replicate.progress'] === expectedEventCount) {
finished = true
}
}, 100)
})
const st = new Date().getTime()
timer = setInterval(async () => {
if (finished) {
clearInterval(timer)
const et = new Date().getTime()
console.log("Duration:", et - st, "ms")
try {
assert.equal(eventCount['replicate'], expectedEventCount)
assert.equal(eventCount['replicate.progress'], expectedEventCount)
const replicateEvents = events.filter(e => e.event === 'replicate')
assert.equal(replicateEvents.length, expectedEventCount)
assert.equal(replicateEvents[0].entry.payload.value.split(' ')[0], 'hello')
assert.equal(replicateEvents[0].entry.clock.time, expectedEventCount)
const replicateProgressEvents = events.filter(e => e.event === 'replicate.progress')
assert.equal(replicateProgressEvents.length, expectedEventCount)
assert.equal(replicateProgressEvents[0].entry.payload.value.split(' ')[0], 'hello')
assert.equal(replicateProgressEvents[0].entry.clock.time, expectedEventCount)
assert.equal(replicateProgressEvents[0].replicationInfo.max, expectedEventCount)
assert.equal(replicateProgressEvents[0].replicationInfo.progress, 1)
const replicatedEvents = events.filter(e => e.event === 'replicated')
assert.equal(replicatedEvents[0].replicationInfo.max, expectedEventCount)
assert.equal(replicatedEvents[replicatedEvents.length - 1].replicationInfo.progress, expectedEventCount)
resolve()
} catch (e) {
reject(e)
}
}
}, 100)
})
})
it('emits correct replication info in two-way replication', async () => {
return new Promise(async (resolve, reject) => {
let finished = false
let eventCount = { 'replicate': 0, 'replicate.progress': 0, 'replicated': 0 }
let events = []
let expectedEventCount = 100
it('emits correct replication info in two-way replication', async () => {
return new Promise(async (resolve, reject) => {
let finished = false
let eventCount = { 'replicate': 0, 'replicate.progress': 0, 'replicated': 0 }
let events = []
let expectedEventCount = 100
// Trigger replication
let adds = []
for (let i = 0; i < expectedEventCount; i ++) {
adds.push(i)
}
// Trigger replication
let adds = []
for (let i = 0; i < expectedEventCount; i ++) {
adds.push(i)
}
const add = async (i) => {
// process.stdout.write("\rWriting " + (i + 1) + " / " + expectedEventCount)
await Promise.all([db1.add('hello-1-' + i), db2.add('hello-2-' + i)])
}
const add = async (i) => {
// process.stdout.write("\rWriting " + (i + 1) + " / " + expectedEventCount)
await Promise.all([db1.add('hello-1-' + i), db2.add('hello-2-' + i)])
}
// Open second instance again
let options = {
path: dbPath2,
overwrite: true,
sync: true,
// Set write access for both clients
write: [
orbitdb1.key.getPublic('hex'),
orbitdb2.key.getPublic('hex')
],
}
// Open second instance again
let options = {
path: dbPath2,
overwrite: true,
sync: true,
// Set write access for both clients
write: [
orbitdb1.key.getPublic('hex'),
orbitdb2.key.getPublic('hex')
],
}
// if (db2) {
// await db2.drop()
// }
db2 = await orbitdb2.eventlog(db1.address.toString(), options)
await waitForPeers(ipfs2, [orbitdb1.id], db1.address.toString())
db2 = await orbitdb2.eventlog(db1.address.toString(), options)
await waitForPeers(ipfs2, [orbitdb1.id], db1.address.toString())
let current = 0
let total = 0
db2.events.on('replicate', (address, entry) => {
eventCount['replicate'] ++
current = db2.replicationStatus.progress
total = db2.replicationStatus.max
// console.log("[replicate] ", '#' + eventCount['replicate'] + ':', current, '/', total, '| Tasks (in/queued/running/out):', db2._loader.tasksRequested, '/', db2._loader.tasksQueued, '/', db2._loader.tasksRunning, '/', db2._loader.tasksFinished)
events.push({
event: 'replicate',
count: eventCount['replicate'],
entry: entry,
})
db2.events.on('replicate', (address, entry) => {
eventCount['replicate'] ++
// console.log("[replicate] ", '#' + eventCount['replicate'] + ':', current, '/', total, '| Tasks (in/queued/running/out):', db2._loader.tasksRequested, '/', db2._loader.tasksQueued, '/', db2._loader.tasksRunning, '/', db2._loader.tasksFinished)
events.push({
event: 'replicate',
count: eventCount['replicate'],
entry: entry,
})
let prevProgress = 0
db2.events.on('replicate.progress', (address, hash, entry) => {
eventCount['replicate.progress'] ++
current = db2.replicationStatus.progress
total = db2.replicationStatus.max
// console.log("[progress] ", '#' + eventCount['replicate.progress'] + ':', current, '/', total, '| Tasks (in/queued/running/out):', db2._loader.tasksRequested, '/', db2._loader.tasksQueued, '/', db2._loader.tasksRunning, '/', db2._loader.tasksFinished)
// assert.equal(current, total)
events.push({
event: 'replicate.progress',
count: eventCount['replicate.progress'],
entry: entry ,
replicationInfo: {
max: db2.replicationStatus.max,
progress: db2.replicationStatus.progress,
have: db2.replicationStatus.have,
},
})
})
db2.events.on('replicated', (address, length) => {
eventCount['replicated'] += length
current = db2.replicationStatus.progress
total = db2.replicationStatus.max
const values = db2.iterator({limit: -1}).collect()
// console.log(current, "/", total, "/", values.length)
// console.log("[replicated]", '#' + eventCount['replicated'] + ':', current, '/', total, '| Tasks (in/queued/running/out):', db2._loader.tasksRequested, '/', db2._loader.tasksQueued, '/', db2._loader.tasksRunning, '/', db2._loader.tasksFinished, "|", db2._loader._stats.a, db2._loader._stats.b, db2._loader._stats.c, db2._loader._stats.d)
assert.equal(current <= total, true)
events.push({
event: 'replicated',
count: eventCount['replicate'],
replicationInfo: {
max: db2.replicationStatus.max,
progress: db2.replicationStatus.progress,
have: db2.replicationStatus.have,
},
})
if (db2.replicationStatus.max >= expectedEventCount * 2
&& db2.replicationStatus.progress >= expectedEventCount * 2)
finished = true
})
const st = new Date().getTime()
try {
await mapSeries(adds, add)
timer = setInterval(() => {
if (finished) {
clearInterval(timer)
const et = new Date().getTime()
console.log("Duration:", et - st, "ms")
// console.log(eventCount['replicate'])
assert.equal(eventCount['replicate'], expectedEventCount)
assert.equal(eventCount['replicate.progress'], expectedEventCount)
assert.equal(eventCount['replicated'], expectedEventCount)
const replicateEvents = events.filter(e => e.event === 'replicate')
assert.equal(replicateEvents.length, expectedEventCount)
const replicateProgressEvents = events.filter(e => e.event === 'replicate.progress')
assert.equal(replicateProgressEvents.length, expectedEventCount)
assert.equal(replicateProgressEvents[replicateProgressEvents.length - 1].entry.clock.time, expectedEventCount)
assert.equal(replicateProgressEvents[replicateProgressEvents.length - 1].replicationInfo.max, expectedEventCount * 2)
assert.equal(replicateProgressEvents[replicateProgressEvents.length - 1].replicationInfo.progress, expectedEventCount * 2)
const replicatedEvents = events.filter(e => e.event === 'replicated')
assert.equal(replicatedEvents[replicatedEvents.length - 1].replicationInfo.progress, expectedEventCount * 2)
assert.equal(replicatedEvents[replicatedEvents.length - 1].replicationInfo.max, expectedEventCount * 2)
const values1 = db1.iterator({limit: -1}).collect()
const values2 = db2.iterator({limit: -1}).collect()
assert.deepEqual(values1, values2)
// Test the replicator state
assert.equal(db1._loader.tasksRequested, expectedEventCount)
assert.equal(db1._loader.tasksQueued, 0)
assert.equal(db1._loader.tasksRunning, 0)
assert.equal(db1._loader.tasksFinished, expectedEventCount)
assert.equal(db2._loader.tasksRequested, expectedEventCount)
assert.equal(db2._loader.tasksQueued, 0)
assert.equal(db2._loader.tasksRunning, 0)
assert.equal(db2._loader.tasksFinished, expectedEventCount)
resolve()
}
}, 100)
} catch (e) {
reject(e)
}
})
let prevProgress = 0
db2.events.on('replicate.progress', (address, hash, entry) => {
eventCount['replicate.progress'] ++
// console.log("[progress] ", '#' + eventCount['replicate.progress'] + ':', current, '/', total, '| Tasks (in/queued/running/out):', db2._loader.tasksRequested, '/', db2._loader.tasksQueued, '/', db2._loader.tasksRunning, '/', db2._loader.tasksFinished)
// assert.equal(current, total)
events.push({
event: 'replicate.progress',
count: eventCount['replicate.progress'],
entry: entry ,
replicationInfo: {
max: db2.replicationStatus.max,
progress: db2.replicationStatus.progress,
},
})
})
db2.events.on('replicated', (address, length) => {
eventCount['replicated'] += length
const values = db2.iterator({limit: -1}).collect()
// console.log("[replicated]", '#' + eventCount['replicated'] + ':', current, '/', total, '| Tasks (in/queued/running/out):', db2._loader.tasksRequested, '/', db2._loader.tasksQueued, '/', db2._loader.tasksRunning, '/', db2._loader.tasksFinished, "|", db2._loader._stats.a, db2._loader._stats.b, db2._loader._stats.c, db2._loader._stats.d)
assert.equal(db2.replicationStatus.progress <= db2.replicationStatus.max, true)
events.push({
event: 'replicated',
count: eventCount['replicate'],
replicationInfo: {
max: db2.replicationStatus.max,
progress: db2.replicationStatus.progress,
},
})
if (db2.replicationStatus.max >= expectedEventCount * 2
&& db2.replicationStatus.progress >= expectedEventCount * 2)
finished = true
})
const st = new Date().getTime()
try {
await mapSeries(adds, add)
timer = setInterval(() => {
if (finished) {
clearInterval(timer)
const et = new Date().getTime()
// console.log("Duration:", et - st, "ms")
assert.equal(eventCount['replicate'], expectedEventCount)
assert.equal(eventCount['replicate.progress'], expectedEventCount)
assert.equal(eventCount['replicated'], expectedEventCount)
const replicateEvents = events.filter(e => e.event === 'replicate')
assert.equal(replicateEvents.length, expectedEventCount)
const replicateProgressEvents = events.filter(e => e.event === 'replicate.progress')
assert.equal(replicateProgressEvents.length, expectedEventCount)
assert.equal(replicateProgressEvents[replicateProgressEvents.length - 1].entry.clock.time, expectedEventCount)
assert.equal(replicateProgressEvents[replicateProgressEvents.length - 1].replicationInfo.max, expectedEventCount * 2)
assert.equal(replicateProgressEvents[replicateProgressEvents.length - 1].replicationInfo.progress, expectedEventCount * 2)
const replicatedEvents = events.filter(e => e.event === 'replicated')
assert.equal(replicatedEvents[replicatedEvents.length - 1].replicationInfo.progress, expectedEventCount * 2)
assert.equal(replicatedEvents[replicatedEvents.length - 1].replicationInfo.max, expectedEventCount * 2)
const values1 = db1.iterator({limit: -1}).collect()
const values2 = db2.iterator({limit: -1}).collect()
assert.deepEqual(values1, values2)
// Test the replicator state
assert.equal(db1._loader.tasksRequested, expectedEventCount)
assert.equal(db1._loader.tasksQueued, 0)
assert.equal(db1._loader.tasksRunning, 0)
assert.equal(db1._loader.tasksFinished, expectedEventCount)
assert.equal(db2._loader.tasksRequested, expectedEventCount)
assert.equal(db2._loader.tasksQueued, 0)
assert.equal(db2._loader.tasksRunning, 0)
assert.equal(db2._loader.tasksFinished, expectedEventCount)
resolve()
}
}, 100)
} catch (e) {
reject(e)
}
})
})
})