mirror of
https://github.com/immich-app/immich.git
synced 2026-05-18 03:10:24 +03:00
fix(server): library import batch size (#27595)
* lower batch size * update test
This commit is contained in:
@@ -73,7 +73,8 @@ export function Chunked(
|
||||
const originalMethod = descriptor.value;
|
||||
const parameterIndex = options.paramIndex ?? 0;
|
||||
const chunkSize = options.chunkSize || DATABASE_PARAMETER_CHUNK_SIZE;
|
||||
descriptor.value = async function (...arguments_: any[]) {
|
||||
const mergeFn = options.mergeFn;
|
||||
descriptor.value = function (...arguments_: any[]) {
|
||||
const argument = arguments_[parameterIndex];
|
||||
|
||||
// Early return if argument length is less than or equal to the chunk size.
|
||||
@@ -81,27 +82,27 @@ export function Chunked(
|
||||
(Array.isArray(argument) && argument.length <= chunkSize) ||
|
||||
(argument instanceof Set && argument.size <= chunkSize)
|
||||
) {
|
||||
return await originalMethod.apply(this, arguments_);
|
||||
return originalMethod.apply(this, arguments_);
|
||||
}
|
||||
|
||||
return Promise.all(
|
||||
chunks(argument, chunkSize).map(async (chunk) => {
|
||||
return await Reflect.apply(originalMethod, this, [
|
||||
chunks(argument, chunkSize).map((chunk) => {
|
||||
return Reflect.apply(originalMethod, this, [
|
||||
...arguments_.slice(0, parameterIndex),
|
||||
chunk,
|
||||
...arguments_.slice(parameterIndex + 1),
|
||||
]);
|
||||
}),
|
||||
).then((results) => (options.mergeFn ? options.mergeFn(results) : results));
|
||||
).then((results) => (mergeFn ? mergeFn(results) : results));
|
||||
};
|
||||
};
|
||||
}
|
||||
|
||||
export function ChunkedArray(options?: { paramIndex?: number }): MethodDecorator {
|
||||
export function ChunkedArray(options?: { paramIndex?: number; chunkSize?: number }): MethodDecorator {
|
||||
return Chunked({ ...options, mergeFn: _.flatten });
|
||||
}
|
||||
|
||||
export function ChunkedSet(options?: { paramIndex?: number }): MethodDecorator {
|
||||
export function ChunkedSet(options?: { paramIndex?: number; chunkSize?: number }): MethodDecorator {
|
||||
return Chunked({ ...options, mergeFn: (args: Set<any>[]) => setUnion(...args) });
|
||||
}
|
||||
|
||||
|
||||
@@ -380,8 +380,10 @@ export class AssetRepository {
|
||||
return this.db.insertInto('asset').values(asset).returningAll().executeTakeFirstOrThrow();
|
||||
}
|
||||
|
||||
createAll(assets: Insertable<AssetTable>[]) {
|
||||
return this.db.insertInto('asset').values(assets).returningAll().execute();
|
||||
@ChunkedArray({ chunkSize: 4000 })
|
||||
async createAll(assets: Insertable<AssetTable>[]) {
|
||||
const ids = await this.db.insertInto('asset').values(assets).returning('id').execute();
|
||||
return ids.map(({ id }) => id);
|
||||
}
|
||||
|
||||
@GenerateSql({ params: [DummyValue.UUID, { year: 2000, day: 1, month: 1 }] })
|
||||
|
||||
@@ -560,7 +560,7 @@ describe(LibraryService.name, () => {
|
||||
paths: ['/data/user1/photo.jpg'],
|
||||
};
|
||||
|
||||
mocks.asset.createAll.mockResolvedValue([asset]);
|
||||
mocks.asset.createAll.mockResolvedValue([asset.id]);
|
||||
mocks.library.get.mockResolvedValue(library);
|
||||
|
||||
await expect(sut.handleSyncFiles(mockLibraryJob)).resolves.toBe(JobStatus.Success);
|
||||
|
||||
@@ -266,13 +266,7 @@ export class LibraryService extends BaseService {
|
||||
),
|
||||
);
|
||||
|
||||
const assetIds: string[] = [];
|
||||
|
||||
for (let i = 0; i < assetImports.length; i += 5000) {
|
||||
// Chunk the imports to avoid the postgres limit of max parameters at once
|
||||
const chunk = assetImports.slice(i, i + 5000);
|
||||
await this.assetRepository.createAll(chunk).then((assets) => assetIds.push(...assets.map((asset) => asset.id)));
|
||||
}
|
||||
const assetIds = await this.assetRepository.createAll(assetImports);
|
||||
|
||||
const progressMessage =
|
||||
job.progressCounter && job.totalAssets
|
||||
|
||||
Reference in New Issue
Block a user