From 0558c323430b6a60624d2d8127bbbc1a0ca2ce04 Mon Sep 17 00:00:00 2001 From: haad Date: Fri, 30 Mar 2018 12:38:55 +0200 Subject: [PATCH] Move exchanging heads logic to its own function Use latest store modules Add a set of tests for checking database replication status Fix tests as per new replication status api Speed up and improve tests --- package-lock.json | 100 +++- package.json | 5 +- src/OrbitDB.js | 69 ++- src/exchange-heads.js | 45 ++ test/multiple-databases.test.js | 16 +- test/replicate.test.js | 840 +++++++++++++++----------------- 6 files changed, 578 insertions(+), 497 deletions(-) create mode 100644 src/exchange-heads.js diff --git a/package-lock.json b/package-lock.json index 50dbaf0..e9e44ab 100644 --- a/package-lock.json +++ b/package-lock.json @@ -6158,9 +6158,9 @@ "dev": true }, "ipfs-log": { - "version": "4.0.6", - "resolved": "https://registry.npmjs.org/ipfs-log/-/ipfs-log-4.0.6.tgz", - "integrity": "sha512-LcCFq8AF8CDCKFAM78QkbH5+6VHk0sX5qu6k7/b7wVetuaTx3Xq85axfFjj7D5YeQdssBGLqex1x6EAx9MX9+Q==", + "version": "4.1.0", + "resolved": "https://registry.npmjs.org/ipfs-log/-/ipfs-log-4.1.0.tgz", + "integrity": "sha512-teHxC0mUA9Jffq5nDzephsxsdx+RKB7pLhR+G7xtXsNLAYdUvrCFgItz8BRunNZOAE009u6QRyr6LJHF9F3rnQ==", "requires": { "p-map": "1.2.0", "p-whilst": "1.0.0" @@ -6177,9 +6177,9 @@ } }, "ipfs-pubsub-1on1": { - "version": "0.0.2", - "resolved": "https://registry.npmjs.org/ipfs-pubsub-1on1/-/ipfs-pubsub-1on1-0.0.2.tgz", - "integrity": "sha512-Y3FYTvEy9MJWl6e2tM8tQgqR6SPFE30DjAcUrB6AJ2Ha1oGZl1aVVhuXZWSEVYIGpF0546dklOTfhvIGB5CUBQ==", + "version": "0.0.3", + "resolved": "https://registry.npmjs.org/ipfs-pubsub-1on1/-/ipfs-pubsub-1on1-0.0.3.tgz", + "integrity": "sha512-sQvcv/gnLCEVDqGEE3/blrm4v5wwzTYKtC1LENrnW6NmHG6Cz+chaOTeRLR/rUhjGu32slL+4nTsPDtrX446GQ==", "requires": { "safe-buffer": "5.1.1" } @@ -7280,7 +7280,7 @@ "keypair": "1.0.1", "libp2p-crypto-secp256k1": "0.2.2", "multihashing-async": "0.4.8", - "node-forge": "0.7.4", + "node-forge": "0.7.5", "pem-jwk": "1.5.1", "protons": "1.0.1", "rsa-pem-to-jwk": "1.1.3", @@ -8489,9 +8489,9 @@ } }, "node-forge": { - "version": "0.7.4", - "resolved": "https://registry.npmjs.org/node-forge/-/node-forge-0.7.4.tgz", - "integrity": "sha512-8Df0906+tq/omxuCZD6PqhPaQDYuyJ1d+VITgxoIA8zvQd1ru+nMJcDChHH324MWitIgbVkAkQoGEEVJNpn/PA==", + "version": "0.7.5", + "resolved": "https://registry.npmjs.org/node-forge/-/node-forge-0.7.5.tgz", + "integrity": "sha512-MmbQJ2MTESTjt3Gi/3yG1wGpIMhUfcIypUCGtTizFR9IiccFwxSpfp0vtIZlkFclEqERemxfnSdZEMR9VqqEFQ==", "dev": true }, "node-libs-browser": { @@ -8752,12 +8752,12 @@ } }, "orbit-db-counterstore": { - "version": "1.3.1", - "resolved": "https://registry.npmjs.org/orbit-db-counterstore/-/orbit-db-counterstore-1.3.1.tgz", - "integrity": "sha512-ZrSv5dTOslPSkMFOzyGdr9aotioGH6Gg86ULWswF54//uD5p9034Eo0s2L5ez9KRqr196vFo+fx23uU3QAEV5A==", + "version": "1.4.0", + "resolved": "https://registry.npmjs.org/orbit-db-counterstore/-/orbit-db-counterstore-1.4.0.tgz", + "integrity": "sha512-L7GBp1q1LawWi398wHqMgN6LwdGssB1GomwsSCeSHTMO0iPPUNe7S+lsAuJ+ZWc07qH0wvHnWkd6CC2JJaLOIA==", "requires": { "crdts": "0.1.5", - "orbit-db-store": "2.4.0" + "orbit-db-store": "2.5.0" } }, "orbit-db-docstore": { @@ -8769,6 +8769,26 @@ "p-map": "1.1.1" }, "dependencies": { + "ipfs-log": { + "version": "4.0.6", + "resolved": "https://registry.npmjs.org/ipfs-log/-/ipfs-log-4.0.6.tgz", + "integrity": "sha512-LcCFq8AF8CDCKFAM78QkbH5+6VHk0sX5qu6k7/b7wVetuaTx3Xq85axfFjj7D5YeQdssBGLqex1x6EAx9MX9+Q==", + "requires": { + "p-map": "1.1.1", + "p-whilst": "1.0.0" + } + }, + "orbit-db-store": { + "version": "2.4.0", + "resolved": "https://registry.npmjs.org/orbit-db-store/-/orbit-db-store-2.4.0.tgz", + "integrity": "sha512-dIefF8jSvhwHj77YU7IR7rNoOJbUXaM6ByJmuQH05V5f6+b1Uu2vP4wm/9QQazozzU+D472fanpSC9RkBOi3bA==", + "requires": { + "ipfs-log": "4.0.6", + "logplease": "1.2.14", + "p-each-series": "1.0.0", + "readable-stream": "2.3.5" + } + }, "p-map": { "version": "1.1.1", "resolved": "https://registry.npmjs.org/p-map/-/p-map-1.1.1.tgz", @@ -8782,6 +8802,28 @@ "integrity": "sha512-e9/AFS7U6i4hcm35x2CmTyNhVNUfjd4CWgdT6nyF4UIjE4vJvvYmj1qZJx+L54HCqoLKhJuOs/yhmQIhH6wQEA==", "requires": { "orbit-db-store": "2.4.0" + }, + "dependencies": { + "ipfs-log": { + "version": "4.0.6", + "resolved": "https://registry.npmjs.org/ipfs-log/-/ipfs-log-4.0.6.tgz", + "integrity": "sha512-LcCFq8AF8CDCKFAM78QkbH5+6VHk0sX5qu6k7/b7wVetuaTx3Xq85axfFjj7D5YeQdssBGLqex1x6EAx9MX9+Q==", + "requires": { + "p-map": "1.2.0", + "p-whilst": "1.0.0" + } + }, + "orbit-db-store": { + "version": "2.4.0", + "resolved": "https://registry.npmjs.org/orbit-db-store/-/orbit-db-store-2.4.0.tgz", + "integrity": "sha512-dIefF8jSvhwHj77YU7IR7rNoOJbUXaM6ByJmuQH05V5f6+b1Uu2vP4wm/9QQazozzU+D472fanpSC9RkBOi3bA==", + "requires": { + "ipfs-log": "4.0.6", + "logplease": "1.2.14", + "p-each-series": "1.0.0", + "readable-stream": "2.3.5" + } + } } }, "orbit-db-feedstore": { @@ -8808,6 +8850,28 @@ "integrity": "sha512-4jB123qGQvfPVdF+3cv0gKYr1gcygPWqqO4/hr+9eiAL1NMRAdzo0gCMafOMDv9dWk21ppKKKcgugWBNNtZWyg==", "requires": { "orbit-db-store": "2.4.0" + }, + "dependencies": { + "ipfs-log": { + "version": "4.0.6", + "resolved": "https://registry.npmjs.org/ipfs-log/-/ipfs-log-4.0.6.tgz", + "integrity": "sha512-LcCFq8AF8CDCKFAM78QkbH5+6VHk0sX5qu6k7/b7wVetuaTx3Xq85axfFjj7D5YeQdssBGLqex1x6EAx9MX9+Q==", + "requires": { + "p-map": "1.2.0", + "p-whilst": "1.0.0" + } + }, + "orbit-db-store": { + "version": "2.4.0", + "resolved": "https://registry.npmjs.org/orbit-db-store/-/orbit-db-store-2.4.0.tgz", + "integrity": "sha512-dIefF8jSvhwHj77YU7IR7rNoOJbUXaM6ByJmuQH05V5f6+b1Uu2vP4wm/9QQazozzU+D472fanpSC9RkBOi3bA==", + "requires": { + "ipfs-log": "4.0.6", + "logplease": "1.2.14", + "p-each-series": "1.0.0", + "readable-stream": "2.3.5" + } + } } }, "orbit-db-pubsub": { @@ -8820,11 +8884,11 @@ } }, "orbit-db-store": { - "version": "2.4.0", - "resolved": "https://registry.npmjs.org/orbit-db-store/-/orbit-db-store-2.4.0.tgz", - "integrity": "sha512-dIefF8jSvhwHj77YU7IR7rNoOJbUXaM6ByJmuQH05V5f6+b1Uu2vP4wm/9QQazozzU+D472fanpSC9RkBOi3bA==", + "version": "2.5.0", + "resolved": "https://registry.npmjs.org/orbit-db-store/-/orbit-db-store-2.5.0.tgz", + "integrity": "sha512-vKHRVNpKcyNrvMt0NSOU/jjzIcIBVk4sbYS9g7RdnJBWx3D5mDCh8xDzzkq3Yv5UZSQWDM4oiXK0h08hz9kYJQ==", "requires": { - "ipfs-log": "4.0.6", + "ipfs-log": "4.1.0", "logplease": "1.2.14", "p-each-series": "1.0.0", "readable-stream": "2.3.5" diff --git a/package.json b/package.json index d97ddca..7872a0e 100644 --- a/package.json +++ b/package.json @@ -13,16 +13,17 @@ }, "main": "src/OrbitDB.js", "dependencies": { - "ipfs-pubsub-1on1": "~0.0.2", + "ipfs-pubsub-1on1": "~0.0.3", "logplease": "^1.2.14", "multihashes": "^0.4.12", "orbit-db-cache": "~0.2.2", - "orbit-db-counterstore": "~1.3.1", + "orbit-db-counterstore": "~1.4.0", "orbit-db-docstore": "~1.3.1", "orbit-db-eventstore": "~1.3.1", "orbit-db-feedstore": "~1.3.1", "orbit-db-keystore": "~0.1.0", "orbit-db-kvstore": "~1.3.1", + "orbit-db-store": "~2.5.0", "orbit-db-pubsub": "~0.5.1" }, "devDependencies": { diff --git a/src/OrbitDB.js b/src/OrbitDB.js index e6bf3ed..626c0af 100644 --- a/src/OrbitDB.js +++ b/src/OrbitDB.js @@ -7,16 +7,16 @@ const KeyValueStore = require('orbit-db-kvstore') const CounterStore = require('orbit-db-counterstore') const DocumentStore = require('orbit-db-docstore') const Pubsub = require('orbit-db-pubsub') -const Channel = require('ipfs-pubsub-1on1') const Cache = require('orbit-db-cache') const Keystore = require('orbit-db-keystore') const AccessController = require('./ipfs-access-controller') const OrbitDBAddress = require('./orbit-db-address') const createDBManifest = require('./db-manifest') +const exchangeHeads = require('./exchange-heads') const Logger = require('logplease') const logger = Logger.create("orbit-db") -Logger.setLogLevel('NONE') +Logger.setLogLevel('ERROR') // Mapping for 'database type' -> Class let databaseTypes = { @@ -87,11 +87,14 @@ class OrbitDB { delete this.stores[db.address.toString()] } - // Close all direct connections to peers - Object.keys(this._directConnections).forEach(e => { + // Close a direct connection and remove it from internal state + const removeDirectConnect = e => { this._directConnections[e].close() delete this._directConnections[e] - }) + } + + // Close all direct connections to peers + Object.keys(this._directConnections).forEach(removeDirectConnect) // Disconnect from pubsub if (this._pubsub) @@ -136,6 +139,9 @@ class OrbitDB { const addr = address.toString() this.stores[addr] = store + // Subscribe to pubsub to get updates from peers, + // this is what hooks us into the message propagation layer + // and the p2p network if(opts.replicate && this._pubsub) this._pubsub.subscribe(addr, this._onMessage.bind(this), this._onPeerConnected.bind(this)) @@ -153,44 +159,35 @@ class OrbitDB { const store = this.stores[address] try { logger.debug(`Received ${heads.length} heads for '${address}':\n`, JSON.stringify(heads.map(e => e.hash), null, 2)) - if (store) + if (store && heads && heads.length > 0) { await store.sync(heads) + } } catch (e) { logger.error(e) } } // Callback for when a peer connected to a database - async _onPeerConnected (address, peer, room) { + async _onPeerConnected (address, peer) { logger.debug(`New peer '${peer}' connected to '${address}'`) - const store = this.stores[address] - if (store) { - // Create a direct channel to the connected peer - let channel = this._directConnections[peer] - if (!channel) { - try { - logger.debug(`Create a channel`) - channel = await Channel.open(this._ipfs, peer) - channel.on('message', (message) => this._onMessage(address, JSON.parse(message.data))) - this._directConnections[peer] = channel - logger.debug(`Channel created`) - } catch (e) { - console.error(e) - logger.error(e) - } - } - // Send the newly connected peer our latest heads - let heads = store._oplog.heads - if (heads.length > 0) { - // Wait for the direct channel to be fully connected - await channel.connect() - logger.debug(`Send latest heads of '${address}':\n`, JSON.stringify(heads.map(e => e.hash), null, 2)) - channel.send(JSON.stringify(heads)) - } - store.events.emit('peer', peer) - } else { - logger.error(`Database '${address}' is not open!`) - } + + const getStore = address => this.stores[address] + const getDirectConnection = peer => this._directConnections[peer] + const onChannelCreated = channel => this._directConnections[channel._receiverID] = channel + const onMessage = (address, heads) => this._onMessage(address, heads) + + const channel = await exchangeHeads( + this._ipfs, + address, + peer, + getStore, + getDirectConnection, + onMessage, + onChannelCreated + ) + + if (getStore(address)) + getStore(address).events.emit('peer', peer) } // Callback when database was closed @@ -314,7 +311,7 @@ class OrbitDB { // If we want to try and open the database local-only, throw an error // if we don't have the database locally if (options.localOnly && !haveDB) { - logger.error(`Database '${dbAddress}' doesn't exist!`) + logger.warn(`Database '${dbAddress}' doesn't exist!`) throw new Error(`Database '${dbAddress}' doesn't exist!`) } diff --git a/src/exchange-heads.js b/src/exchange-heads.js new file mode 100644 index 0000000..7660409 --- /dev/null +++ b/src/exchange-heads.js @@ -0,0 +1,45 @@ +'use strict' + +const Channel = require('ipfs-pubsub-1on1') + +const Logger = require('logplease') +const logger = Logger.create("exchange-heads", { color: Logger.Colors.Yellow }) +Logger.setLogLevel('ERROR') + +const getHeadsForDatabase = store => (store && store._oplog) ? store._oplog.heads : [] + +const exchangeHeads = async (ipfs, address, peer, getStore, getDirectConnection, onMessage, onChannelCreated) => { + const _handleMessage = message => { + const msg = JSON.parse(message.data) + const { address, heads } = msg + onMessage(address, heads) + } + + let channel = getDirectConnection(peer) + if (!channel) { + try { + logger.debug(`Create a channel to ${peer}`) + channel = await Channel.open(ipfs, peer) + channel.on('message', _handleMessage) + logger.debug(`Channel created to ${peer}`) + onChannelCreated(channel) + } catch (e) { + logger.error(e) + } + } + + // Wait for the direct channel to be fully connected + await channel.connect() + logger.debug(`Connected to ${peer}`) + + // Send the heads if we have any + const heads = getHeadsForDatabase(getStore(address)) + logger.debug(`Send latest heads of '${address}':\n`, JSON.stringify(heads.map(e => e.hash), null, 2)) + if (heads) { + channel.send(JSON.stringify({ address: address, heads: heads })) + } + + return channel +} + +module.exports = exchangeHeads diff --git a/test/multiple-databases.test.js b/test/multiple-databases.test.js index 12a64bd..a532780 100644 --- a/test/multiple-databases.test.js +++ b/test/multiple-databases.test.js @@ -4,12 +4,16 @@ const assert = require('assert') const mapSeries = require('p-each-series') const rmrf = require('rimraf') const OrbitDB = require('../src/OrbitDB') -const config = require('./utils/config') -const startIpfs = require('./utils/start-ipfs') -const stopIpfs = require('./utils/stop-ipfs') -const testAPIs = require('./utils/test-apis') -const connectPeers = require('./utils/connect-peers') -const waitForPeers = require('./utils/wait-for-peers') + +// Include test utilities +const { + config, + startIpfs, + stopIpfs, + connectPeers, + waitForPeers, + testAPIs, +} = require('./utils') const dbPath1 = './orbitdb/tests/multiple-databases/1' const dbPath2 = './orbitdb/tests/multiple-databases/2' diff --git a/test/replicate.test.js b/test/replicate.test.js index ce62585..acf63cf 100644 --- a/test/replicate.test.js +++ b/test/replicate.test.js @@ -23,52 +23,246 @@ const ipfsPath2 = './orbitdb/tests/replication/2/ipfs' Object.keys(testAPIs).forEach(API => { describe(`orbit-db - Replication (${API})`, function() { - this.timeout(config.timeout * 2) + this.timeout(config.timeout) let ipfsd1, ipfsd2, ipfs1, ipfs2 let orbitdb1, orbitdb2, db1, db2 let id1, id2 - describe('two peers', function() { - let timer - let options + let timer + let options - before(async () => { - config.daemon1.repo = ipfsPath1 - config.daemon2.repo = ipfsPath2 - rmrf.sync(config.daemon1.repo) - rmrf.sync(config.daemon2.repo) - rmrf.sync(dbPath1) - rmrf.sync(dbPath2) - ipfsd1 = await startIpfs(API, config.daemon1) - ipfsd2 = await startIpfs(API, config.daemon2) - ipfs1 = ipfsd1.api - ipfs2 = ipfsd2.api - // Use memory store for quicker tests - const memstore = new MemStore() - ipfs1.object.put = memstore.put.bind(memstore) - ipfs1.object.get = memstore.get.bind(memstore) - ipfs2.object.put = memstore.put.bind(memstore) - ipfs2.object.get = memstore.get.bind(memstore) - // Connect the peers manually to speed up test times - await connectPeers(ipfs1, ipfs2) + before(async () => { + config.daemon1.repo = ipfsPath1 + config.daemon2.repo = ipfsPath2 + rmrf.sync(config.daemon1.repo) + rmrf.sync(config.daemon2.repo) + rmrf.sync(dbPath1) + rmrf.sync(dbPath2) + ipfsd1 = await startIpfs(API, config.daemon1) + ipfsd2 = await startIpfs(API, config.daemon2) + ipfs1 = ipfsd1.api + ipfs2 = ipfsd2.api + // Use memory store for quicker tests + const memstore = new MemStore() + ipfs1.object.put = memstore.put.bind(memstore) + ipfs1.object.get = memstore.get.bind(memstore) + ipfs2.object.put = memstore.put.bind(memstore) + ipfs2.object.get = memstore.get.bind(memstore) + // Connect the peers manually to speed up test times + await connectPeers(ipfs1, ipfs2) + }) + + after(async () => { + if (ipfsd1) + await stopIpfs(ipfsd1) + + if (ipfsd2) + await stopIpfs(ipfsd2) + }) + + beforeEach(async () => { + clearInterval(timer) + + orbitdb1 = new OrbitDB(ipfs1, dbPath1) + orbitdb2 = new OrbitDB(ipfs2, dbPath2) + + options = { + // Set write access for both clients + write: [ + orbitdb1.key.getPublic('hex'), + orbitdb2.key.getPublic('hex') + ], + } + + options = Object.assign({}, options, { path: dbPath1 }) + db1 = await orbitdb1.eventlog('replication-tests', options) + }) + + afterEach(async () => { + clearInterval(timer) + options = {} + + if (db1) + await db1.drop() + + if (db2) + await db2.drop() + + if(orbitdb1) + await orbitdb1.stop() + + if(orbitdb2) + await orbitdb2.stop() + }) + + it('replicates database of 1 entry', async () => { + // Set 'sync' flag on. It'll prevent creating a new local database and rather + // fetch the database from the network + options = Object.assign({}, options, { path: dbPath2, sync: true }) + db2 = await orbitdb2.eventlog(db1.address.toString(), options) + await waitForPeers(ipfs2, [orbitdb1.id], db1.address.toString()) + + await db1.add('hello') + return new Promise(resolve => { + setTimeout(() => { + const items = db2.iterator().collect() + assert.equal(items.length, 1) + assert.equal(items[0].payload.value, 'hello') + resolve() + }, 500) + }) + }) + + it('replicates database of 100 entries', async () => { + options = Object.assign({}, options, { path: dbPath2, sync: true }) + db2 = await orbitdb2.eventlog(db1.address.toString(), options) + await waitForPeers(ipfs2, [orbitdb1.id], db1.address.toString()) + + const entryCount = 100 + const entryArr = [] + + for (let i = 0; i < entryCount; i ++) + entryArr.push(i) + + return new Promise(async (resolve, reject) => { + try { + const add = i => db1.add('hello' + i) + await mapSeries(entryArr, add) + } catch (e) { + reject(e) + } + + timer = setInterval(() => { + const items = db2.iterator({ limit: -1 }).collect() + if (items.length === entryCount) { + clearInterval(timer) + assert.equal(items.length, entryCount) + assert.equal(items[0].payload.value, 'hello0') + assert.equal(items[items.length - 1].payload.value, 'hello99') + resolve() + } + }, 100) + }) + }) + + it('emits correct replication info', async () => { + options = Object.assign({}, options, { path: dbPath2, sync: true }) + db2 = await orbitdb2.eventlog(db1.address.toString(), options) + await waitForPeers(ipfs2, [orbitdb1.id], db1.address.toString()) + + let finished = false + let eventCount = { 'replicate': 0, 'replicate.progress': 0, 'replicated': 0 } + let events = [] + let expectedEventCount = 99 + + db2.events.on('replicate', (address, entry) => { + eventCount['replicate'] ++ + events.push({ + event: 'replicate', + count: eventCount['replicate'], + entry: entry, + }) }) - after(async () => { - if (ipfsd1) - await stopIpfs(ipfsd1) - - if (ipfsd2) - await stopIpfs(ipfsd2) + db2.events.on('replicate.progress', (address, hash, entry, progress, total) => { + eventCount['replicate.progress'] ++ + events.push({ + event: 'replicate.progress', + count: eventCount['replicate.progress'], + entry: entry , + replicationInfo: { + max: db2.replicationStatus.max, + progress: db2.replicationStatus.progress, + }, + }) }) - beforeEach(async () => { - clearInterval(timer) + db2.events.on('replicated', (address) => { + eventCount['replicated'] ++ + events.push({ + event: 'replicated', + count: eventCount['replicate'], + replicationInfo: { + max: db2.replicationStatus.max, + progress: db2.replicationStatus.progress, + }, + }) + // Resolve with a little timeout to make sure we + // don't receive more than one event + setTimeout(() => { + finished = db2.iterator({ limit: -1 }).collect().length === expectedEventCount + }, 200) + }) - orbitdb1 = new OrbitDB(ipfs1, dbPath1) - orbitdb2 = new OrbitDB(ipfs2, dbPath2) + return new Promise((resolve, reject) => { + try { + timer = setInterval(() => { + if (finished) { + clearInterval(timer) - options = { + assert.equal(eventCount['replicate'], expectedEventCount) + assert.equal(eventCount['replicate.progress'], expectedEventCount) + + const replicateEvents = events.filter(e => e.event === 'replicate') + assert.equal(replicateEvents.length, expectedEventCount) + assert.equal(replicateEvents[0].entry.payload.value.split(' ')[0], 'hello') + assert.equal(replicateEvents[0].entry.clock.time, 1) + + const replicateProgressEvents = events.filter(e => e.event === 'replicate.progress') + assert.equal(replicateProgressEvents.length, expectedEventCount) + assert.equal(replicateProgressEvents[0].entry.payload.value.split(' ')[0], 'hello') + assert.equal(replicateProgressEvents[0].entry.clock.time, 1) + assert.equal(replicateProgressEvents[0].replicationInfo.max >= 1, true) + assert.equal(replicateProgressEvents[0].replicationInfo.progress, 1) + + const replicatedEvents = events.filter(e => e.event === 'replicated') + assert.equal(replicatedEvents[0].replicationInfo.max >= 1, true) + assert.equal(replicatedEvents[0].replicationInfo.progress >= 1, true) + + resolve() + } + }, 100) + } catch (e) { + reject(e) + } + + // Trigger replication + let adds = [] + for (let i = 0; i < expectedEventCount; i ++) { + adds.push(i) + } + + mapSeries(adds, i => db1.add('hello ' + i)) + }) + }) + + it('emits correct replication info on fresh replication', async () => { + return new Promise(async (resolve, reject) => { + let finished = false + let eventCount = { 'replicate': 0, 'replicate.progress': 0, 'replicated': 0 } + let events = [] + let expectedEventCount = 512 + + // Trigger replication + let adds = [] + for (let i = 0; i < expectedEventCount; i ++) { + adds.push(i) + } + + const add = async (i) => { + process.stdout.write("\rWriting " + (i + 1) + " / " + expectedEventCount + " ") + await db1.add('hello ' + i) + } + + await mapSeries(adds, add) + console.log() + + // Open second instance again + options = { + path: dbPath2, + overwrite: true, + sync: true, // Set write access for both clients write: [ orbitdb1.key.getPublic('hex'), @@ -76,88 +270,11 @@ Object.keys(testAPIs).forEach(API => { ], } - options = Object.assign({}, options, { path: dbPath1 }) - db1 = await orbitdb1.eventlog('replication-tests', options) - }) - - afterEach(async () => { - clearInterval(timer) - options = {} - - if (db1) - await db1.drop() - - if (db2) - await db2.drop() - - if(orbitdb1) - await orbitdb1.stop() - - if(orbitdb2) - await orbitdb2.stop() - }) - - it('replicates database of 1 entry', async () => { - // Set 'sync' flag on. It'll prevent creating a new local database and rather - // fetch the database from the network - options = Object.assign({}, options, { path: dbPath2, sync: true }) db2 = await orbitdb2.eventlog(db1.address.toString(), options) - await waitForPeers(ipfs2, [orbitdb1.id], db1.address.toString()) - - await db1.add('hello') - return new Promise(resolve => { - setTimeout(() => { - const items = db2.iterator().collect() - assert.equal(items.length, 1) - assert.equal(items[0].payload.value, 'hello') - resolve() - }, 1000) - }) - }) - - it('replicates database of 100 entries', async () => { - options = Object.assign({}, options, { path: dbPath2, sync: true }) - db2 = await orbitdb2.eventlog(db1.address.toString(), options) - await waitForPeers(ipfs2, [orbitdb1.id], db1.address.toString()) - - const entryCount = 100 - const entryArr = [] - - for (let i = 0; i < entryCount; i ++) - entryArr.push(i) - - return new Promise(async (resolve, reject) => { - try { - await mapSeries(entryArr, (i) => db1.add('hello' + i)) - } catch (e) { - reject(e) - } - - timer = setInterval(() => { - const items = db2.iterator({ limit: -1 }).collect() - if (items.length === entryCount) { - clearInterval(timer) - assert.equal(items.length, entryCount) - assert.equal(items[0].payload.value, 'hello0') - assert.equal(items[items.length - 1].payload.value, 'hello99') - resolve() - } - }, 1000) - }) - }) - - it('emits correct replication info', async () => { - options = Object.assign({}, options, { path: dbPath2, sync: true }) - db2 = await orbitdb2.eventlog(db1.address.toString(), options) - await waitForPeers(ipfs2, [orbitdb1.id], db1.address.toString()) - - let finished = false - let eventCount = { 'replicate': 0, 'replicate.progress': 0, 'replicated': 0 } - let events = [] - let expectedEventCount = 99 db2.events.on('replicate', (address, entry) => { eventCount['replicate'] ++ + // console.log("[replicate] ", '#' + eventCount['replicate'] + ':', db2.replicationStatus.progress, '/', db2.replicationStatus.max, '| Tasks (in/queued/running/out):', db2._loader.tasksRequested, '/', db2._loader.tasksQueued, '/', db2._loader.tasksRunning, '/', db2._loader.tasksFinished) events.push({ event: 'replicate', count: eventCount['replicate'], @@ -165,8 +282,10 @@ Object.keys(testAPIs).forEach(API => { }) }) - db2.events.on('replicate.progress', (address, hash, entry, progress) => { + db2.events.on('replicate.progress', (address, hash, entry) => { eventCount['replicate.progress'] ++ + // console.log("[progress] ", '#' + eventCount['replicate.progress'] + ':', db2.replicationStatus.progress, '/', db2.replicationStatus.max, '| Tasks (in/queued/running/out):', db2._loader.tasksRequested, '/', db2._loader.tasksQueued, '/', db2._loader.tasksRunning, '/', db2._loader.tasksFinished) + // assert.equal(db2.replicationStatus.progress, eventCount['replicate.progress']) events.push({ event: 'replicate.progress', count: eventCount['replicate.progress'], @@ -174,354 +293,205 @@ Object.keys(testAPIs).forEach(API => { replicationInfo: { max: db2.replicationStatus.max, progress: db2.replicationStatus.progress, - have: db2.replicationStatus.have, }, }) }) - db2.events.on('replicated', (address) => { - eventCount['replicated'] ++ + db2.events.on('replicated', (address, length) => { + eventCount['replicated'] += length + // console.log("[replicated]", '#' + eventCount['replicated'] + ':', db2.replicationStatus.progress, '/', db2.replicationStatus.max, '| Tasks (in/queued/running/out):', db2._loader.tasksRequested, '/', db2._loader.tasksQueued, '/', db2._loader.tasksRunning, '/', db2._loader.tasksFinished, "|") + assert.equal(db2.replicationStatus.progress, eventCount['replicated']) + assert.equal(db2.replicationStatus.max, expectedEventCount) + + // Test the replicator state + assert.equal(db2._loader.tasksRequested >= db2.replicationStatus.progress, true) + assert.equal(db2._loader.tasksQueued <= db2.options.referenceCount, true) + assert.equal(db2.options.referenceCount, 64) + assert.equal(db2._loader.tasksRunning, 0) + assert.equal(db2._loader.tasksFinished, db2.replicationStatus.progress) + events.push({ event: 'replicated', count: eventCount['replicate'], replicationInfo: { max: db2.replicationStatus.max, progress: db2.replicationStatus.progress, - have: db2.replicationStatus.have, }, }) // Resolve with a little timeout to make sure we // don't receive more than one event - setTimeout(() => { - finished = db2.iterator({ limit: -1 }).collect().length === expectedEventCount - }, 500) - }) - - return new Promise((resolve, reject) => { - try { - timer = setInterval(() => { - if (finished) { - clearInterval(timer) - - assert.equal(eventCount['replicate'], expectedEventCount) - assert.equal(eventCount['replicate.progress'], expectedEventCount) - - const replicateEvents = events.filter(e => e.event === 'replicate') - assert.equal(replicateEvents.length, expectedEventCount) - assert.equal(replicateEvents[0].entry.payload.value.split(' ')[0], 'hello') - assert.equal(replicateEvents[0].entry.clock.time, 1) - - const replicateProgressEvents = events.filter(e => e.event === 'replicate.progress') - assert.equal(replicateProgressEvents.length, expectedEventCount) - assert.equal(replicateProgressEvents[0].entry.payload.value.split(' ')[0], 'hello') - assert.equal(replicateProgressEvents[0].entry.clock.time, 1) - assert.equal(replicateProgressEvents[0].replicationInfo.max, 1) - assert.equal(replicateProgressEvents[0].replicationInfo.progress, 1) - - const replicatedEvents = events.filter(e => e.event === 'replicated') - assert.equal(replicatedEvents[0].replicationInfo.max, 1) - assert.equal(replicatedEvents[0].replicationInfo.progress, 1) - - resolve() - } - }, 100) - } catch (e) { - reject(e) - } - - // Trigger replication - let adds = [] - for (let i = 0; i < expectedEventCount; i ++) { - adds.push(i) - } - - mapSeries(adds, i => db1.add('hello ' + i)) - }) - }) - - it('emits correct replication info on fresh replication', async () => { - return new Promise(async (resolve, reject) => { - let finished = false - let eventCount = { 'replicate': 0, 'replicate.progress': 0, 'replicated': 0 } - let events = [] - let expectedEventCount = 512 - - // Trigger replication - let adds = [] - for (let i = 0; i < expectedEventCount; i ++) { - adds.push(i) - } - - const add = async (i) => { - process.stdout.write("\rWriting " + (i + 1) + " / " + expectedEventCount) - await db1.add('hello ' + i) - } - - await mapSeries(adds, add) - console.log() - - // Open second instance again - options = { - path: dbPath2, - overwrite: true, - sync: true, - // Set write access for both clients - write: [ - orbitdb1.key.getPublic('hex'), - orbitdb2.key.getPublic('hex') - ], - } - - db2 = await orbitdb2.eventlog(db1.address.toString(), options) - - let current = 0 - let total = 0 - - db2.events.on('replicate', (address, entry) => { - eventCount['replicate'] ++ - total = db2.replicationStatus.max - // console.log("[replicate] ", '#' + eventCount['replicate'] + ':', current, '/', total, '| Tasks (in/queued/running/out):', db2._loader.tasksRequested, '/', db2._loader.tasksQueued, '/', db2._loader.tasksRunning, '/', db2._loader.tasksFinished) - events.push({ - event: 'replicate', - count: eventCount['replicate'], - entry: entry, - }) - }) - - db2.events.on('replicate.progress', (address, hash, entry) => { - eventCount['replicate.progress'] ++ - current = db2.replicationStatus.progress - // console.log("[progress] ", '#' + eventCount['replicate.progress'] + ':', current, '/', total, '| Tasks (in/queued/running/out):', db2._loader.tasksRequested, '/', db2._loader.tasksQueued, '/', db2._loader.tasksRunning, '/', db2._loader.tasksFinished) - // assert.equal(db2.replicationStatus.progress, eventCount['replicate.progress']) - events.push({ - event: 'replicate.progress', - count: eventCount['replicate.progress'], - entry: entry , - replicationInfo: { - max: db2.replicationStatus.max, - progress: db2.replicationStatus.progress, - have: db2.replicationStatus.have, - }, - }) - }) - - db2.events.on('replicated', (address, length) => { - eventCount['replicated'] += length - current = db2.replicationStatus.progress - // console.log("[replicated]", '#' + eventCount['replicated'] + ':', current, '/', total, '| Tasks (in/queued/running/out):', db2._loader.tasksRequested, '/', db2._loader.tasksQueued, '/', db2._loader.tasksRunning, '/', db2._loader.tasksFinished, "|", db2._loader._stats.a, db2._loader._stats.b, db2._loader._stats.c, db2._loader._stats.d) - assert.equal(current, eventCount['replicated']) - assert.equal(total, expectedEventCount) - - // Test the replicator state - assert.equal(db2._loader.tasksRequested >= current, true) - assert.equal(db2._loader.tasksQueued <= db2.options.referenceCount, true) - assert.equal(db2.options.referenceCount, 64) - assert.equal(db2._loader.tasksRunning, 0) - assert.equal(db2._loader.tasksFinished, current) - - events.push({ - event: 'replicated', - count: eventCount['replicate'], - replicationInfo: { - max: db2.replicationStatus.max, - progress: db2.replicationStatus.progress, - have: db2.replicationStatus.have, - }, - }) - // Resolve with a little timeout to make sure we - // don't receive more than one event - setTimeout( async () => { - // console.log(eventCount['replicate.progress'], expectedEventCount) - if (eventCount['replicate.progress'] === expectedEventCount) { - finished = true - } - }, 500) - }) - - const st = new Date().getTime() - timer = setInterval(async () => { - if (finished) { - clearInterval(timer) - - // await db2.close() - - const et = new Date().getTime() - console.log("Duration:", et - st, "ms") - - try { - assert.equal(eventCount['replicate'], expectedEventCount) - assert.equal(eventCount['replicate.progress'], expectedEventCount) - - const replicateEvents = events.filter(e => e.event === 'replicate') - assert.equal(replicateEvents.length, expectedEventCount) - assert.equal(replicateEvents[0].entry.payload.value.split(' ')[0], 'hello') - assert.equal(replicateEvents[0].entry.clock.time, expectedEventCount) - - const replicateProgressEvents = events.filter(e => e.event === 'replicate.progress') - assert.equal(replicateProgressEvents.length, expectedEventCount) - assert.equal(replicateProgressEvents[0].entry.payload.value.split(' ')[0], 'hello') - assert.equal(replicateProgressEvents[0].entry.clock.time, expectedEventCount) - assert.equal(replicateProgressEvents[0].replicationInfo.max, expectedEventCount) - assert.equal(replicateProgressEvents[0].replicationInfo.progress, 1) - - const replicatedEvents = events.filter(e => e.event === 'replicated') - assert.equal(replicatedEvents[0].replicationInfo.max, expectedEventCount) - assert.equal(replicatedEvents[replicatedEvents.length - 1].replicationInfo.progress, expectedEventCount) - - resolve() - } catch (e) { - reject(e) - } + setTimeout( async () => { + // console.log(eventCount['replicate.progress'], expectedEventCount) + if (eventCount['replicate.progress'] === expectedEventCount) { + finished = true } }, 100) }) + + const st = new Date().getTime() + timer = setInterval(async () => { + if (finished) { + clearInterval(timer) + + const et = new Date().getTime() + console.log("Duration:", et - st, "ms") + + try { + assert.equal(eventCount['replicate'], expectedEventCount) + assert.equal(eventCount['replicate.progress'], expectedEventCount) + + const replicateEvents = events.filter(e => e.event === 'replicate') + assert.equal(replicateEvents.length, expectedEventCount) + assert.equal(replicateEvents[0].entry.payload.value.split(' ')[0], 'hello') + assert.equal(replicateEvents[0].entry.clock.time, expectedEventCount) + + const replicateProgressEvents = events.filter(e => e.event === 'replicate.progress') + assert.equal(replicateProgressEvents.length, expectedEventCount) + assert.equal(replicateProgressEvents[0].entry.payload.value.split(' ')[0], 'hello') + assert.equal(replicateProgressEvents[0].entry.clock.time, expectedEventCount) + assert.equal(replicateProgressEvents[0].replicationInfo.max, expectedEventCount) + assert.equal(replicateProgressEvents[0].replicationInfo.progress, 1) + + const replicatedEvents = events.filter(e => e.event === 'replicated') + assert.equal(replicatedEvents[0].replicationInfo.max, expectedEventCount) + assert.equal(replicatedEvents[replicatedEvents.length - 1].replicationInfo.progress, expectedEventCount) + + resolve() + } catch (e) { + reject(e) + } + } + }, 100) }) + }) - it('emits correct replication info in two-way replication', async () => { - return new Promise(async (resolve, reject) => { - let finished = false - let eventCount = { 'replicate': 0, 'replicate.progress': 0, 'replicated': 0 } - let events = [] - let expectedEventCount = 100 + it('emits correct replication info in two-way replication', async () => { + return new Promise(async (resolve, reject) => { + let finished = false + let eventCount = { 'replicate': 0, 'replicate.progress': 0, 'replicated': 0 } + let events = [] + let expectedEventCount = 100 - // Trigger replication - let adds = [] - for (let i = 0; i < expectedEventCount; i ++) { - adds.push(i) - } + // Trigger replication + let adds = [] + for (let i = 0; i < expectedEventCount; i ++) { + adds.push(i) + } - const add = async (i) => { - // process.stdout.write("\rWriting " + (i + 1) + " / " + expectedEventCount) - await Promise.all([db1.add('hello-1-' + i), db2.add('hello-2-' + i)]) - } + const add = async (i) => { + // process.stdout.write("\rWriting " + (i + 1) + " / " + expectedEventCount) + await Promise.all([db1.add('hello-1-' + i), db2.add('hello-2-' + i)]) + } - // Open second instance again - let options = { - path: dbPath2, - overwrite: true, - sync: true, - // Set write access for both clients - write: [ - orbitdb1.key.getPublic('hex'), - orbitdb2.key.getPublic('hex') - ], - } + // Open second instance again + let options = { + path: dbPath2, + overwrite: true, + sync: true, + // Set write access for both clients + write: [ + orbitdb1.key.getPublic('hex'), + orbitdb2.key.getPublic('hex') + ], + } - // if (db2) { - // await db2.drop() - // } + db2 = await orbitdb2.eventlog(db1.address.toString(), options) + await waitForPeers(ipfs2, [orbitdb1.id], db1.address.toString()) - db2 = await orbitdb2.eventlog(db1.address.toString(), options) - await waitForPeers(ipfs2, [orbitdb1.id], db1.address.toString()) - - let current = 0 - let total = 0 - - db2.events.on('replicate', (address, entry) => { - eventCount['replicate'] ++ - current = db2.replicationStatus.progress - total = db2.replicationStatus.max - // console.log("[replicate] ", '#' + eventCount['replicate'] + ':', current, '/', total, '| Tasks (in/queued/running/out):', db2._loader.tasksRequested, '/', db2._loader.tasksQueued, '/', db2._loader.tasksRunning, '/', db2._loader.tasksFinished) - events.push({ - event: 'replicate', - count: eventCount['replicate'], - entry: entry, - }) + db2.events.on('replicate', (address, entry) => { + eventCount['replicate'] ++ + // console.log("[replicate] ", '#' + eventCount['replicate'] + ':', current, '/', total, '| Tasks (in/queued/running/out):', db2._loader.tasksRequested, '/', db2._loader.tasksQueued, '/', db2._loader.tasksRunning, '/', db2._loader.tasksFinished) + events.push({ + event: 'replicate', + count: eventCount['replicate'], + entry: entry, }) - - let prevProgress = 0 - db2.events.on('replicate.progress', (address, hash, entry) => { - eventCount['replicate.progress'] ++ - current = db2.replicationStatus.progress - total = db2.replicationStatus.max - // console.log("[progress] ", '#' + eventCount['replicate.progress'] + ':', current, '/', total, '| Tasks (in/queued/running/out):', db2._loader.tasksRequested, '/', db2._loader.tasksQueued, '/', db2._loader.tasksRunning, '/', db2._loader.tasksFinished) - // assert.equal(current, total) - events.push({ - event: 'replicate.progress', - count: eventCount['replicate.progress'], - entry: entry , - replicationInfo: { - max: db2.replicationStatus.max, - progress: db2.replicationStatus.progress, - have: db2.replicationStatus.have, - }, - }) - }) - - db2.events.on('replicated', (address, length) => { - eventCount['replicated'] += length - current = db2.replicationStatus.progress - total = db2.replicationStatus.max - const values = db2.iterator({limit: -1}).collect() - // console.log(current, "/", total, "/", values.length) - // console.log("[replicated]", '#' + eventCount['replicated'] + ':', current, '/', total, '| Tasks (in/queued/running/out):', db2._loader.tasksRequested, '/', db2._loader.tasksQueued, '/', db2._loader.tasksRunning, '/', db2._loader.tasksFinished, "|", db2._loader._stats.a, db2._loader._stats.b, db2._loader._stats.c, db2._loader._stats.d) - assert.equal(current <= total, true) - events.push({ - event: 'replicated', - count: eventCount['replicate'], - replicationInfo: { - max: db2.replicationStatus.max, - progress: db2.replicationStatus.progress, - have: db2.replicationStatus.have, - }, - }) - - if (db2.replicationStatus.max >= expectedEventCount * 2 - && db2.replicationStatus.progress >= expectedEventCount * 2) - finished = true - }) - - const st = new Date().getTime() - - try { - await mapSeries(adds, add) - - timer = setInterval(() => { - if (finished) { - clearInterval(timer) - - const et = new Date().getTime() - console.log("Duration:", et - st, "ms") - - // console.log(eventCount['replicate']) - assert.equal(eventCount['replicate'], expectedEventCount) - assert.equal(eventCount['replicate.progress'], expectedEventCount) - assert.equal(eventCount['replicated'], expectedEventCount) - - const replicateEvents = events.filter(e => e.event === 'replicate') - assert.equal(replicateEvents.length, expectedEventCount) - - const replicateProgressEvents = events.filter(e => e.event === 'replicate.progress') - assert.equal(replicateProgressEvents.length, expectedEventCount) - assert.equal(replicateProgressEvents[replicateProgressEvents.length - 1].entry.clock.time, expectedEventCount) - assert.equal(replicateProgressEvents[replicateProgressEvents.length - 1].replicationInfo.max, expectedEventCount * 2) - assert.equal(replicateProgressEvents[replicateProgressEvents.length - 1].replicationInfo.progress, expectedEventCount * 2) - - const replicatedEvents = events.filter(e => e.event === 'replicated') - assert.equal(replicatedEvents[replicatedEvents.length - 1].replicationInfo.progress, expectedEventCount * 2) - assert.equal(replicatedEvents[replicatedEvents.length - 1].replicationInfo.max, expectedEventCount * 2) - - const values1 = db1.iterator({limit: -1}).collect() - const values2 = db2.iterator({limit: -1}).collect() - assert.deepEqual(values1, values2) - - // Test the replicator state - assert.equal(db1._loader.tasksRequested, expectedEventCount) - assert.equal(db1._loader.tasksQueued, 0) - assert.equal(db1._loader.tasksRunning, 0) - assert.equal(db1._loader.tasksFinished, expectedEventCount) - assert.equal(db2._loader.tasksRequested, expectedEventCount) - assert.equal(db2._loader.tasksQueued, 0) - assert.equal(db2._loader.tasksRunning, 0) - assert.equal(db2._loader.tasksFinished, expectedEventCount) - - resolve() - } - }, 100) - } catch (e) { - reject(e) - } }) + + let prevProgress = 0 + db2.events.on('replicate.progress', (address, hash, entry) => { + eventCount['replicate.progress'] ++ + // console.log("[progress] ", '#' + eventCount['replicate.progress'] + ':', current, '/', total, '| Tasks (in/queued/running/out):', db2._loader.tasksRequested, '/', db2._loader.tasksQueued, '/', db2._loader.tasksRunning, '/', db2._loader.tasksFinished) + // assert.equal(current, total) + events.push({ + event: 'replicate.progress', + count: eventCount['replicate.progress'], + entry: entry , + replicationInfo: { + max: db2.replicationStatus.max, + progress: db2.replicationStatus.progress, + }, + }) + }) + + db2.events.on('replicated', (address, length) => { + eventCount['replicated'] += length + const values = db2.iterator({limit: -1}).collect() + // console.log("[replicated]", '#' + eventCount['replicated'] + ':', current, '/', total, '| Tasks (in/queued/running/out):', db2._loader.tasksRequested, '/', db2._loader.tasksQueued, '/', db2._loader.tasksRunning, '/', db2._loader.tasksFinished, "|", db2._loader._stats.a, db2._loader._stats.b, db2._loader._stats.c, db2._loader._stats.d) + assert.equal(db2.replicationStatus.progress <= db2.replicationStatus.max, true) + events.push({ + event: 'replicated', + count: eventCount['replicate'], + replicationInfo: { + max: db2.replicationStatus.max, + progress: db2.replicationStatus.progress, + }, + }) + + if (db2.replicationStatus.max >= expectedEventCount * 2 + && db2.replicationStatus.progress >= expectedEventCount * 2) + finished = true + }) + + const st = new Date().getTime() + + try { + await mapSeries(adds, add) + + timer = setInterval(() => { + if (finished) { + clearInterval(timer) + + const et = new Date().getTime() + // console.log("Duration:", et - st, "ms") + + assert.equal(eventCount['replicate'], expectedEventCount) + assert.equal(eventCount['replicate.progress'], expectedEventCount) + assert.equal(eventCount['replicated'], expectedEventCount) + + const replicateEvents = events.filter(e => e.event === 'replicate') + assert.equal(replicateEvents.length, expectedEventCount) + + const replicateProgressEvents = events.filter(e => e.event === 'replicate.progress') + assert.equal(replicateProgressEvents.length, expectedEventCount) + assert.equal(replicateProgressEvents[replicateProgressEvents.length - 1].entry.clock.time, expectedEventCount) + assert.equal(replicateProgressEvents[replicateProgressEvents.length - 1].replicationInfo.max, expectedEventCount * 2) + assert.equal(replicateProgressEvents[replicateProgressEvents.length - 1].replicationInfo.progress, expectedEventCount * 2) + + const replicatedEvents = events.filter(e => e.event === 'replicated') + assert.equal(replicatedEvents[replicatedEvents.length - 1].replicationInfo.progress, expectedEventCount * 2) + assert.equal(replicatedEvents[replicatedEvents.length - 1].replicationInfo.max, expectedEventCount * 2) + + const values1 = db1.iterator({limit: -1}).collect() + const values2 = db2.iterator({limit: -1}).collect() + assert.deepEqual(values1, values2) + + // Test the replicator state + assert.equal(db1._loader.tasksRequested, expectedEventCount) + assert.equal(db1._loader.tasksQueued, 0) + assert.equal(db1._loader.tasksRunning, 0) + assert.equal(db1._loader.tasksFinished, expectedEventCount) + assert.equal(db2._loader.tasksRequested, expectedEventCount) + assert.equal(db2._loader.tasksQueued, 0) + assert.equal(db2._loader.tasksRunning, 0) + assert.equal(db2._loader.tasksFinished, expectedEventCount) + + resolve() + } + }, 100) + } catch (e) { + reject(e) + } }) }) })