mirror of https://github.com/laurent22/joplin.git
Server: Ensure that server does not crash when trying to start a task that is already running
parent
d0e943630d
commit
5ed3d94faa
|
@ -1,4 +1,5 @@
|
||||||
import { TaskId, TaskState } from '../services/database/types';
|
import { TaskId, TaskState } from '../services/database/types';
|
||||||
|
import { ErrorBadRequest, ErrorCode } from '../utils/errors';
|
||||||
import BaseModel from './BaseModel';
|
import BaseModel from './BaseModel';
|
||||||
|
|
||||||
export default class TaskStateModel extends BaseModel<TaskState> {
|
export default class TaskStateModel extends BaseModel<TaskState> {
|
||||||
|
@ -32,13 +33,13 @@ export default class TaskStateModel extends BaseModel<TaskState> {
|
||||||
|
|
||||||
public async start(taskId: TaskId) {
|
public async start(taskId: TaskId) {
|
||||||
const state = await this.loadByTaskId(taskId);
|
const state = await this.loadByTaskId(taskId);
|
||||||
if (state.running) throw new Error(`Task is already running: ${taskId}`);
|
if (state.running) throw new ErrorBadRequest(`Task is already running: ${taskId}`, { code: ErrorCode.TaskAlreadyRunning });
|
||||||
await this.save({ id: state.id, running: 1 });
|
await this.save({ id: state.id, running: 1 });
|
||||||
}
|
}
|
||||||
|
|
||||||
public async stop(taskId: TaskId) {
|
public async stop(taskId: TaskId) {
|
||||||
const state = await this.loadByTaskId(taskId);
|
const state = await this.loadByTaskId(taskId);
|
||||||
if (!state.running) throw new Error(`Task is not running: ${taskId}`);
|
if (!state.running) throw new ErrorBadRequest(`Task is not running: ${taskId}`, { code: ErrorCode.TaskAlreadyRunning });
|
||||||
await this.save({ id: state.id, running: 0 });
|
await this.save({ id: state.id, running: 0 });
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
import config from '../config';
|
import config from '../config';
|
||||||
import { Models } from '../models/factory';
|
import { Models } from '../models/factory';
|
||||||
|
import { ErrorCode } from '../utils/errors';
|
||||||
import { afterAllTests, beforeAllDb, beforeEachDb, expectThrow, models } from '../utils/testing/testUtils';
|
import { afterAllTests, beforeAllDb, beforeEachDb, expectThrow, models } from '../utils/testing/testUtils';
|
||||||
import { Env } from '../utils/types';
|
import { Env } from '../utils/types';
|
||||||
import { TaskId } from './database/types';
|
import { TaskId } from './database/types';
|
||||||
|
@ -14,6 +15,23 @@ const newService = () => {
|
||||||
});
|
});
|
||||||
};
|
};
|
||||||
|
|
||||||
|
const createDemoTasks = (): Task[] => {
|
||||||
|
return [
|
||||||
|
{
|
||||||
|
id: TaskId.DeleteExpiredTokens,
|
||||||
|
description: '',
|
||||||
|
run: (_models: Models) => {},
|
||||||
|
schedule: '',
|
||||||
|
},
|
||||||
|
{
|
||||||
|
id: TaskId.CompressOldChanges,
|
||||||
|
description: '',
|
||||||
|
run: (_models: Models) => {},
|
||||||
|
schedule: '',
|
||||||
|
},
|
||||||
|
];
|
||||||
|
};
|
||||||
|
|
||||||
describe('TaskService', () => {
|
describe('TaskService', () => {
|
||||||
|
|
||||||
beforeAll(async () => {
|
beforeAll(async () => {
|
||||||
|
@ -31,98 +49,66 @@ describe('TaskService', () => {
|
||||||
test('should register a task', async () => {
|
test('should register a task', async () => {
|
||||||
const service = newService();
|
const service = newService();
|
||||||
|
|
||||||
const task: Task = {
|
const tasks = createDemoTasks();
|
||||||
id: TaskId.DeleteExpiredTokens,
|
await service.registerTasks(tasks);
|
||||||
description: '',
|
|
||||||
run: (_models: Models) => {},
|
|
||||||
schedule: '',
|
|
||||||
};
|
|
||||||
|
|
||||||
await service.registerTask(task);
|
|
||||||
|
|
||||||
expect(service.tasks[TaskId.DeleteExpiredTokens]).toBeTruthy();
|
expect(service.tasks[TaskId.DeleteExpiredTokens]).toBeTruthy();
|
||||||
await expectThrow(async () => service.registerTask(task));
|
expect(service.tasks[TaskId.CompressOldChanges]).toBeTruthy();
|
||||||
|
await expectThrow(async () => service.registerTask(tasks[0]));
|
||||||
});
|
});
|
||||||
|
|
||||||
// test('should run a task', async function() {
|
|
||||||
// const service = newService();
|
|
||||||
|
|
||||||
// let taskStarted = false;
|
|
||||||
// let waitToFinish = true;
|
|
||||||
// let finishTask = false;
|
|
||||||
// let taskHasRan = false;
|
|
||||||
|
|
||||||
// const taskId = TaskId.DeleteExpiredTokens;
|
|
||||||
|
|
||||||
// const task: Task = {
|
|
||||||
// id: taskId,
|
|
||||||
// description: '',
|
|
||||||
// run: async (_models: Models) => {
|
|
||||||
// taskStarted = true;
|
|
||||||
|
|
||||||
// const iid = setInterval(() => {
|
|
||||||
// if (waitToFinish) return;
|
|
||||||
|
|
||||||
// if (finishTask) {
|
|
||||||
// clearInterval(iid);
|
|
||||||
// taskHasRan = true;
|
|
||||||
// }
|
|
||||||
// }, 1);
|
|
||||||
// },
|
|
||||||
// schedule: '',
|
|
||||||
// };
|
|
||||||
|
|
||||||
// await service.registerTask(task);
|
|
||||||
|
|
||||||
// expect((await service.taskState(taskId)).running).toBe(0);
|
|
||||||
|
|
||||||
// const startTime = new Date();
|
|
||||||
|
|
||||||
// void service.runTask(taskId, RunType.Manual);
|
|
||||||
// while (!taskStarted) {
|
|
||||||
// await msleep(1);
|
|
||||||
// }
|
|
||||||
|
|
||||||
// expect((await service.taskState(taskId)).running).toBe(1);
|
|
||||||
// waitToFinish = false;
|
|
||||||
|
|
||||||
// while (!taskHasRan) {
|
|
||||||
// await msleep(1);
|
|
||||||
// finishTask = true;
|
|
||||||
// }
|
|
||||||
|
|
||||||
// expect((await service.taskState(taskId)).running).toBe(0);
|
|
||||||
|
|
||||||
// const events = await service.taskLastEvents(taskId);
|
|
||||||
// expect(events.taskStarted.created_time).toBeGreaterThanOrEqual(startTime.getTime());
|
|
||||||
// expect(events.taskCompleted.created_time).toBeGreaterThan(startTime.getTime());
|
|
||||||
// });
|
|
||||||
|
|
||||||
test('should not run if task is disabled', async () => {
|
test('should not run if task is disabled', async () => {
|
||||||
const service = newService();
|
const service = newService();
|
||||||
|
|
||||||
let taskHasRan = false;
|
let taskHasRan = false;
|
||||||
|
|
||||||
const taskId = TaskId.DeleteExpiredTokens;
|
const tasks = createDemoTasks();
|
||||||
|
tasks[0].run = async (_models: Models) => {
|
||||||
const task: Task = {
|
|
||||||
id: taskId,
|
|
||||||
description: '',
|
|
||||||
run: async (_models: Models) => {
|
|
||||||
taskHasRan = true;
|
taskHasRan = true;
|
||||||
},
|
},
|
||||||
schedule: '',
|
await service.registerTasks(tasks);
|
||||||
};
|
const taskId = tasks[0].id;
|
||||||
|
|
||||||
await service.registerTask(task);
|
|
||||||
|
|
||||||
await service.runTask(taskId, RunType.Manual);
|
await service.runTask(taskId, RunType.Manual);
|
||||||
expect(taskHasRan).toBe(true);
|
expect(taskHasRan).toBe(true);
|
||||||
|
|
||||||
taskHasRan = false;
|
taskHasRan = false;
|
||||||
await models().taskState().disable(task.id);
|
await models().taskState().disable(taskId);
|
||||||
await service.runTask(taskId, RunType.Manual);
|
await service.runTask(taskId, RunType.Manual);
|
||||||
expect(taskHasRan).toBe(false);
|
expect(taskHasRan).toBe(false);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
test('should not run if task is already running', async () => {
|
||||||
|
const service = newService();
|
||||||
|
|
||||||
|
const tasks = createDemoTasks();
|
||||||
|
await service.registerTasks(tasks);
|
||||||
|
const task = tasks[0];
|
||||||
|
|
||||||
|
const state = await models().taskState().loadByTaskId(task.id);
|
||||||
|
await models().taskState().save({ id: state.id, running: 1 });
|
||||||
|
|
||||||
|
await expectThrow(async () => service.runTask(task.id, RunType.Manual), ErrorCode.TaskAlreadyRunning);
|
||||||
|
});
|
||||||
|
|
||||||
|
test('should reset interrupted tasks', async () => {
|
||||||
|
const service = newService();
|
||||||
|
|
||||||
|
const tasks = createDemoTasks();
|
||||||
|
await service.registerTasks(tasks);
|
||||||
|
const task = tasks[0];
|
||||||
|
|
||||||
|
const state = await models().taskState().loadByTaskId(task.id);
|
||||||
|
await models().taskState().save({ id: state.id, running: 1 });
|
||||||
|
|
||||||
|
const stateBefore = await models().taskState().loadByTaskId(task.id);
|
||||||
|
|
||||||
|
await service.resetInterruptedTasks();
|
||||||
|
|
||||||
|
const stateAfter = await models().taskState().loadByTaskId(task.id);
|
||||||
|
|
||||||
|
expect(stateBefore.running).toBe(1);
|
||||||
|
expect(stateAfter.running).toBe(0);
|
||||||
|
});
|
||||||
|
|
||||||
});
|
});
|
||||||
|
|
|
@ -5,7 +5,7 @@ import BaseService from './BaseService';
|
||||||
import { Event, EventType, TaskId, TaskState } from './database/types';
|
import { Event, EventType, TaskId, TaskState } from './database/types';
|
||||||
import { Services } from './types';
|
import { Services } from './types';
|
||||||
import { _ } from '@joplin/lib/locale';
|
import { _ } from '@joplin/lib/locale';
|
||||||
import { ErrorNotFound } from '../utils/errors';
|
import { ErrorCode, ErrorNotFound } from '../utils/errors';
|
||||||
import { durationToMilliseconds } from '../utils/time';
|
import { durationToMilliseconds } from '../utils/time';
|
||||||
|
|
||||||
const cron = require('node-cron');
|
const cron = require('node-cron');
|
||||||
|
@ -104,6 +104,16 @@ export default class TaskService extends BaseService {
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public async resetInterruptedTasks() {
|
||||||
|
const taskStates = await this.models.taskState().all();
|
||||||
|
for (const taskState of taskStates) {
|
||||||
|
if (taskState.running) {
|
||||||
|
logger.warn(`Found a task that was in running state: ${this.taskDisplayString(taskState.task_id)} - resetting it.`);
|
||||||
|
await this.models.taskState().stop(taskState.task_id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private taskById(id: TaskId): Task {
|
private taskById(id: TaskId): Task {
|
||||||
if (!this.tasks_[id]) throw new Error(`No such task: ${id}`);
|
if (!this.tasks_[id]) throw new Error(`No such task: ${id}`);
|
||||||
return this.tasks_[id];
|
return this.tasks_[id];
|
||||||
|
@ -159,13 +169,28 @@ export default class TaskService extends BaseService {
|
||||||
interval = null;
|
interval = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const runTaskWithErrorChecking = async (taskId: TaskId) => {
|
||||||
|
try {
|
||||||
|
await this.runTask(taskId, RunType.Scheduled);
|
||||||
|
} catch (error) {
|
||||||
|
if (error.code === ErrorCode.TaskAlreadyRunning) {
|
||||||
|
// This is not critical but we should log a warning
|
||||||
|
// because it may mean that the interval is too tight,
|
||||||
|
// or the task is taking too long.
|
||||||
|
logger.warn(`Tried to start ${this.taskDisplayString(taskId)} but it was already running`);
|
||||||
|
} else {
|
||||||
|
logger.error(`Failed running task ${this.taskDisplayString(taskId)}`, error);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
if (interval !== null) {
|
if (interval !== null) {
|
||||||
setInterval(async () => {
|
setInterval(async () => {
|
||||||
await this.runTask(Number(taskId), RunType.Scheduled);
|
await runTaskWithErrorChecking(Number(taskId));
|
||||||
}, interval);
|
}, interval);
|
||||||
} else {
|
} else {
|
||||||
cron.schedule(task.schedule, async () => {
|
cron.schedule(task.schedule, async () => {
|
||||||
await this.runTask(Number(taskId), RunType.Scheduled);
|
await runTaskWithErrorChecking(Number(taskId));
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -6,6 +6,7 @@ export enum ErrorCode {
|
||||||
NoSub = 'no_sub',
|
NoSub = 'no_sub',
|
||||||
NoStripeSub = 'no_stripe_sub',
|
NoStripeSub = 'no_stripe_sub',
|
||||||
InvalidOrigin = 'invalidOrigin',
|
InvalidOrigin = 'invalidOrigin',
|
||||||
|
TaskAlreadyRunning = 'taskAlreadyRunning',
|
||||||
}
|
}
|
||||||
|
|
||||||
export interface ErrorOptions {
|
export interface ErrorOptions {
|
||||||
|
|
|
@ -104,5 +104,7 @@ export default async function(env: Env, models: Models, config: Config, services
|
||||||
|
|
||||||
await taskService.registerTasks(tasks);
|
await taskService.registerTasks(tasks);
|
||||||
|
|
||||||
|
await taskService.resetInterruptedTasks();
|
||||||
|
|
||||||
return taskService;
|
return taskService;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue