mirror of
https://github.com/immich-app/immich.git
synced 2026-05-18 03:10:24 +03:00
chore!: queue endpoint migration
This commit is contained in:
@@ -1,33 +1,15 @@
|
||||
import { Body, Controller, Get, HttpCode, HttpStatus, Param, Post, Put } from '@nestjs/common';
|
||||
import { Body, Controller, HttpCode, HttpStatus, Post } from '@nestjs/common';
|
||||
import { ApiTags } from '@nestjs/swagger';
|
||||
import { Endpoint, HistoryBuilder } from 'src/decorators';
|
||||
import { AuthDto } from 'src/dtos/auth.dto';
|
||||
import { JobCreateDto } from 'src/dtos/job.dto';
|
||||
import { QueueResponseLegacyDto, QueuesResponseLegacyDto } from 'src/dtos/queue-legacy.dto';
|
||||
import { QueueCommandDto, QueueNameParamDto } from 'src/dtos/queue.dto';
|
||||
import { ApiTag, Permission } from 'src/enum';
|
||||
import { Auth, Authenticated } from 'src/middleware/auth.guard';
|
||||
import { Authenticated } from 'src/middleware/auth.guard';
|
||||
import { JobService } from 'src/services/job.service';
|
||||
import { QueueService } from 'src/services/queue.service';
|
||||
|
||||
@ApiTags(ApiTag.Jobs)
|
||||
@Controller('jobs')
|
||||
export class JobController {
|
||||
constructor(
|
||||
private service: JobService,
|
||||
private queueService: QueueService,
|
||||
) {}
|
||||
|
||||
@Get()
|
||||
@Authenticated({ permission: Permission.JobRead, admin: true })
|
||||
@Endpoint({
|
||||
summary: 'Retrieve queue counts and status',
|
||||
description: 'Retrieve the counts of the current queue, as well as the current status.',
|
||||
history: new HistoryBuilder().added('v1').beta('v1').stable('v2').deprecated('v2.4.0'),
|
||||
})
|
||||
getQueuesLegacy(@Auth() auth: AuthDto): Promise<QueuesResponseLegacyDto> {
|
||||
return this.queueService.getAllLegacy(auth);
|
||||
}
|
||||
constructor(private service: JobService) {}
|
||||
|
||||
@Post()
|
||||
@Authenticated({ permission: Permission.JobCreate, admin: true })
|
||||
@@ -41,19 +23,4 @@ export class JobController {
|
||||
createJob(@Body() dto: JobCreateDto): Promise<void> {
|
||||
return this.service.create(dto);
|
||||
}
|
||||
|
||||
@Put(':name')
|
||||
@Authenticated({ permission: Permission.JobCreate, admin: true })
|
||||
@Endpoint({
|
||||
summary: 'Run jobs',
|
||||
description:
|
||||
'Queue all assets for a specific job type. Defaults to only queueing assets that have not yet been processed, but the force command can be used to re-process all assets.',
|
||||
history: new HistoryBuilder().added('v1').beta('v1').stable('v2').deprecated('v2.4.0'),
|
||||
})
|
||||
runQueueCommandLegacy(
|
||||
@Param() { name }: QueueNameParamDto,
|
||||
@Body() dto: QueueCommandDto,
|
||||
): Promise<QueueResponseLegacyDto> {
|
||||
return this.queueService.runCommandLegacy(name, dto);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,64 +0,0 @@
|
||||
import { createZodDto } from 'nestjs-zod';
|
||||
import { QueueResponseDto, QueueStatisticsSchema } from 'src/dtos/queue.dto';
|
||||
import { QueueName } from 'src/enum';
|
||||
import z from 'zod';
|
||||
|
||||
const QueueStatusLegacySchema = z
|
||||
.object({
|
||||
isActive: z.boolean().describe('Whether the queue is currently active (has running jobs)'),
|
||||
isPaused: z.boolean().describe('Whether the queue is paused'),
|
||||
})
|
||||
.meta({ id: 'QueueStatusLegacyDto' });
|
||||
|
||||
const QueueResponseLegacySchema = z
|
||||
.object({
|
||||
queueStatus: QueueStatusLegacySchema,
|
||||
jobCounts: QueueStatisticsSchema,
|
||||
})
|
||||
.meta({ id: 'QueueResponseLegacyDto' });
|
||||
|
||||
const QueuesResponseLegacySchema = z
|
||||
.object({
|
||||
[QueueName.ThumbnailGeneration]: QueueResponseLegacySchema,
|
||||
[QueueName.MetadataExtraction]: QueueResponseLegacySchema,
|
||||
[QueueName.VideoConversion]: QueueResponseLegacySchema,
|
||||
[QueueName.SmartSearch]: QueueResponseLegacySchema,
|
||||
[QueueName.StorageTemplateMigration]: QueueResponseLegacySchema,
|
||||
[QueueName.Migration]: QueueResponseLegacySchema,
|
||||
[QueueName.BackgroundTask]: QueueResponseLegacySchema,
|
||||
[QueueName.Search]: QueueResponseLegacySchema,
|
||||
[QueueName.DuplicateDetection]: QueueResponseLegacySchema,
|
||||
[QueueName.FaceDetection]: QueueResponseLegacySchema,
|
||||
[QueueName.FacialRecognition]: QueueResponseLegacySchema,
|
||||
[QueueName.Sidecar]: QueueResponseLegacySchema,
|
||||
[QueueName.Library]: QueueResponseLegacySchema,
|
||||
[QueueName.Notification]: QueueResponseLegacySchema,
|
||||
[QueueName.BackupDatabase]: QueueResponseLegacySchema,
|
||||
[QueueName.Ocr]: QueueResponseLegacySchema,
|
||||
[QueueName.Workflow]: QueueResponseLegacySchema,
|
||||
[QueueName.Editor]: QueueResponseLegacySchema,
|
||||
})
|
||||
.meta({ id: 'QueuesResponseLegacyDto' });
|
||||
|
||||
export class QueueResponseLegacyDto extends createZodDto(QueueResponseLegacySchema) {}
|
||||
export class QueuesResponseLegacyDto extends createZodDto(QueuesResponseLegacySchema) {}
|
||||
|
||||
export const mapQueueLegacy = (response: QueueResponseDto): QueueResponseLegacyDto => {
|
||||
return {
|
||||
queueStatus: {
|
||||
isPaused: response.isPaused,
|
||||
isActive: response.statistics.active > 0,
|
||||
},
|
||||
jobCounts: response.statistics,
|
||||
};
|
||||
};
|
||||
|
||||
export const mapQueuesLegacy = (responses: QueueResponseDto[]): QueuesResponseLegacyDto => {
|
||||
const legacy = new QueuesResponseLegacyDto();
|
||||
|
||||
for (const response of responses) {
|
||||
legacy[response.name] = mapQueueLegacy(response);
|
||||
}
|
||||
|
||||
return legacy;
|
||||
};
|
||||
@@ -826,23 +826,6 @@ export enum JobName {
|
||||
|
||||
export const JobNameSchema = z.enum(JobName).describe('Job name').meta({ id: 'JobName' });
|
||||
|
||||
export enum QueueCommand {
|
||||
Start = 'start',
|
||||
/** @deprecated Use `updateQueue` instead */
|
||||
Pause = 'pause',
|
||||
/** @deprecated Use `updateQueue` instead */
|
||||
Resume = 'resume',
|
||||
/** @deprecated Use `emptyQueue` instead */
|
||||
Empty = 'empty',
|
||||
/** @deprecated Use `emptyQueue` instead */
|
||||
ClearFailed = 'clear-failed',
|
||||
}
|
||||
|
||||
export const QueueCommandSchema = z
|
||||
.enum(QueueCommand)
|
||||
.describe('Queue command to execute')
|
||||
.meta({ id: 'QueueCommand' });
|
||||
|
||||
export enum JobStatus {
|
||||
Success = 'success',
|
||||
Failed = 'failed',
|
||||
|
||||
@@ -1,8 +1,6 @@
|
||||
import { BadRequestException } from '@nestjs/common';
|
||||
import { defaults, SystemConfig } from 'src/config';
|
||||
import { ImmichWorker, JobName, QueueCommand, QueueName } from 'src/enum';
|
||||
import { ImmichWorker, JobName, QueueName } from 'src/enum';
|
||||
import { QueueService } from 'src/services/queue.service';
|
||||
import { factory } from 'test/small.factory';
|
||||
import { newTestService, ServiceMocks } from 'test/utils';
|
||||
|
||||
describe(QueueService.name, () => {
|
||||
@@ -50,170 +48,4 @@ describe(QueueService.name, () => {
|
||||
]);
|
||||
});
|
||||
});
|
||||
|
||||
describe('getAllJobStatus', () => {
|
||||
it('should get all job statuses', async () => {
|
||||
const stats = factory.queueStatistics({ active: 1 });
|
||||
const expected = { jobCounts: stats, queueStatus: { isActive: true, isPaused: true } };
|
||||
|
||||
mocks.job.getJobCounts.mockResolvedValue(stats);
|
||||
mocks.job.isPaused.mockResolvedValue(true);
|
||||
|
||||
await expect(sut.getAllLegacy(factory.auth())).resolves.toEqual({
|
||||
[QueueName.BackgroundTask]: expected,
|
||||
[QueueName.DuplicateDetection]: expected,
|
||||
[QueueName.SmartSearch]: expected,
|
||||
[QueueName.MetadataExtraction]: expected,
|
||||
[QueueName.Search]: expected,
|
||||
[QueueName.StorageTemplateMigration]: expected,
|
||||
[QueueName.Migration]: expected,
|
||||
[QueueName.ThumbnailGeneration]: expected,
|
||||
[QueueName.VideoConversion]: expected,
|
||||
[QueueName.FaceDetection]: expected,
|
||||
[QueueName.FacialRecognition]: expected,
|
||||
[QueueName.Sidecar]: expected,
|
||||
[QueueName.Library]: expected,
|
||||
[QueueName.Notification]: expected,
|
||||
[QueueName.BackupDatabase]: expected,
|
||||
[QueueName.Ocr]: expected,
|
||||
[QueueName.Workflow]: expected,
|
||||
[QueueName.Editor]: expected,
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe('handleCommand', () => {
|
||||
it('should handle a pause command', async () => {
|
||||
mocks.job.getJobCounts.mockResolvedValue(factory.queueStatistics());
|
||||
|
||||
await sut.runCommandLegacy(QueueName.MetadataExtraction, { command: QueueCommand.Pause, force: false });
|
||||
|
||||
expect(mocks.job.pause).toHaveBeenCalledWith(QueueName.MetadataExtraction);
|
||||
});
|
||||
|
||||
it('should handle a resume command', async () => {
|
||||
mocks.job.getJobCounts.mockResolvedValue(factory.queueStatistics());
|
||||
|
||||
await sut.runCommandLegacy(QueueName.MetadataExtraction, { command: QueueCommand.Resume, force: false });
|
||||
|
||||
expect(mocks.job.resume).toHaveBeenCalledWith(QueueName.MetadataExtraction);
|
||||
});
|
||||
|
||||
it('should handle an empty command', async () => {
|
||||
mocks.job.getJobCounts.mockResolvedValue(factory.queueStatistics());
|
||||
|
||||
await sut.runCommandLegacy(QueueName.MetadataExtraction, { command: QueueCommand.Empty, force: false });
|
||||
|
||||
expect(mocks.job.empty).toHaveBeenCalledWith(QueueName.MetadataExtraction);
|
||||
});
|
||||
|
||||
it('should not start a job that is already running', async () => {
|
||||
mocks.job.isActive.mockResolvedValue(true);
|
||||
|
||||
await expect(
|
||||
sut.runCommandLegacy(QueueName.VideoConversion, { command: QueueCommand.Start, force: false }),
|
||||
).rejects.toBeInstanceOf(BadRequestException);
|
||||
|
||||
expect(mocks.job.queue).not.toHaveBeenCalled();
|
||||
expect(mocks.job.queueAll).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('should handle a start video conversion command', async () => {
|
||||
mocks.job.isActive.mockResolvedValue(false);
|
||||
mocks.job.getJobCounts.mockResolvedValue(factory.queueStatistics());
|
||||
|
||||
await sut.runCommandLegacy(QueueName.VideoConversion, { command: QueueCommand.Start, force: false });
|
||||
|
||||
expect(mocks.job.queue).toHaveBeenCalledWith({ name: JobName.AssetEncodeVideoQueueAll, data: { force: false } });
|
||||
});
|
||||
|
||||
it('should handle a start storage template migration command', async () => {
|
||||
mocks.job.isActive.mockResolvedValue(false);
|
||||
mocks.job.getJobCounts.mockResolvedValue(factory.queueStatistics());
|
||||
|
||||
await sut.runCommandLegacy(QueueName.StorageTemplateMigration, { command: QueueCommand.Start, force: false });
|
||||
|
||||
expect(mocks.job.queue).toHaveBeenCalledWith({ name: JobName.StorageTemplateMigration });
|
||||
});
|
||||
|
||||
it('should handle a start smart search command', async () => {
|
||||
mocks.job.isActive.mockResolvedValue(false);
|
||||
mocks.job.getJobCounts.mockResolvedValue(factory.queueStatistics());
|
||||
|
||||
await sut.runCommandLegacy(QueueName.SmartSearch, { command: QueueCommand.Start, force: false });
|
||||
|
||||
expect(mocks.job.queue).toHaveBeenCalledWith({ name: JobName.SmartSearchQueueAll, data: { force: false } });
|
||||
});
|
||||
|
||||
it('should handle a start metadata extraction command', async () => {
|
||||
mocks.job.isActive.mockResolvedValue(false);
|
||||
mocks.job.getJobCounts.mockResolvedValue(factory.queueStatistics());
|
||||
|
||||
await sut.runCommandLegacy(QueueName.MetadataExtraction, { command: QueueCommand.Start, force: false });
|
||||
|
||||
expect(mocks.job.queue).toHaveBeenCalledWith({
|
||||
name: JobName.AssetExtractMetadataQueueAll,
|
||||
data: { force: false },
|
||||
});
|
||||
});
|
||||
|
||||
it('should handle a start sidecar command', async () => {
|
||||
mocks.job.isActive.mockResolvedValue(false);
|
||||
mocks.job.getJobCounts.mockResolvedValue(factory.queueStatistics());
|
||||
|
||||
await sut.runCommandLegacy(QueueName.Sidecar, { command: QueueCommand.Start, force: false });
|
||||
|
||||
expect(mocks.job.queue).toHaveBeenCalledWith({ name: JobName.SidecarQueueAll, data: { force: false } });
|
||||
});
|
||||
|
||||
it('should handle a start thumbnail generation command', async () => {
|
||||
mocks.job.isActive.mockResolvedValue(false);
|
||||
mocks.job.getJobCounts.mockResolvedValue(factory.queueStatistics());
|
||||
|
||||
await sut.runCommandLegacy(QueueName.ThumbnailGeneration, { command: QueueCommand.Start, force: false });
|
||||
|
||||
expect(mocks.job.queue).toHaveBeenCalledWith({
|
||||
name: JobName.AssetGenerateThumbnailsQueueAll,
|
||||
data: { force: false },
|
||||
});
|
||||
});
|
||||
|
||||
it('should handle a start face detection command', async () => {
|
||||
mocks.job.isActive.mockResolvedValue(false);
|
||||
mocks.job.getJobCounts.mockResolvedValue(factory.queueStatistics());
|
||||
|
||||
await sut.runCommandLegacy(QueueName.FaceDetection, { command: QueueCommand.Start, force: false });
|
||||
|
||||
expect(mocks.job.queue).toHaveBeenCalledWith({ name: JobName.AssetDetectFacesQueueAll, data: { force: false } });
|
||||
});
|
||||
|
||||
it('should handle a start facial recognition command', async () => {
|
||||
mocks.job.isActive.mockResolvedValue(false);
|
||||
mocks.job.getJobCounts.mockResolvedValue(factory.queueStatistics());
|
||||
|
||||
await sut.runCommandLegacy(QueueName.FacialRecognition, { command: QueueCommand.Start, force: false });
|
||||
|
||||
expect(mocks.job.queue).toHaveBeenCalledWith({ name: JobName.FacialRecognitionQueueAll, data: { force: false } });
|
||||
});
|
||||
|
||||
it('should handle a start backup database command', async () => {
|
||||
mocks.job.isActive.mockResolvedValue(false);
|
||||
mocks.job.getJobCounts.mockResolvedValue(factory.queueStatistics());
|
||||
|
||||
await sut.runCommandLegacy(QueueName.BackupDatabase, { command: QueueCommand.Start, force: false });
|
||||
|
||||
expect(mocks.job.queue).toHaveBeenCalledWith({ name: JobName.DatabaseBackup, data: { force: false } });
|
||||
});
|
||||
|
||||
it('should throw a bad request when an invalid queue is used', async () => {
|
||||
mocks.job.isActive.mockResolvedValue(false);
|
||||
|
||||
await expect(
|
||||
sut.runCommandLegacy(QueueName.BackgroundTask, { command: QueueCommand.Start, force: false }),
|
||||
).rejects.toBeInstanceOf(BadRequestException);
|
||||
|
||||
expect(mocks.job.queue).not.toHaveBeenCalled();
|
||||
expect(mocks.job.queueAll).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
@@ -2,12 +2,6 @@ import { BadRequestException, Injectable } from '@nestjs/common';
|
||||
import { SystemConfig } from 'src/config';
|
||||
import { OnEvent } from 'src/decorators';
|
||||
import { AuthDto } from 'src/dtos/auth.dto';
|
||||
import {
|
||||
mapQueueLegacy,
|
||||
mapQueuesLegacy,
|
||||
QueueResponseLegacyDto,
|
||||
QueuesResponseLegacyDto,
|
||||
} from 'src/dtos/queue-legacy.dto';
|
||||
import {
|
||||
QueueCommandDto,
|
||||
QueueDeleteDto,
|
||||
@@ -23,7 +17,6 @@ import {
|
||||
ImmichWorker,
|
||||
JobName,
|
||||
QueueCleanType,
|
||||
QueueCommand,
|
||||
QueueName,
|
||||
} from 'src/enum';
|
||||
import { ArgOf } from 'src/repositories/event.repository';
|
||||
@@ -99,51 +92,10 @@ export class QueueService extends BaseService {
|
||||
this.services = services;
|
||||
}
|
||||
|
||||
async runCommandLegacy(name: QueueName, dto: QueueCommandDto): Promise<QueueResponseLegacyDto> {
|
||||
this.logger.debug(`Handling command: queue=${name},command=${dto.command},force=${dto.force}`);
|
||||
|
||||
switch (dto.command) {
|
||||
case QueueCommand.Start: {
|
||||
await this.start(name, dto);
|
||||
break;
|
||||
}
|
||||
|
||||
case QueueCommand.Pause: {
|
||||
await this.jobRepository.pause(name);
|
||||
break;
|
||||
}
|
||||
|
||||
case QueueCommand.Resume: {
|
||||
await this.jobRepository.resume(name);
|
||||
break;
|
||||
}
|
||||
|
||||
case QueueCommand.Empty: {
|
||||
await this.jobRepository.empty(name);
|
||||
break;
|
||||
}
|
||||
|
||||
case QueueCommand.ClearFailed: {
|
||||
const failedJobs = await this.jobRepository.clear(name, QueueCleanType.Failed);
|
||||
this.logger.debug(`Cleared failed jobs: ${failedJobs}`);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
const response = await this.getByName(name);
|
||||
|
||||
return mapQueueLegacy(response);
|
||||
}
|
||||
|
||||
async getAll(_auth: AuthDto): Promise<QueueResponseDto[]> {
|
||||
return Promise.all(Object.values(QueueName).map((name) => this.getByName(name)));
|
||||
}
|
||||
|
||||
async getAllLegacy(auth: AuthDto): Promise<QueuesResponseLegacyDto> {
|
||||
const responses = await this.getAll(auth);
|
||||
return mapQueuesLegacy(responses);
|
||||
}
|
||||
|
||||
get(auth: AuthDto, name: QueueName): Promise<QueueResponseDto> {
|
||||
return this.getByName(name);
|
||||
}
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
import { AuthApiKey, AuthSharedLink, AuthUser, Exif, Library, UserAdmin } from 'src/database';
|
||||
import { AuthDto } from 'src/dtos/auth.dto';
|
||||
import { QueueStatisticsDto } from 'src/dtos/queue.dto';
|
||||
import { AssetFileType, Permission, UserStatus } from 'src/enum';
|
||||
import { v4, v7 } from 'uuid';
|
||||
|
||||
@@ -100,16 +99,6 @@ const authUserFactory = (authUser: Partial<AuthUser> = {}) => {
|
||||
return { id, isAdmin, name, email, quotaUsageInBytes, quotaSizeInBytes };
|
||||
};
|
||||
|
||||
const queueStatisticsFactory = (dto?: Partial<QueueStatisticsDto>) => ({
|
||||
active: 0,
|
||||
completed: 0,
|
||||
failed: 0,
|
||||
delayed: 0,
|
||||
waiting: 0,
|
||||
paused: 0,
|
||||
...dto,
|
||||
});
|
||||
|
||||
const userAdminFactory = (user: Partial<UserAdmin> = {}) => {
|
||||
const {
|
||||
id = newUuid(),
|
||||
@@ -236,7 +225,6 @@ export const factory = {
|
||||
assetOcr: assetOcrFactory,
|
||||
auth: authFactory,
|
||||
library: libraryFactory,
|
||||
queueStatistics: queueStatisticsFactory,
|
||||
versionHistory: versionHistoryFactory,
|
||||
jobAssets: {
|
||||
sidecarWrite: assetSidecarWriteFactory,
|
||||
|
||||
Reference in New Issue
Block a user