From 60d4def0c8b6b54bf209eb5e922c2d1c51d111b4 Mon Sep 17 00:00:00 2001 From: mertalev <101130780+mertalev@users.noreply.github.com> Date: Mon, 4 May 2026 09:34:00 -0400 Subject: [PATCH] transcoding service --- server/src/constants.ts | 2 +- server/src/cores/storage.core.ts | 44 ++ server/src/enum.ts | 3 + server/src/repositories/process.repository.ts | 6 +- server/src/repositories/storage.repository.ts | 12 +- server/src/services/index.ts | 2 + server/src/services/media.service.ts | 28 +- server/src/services/queue.service.spec.ts | 1 + server/src/services/queue.service.ts | 1 + .../src/services/transcoding.service.spec.ts | 207 +++++++++ server/src/services/transcoding.service.ts | 393 ++++++++++++++++++ server/src/types.ts | 30 +- server/src/utils/database.ts | 9 +- server/src/utils/media.ts | 324 +++++++++------ .../repositories/storage.repository.mock.ts | 1 + server/test/utils.ts | 9 +- 16 files changed, 893 insertions(+), 179 deletions(-) create mode 100644 server/src/services/transcoding.service.spec.ts create mode 100644 server/src/services/transcoding.service.ts diff --git a/server/src/constants.ts b/server/src/constants.ts index 54fce33e95..90c06d0073 100644 --- a/server/src/constants.ts +++ b/server/src/constants.ts @@ -222,7 +222,7 @@ export const SUPPORTED_HWA_CODECS: Record { + const [dri, mali] = await Promise.all([this.getDevices(), this.hasMaliOpenCL()]); + return { dri, mali }; + } + private savePath(pathType: PathType, id: string, newPath: string) { switch (pathType) { case AssetPathType.Original: { @@ -330,4 +352,26 @@ export class StorageCore { static getTempPathInDir(dir: string): string { return join(dir, `${randomUUID()}.tmp`); } + + private async getDevices() { + try { + return await this.storageRepository.readdir('/dev/dri'); + } catch { + this.logger.debug('No devices found in /dev/dri.'); + return []; + } + } + + private async hasMaliOpenCL() { + try { + const [maliIcdStat, maliDeviceStat] = await Promise.all([ + this.storageRepository.stat('/etc/OpenCL/vendors/mali.icd'), + this.storageRepository.stat('/dev/mali0'), + ]); + return maliIcdStat.isFile() && maliDeviceStat.isCharacterDevice(); + } catch { + this.logger.debug('OpenCL not available for transcoding, so RKMPP acceleration will use CPU tonemapping'); + return false; + } + } } diff --git a/server/src/enum.ts b/server/src/enum.ts index 28aca072ef..8d0bb9a844 100644 --- a/server/src/enum.ts +++ b/server/src/enum.ts @@ -814,6 +814,8 @@ export enum JobName { LibrarySyncFiles = 'LibrarySyncFiles', LibraryScanQueueAll = 'LibraryScanQueueAll', + HlsSessionCleanup = 'HlsSessionCleanup', + MemoryCleanup = 'MemoryCleanup', MemoryGenerate = 'MemoryGenerate', @@ -906,6 +908,7 @@ export enum DatabaseLock { MaintenanceOperation = 621, MemoryCreation = 777, VersionCheck = 800, + HlsSessionCleanup = 850, } export enum MaintenanceAction { diff --git a/server/src/repositories/process.repository.ts b/server/src/repositories/process.repository.ts index 9d8cac1f40..928531408f 100644 --- a/server/src/repositories/process.repository.ts +++ b/server/src/repositories/process.repository.ts @@ -1,12 +1,10 @@ import { Injectable } from '@nestjs/common'; -import { ChildProcessWithoutNullStreams, fork, spawn, SpawnOptionsWithoutStdio } from 'node:child_process'; +import { fork, spawn, SpawnOptionsWithoutStdio } from 'node:child_process'; import { Duplex } from 'node:stream'; @Injectable() export class ProcessRepository { - spawn(command: string, args?: readonly string[], options?: SpawnOptionsWithoutStdio): ChildProcessWithoutNullStreams { - return spawn(command, args, options); - } + spawn = spawn; spawnDuplexStream(command: string, args?: readonly string[], options?: SpawnOptionsWithoutStdio): Duplex { let stdinClosed = false; diff --git a/server/src/repositories/storage.repository.ts b/server/src/repositories/storage.repository.ts index c7ba4ab6cc..18331b7d0d 100644 --- a/server/src/repositories/storage.repository.ts +++ b/server/src/repositories/storage.repository.ts @@ -2,7 +2,15 @@ import { Injectable } from '@nestjs/common'; import archiver from 'archiver'; import chokidar, { ChokidarOptions } from 'chokidar'; import { escapePath, glob, globStream } from 'fast-glob'; -import { constants, createReadStream, createWriteStream, existsSync, mkdirSync, ReadOptionsWithBuffer } from 'node:fs'; +import { + constants, + createReadStream, + createWriteStream, + existsSync, + mkdirSync, + ReadOptionsWithBuffer, + watch, +} from 'node:fs'; import fs from 'node:fs/promises'; import path from 'node:path'; import { PassThrough, Readable, Writable } from 'node:stream'; @@ -258,6 +266,8 @@ export class StorageRepository { return () => watcher.close(); } + watchDir = watch; // Native fs.watch without chokidar overhead + private asGlob(pathToCrawl: string): string { const escapedPath = escapePath(pathToCrawl).replaceAll('"', '["]').replaceAll("'", "[']").replaceAll('`', '[`]'); const extensions = `*{${mimeTypes.getSupportedFileExtensions().join(',')}}`; diff --git a/server/src/services/index.ts b/server/src/services/index.ts index 6fa4883c83..4af781d607 100644 --- a/server/src/services/index.ts +++ b/server/src/services/index.ts @@ -40,6 +40,7 @@ import { SystemMetadataService } from 'src/services/system-metadata.service'; import { TagService } from 'src/services/tag.service'; import { TelemetryService } from 'src/services/telemetry.service'; import { TimelineService } from 'src/services/timeline.service'; +import { TranscodingService } from 'src/services/transcoding.service'; import { TrashService } from 'src/services/trash.service'; import { UserAdminService } from 'src/services/user-admin.service'; import { UserService } from 'src/services/user.service'; @@ -90,6 +91,7 @@ export const services = [ TagService, TelemetryService, TimelineService, + TranscodingService, TrashService, UserAdminService, UserService, diff --git a/server/src/services/media.service.ts b/server/src/services/media.service.ts index 0b8a1f6702..a73eb3e22e 100644 --- a/server/src/services/media.service.ts +++ b/server/src/services/media.service.ts @@ -13,6 +13,7 @@ import { AudioCodec, Colorspace, ImageFormat, + ImmichWorker, JobName, JobStatus, QueueName, @@ -60,10 +61,9 @@ type ThumbnailAsset = NonNullable= targetSize; } - private async getDevices() { - try { - return await this.storageRepository.readdir('/dev/dri'); - } catch { - this.logger.debug('No devices found in /dev/dri.'); - return []; - } - } - - private async hasMaliOpenCL() { - try { - const [maliIcdStat, maliDeviceStat] = await Promise.all([ - this.storageRepository.stat('/etc/OpenCL/vendors/mali.icd'), - this.storageRepository.stat('/dev/mali0'), - ]); - return maliIcdStat.isFile() && maliDeviceStat.isCharacterDevice(); - } catch { - this.logger.debug('OpenCL not available for transcoding, so RKMPP acceleration will use CPU tonemapping'); - return false; - } - } - private async syncFiles( oldFiles: (AssetFile & { isProgressive: boolean; isTransparent: boolean })[], newFiles: UpsertFileOptions[], diff --git a/server/src/services/queue.service.spec.ts b/server/src/services/queue.service.spec.ts index d4c425e8bd..48c61c0951 100644 --- a/server/src/services/queue.service.spec.ts +++ b/server/src/services/queue.service.spec.ts @@ -41,6 +41,7 @@ describe(QueueService.name, () => { { name: JobName.PersonCleanup }, { name: JobName.MemoryCleanup }, { name: JobName.SessionCleanup }, + { name: JobName.HlsSessionCleanup }, { name: JobName.AuditTableCleanup }, { name: JobName.MemoryGenerate }, { name: JobName.UserSyncUsage }, diff --git a/server/src/services/queue.service.ts b/server/src/services/queue.service.ts index ba6f4c5f3b..d11c9180b2 100644 --- a/server/src/services/queue.service.ts +++ b/server/src/services/queue.service.ts @@ -269,6 +269,7 @@ export class QueueService extends BaseService { { name: JobName.PersonCleanup }, { name: JobName.MemoryCleanup }, { name: JobName.SessionCleanup }, + { name: JobName.HlsSessionCleanup }, { name: JobName.AuditTableCleanup }, ); } diff --git a/server/src/services/transcoding.service.spec.ts b/server/src/services/transcoding.service.spec.ts new file mode 100644 index 0000000000..d374ae0914 --- /dev/null +++ b/server/src/services/transcoding.service.spec.ts @@ -0,0 +1,207 @@ +import { HLS_CLEANUP_INTERVAL_MS, HLS_INACTIVITY_TIMEOUT_MS, HLS_LEASE_DURATION_MS } from 'src/constants'; +import { TranscodingService } from 'src/services/transcoding.service'; +import { VIDEO_STREAM_SESSION_PK_CONSTRAINT } from 'src/utils/database'; +import { eiffelTower } from 'test/fixtures/media.stub'; +import { mockSpawn, newTestService, ServiceMocks } from 'test/utils'; +import { vi } from 'vitest'; + +describe(TranscodingService.name, () => { + let sut: TranscodingService; + let mocks: ServiceMocks; + + const sessionId = 'session-1'; + const assetId = 'asset-1'; + const ownerId = 'user-1'; + + beforeEach(() => { + ({ sut, mocks } = newTestService(TranscodingService)); + mocks.systemMetadata.get.mockResolvedValue({ ffmpeg: { realtime: { enabled: true } } }); + mocks.videoStream.getForTranscoding.mockResolvedValue(eiffelTower); + }); + + describe('onSessionRequest', () => { + it('creates the session row and emits HlsSessionResult on success', async () => { + await sut.onSessionRequest({ sessionId, assetId, ownerId }); + + expect(mocks.videoStream.createSession).toHaveBeenCalledWith({ + id: sessionId, + assetId, + expiresAt: expect.any(Date), + }); + expect(mocks.websocket.serverSend).toHaveBeenCalledWith('HlsSessionResult', { sessionId }); + }); + + it('treats a primary-key conflict as a no-op for replay tolerance', async () => { + mocks.videoStream.createSession.mockRejectedValue({ constraint_name: VIDEO_STREAM_SESSION_PK_CONSTRAINT }); + + await sut.onSessionRequest({ sessionId, assetId, ownerId }); + + expect(mocks.websocket.serverSend).not.toHaveBeenCalled(); + }); + + it('emits HlsSessionResult with an error on other DB failures', async () => { + mocks.videoStream.createSession.mockRejectedValue(new Error('database is down')); + + await sut.onSessionRequest({ sessionId, assetId, ownerId }); + + expect(mocks.websocket.serverSend).toHaveBeenCalledWith('HlsSessionResult', { + sessionId, + error: 'Failed to create HLS session', + }); + }); + }); + + describe('onSessionEnd', () => { + it('removes the session, kills the transcode, and deletes the dir + DB row', async () => { + await sut.onSessionRequest({ sessionId, assetId, ownerId }); + const process = mockSpawn(0, '', ''); + mocks.process.spawn.mockReturnValue(process); + await sut.onSegmentRequest({ sessionId, assetId, variantIndex: 0, segmentIndex: 0 }); + + await sut.onSessionEnd({ sessionId }); + + expect(process.kill).toHaveBeenCalled(); + expect(mocks.storage.unlinkDir).toHaveBeenCalled(); + expect(mocks.videoStream.deleteSession).toHaveBeenCalledWith(sessionId); + }); + + it('is a no-op when the session is unknown', async () => { + await sut.onSessionEnd({ sessionId: 'never-created' }); + + expect(mocks.videoStream.deleteSession).not.toHaveBeenCalled(); + expect(mocks.storage.unlinkDir).not.toHaveBeenCalled(); + }); + }); + + describe('onHeartbeat', () => { + it('extends the DB lease when remaining time falls below half', async () => { + vi.useFakeTimers(); + try { + await sut.onSessionRequest({ sessionId, assetId, ownerId }); + vi.setSystemTime(Date.now() + HLS_LEASE_DURATION_MS / 2 + 1); + + await sut.onHeartbeat({ sessionId }); + + expect(mocks.videoStream.extendSession).toHaveBeenCalledWith(sessionId, expect.any(Date)); + } finally { + vi.useRealTimers(); + } + }); + + it('does not extend the lease while it is still fresh', async () => { + await sut.onSessionRequest({ sessionId, assetId, ownerId }); + + await sut.onHeartbeat({ sessionId }); + + expect(mocks.videoStream.extendSession).not.toHaveBeenCalled(); + }); + + it('is a no-op when the session is unknown', async () => { + await sut.onHeartbeat({ sessionId: 'never-created' }); + + expect(mocks.videoStream.extendSession).not.toHaveBeenCalled(); + }); + }); + + describe('onSegmentRequest', () => { + beforeEach(async () => { + await sut.onSessionRequest({ sessionId, assetId, ownerId }); + mocks.websocket.serverSend.mockClear(); + }); + + it('spawns FFmpeg on the first request', async () => { + mocks.process.spawn.mockReturnValue(mockSpawn(0, '', '')); + + await sut.onSegmentRequest({ sessionId, assetId, variantIndex: 0, segmentIndex: 0 }); + + expect(mocks.process.spawn).toHaveBeenCalledTimes(1); + expect(mocks.process.spawn).toHaveBeenCalledWith('ffmpeg', expect.any(Array), expect.any(Object)); + }); + + it('kills and respawns when the variant changes', async () => { + const first = mockSpawn(0, '', ''); + const second = mockSpawn(0, '', ''); + mocks.process.spawn.mockReturnValueOnce(first).mockReturnValueOnce(second); + + await sut.onSegmentRequest({ sessionId, assetId, variantIndex: 0, segmentIndex: 0 }); + await sut.onSegmentRequest({ sessionId, assetId, variantIndex: 1, segmentIndex: 0 }); + + expect(first.kill).toHaveBeenCalled(); + expect(mocks.process.spawn).toHaveBeenCalledTimes(2); + }); + + it('kills and respawns when seeking before the start segment', async () => { + const first = mockSpawn(0, '', ''); + const second = mockSpawn(0, '', ''); + mocks.process.spawn.mockReturnValueOnce(first).mockReturnValueOnce(second); + + await sut.onSegmentRequest({ sessionId, assetId, variantIndex: 0, segmentIndex: 5 }); + await sut.onSegmentRequest({ sessionId, assetId, variantIndex: 0, segmentIndex: 2 }); + + expect(first.kill).toHaveBeenCalled(); + expect(mocks.process.spawn).toHaveBeenCalledTimes(2); + }); + + it('kills and respawns when the requested segment is too far ahead', async () => { + const first = mockSpawn(0, '', ''); + const second = mockSpawn(0, '', ''); + mocks.process.spawn.mockReturnValueOnce(first).mockReturnValueOnce(second); + + await sut.onSegmentRequest({ sessionId, assetId, variantIndex: 0, segmentIndex: 0 }); + await sut.onSegmentRequest({ sessionId, assetId, variantIndex: 0, segmentIndex: 5 }); + + expect(first.kill).toHaveBeenCalled(); + expect(mocks.process.spawn).toHaveBeenCalledTimes(2); + }); + + it('does not spawn when the session is unknown', async () => { + await sut.onSegmentRequest({ sessionId: 'never-created', assetId, variantIndex: 0, segmentIndex: 0 }); + + expect(mocks.process.spawn).not.toHaveBeenCalled(); + }); + }); + + describe('inactivity sweeper', () => { + it('reaps a session whose last activity exceeds the inactivity timeout', async () => { + vi.useFakeTimers(); + try { + await sut.onSessionRequest({ sessionId, assetId, ownerId }); + mocks.websocket.serverSend.mockClear(); + await vi.advanceTimersByTimeAsync(HLS_INACTIVITY_TIMEOUT_MS + HLS_CLEANUP_INTERVAL_MS); + + expect(mocks.websocket.serverSend).toHaveBeenCalledWith('HlsSessionEnd', { sessionId }); + expect(mocks.videoStream.deleteSession).toHaveBeenCalledWith(sessionId); + } finally { + vi.useRealTimers(); + } + }); + }); + + describe('onShutdown', () => { + it('ends every active session', async () => { + await sut.onSessionRequest({ sessionId: 'session-a', assetId, ownerId }); + await sut.onSessionRequest({ sessionId: 'session-b', assetId, ownerId }); + + await sut.onShutdown(); + + expect(mocks.videoStream.deleteSession).toHaveBeenCalledWith('session-a'); + expect(mocks.videoStream.deleteSession).toHaveBeenCalledWith('session-b'); + }); + }); + + describe('onHlsSessionCleanup', () => { + it('reaps DB-expired sessions under a database lock', async () => { + mocks.database.withLock.mockImplementation(async (_, fn) => fn()); + mocks.videoStream.getExpiredSessions.mockResolvedValue([ + { id: 'expired-1', ownerId: 'user-a' }, + { id: 'expired-2', ownerId: 'user-b' }, + ]); + + await sut.onHlsSessionCleanup(); + + expect(mocks.videoStream.deleteSession).toHaveBeenCalledWith('expired-1'); + expect(mocks.videoStream.deleteSession).toHaveBeenCalledWith('expired-2'); + expect(mocks.storage.unlinkDir).toHaveBeenCalledTimes(2); + }); + }); +}); diff --git a/server/src/services/transcoding.service.ts b/server/src/services/transcoding.service.ts new file mode 100644 index 0000000000..5e3a625289 --- /dev/null +++ b/server/src/services/transcoding.service.ts @@ -0,0 +1,393 @@ +import { Injectable } from '@nestjs/common'; +import { ChildProcess } from 'node:child_process'; +import { join } from 'node:path'; +import { + HLS_BACKPRESSURE_PAUSE_SEGMENTS, + HLS_BACKPRESSURE_RESUME_SEGMENTS, + HLS_CLEANUP_INTERVAL_MS, + HLS_INACTIVITY_TIMEOUT_MS, + HLS_LEASE_DURATION_MS, + HLS_SEGMENT_DURATION, + HLS_VARIANTS, + SEGMENT_FILENAME_REGEX, +} from 'src/constants'; +import { StorageCore } from 'src/cores/storage.core'; +import { OnEvent, OnJob } from 'src/decorators'; +import { DatabaseLock, ImmichWorker, JobName, QueueName, TranscodeTarget } from 'src/enum'; +import { ArgOf } from 'src/repositories/event.repository'; +import { BaseService } from 'src/services/base.service'; +import { VideoInterfaces } from 'src/types'; +import { isVideoStreamSessionPkConstraint } from 'src/utils/database'; +import { BaseConfig } from 'src/utils/media'; + +type Session = { + assetId: string; + expiresAt: Date; + id: string; + lastActivityTime: Date; + lastClientRequestedSegment: number | null; + lastCompletedSegment: number | null; + ownerId: string; + paused: boolean; + process: ChildProcess | null; + startSegment: number | null; + variantIndex: number | null; +}; + +@Injectable() +export class TranscodingService extends BaseService { + private sessions = new Map(); + private videoInterfaces: VideoInterfaces = { dri: [], mali: false }; + private cleanupInterval: NodeJS.Timeout | null = null; + + @OnEvent({ name: 'AppBootstrap', workers: [ImmichWorker.Microservices] }) + async onBootstrap() { + const [videoInterfaces] = await Promise.all([this.storageCore.getVideoInterfaces(), this.removeExpiredSessions()]); + this.videoInterfaces = videoInterfaces; + } + + @OnEvent({ name: 'AppShutdown', workers: [ImmichWorker.Microservices] }) + onShutdown() { + if (this.cleanupInterval) { + clearInterval(this.cleanupInterval); + this.cleanupInterval = null; + } + return Promise.all([...this.sessions.values()].map(({ id }) => this.onSessionEnd({ sessionId: id }))); + } + + @OnJob({ name: JobName.HlsSessionCleanup, queue: QueueName.BackgroundTask }) + onHlsSessionCleanup() { + return this.removeExpiredSessions(); + } + + @OnEvent({ name: 'HlsSessionRequest', server: true, workers: [ImmichWorker.Microservices] }) + async onSessionRequest({ assetId, sessionId, ownerId }: ArgOf<'HlsSessionRequest'>) { + try { + const expiresAt = new Date(Date.now() + HLS_LEASE_DURATION_MS); + await this.videoStreamRepository.createSession({ id: sessionId, assetId, expiresAt }); + this.sessions.set(sessionId, { + assetId, + expiresAt, + id: sessionId, + lastActivityTime: new Date(), + lastClientRequestedSegment: null, + lastCompletedSegment: null, + ownerId, + paused: false, + process: null, + startSegment: null, + variantIndex: null, + }); + this.cleanupInterval ??= setInterval(() => void this.removeInactiveSessions(), HLS_CLEANUP_INTERVAL_MS); + this.websocketRepository.serverSend('HlsSessionResult', { sessionId }); + } catch (error) { + if (!isVideoStreamSessionPkConstraint(error)) { + this.logger.error(`Failed to create HLS session ${sessionId}: ${error}`); + this.websocketRepository.serverSend('HlsSessionResult', { sessionId, error: 'Failed to create HLS session' }); + } + } + } + + @OnEvent({ name: 'HlsSessionEnd', server: true, workers: [ImmichWorker.Microservices] }) + async onSessionEnd({ sessionId }: ArgOf<'HlsSessionEnd'>) { + const session = this.sessions.get(sessionId); + if (!session) { + return; + } + this.sessions.delete(sessionId); + if (this.cleanupInterval && this.sessions.size === 0) { + clearInterval(this.cleanupInterval); + this.cleanupInterval = null; + } + this.stopTranscode(session); + await this.removeSessionDir(session); + await this.videoStreamRepository.deleteSession(sessionId); + } + + @OnEvent({ name: 'HlsHeartbeat', server: true, workers: [ImmichWorker.Microservices] }) + async onHeartbeat({ sessionId, segmentIndex }: ArgOf<'HlsHeartbeat'>) { + const session = this.sessions.get(sessionId); + if (!session) { + return; + } + + session.lastActivityTime = new Date(); + + if (segmentIndex !== undefined) { + session.lastClientRequestedSegment = segmentIndex; + this.applyBackpressure(session); + } + + const remaining = session.expiresAt.getTime() - Date.now(); + if (remaining < HLS_LEASE_DURATION_MS / 2) { + session.expiresAt = new Date(Date.now() + HLS_LEASE_DURATION_MS); + await this.videoStreamRepository.extendSession(sessionId, session.expiresAt); + } + } + + @OnEvent({ name: 'HlsSegmentRequest', server: true, workers: [ImmichWorker.Microservices] }) + async onSegmentRequest({ sessionId, variantIndex, segmentIndex }: ArgOf<'HlsSegmentRequest'>) { + const session = this.sessions.get(sessionId); + if (!session) { + return; + } + + session.variantIndex ??= variantIndex; + session.startSegment ??= segmentIndex; + const curSegment = session.lastCompletedSegment === null ? session.startSegment : session.lastCompletedSegment + 1; + const needsRestart = + session.variantIndex !== variantIndex || segmentIndex < session.startSegment || segmentIndex > curSegment + 1; + if (needsRestart) { + this.stopTranscode(session); + session.variantIndex = variantIndex; + session.startSegment = segmentIndex; + } else if (session.process) { + this.resumeTranscode(session); + return; + } + + const process = await this.startTranscode(session, variantIndex, segmentIndex); + if (process) { + session.process = process; + } + } + + private applyBackpressure(session: Session) { + if (session.lastCompletedSegment === null || session.lastClientRequestedSegment === null) { + return; + } + const lead = session.lastCompletedSegment - session.lastClientRequestedSegment; + if (!session.paused && lead > HLS_BACKPRESSURE_PAUSE_SEGMENTS) { + this.pauseTranscode(session); + } else if (session.paused && lead < HLS_BACKPRESSURE_RESUME_SEGMENTS) { + this.resumeTranscode(session); + } + } + + private async startTranscode(session: Session, variantIndex: number, startSegment: number) { + const t0 = performance.now(); + const { ffmpeg } = await this.getConfig({ withCache: true }); + const t1 = performance.now(); + + const asset = await this.videoStreamRepository.getForTranscoding(session.assetId); + const t2 = performance.now(); + if (!asset) { + this.logger.error(`Asset ${session.assetId} not found for HLS transcoding`); + return; + } + + if (session.variantIndex !== variantIndex || session.startSegment !== startSegment) { + return; + } + + const variant = HLS_VARIANTS[variantIndex]; + if (!variant) { + this.logger.error(`Variant ${variantIndex} out of range for asset ${session.assetId}`); + await this.failSession(session, `Invalid variant index ${variantIndex}`); + return; + } + + const variantDir = StorageCore.getHlsVariantFolder({ + ownerId: session.ownerId, + sessionId: session.id, + variantIndex, + }); + this.storageRepository.mkdirSync(variantDir); + + // Encoder runs at fps = packetCount × timeBase / totalDuration with + // gop = ceil(SEGMENT_DURATION × fps). To start segment K's content at + // exactly cfr slot K × gop, seek to the midpoint between slots K×gop−1 and + // K×gop. accurate_seek's "discard < target" then keeps the source frame + // that quantizes to slot K×gop and discards the one quantizing to K×gop−1. + const fps = (asset.packets.packetCount * asset.videoStream.timeBase) / asset.packets.totalDuration; + const gop = Math.ceil(HLS_SEGMENT_DURATION * fps); + const seekSeconds = startSegment > 0 ? (startSegment * gop - 0.5) / fps : 0; + + let config; + try { + config = BaseConfig.create( + { + ...ffmpeg, + targetVideoCodec: variant.codec, + targetResolution: String(variant.resolution), + maxBitrate: `${Math.round(variant.bitrate / 1000)}k`, + gopSize: gop, + }, + this.videoInterfaces, + { strictGop: true, lowLatency: true }, + ); + } catch (error: any) { + this.logger.error( + `Failed to create transcode config for variant ${variantIndex} asset ${session.assetId}: ${error?.message ?? error}`, + ); + await this.failSession(session, `Failed to start transcode: ${error?.message ?? 'unknown error'}`); + return; + } + const t3 = performance.now(); + const args = config.getHlsCommand( + { + initFilename: 'init.mp4', + inputPath: asset.originalPath, + packetCount: asset.packets.packetCount, + playlistFilename: join(variantDir, 'playlist.m3u8'), + seekSeconds, + segmentDuration: HLS_SEGMENT_DURATION, + segmentFilename: join(variantDir, 'seg_%d.m4s'), + startSegment, + target: TranscodeTarget.All, + timeBase: asset.videoStream.timeBase, + totalDuration: asset.packets.totalDuration, + }, + asset.videoStream, + asset.audioStream ?? undefined, + ); + const t4 = performance.now(); + this.logger.log( + `Starting HLS transcode for asset ${session.assetId} variant ${variantIndex} with command: ffmpeg ${args.join(' ')}`, + ); + const process = this.processRepository.spawn('ffmpeg', args, { stdio: ['ignore', 'ignore', 'pipe'] }); + const t5 = performance.now(); + this.attachProcessHandlers(process, session, variantIndex, t5); + this.logger.log( + `[TIMING] startTranscode session=${session.id} variant=${variantIndex} startSegment=${startSegment} ` + + `getConfig=${(t1 - t0).toFixed(1)}ms getAsset=${(t2 - t1).toFixed(1)}ms ` + + `config=${(t3 - t2).toFixed(1)}ms args=${(t4 - t3).toFixed(1)}ms ` + + `spawn=${(t5 - t4).toFixed(1)}ms total=${(t5 - t0).toFixed(1)}ms`, + ); + return process; + } + + private failSession(session: Session, error: string) { + this.websocketRepository.serverSend('HlsSessionResult', { sessionId: session.id, error }); + return this.onSessionEnd({ sessionId: session.id }); + } + + private attachProcessHandlers(process: ChildProcess, session: Session, variantIndex: number, spawnTime: number) { + let stderr = ''; + let firstSegmentLogged = false; + const variantDir = StorageCore.getHlsVariantFolder({ + ownerId: session.ownerId, + sessionId: session.id, + variantIndex, + }); + + // hlsenc writes each segment as `seg_K.m4s.tmp` then renames to + // `seg_K.m4s`. The rename event fires the moment the renamed file is + // observable — the only signal we need to tell the API worker the + // segment is ready to serve. + const watcher = this.storageRepository.watchDir(variantDir, (eventType, filename) => { + if (eventType !== 'rename' || !filename || session.process !== process) { + return; + } + const match = SEGMENT_FILENAME_REGEX.exec(filename); + if (!match) { + return; + } + const segmentIndex = Number.parseInt(match[1]); + if (!firstSegmentLogged) { + firstSegmentLogged = true; + this.logger.log( + `[TIMING] firstSegmentVisible session=${session.id} variant=${variantIndex} ` + + `segmentIndex=${segmentIndex} sinceSpawn=${(performance.now() - spawnTime).toFixed(1)}ms`, + ); + } + session.lastCompletedSegment = segmentIndex; + this.websocketRepository.serverSend('HlsSegmentResult', { + sessionId: session.id, + variantIndex, + segmentIndex, + }); + }); + watcher.on('error', (error) => { + this.logger.error(`watcher error for ${variantDir}: ${error}`); + }); + + process.stderr!.on('data', (chunk: Buffer) => { + if (session.process !== process) { + return; + } + stderr += chunk.toString(); + }); + + process.on('exit', (code) => { + watcher.close(); + if (session.process !== process || session.variantIndex !== variantIndex) { + return; + } + session.paused = false; + session.process = null; + if (code) { + this.logger.error( + `FFmpeg exited with code ${code} for variant ${variantIndex} asset ${session.assetId}\n${stderr}`, + ); + void this.failSession(session, `Transcoding process exited unexpectedly with code ${code}`).catch((error) => + this.logger.error(`Failed to end session ${session.id} after ffmpeg exit: ${error}`), + ); + } + }); + } + + private stopTranscode(session: Session) { + if (!session.process) { + return; + } + this.resumeTranscode(session); + session.process.kill(); + session.process = null; + } + + private pauseTranscode(session: Session) { + if (session.paused || !session.process) { + return; + } + session.process.kill('SIGSTOP'); + session.paused = true; + } + + private resumeTranscode(session: Session) { + if (!session.paused || !session.process) { + return; + } + session.process.kill('SIGCONT'); + session.paused = false; + } + + private async removeSessionDir(session: { ownerId: string; id: string }) { + const dir = StorageCore.getHlsSessionFolder({ ownerId: session.ownerId, sessionId: session.id }); + try { + await this.storageRepository.unlinkDir(dir, { recursive: true, force: true }); + } catch (error) { + if ((error as NodeJS.ErrnoException)?.code !== 'ENOENT') { + throw error; + } + this.logger.warn(`Session dir ${dir} does not exist.`); + } + } + + private removeInactiveSessions() { + const cutoff = Date.now() - HLS_INACTIVITY_TIMEOUT_MS; + const inactiveSessions = [...this.sessions.values()].filter((s) => s.lastActivityTime.getTime() < cutoff); + return Promise.all( + inactiveSessions.map(async (session) => { + try { + this.websocketRepository.serverSend('HlsSessionEnd', { sessionId: session.id }); + await this.onSessionEnd({ sessionId: session.id }); + } catch (error) { + this.logger.error(`Failed to sweep inactive HLS session ${session.id}: ${error}`); + } + }), + ); + } + + private removeExpiredSessions() { + return this.databaseRepository.withLock(DatabaseLock.HlsSessionCleanup, async () => { + const expiredSessions = await this.videoStreamRepository.getExpiredSessions(); + await Promise.all( + expiredSessions.map(async (session) => { + await this.removeSessionDir(session); + await this.videoStreamRepository.deleteSession(session.id); + }), + ); + }); + } +} diff --git a/server/src/types.ts b/server/src/types.ts index aa6bb820cc..06f9c97989 100644 --- a/server/src/types.ts +++ b/server/src/types.ts @@ -29,7 +29,6 @@ import { SystemMetadataKey, TranscodeTarget, UserMetadataKey, - VideoCodec, } from 'src/enum'; export type DeepPartial = @@ -160,6 +159,25 @@ export interface TranscodeCommand { }; } +export interface VideoTuning { + strictGop: boolean; + lowLatency: boolean; +} + +export interface HlsCommandOptions { + initFilename: string; + inputPath: string; + packetCount: number; + playlistFilename: string; + seekSeconds?: number; + segmentDuration: number; + segmentFilename: string; + startSegment: number; + target: TranscodeTarget; + timeBase: number; + totalDuration: number; +} + export interface BitrateDistribution { max: number; target: number; @@ -175,14 +193,11 @@ export interface ImageBuffer { export interface VideoCodecSWConfig { getCommand( target: TranscodeTarget, - videoStream: VideoStreamInfo, - audioStream?: AudioStreamInfo, + video: VideoStreamInfo, + audio?: AudioStreamInfo, format?: VideoFormat, ): TranscodeCommand; -} - -export interface VideoCodecHWConfig extends VideoCodecSWConfig { - getSupportedCodecs(): Array; + getHlsCommand(options: HlsCommandOptions, video: VideoStreamInfo, audio?: AudioStreamInfo): string[]; } export interface ProbeOptions { @@ -380,6 +395,7 @@ export type JobItem = // Cleanup | { name: JobName.SessionCleanup; data?: IBaseJob } + | { name: JobName.HlsSessionCleanup; data?: IBaseJob } // Tags | { name: JobName.TagCleanup; data?: IBaseJob } diff --git a/server/src/utils/database.ts b/server/src/utils/database.ts index bc530f2b03..4b2c27e477 100644 --- a/server/src/utils/database.ts +++ b/server/src/utils/database.ts @@ -71,10 +71,13 @@ export const removeUndefinedKeys = (update: T, template: unkno }; export const ASSET_CHECKSUM_CONSTRAINT = 'UQ_assets_owner_checksum'; +export const VIDEO_STREAM_SESSION_PK_CONSTRAINT = 'video_stream_session_pkey'; -export const isAssetChecksumConstraint = (error: unknown) => { - return (error as PostgresError)?.constraint_name === 'UQ_assets_owner_checksum'; -}; +export const isAssetChecksumConstraint = (error: unknown) => + (error as PostgresError)?.constraint_name === ASSET_CHECKSUM_CONSTRAINT; + +export const isVideoStreamSessionPkConstraint = (error: unknown) => + (error as PostgresError)?.constraint_name === VIDEO_STREAM_SESSION_PK_CONSTRAINT; export function withDefaultVisibility(qb: SelectQueryBuilder) { return qb.where('asset.visibility', 'in', [sql.lit(AssetVisibility.Archive), sql.lit(AssetVisibility.Timeline)]); diff --git a/server/src/utils/media.ts b/server/src/utils/media.ts index 49e11edab7..5ebae2e2e3 100644 --- a/server/src/utils/media.ts +++ b/server/src/utils/media.ts @@ -1,4 +1,4 @@ -import { AUDIO_ENCODER } from 'src/constants'; +import { AUDIO_ENCODER, SUPPORTED_HWA_CODECS } from 'src/constants'; import { SystemConfigFFmpegDto } from 'src/dtos/system-config.dto'; import { ColorMatrix, @@ -13,38 +13,56 @@ import { import { AudioStreamInfo, BitrateDistribution, + HlsCommandOptions, TranscodeCommand, - VideoCodecHWConfig, VideoCodecSWConfig, VideoFormat, VideoInterfaces, VideoStreamInfo, + VideoTuning, } from 'src/types'; +export const isVideoRotated = (videoStream: VideoStreamInfo): boolean => Math.abs(videoStream.rotation) === 90; + +export const isVideoVertical = (videoStream: VideoStreamInfo): boolean => + videoStream.height > videoStream.width || isVideoRotated(videoStream); + +export const getOutputSize = (videoStream: VideoStreamInfo, targetRes: number) => { + const factor = Math.max(videoStream.height, videoStream.width) / Math.min(videoStream.height, videoStream.width); + let larger = Math.round(targetRes * factor); + if (larger % 2 !== 0) { + larger -= 1; + } + return isVideoVertical(videoStream) ? { width: targetRes, height: larger } : { width: larger, height: targetRes }; +}; + export class BaseConfig implements VideoCodecSWConfig { readonly presets = ['veryslow', 'slower', 'slow', 'medium', 'fast', 'faster', 'veryfast', 'superfast', 'ultrafast']; - protected constructor(protected config: SystemConfigFFmpegDto) {} + protected constructor( + protected config: SystemConfigFFmpegDto, + protected tune: VideoTuning = { strictGop: false, lowLatency: false }, + ) {} - static create(config: SystemConfigFFmpegDto, interfaces: VideoInterfaces): VideoCodecSWConfig { + static create(config: SystemConfigFFmpegDto, interfaces: VideoInterfaces, tune?: VideoTuning) { if (config.accel === TranscodeHardwareAcceleration.Disabled) { - return this.getSWCodecConfig(config); + return this.getSWCodecConfig(config, tune); } - return this.getHWCodecConfig(config, interfaces); + return this.getHWCodecConfig(config, interfaces, tune); } - private static getSWCodecConfig(config: SystemConfigFFmpegDto) { + private static getSWCodecConfig(config: SystemConfigFFmpegDto, tune?: VideoTuning): VideoCodecSWConfig { switch (config.targetVideoCodec) { case VideoCodec.H264: { - return new H264Config(config); + return new H264Config(config, tune); } case VideoCodec.Hevc: { - return new HEVCConfig(config); + return new HEVCConfig(config, tune); } case VideoCodec.Vp9: { - return new VP9Config(config); + return new VP9Config(config, tune); } case VideoCodec.Av1: { - return new AV1Config(config); + return new AV1Config(config, tune); } default: { throw new Error(`Codec '${config.targetVideoCodec}' is unsupported`); @@ -52,72 +70,126 @@ export class BaseConfig implements VideoCodecSWConfig { } } - private static getHWCodecConfig(config: SystemConfigFFmpegDto, interfaces: VideoInterfaces) { - let handler: VideoCodecHWConfig; + private static getHWCodecConfig(config: SystemConfigFFmpegDto, interfaces: VideoInterfaces, tune?: VideoTuning) { + if (!SUPPORTED_HWA_CODECS[config.accel].includes(config.targetVideoCodec)) { + throw new Error( + `${config.accel.toUpperCase()} acceleration does not support codec '${config.targetVideoCodec.toUpperCase()}'. Supported codecs: ${SUPPORTED_HWA_CODECS[config.accel]}`, + ); + } + + let handler: VideoCodecSWConfig; switch (config.accel) { case TranscodeHardwareAcceleration.Nvenc: { handler = config.accelDecode - ? new NvencHwDecodeConfig(config, interfaces) - : new NvencSwDecodeConfig(config, interfaces); + ? new NvencHwDecodeConfig(config, interfaces, tune) + : new NvencSwDecodeConfig(config, interfaces, tune); break; } case TranscodeHardwareAcceleration.Qsv: { handler = config.accelDecode - ? new QsvHwDecodeConfig(config, interfaces) - : new QsvSwDecodeConfig(config, interfaces); + ? new QsvHwDecodeConfig(config, interfaces, tune) + : new QsvSwDecodeConfig(config, interfaces, tune); break; } case TranscodeHardwareAcceleration.Vaapi: { handler = config.accelDecode - ? new VaapiHwDecodeConfig(config, interfaces) - : new VaapiSwDecodeConfig(config, interfaces); + ? new VaapiHwDecodeConfig(config, interfaces, tune) + : new VaapiSwDecodeConfig(config, interfaces, tune); break; } case TranscodeHardwareAcceleration.Rkmpp: { handler = config.accelDecode - ? new RkmppHwDecodeConfig(config, interfaces) - : new RkmppSwDecodeConfig(config, interfaces); + ? new RkmppHwDecodeConfig(config, interfaces, tune) + : new RkmppSwDecodeConfig(config, interfaces, tune); break; } default: { throw new Error(`${config.accel.toUpperCase()} acceleration is unsupported`); } } - if (!handler.getSupportedCodecs().includes(config.targetVideoCodec)) { - throw new Error( - `${config.accel.toUpperCase()} acceleration does not support codec '${config.targetVideoCodec.toUpperCase()}'. Supported codecs: ${handler.getSupportedCodecs()}`, - ); - } return handler; } - getCommand( - target: TranscodeTarget, - videoStream: VideoStreamInfo, - audioStream?: AudioStreamInfo, - format?: VideoFormat, - ) { + getCommand(target: TranscodeTarget, video: VideoStreamInfo, audio?: AudioStreamInfo, format?: VideoFormat) { const options = { - inputOptions: this.getBaseInputOptions(videoStream, format), - outputOptions: [...this.getBaseOutputOptions(target, videoStream, audioStream), '-v', 'verbose'], + inputOptions: this.getBaseInputOptions(video, format), + outputOptions: [ + ...this.getBaseOutputOptions(target, video, audio), + ...this.getPresetOptions(), + ...this.getBitrateOptions(), + ...this.getEncoderOptions(), + '-movflags', + 'faststart', + '-fps_mode', + 'passthrough', + '-v', + 'verbose', + ], twoPass: this.eligibleForTwoPass(), - progress: { frameCount: videoStream.frameCount, percentInterval: 5 }, + progress: { frameCount: video.frameCount, percentInterval: 5 }, } as TranscodeCommand; if ([TranscodeTarget.All, TranscodeTarget.Video].includes(target)) { - const filters = this.getFilterOptions(videoStream); + const filters = this.getFilterOptions(video); if (filters.length > 0) { options.outputOptions.push('-vf', filters.join(',')); } } - options.outputOptions.push( + return options; + } + + getHlsCommand(options: HlsCommandOptions, video: VideoStreamInfo, audio?: AudioStreamInfo) { + const args: string[] = this.getBaseInputOptions(video); + if (options.seekSeconds) { + args.push('-ss', String(options.seekSeconds)); + } + args.push( + '-nostdin', + '-nostats', + '-i', + options.inputPath, + ...this.getBaseOutputOptions(options.target, video, audio), ...this.getPresetOptions(), - ...this.getOutputThreadOptions(), ...this.getBitrateOptions(), + ...this.getEncoderOptions(), + // -start_at_zero only applies meaningfully when seekSeconds is 0. + // Combined with -ss N -copyts, it would shift the first frame to t=0 + // and break tfdt alignment with the playlist. + ...(options.seekSeconds ? [] : ['-start_at_zero']), + '-copyts', + '-r', + `${options.packetCount * options.timeBase}/${options.totalDuration}`, + '-avoid_negative_ts', + 'disabled', + '-f', + 'hls', + '-hls_time', + String(options.segmentDuration), + '-hls_list_size', + '0', + '-hls_segment_type', + 'fmp4', + '-hls_fmp4_init_filename', + options.initFilename, + '-hls_segment_options', + 'movflags=+frag_discont', + '-hls_flags', + 'temp_file', + '-hls_segment_filename', + options.segmentFilename, + '-start_number', + String(options.startSegment), ); - return options; + if ([TranscodeTarget.All, TranscodeTarget.Video].includes(options.target)) { + const filters = this.getFilterOptions(video); + if (filters.length > 0) { + args.push('-vf', filters.join(',')); + } + } + args.push(options.playlistFilename); + return args; } // eslint-disable-next-line @typescript-eslint/no-unused-vars @@ -129,23 +201,7 @@ export class BaseConfig implements VideoCodecSWConfig { const videoCodec = [TranscodeTarget.All, TranscodeTarget.Video].includes(target) ? this.getVideoCodec() : 'copy'; const audioCodec = [TranscodeTarget.All, TranscodeTarget.Audio].includes(target) ? this.getAudioEncoder() : 'copy'; - const options = [ - '-c:v', - videoCodec, - '-c:a', - audioCodec, - // Makes a second pass moving the moov atom to the - // beginning of the file for improved playback speed. - '-movflags', - 'faststart', - '-fps_mode', - 'passthrough', - '-map', - `0:${videoStream.index}`, - '-map_metadata', - '-1', - ]; - + const options = ['-c:v', videoCodec, '-c:a', audioCodec, '-map', `0:${videoStream.index}`, '-map_metadata', '-1']; if (audioStream) { options.push('-map', `0:${audioStream.index}`); } @@ -157,18 +213,22 @@ export class BaseConfig implements VideoCodecSWConfig { } if (this.getGopSize() > 0) { options.push('-g', `${this.getGopSize()}`); + if (this.tune.strictGop) { + options.push('-keyint_min', `${this.getGopSize()}`); + } } - - if ( - this.config.targetVideoCodec === VideoCodec.Hevc && - (videoCodec !== 'copy' || videoStream.codecName === 'hevc') - ) { + const isHvc = (videoCodec === 'copy' ? videoStream.codecName : videoCodec) === VideoCodec.Hevc; + if (isHvc) { options.push('-tag:v', 'hvc1'); } return options; } + getEncoderOptions(): string[] { + return []; + } + getFilterOptions(videoStream: VideoStreamInfo) { const options = []; if (this.shouldScale(videoStream)) { @@ -272,25 +332,7 @@ export class BaseConfig implements VideoCodecSWConfig { getScaling(videoStream: VideoStreamInfo, mult = 2) { const targetResolution = this.getTargetResolution(videoStream); - return this.isVideoVertical(videoStream) ? `${targetResolution}:-${mult}` : `-${mult}:${targetResolution}`; - } - - getSize(videoStream: VideoStreamInfo) { - const smaller = this.getTargetResolution(videoStream); - const factor = Math.max(videoStream.height, videoStream.width) / Math.min(videoStream.height, videoStream.width); - let larger = Math.round(smaller * factor); - if (larger % 2 !== 0) { - larger -= 1; - } - return this.isVideoVertical(videoStream) ? { width: smaller, height: larger } : { width: larger, height: smaller }; - } - - isVideoRotated(videoStream: VideoStreamInfo) { - return Math.abs(videoStream.rotation) === 90; - } - - isVideoVertical(videoStream: VideoStreamInfo) { - return videoStream.height > videoStream.width || this.isVideoRotated(videoStream); + return isVideoVertical(videoStream) ? `${targetResolution}:-${mult}` : `-${mult}:${targetResolution}`; } isBitrateConstrained() { @@ -353,23 +395,18 @@ export class BaseConfig implements VideoCodecSWConfig { } } -export class BaseHWConfig extends BaseConfig implements VideoCodecHWConfig { +export class BaseHWConfig extends BaseConfig { protected device: string; - protected interfaces: VideoInterfaces; constructor( protected config: SystemConfigFFmpegDto, - interfaces: VideoInterfaces, + protected interfaces: VideoInterfaces, + tune?: VideoTuning, ) { - super(config); - this.interfaces = interfaces; + super(config, tune); this.device = this.getDevice(interfaces); } - getSupportedCodecs() { - return [VideoCodec.H264, VideoCodec.Hevc]; - } - validateDevices(devices: string[]) { if (devices.length === 0) { throw new Error('No /dev/dri devices found. If using Docker, make sure at least one /dev/dri device is mounted'); @@ -474,24 +511,32 @@ export class ThumbnailConfig extends BaseConfig { } export class H264Config extends BaseConfig { - getOutputThreadOptions() { - const options = super.getOutputThreadOptions(); - if (this.config.threads === 1) { - options.push('-x264-params', 'frame-threads=1:pools=none'); + getEncoderOptions(): string[] { + const out = this.getOutputThreadOptions(); + if (this.tune.strictGop) { + out.push('-sc_threshold:v', '0'); } - - return options; + if (this.config.threads === 1) { + out.push('-x264-params', 'frame-threads=1:pools=none'); + } + return out; } } export class HEVCConfig extends BaseConfig { - getOutputThreadOptions() { - const options = super.getOutputThreadOptions(); - if (this.config.threads === 1) { - options.push('-x265-params', 'frame-threads=1:pools=none'); + getEncoderOptions(): string[] { + const out: string[] = this.getOutputThreadOptions(); + const params: string[] = []; + if (this.tune.strictGop) { + params.push('no-scenecut=1', 'no-open-gop=1'); } - - return options; + if (this.config.threads === 1) { + params.push('frame-threads=1', 'pools=none'); + } + if (params.length > 0) { + out.push('-x265-params', params.join(':')); + } + return out; } } @@ -520,8 +565,8 @@ export class VP9Config extends BaseConfig { return [`-${this.useCQP() ? 'q:v' : 'crf'}`, `${this.config.crf}`, '-b:v', `${bitrates.max}${bitrates.unit}`]; } - getOutputThreadOptions() { - return ['-row-mt', '1', ...super.getOutputThreadOptions()]; + getEncoderOptions(): string[] { + return ['-row-mt', '1', ...this.getOutputThreadOptions()]; } eligibleForTwoPass() { @@ -543,23 +588,22 @@ export class AV1Config extends BaseConfig { } getBitrateOptions() { - const options = ['-crf', `${this.config.crf}`]; - const bitrates = this.getBitrateDistribution(); - const svtparams = []; - if (this.config.threads > 0) { - svtparams.push(`lp=${this.config.threads}`); - } - if (bitrates.max > 0) { - svtparams.push(`mbr=${bitrates.max}${bitrates.unit}`); - } - if (svtparams.length > 0) { - options.push('-svtav1-params', svtparams.join(':')); - } - return options; + return ['-crf', `${this.config.crf}`]; } - getOutputThreadOptions() { - return []; // Already set above with svtav1-params + getEncoderOptions(): string[] { + const params: string[] = []; + if (this.tune.lowLatency) { + params.push('hierarchical-levels=3', 'lookahead=0', 'enable-tf=0'); + } + if (this.config.threads > 0) { + params.push(`lp=${this.config.threads}`); + } + const bitrates = this.getBitrateDistribution(); + if (bitrates.max > 0) { + params.push(`mbr=${bitrates.max}${bitrates.unit}`); + } + return params.length > 0 ? ['-svtav1-params', params.join(':')] : []; } eligibleForTwoPass() { @@ -572,10 +616,6 @@ export class NvencSwDecodeConfig extends BaseHWConfig { return '0'; } - getSupportedCodecs() { - return [VideoCodec.H264, VideoCodec.Hevc, VideoCodec.Av1]; - } - getBaseInputOptions() { return ['-init_hw_device', `cuda=cuda:${this.device}`, '-filter_hw_device', 'cuda']; } @@ -652,6 +692,14 @@ export class NvencSwDecodeConfig extends BaseHWConfig { return []; } + getEncoderOptions(): string[] { + const out = this.getOutputThreadOptions(); + if (this.tune.strictGop) { + out.push('-forced-idr', '1'); + } + return out; + } + getRefs() { const bframes = this.getBFrames(); if (bframes > 0 && bframes < 3 && this.config.refs < 3) { @@ -703,8 +751,8 @@ export class NvencHwDecodeConfig extends NvencSwDecodeConfig { return ['-threads', '1']; } - getOutputThreadOptions() { - return []; + getEncoderOptions(): string[] { + return this.tune.strictGop ? ['-forced-idr', '1'] : []; } } @@ -749,10 +797,6 @@ export class QsvSwDecodeConfig extends BaseHWConfig { return options; } - getSupportedCodecs() { - return [VideoCodec.H264, VideoCodec.Hevc, VideoCodec.Vp9, VideoCodec.Av1]; - } - // recommended from https://github.com/intel/media-delivery/blob/master/doc/benchmarks/intel-iris-xe-max-graphics/intel-iris-xe-max-graphics.md getBFrames() { if (this.config.bframes < 0) { @@ -775,6 +819,14 @@ export class QsvSwDecodeConfig extends BaseHWConfig { getScaling(videoStream: VideoStreamInfo): string { return super.getScaling(videoStream, 1); } + + getEncoderOptions(): string[] { + const out = this.getOutputThreadOptions(); + if (this.tune.strictGop) { + out.push('-idr_interval', '0'); + } + return out; + } } export class QsvHwDecodeConfig extends QsvSwDecodeConfig { @@ -888,13 +940,17 @@ export class VaapiSwDecodeConfig extends BaseHWConfig { return options; } - getSupportedCodecs() { - return [VideoCodec.H264, VideoCodec.Hevc, VideoCodec.Vp9, VideoCodec.Av1]; - } - useCQP() { return this.config.cqMode !== CQMode.Icq || this.config.targetVideoCodec === VideoCodec.Vp9; } + + getEncoderOptions(): string[] { + const out = this.getOutputThreadOptions(); + if (this.tune.strictGop) { + out.push('-idr_interval', '0'); + } + return out; + } } export class VaapiHwDecodeConfig extends VaapiSwDecodeConfig { @@ -988,10 +1044,6 @@ export class RkmppSwDecodeConfig extends BaseHWConfig { return ['-rc_mode', 'CQP', '-qp_init', `${this.config.crf}`]; } - getSupportedCodecs() { - return [VideoCodec.H264, VideoCodec.Hevc]; - } - getVideoCodec(): string { return `${this.config.targetVideoCodec}_rkmpp`; } diff --git a/server/test/repositories/storage.repository.mock.ts b/server/test/repositories/storage.repository.mock.ts index 85c72b6c10..f323527df8 100644 --- a/server/test/repositories/storage.repository.mock.ts +++ b/server/test/repositories/storage.repository.mock.ts @@ -74,5 +74,6 @@ export const newStorageRepositoryMock = (): Mocked ({ close: vitest.fn(), on: vitest.fn() })), }; }; diff --git a/server/test/utils.ts b/server/test/utils.ts index 791a457783..d949428a88 100644 --- a/server/test/utils.ts +++ b/server/test/utils.ts @@ -181,7 +181,11 @@ export const automock = ( const mocks: Mock[] = []; const instance = new Dependency(...args); - for (const property of Object.getOwnPropertyNames(Dependency.prototype)) { + const propertyNames = new Set([ + ...Object.getOwnPropertyNames(Dependency.prototype), + ...Object.getOwnPropertyNames(instance), + ]); + for (const property of propertyNames) { if (property === 'constructor') { continue; } @@ -346,7 +350,7 @@ export const getMocks = () => { trash: automock(TrashRepository), user: automock(UserRepository, { strict: false }), versionHistory: automock(VersionHistoryRepository), - videoStream: automock(VideoStreamRepository), + videoStream: automock(VideoStreamRepository, { strict: false }), view: automock(ViewRepository), // eslint-disable-next-line no-sparse-arrays websocket: automock(WebsocketRepository, { args: [, loggerMock], strict: false }), @@ -500,6 +504,7 @@ export const mockSpawn = vitest.fn((exitCode: number, stdout: string, stderr: st callback(exitCode); } }), + kill: vitest.fn(), } as unknown as ChildProcessWithoutNullStreams; });