mirror of https://github.com/laurent22/joplin.git
All: Created ResourceFetcher class to handle resource downloads
parent
d66fa87b2b
commit
dbdd602f50
|
@ -7,6 +7,7 @@ const fs = require('fs-extra');
|
|||
const Folder = require('lib/models/Folder.js');
|
||||
const Note = require('lib/models/Note.js');
|
||||
const Resource = require('lib/models/Resource.js');
|
||||
const ResourceFetcher = require('lib/services/ResourceFetcher');
|
||||
const Tag = require('lib/models/Tag.js');
|
||||
const { Database } = require('lib/database.js');
|
||||
const Setting = require('lib/models/Setting.js');
|
||||
|
@ -872,12 +873,46 @@ describe('Synchronizer', function() {
|
|||
let allResources = await Resource.all();
|
||||
expect(allResources.length).toBe(1);
|
||||
let resource1_2 = allResources[0];
|
||||
let resourcePath1_2 = Resource.fullPath(resource1_2);
|
||||
|
||||
expect(resource1_2.id).toBe(resource1.id);
|
||||
expect(resource1_2.fetch_status).toBe(Resource.FETCH_STATUS_IDLE);
|
||||
|
||||
const fetcher = new ResourceFetcher(() => { return synchronizer().api() });
|
||||
fetcher.queueDownload(resource1_2.id);
|
||||
await fetcher.waitForAllFinished();
|
||||
|
||||
resource1_2 = await Resource.load(resource1.id);
|
||||
expect(resource1_2.fetch_status).toBe(Resource.FETCH_STATUS_DONE);
|
||||
|
||||
let resourcePath1_2 = Resource.fullPath(resource1_2);
|
||||
expect(fileContentEqual(resourcePath1, resourcePath1_2)).toBe(true);
|
||||
}));
|
||||
|
||||
it('should handle resource download errors', asyncTest(async () => {
|
||||
while (insideBeforeEach) await time.msleep(500);
|
||||
|
||||
let folder1 = await Folder.save({ title: "folder1" });
|
||||
let note1 = await Note.save({ title: 'ma note', parent_id: folder1.id });
|
||||
await shim.attachFileToNote(note1, __dirname + '/../tests/support/photo.jpg');
|
||||
let resource1 = (await Resource.all())[0];
|
||||
let resourcePath1 = Resource.fullPath(resource1);
|
||||
await synchronizer().start();
|
||||
|
||||
await switchClient(2);
|
||||
|
||||
await synchronizer().start();
|
||||
|
||||
const fetcher = new ResourceFetcher(() => { return {
|
||||
// Simulate a failed download
|
||||
get: () => { return new Promise((resolve, reject) => { reject(new Error('did not work')) }); }
|
||||
} });
|
||||
fetcher.queueResource(resource1.id);
|
||||
await fetcher.waitForAllFinished();
|
||||
|
||||
resource1 = await Resource.load(resource1.id);
|
||||
expect(resource1.fetch_status).toBe(Resource.FETCH_STATUS_ERROR);
|
||||
expect(resource1.fetch_error).toBe('did not work');
|
||||
}));
|
||||
|
||||
it('should delete resources', asyncTest(async () => {
|
||||
while (insideBeforeEach) await time.msleep(500);
|
||||
|
||||
|
@ -926,6 +961,10 @@ describe('Synchronizer', function() {
|
|||
Setting.setObjectKey('encryption.passwordCache', masterKey.id, '123456');
|
||||
await encryptionService().loadMasterKeysFromSettings();
|
||||
|
||||
const fetcher = new ResourceFetcher(() => { return synchronizer().api() });
|
||||
fetcher.queueDownload(resource1.id);
|
||||
await fetcher.waitForAllFinished();
|
||||
|
||||
let resource1_2 = (await Resource.all())[0];
|
||||
resource1_2 = await Resource.decrypt(resource1_2);
|
||||
let resourcePath1_2 = Resource.fullPath(resource1_2);
|
||||
|
|
|
@ -14,6 +14,10 @@ class BaseSyncTarget {
|
|||
return false;
|
||||
}
|
||||
|
||||
static resourceDirName() {
|
||||
return '.resource';
|
||||
}
|
||||
|
||||
option(name, defaultValue = null) {
|
||||
return this.options_ && (name in this.options_) ? this.options_[name] : defaultValue;
|
||||
}
|
||||
|
|
|
@ -0,0 +1,116 @@
|
|||
const Resource = require('lib/models/Resource');
|
||||
const BaseService = require('lib/services/BaseService');
|
||||
const BaseSyncTarget = require('lib/BaseSyncTarget');
|
||||
const { Logger } = require('lib/logger.js');
|
||||
|
||||
class ResourceFetcher extends BaseService {
|
||||
|
||||
constructor(fileApi) {
|
||||
super();
|
||||
|
||||
if (typeof fileApi !== 'function') throw new Error('fileApi must be a function that returns the API');
|
||||
|
||||
this.logger_ = new Logger();
|
||||
this.fileApi_ = fileApi;
|
||||
this.queue_ = [];
|
||||
this.fetchingItems_ = {};
|
||||
this.resourceDirName_ = BaseSyncTarget.resourceDirName();
|
||||
this.queueMutex_ = new Mutex();
|
||||
this.maxDownloads_ = 3;
|
||||
}
|
||||
|
||||
setLogger(logger) {
|
||||
this.logger_ = logger;
|
||||
}
|
||||
|
||||
logger() {
|
||||
return this.logger_;
|
||||
}
|
||||
|
||||
fileApi() {
|
||||
return this.fileApi_();
|
||||
}
|
||||
|
||||
queuedItemIndex_(resourceId) {
|
||||
for (let i = 0; i < this.fetchingItems_.length; i++) {
|
||||
const item = this.fetchingItems_[i];
|
||||
if (item.id === resourceId) return i;
|
||||
}
|
||||
return -1;
|
||||
}
|
||||
|
||||
queueDownload(resourceId, priority = null) {
|
||||
if (priority === null) priority = 'normal';
|
||||
|
||||
const index = this.queuedItemIndex_(resourceId);
|
||||
if (index >= 0) return;
|
||||
|
||||
const item = { id: resourceId };
|
||||
|
||||
if (priority === 'high') {
|
||||
this.queue_.splice(0, 0, item);
|
||||
} else {
|
||||
this.queue_.push(item);
|
||||
}
|
||||
|
||||
this.scheduleQueueProcess_();
|
||||
}
|
||||
|
||||
async startDownload_(resourceId) {
|
||||
if (this.fetchingItems_[resourceId]) return;
|
||||
this.fetchingItems_[resourceId] = true;
|
||||
|
||||
const resource = await Resource.load(resourceId);
|
||||
|
||||
this.fetchingItems_[resourceId] = resource;
|
||||
|
||||
const localResourceContentPath = Resource.fullPath(resource);
|
||||
const remoteResourceContentPath = this.resourceDirName_ + "/" + resource.id;
|
||||
|
||||
await Resource.save({ id: resource.id, fetch_status: Resource.FETCH_STATUS_STARTED });
|
||||
|
||||
this.fileApi().get(remoteResourceContentPath, { path: localResourceContentPath, target: "file" }).then(async () => {
|
||||
delete this.fetchingItems_[resource.id];
|
||||
await Resource.save({ id: resource.id, fetch_status: Resource.FETCH_STATUS_DONE });
|
||||
this.scheduleQueueProcess_();
|
||||
}).catch(async (error) => {
|
||||
delete this.fetchingItems_[resource.id];
|
||||
await Resource.save({ id: resource.id, fetch_status: Resource.FETCH_STATUS_ERROR, fetch_error: error.message });
|
||||
this.scheduleQueueProcess_();
|
||||
});
|
||||
}
|
||||
|
||||
processQueue_() {
|
||||
while (Object.getOwnPropertyNames(this.fetchingItems_).length < this.maxDownloads_) {
|
||||
if (!this.queue_.length) return;
|
||||
const item = this.queue_.splice(0, 1)[0];
|
||||
this.startDownload_(item.id);
|
||||
}
|
||||
}
|
||||
|
||||
async waitForAllFinished() {
|
||||
return new Promise((resolve, reject) => {
|
||||
const iid = setInterval(() => {
|
||||
if (!this.queue_.length && !Object.getOwnPropertyNames(this.fetchingItems_).length) {
|
||||
clearInterval(iid);
|
||||
resolve();
|
||||
}
|
||||
}, 100);
|
||||
});
|
||||
}
|
||||
|
||||
scheduleQueueProcess_() {
|
||||
if (this.scheduleQueueProcessIID_) {
|
||||
clearTimeout(this.scheduleQueueProcessIID_);
|
||||
this.scheduleQueueProcessIID_ = null;
|
||||
}
|
||||
|
||||
this.scheduleQueueProcessIID_ = setTimeout(() => {
|
||||
this.processQueue_();
|
||||
this.scheduleQueueProcessIID_ = null;
|
||||
}, 100);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
module.exports = ResourceFetcher;
|
|
@ -11,6 +11,7 @@ const { Logger } = require('lib/logger.js');
|
|||
const { _ } = require('lib/locale.js');
|
||||
const { shim } = require('lib/shim.js');
|
||||
const JoplinError = require('lib/JoplinError');
|
||||
const BaseSyncTarget = require('lib/BaseSyncTarget');
|
||||
|
||||
class Synchronizer {
|
||||
|
||||
|
@ -19,7 +20,7 @@ class Synchronizer {
|
|||
this.db_ = db;
|
||||
this.api_ = api;
|
||||
this.syncDirName_ = '.sync';
|
||||
this.resourceDirName_ = '.resource';
|
||||
this.resourceDirName_ = BaseSyncTarget.resourceDirName();
|
||||
this.logger_ = new Logger();
|
||||
this.appType_ = appType;
|
||||
this.cancelling_ = false;
|
||||
|
|
Loading…
Reference in New Issue