Server: Significantly improve sync performances, especially when there are many changes

pull/9094/head
Laurent Cozic 2023-10-19 17:46:58 +01:00
parent 4d1e0cc21b
commit 5986710fc0
6 changed files with 281 additions and 167 deletions

View File

@ -6,7 +6,7 @@ import { md5 } from '../utils/crypto';
import { ErrorResyncRequired } from '../utils/errors';
import { Day, formatDateTime } from '../utils/time';
import BaseModel, { SaveOptions } from './BaseModel';
import { PaginatedResults, Pagination, PaginationOrderDir } from './utils/pagination';
import { PaginatedResults } from './utils/pagination';
const logger = Logger.create('ChangeModel');
@ -88,7 +88,44 @@ export default class ChangeModel extends BaseModel<Change> {
};
}
private changesForUserQuery(userId: Uuid, count: boolean): Knex.QueryBuilder {
// private changesForUserQuery(userId: Uuid, count: boolean): Knex.QueryBuilder {
// // When need to get:
// //
// // - All the CREATE and DELETE changes associated with the user
// // - All the UPDATE changes that applies to items associated with the
// // user.
// //
// // UPDATE changes do not have the user_id set because they are specific
// // to the item, not to a particular user.
// const query = this
// .db('changes')
// .where(function() {
// void this.whereRaw('((type = ? OR type = ?) AND user_id = ?)', [ChangeType.Create, ChangeType.Delete, userId])
// // Need to use a RAW query here because Knex has a "not a
// // bug" bug that makes it go into infinite loop in some
// // contexts, possibly only when running inside Jest (didn't
// // test outside).
// // https://github.com/knex/knex/issues/1851
// .orWhereRaw('type = ? AND item_id IN (SELECT item_id FROM user_items WHERE user_id = ?)', [ChangeType.Update, userId]);
// });
// if (count) {
// void query.countDistinct('id', { as: 'total' });
// } else {
// void query.select([
// 'id',
// 'item_id',
// 'item_name',
// 'type',
// 'updated_time',
// ]);
// }
// return query;
// }
public async changesForUserQuery(userId: Uuid, fromCounter: number, limit: number, doCountQuery: boolean): Promise<Change[]> {
// When need to get:
//
// - All the CREATE and DELETE changes associated with the user
@ -98,61 +135,125 @@ export default class ChangeModel extends BaseModel<Change> {
// UPDATE changes do not have the user_id set because they are specific
// to the item, not to a particular user.
const query = this
.db('changes')
.where(function() {
void this.whereRaw('((type = ? OR type = ?) AND user_id = ?)', [ChangeType.Create, ChangeType.Delete, userId])
// Need to use a RAW query here because Knex has a "not a
// bug" bug that makes it go into infinite loop in some
// contexts, possibly only when running inside Jest (didn't
// test outside).
// https://github.com/knex/knex/issues/1851
.orWhereRaw('type = ? AND item_id IN (SELECT item_id FROM user_items WHERE user_id = ?)', [ChangeType.Update, userId]);
});
// This used to be just one query but it kept getting slower and slower
// as the `changes` table grew. So it is now split into two queries
// merged by a UNION ALL.
if (count) {
void query.countDistinct('id', { as: 'total' });
const fields = [
'id',
'item_id',
'item_name',
'type',
'updated_time',
'counter',
];
const fieldsSql = `"${fields.join('", "')}"`;
const subQuery1 = `
SELECT ${fieldsSql}
FROM "changes"
WHERE counter > ?
AND (type = ? OR type = ?)
AND user_id = ?
ORDER BY "counter" ASC
${doCountQuery ? '' : 'LIMIT ?'}
`;
const subParams1 = [
fromCounter,
ChangeType.Create,
ChangeType.Delete,
userId,
];
if (!doCountQuery) subParams1.push(limit);
const subQuery2 = `
SELECT ${fieldsSql}
FROM "changes"
WHERE counter > ?
AND type = ?
AND item_id IN (SELECT item_id FROM user_items WHERE user_id = ?)
ORDER BY "counter" ASC
${doCountQuery ? '' : 'LIMIT ?'}
`;
const subParams2 = [
fromCounter,
ChangeType.Update,
userId,
];
if (!doCountQuery) subParams2.push(limit);
let query: Knex.Raw<any> = null;
const finalParams = subParams1.concat(subParams2);
if (!doCountQuery) {
finalParams.push(limit);
query = this.db.raw(`
SELECT ${fieldsSql} FROM (${subQuery1}) as sub1
UNION ALL
SELECT ${fieldsSql} FROM (${subQuery2}) as sub2
ORDER BY counter ASC
LIMIT ?
`, finalParams);
} else {
void query.select([
'id',
'item_id',
'item_name',
'type',
'updated_time',
]);
query = this.db.raw(`
SELECT count(*) as total
FROM (
(${subQuery1})
UNION ALL
(${subQuery2})
) AS merged
`, finalParams);
}
return query;
const results = await query;
// Because it's a raw query, we need to handle the results manually:
// Postgres returns an object with a "rows" property, while SQLite
// returns the rows directly;
const output: Change[] = results.rows ? results.rows : results;
// This property is present only for the purpose of ordering the results
// and can be removed afterwards.
for (const change of output) delete change.counter;
return output;
}
public async allByUser(userId: Uuid, pagination: Pagination = null): Promise<PaginatedDeltaChanges> {
pagination = {
page: 1,
limit: 100,
order: [{ by: 'counter', dir: PaginationOrderDir.ASC }],
...pagination,
};
// public async allByUser(userId: Uuid, pagination: Pagination = null): Promise<PaginatedDeltaChanges> {
// pagination = {
// page: 1,
// limit: 100,
// order: [{ by: 'counter', dir: PaginationOrderDir.ASC }],
// ...pagination,
// };
const query = this.changesForUserQuery(userId, false);
const countQuery = this.changesForUserQuery(userId, true);
const itemCount = (await countQuery.first()).total;
// const query = this.changesForUserQuery(userId, false);
// const countQuery = this.changesForUserQuery(userId, true);
// const itemCount = (await countQuery.first()).total;
void query
.orderBy(pagination.order[0].by, pagination.order[0].dir)
.offset((pagination.page - 1) * pagination.limit)
.limit(pagination.limit) as any[];
// void query
// .orderBy(pagination.order[0].by, pagination.order[0].dir)
// .offset((pagination.page - 1) * pagination.limit)
// .limit(pagination.limit) as any[];
const changes = await query;
// const changes = await query;
return {
items: changes,
// If we have changes, we return the ID of the latest changes from which delta sync can resume.
// If there's no change, we return the previous cursor.
cursor: changes.length ? changes[changes.length - 1].id : pagination.cursor,
has_more: changes.length >= pagination.limit,
page_count: itemCount !== null ? Math.ceil(itemCount / pagination.limit) : undefined,
};
}
// return {
// items: changes,
// // If we have changes, we return the ID of the latest changes from which delta sync can resume.
// // If there's no change, we return the previous cursor.
// cursor: changes.length ? changes[changes.length - 1].id : pagination.cursor,
// has_more: changes.length >= pagination.limit,
// page_count: itemCount !== null ? Math.ceil(itemCount / pagination.limit) : undefined,
// };
// }
public async delta(userId: Uuid, pagination: ChangePagination = null): Promise<PaginatedDeltaChanges> {
pagination = {
@ -167,18 +268,12 @@ export default class ChangeModel extends BaseModel<Change> {
if (!changeAtCursor) throw new ErrorResyncRequired();
}
const query = this.changesForUserQuery(userId, false);
// If a cursor was provided, apply it to the query.
if (changeAtCursor) {
void query.where('counter', '>', changeAtCursor.counter);
}
void query
.orderBy('counter', 'asc')
.limit(pagination.limit) as any[];
const changes: Change[] = await query;
const changes = await this.changesForUserQuery(
userId,
changeAtCursor ? changeAtCursor.counter : -1,
pagination.limit,
false,
);
const items: Item[] = await this.db('items').select('id', 'jop_updated_time').whereIn('items.id', changes.map(c => c.item_id));

View File

@ -428,9 +428,14 @@ describe('UserModel', () => {
test('should throw an error if the password being saved seems to be hashed', async () => {
const passwordSimilarToHash = '$2a$10';
const error = await checkThrowAsync(async () => await models().user().save({ password: passwordSimilarToHash }));
const user = await models().user().save({
email: 'test@example.com',
password: '111111',
});
expect(error.message).toBe('Unable to save user because password already seems to be hashed. User id: undefined');
const error = await checkThrowAsync(async () => await models().user().save({ id: user.id, password: passwordSimilarToHash }));
expect(error.message).toBe(`Unable to save user because password already seems to be hashed. User id: ${user.id}`);
expect(error instanceof ErrorBadRequest).toBe(true);
});

View File

@ -640,6 +640,9 @@ export default class UserModel extends BaseModel<User> {
if (user.password) {
if (isHashedPassword(user.password)) {
if (!isNew) {
// We have this check because if an existing user is loaded,
// then saved again, the "password" field will be hashed a
// second time, and we don't want this.
throw new ErrorBadRequest(`Unable to save user because password already seems to be hashed. User id: ${user.id}`);
} else {
// OK - We allow supplying an already hashed password for

View File

@ -1,52 +1,62 @@
import { beforeAllDb, afterAllTests, beforeEachDb, createItemTree, createUserAndSession, parseHtml } from '../../utils/testing/testUtils';
import { execRequest } from '../../utils/testing/apiUtils';
// Disabled for now
describe('index_changes', () => {
beforeAll(async () => {
await beforeAllDb('index_changes');
});
afterAll(async () => {
await afterAllTests();
});
beforeEach(async () => {
await beforeEachDb();
});
test('should list changes', async () => {
const { user: user1, session: session1 } = await createUserAndSession(1, true);
const items: any = {};
for (let i = 1; i <= 150; i++) {
items[(`${i}`).padStart(32, '0')] = {};
}
await createItemTree(user1.id, '', items);
// Just some basic tests to check that we're seeing at least the first
// and last item of each page.
{
const response: string = await execRequest(session1.id, 'GET', 'changes');
const navLinks = parseHtml(response).querySelectorAll('.pagination-link');
expect(response.includes('00000000000000000000000000000150.md')).toBe(true);
expect(response.includes('00000000000000000000000000000051.md')).toBe(true);
expect(navLinks.length).toBe(2);
expect(navLinks[0].getAttribute('class')).toContain('is-current');
expect(navLinks[1].getAttribute('class')).not.toContain('is-current');
}
{
const response: string = await execRequest(session1.id, 'GET', 'changes', null, { query: { page: 2 } });
const navLinks = parseHtml(response).querySelectorAll('.pagination-link');
expect(response.includes('00000000000000000000000000000050.md')).toBe(true);
expect(response.includes('00000000000000000000000000000001.md')).toBe(true);
expect(navLinks.length).toBe(2);
expect(navLinks[0].getAttribute('class')).not.toContain('is-current');
expect(navLinks[1].getAttribute('class')).toContain('is-current');
}
it('should pass', () => {
expect(true).toBe(true);
});
});
// import { beforeAllDb, afterAllTests, beforeEachDb, createItemTree, createUserAndSession, parseHtml } from '../../utils/testing/testUtils';
// import { execRequest } from '../../utils/testing/apiUtils';
// describe('index_changes', () => {
// beforeAll(async () => {
// await beforeAllDb('index_changes');
// });
// afterAll(async () => {
// await afterAllTests();
// });
// beforeEach(async () => {
// await beforeEachDb();
// });
// test('should list changes', async () => {
// const { user: user1, session: session1 } = await createUserAndSession(1, true);
// const items: any = {};
// for (let i = 1; i <= 150; i++) {
// items[(`${i}`).padStart(32, '0')] = {};
// }
// await createItemTree(user1.id, '', items);
// // Just some basic tests to check that we're seeing at least the first
// // and last item of each page.
// {
// const response: string = await execRequest(session1.id, 'GET', 'changes');
// const navLinks = parseHtml(response).querySelectorAll('.pagination-link');
// expect(response.includes('00000000000000000000000000000150.md')).toBe(true);
// expect(response.includes('00000000000000000000000000000051.md')).toBe(true);
// expect(navLinks.length).toBe(2);
// expect(navLinks[0].getAttribute('class')).toContain('is-current');
// expect(navLinks[1].getAttribute('class')).not.toContain('is-current');
// }
// {
// const response: string = await execRequest(session1.id, 'GET', 'changes', null, { query: { page: 2 } });
// const navLinks = parseHtml(response).querySelectorAll('.pagination-link');
// expect(response.includes('00000000000000000000000000000050.md')).toBe(true);
// expect(response.includes('00000000000000000000000000000001.md')).toBe(true);
// expect(navLinks.length).toBe(2);
// expect(navLinks[0].getAttribute('class')).not.toContain('is-current');
// expect(navLinks[1].getAttribute('class')).toContain('is-current');
// }
// });
// });

View File

@ -2,69 +2,74 @@ import { SubPath } from '../../utils/routeUtils';
import Router from '../../utils/Router';
import { RouteType } from '../../utils/types';
import { AppContext } from '../../utils/types';
import { changeTypeToString } from '../../services/database/types';
import { PaginationOrderDir } from '../../models/utils/pagination';
import { formatDateTime } from '../../utils/time';
import defaultView from '../../utils/defaultView';
import { View } from '../../services/MustacheService';
import { makeTablePagination, Table, Row, makeTableView } from '../../utils/views/table';
import config, { showItemUrls } from '../../config';
// import { changeTypeToString } from '../../services/database/types';
// import { PaginationOrderDir } from '../../models/utils/pagination';
// import { formatDateTime } from '../../utils/time';
// import defaultView from '../../utils/defaultView';
// import { View } from '../../services/MustacheService';
// import { makeTablePagination, Table, Row, makeTableView } from '../../utils/views/table';
// import config, { showItemUrls } from '../../config';
import { ErrorForbidden } from '../../utils/errors';
const router = new Router(RouteType.Web);
router.get('changes', async (_path: SubPath, ctx: AppContext) => {
if (!ctx.joplin.owner.is_admin) throw new ErrorForbidden();
router.get('changes', async (_path: SubPath, _ctx: AppContext) => {
// We disable this because it is too slow to retrieve all the changes and
// could easily lock a database. If we need a way to inspect the log there
// would have to be a different, more efficient way to do it.
throw new ErrorForbidden('Disabled');
const pagination = makeTablePagination(ctx.query, 'updated_time', PaginationOrderDir.DESC);
const paginatedChanges = await ctx.joplin.models.change().allByUser(ctx.joplin.owner.id, pagination);
const items = await ctx.joplin.models.item().loadByIds(paginatedChanges.items.map(i => i.item_id), { fields: ['id'] });
// if (!ctx.joplin.owner.is_admin) throw new ErrorForbidden();
const table: Table = {
baseUrl: ctx.joplin.models.change().changeUrl(),
requestQuery: ctx.query,
pageCount: paginatedChanges.page_count,
pagination,
headers: [
{
name: 'item_name',
label: 'Name',
stretch: true,
},
{
name: 'type',
label: 'Type',
},
{
name: 'updated_time',
label: 'Timestamp',
},
],
rows: paginatedChanges.items.map(change => {
const row: Row = {
items: [
{
value: change.item_name,
stretch: true,
url: showItemUrls(config()) ? (items.find(i => i.id === change.item_id) ? ctx.joplin.models.item().itemContentUrl(change.item_id) : '') : null,
},
{
value: changeTypeToString(change.type),
},
{
value: formatDateTime(change.updated_time),
},
],
};
// const pagination = makeTablePagination(ctx.query, 'updated_time', PaginationOrderDir.DESC);
// const paginatedChanges = await ctx.joplin.models.change().allByUser(ctx.joplin.owner.id, pagination);
// const items = await ctx.joplin.models.item().loadByIds(paginatedChanges.items.map(i => i.item_id), { fields: ['id'] });
return row;
}),
};
// const table: Table = {
// baseUrl: ctx.joplin.models.change().changeUrl(),
// requestQuery: ctx.query,
// pageCount: paginatedChanges.page_count,
// pagination,
// headers: [
// {
// name: 'item_name',
// label: 'Name',
// stretch: true,
// },
// {
// name: 'type',
// label: 'Type',
// },
// {
// name: 'updated_time',
// label: 'Timestamp',
// },
// ],
// rows: paginatedChanges.items.map(change => {
// const row: Row = {
// items: [
// {
// value: change.item_name,
// stretch: true,
// url: showItemUrls(config()) ? (items.find(i => i.id === change.item_id) ? ctx.joplin.models.item().itemContentUrl(change.item_id) : '') : null,
// },
// {
// value: changeTypeToString(change.type),
// },
// {
// value: formatDateTime(change.updated_time),
// },
// ],
// };
const view: View = defaultView('changes', 'Log');
view.content.changeTable = makeTableView(table),
view.cssFiles = ['index/changes'];
return view;
// return row;
// }),
// };
// const view: View = defaultView('changes', 'Log');
// view.content.changeTable = makeTableView(table),
// view.cssFiles = ['index/changes'];
// return view;
});
export default router;

View File

@ -9,7 +9,7 @@ import { makeUrl, SubPath, UrlType } from '../utils/routeUtils';
import MarkdownIt = require('markdown-it');
import { headerAnchor } from '@joplin/renderer';
import { _ } from '@joplin/lib/locale';
import { adminDashboardUrl, adminEmailsUrl, adminTasksUrl, adminUserDeletionsUrl, adminUsersUrl, changesUrl, homeUrl, itemsUrl } from '../utils/urlUtils';
import { adminDashboardUrl, adminEmailsUrl, adminTasksUrl, adminUserDeletionsUrl, adminUsersUrl, homeUrl, itemsUrl } from '../utils/urlUtils';
import { MenuItem, setSelectedMenu } from '../utils/views/menu';
export interface RenderOptions {
@ -150,10 +150,6 @@ export default class MustacheService {
title: _('Items'),
url: itemsUrl(),
},
{
title: _('Logs'),
url: changesUrl(),
},
{
title: _('Admin'),
url: adminDashboardUrl(),