From ac004e9d840ff58173d34f6cfbc044e9b726ad18 Mon Sep 17 00:00:00 2001 From: mertalev <101130780+mertalev@users.noreply.github.com> Date: Fri, 8 May 2026 14:19:05 -0400 Subject: [PATCH] fix backpressure --- .../src/services/transcoding.service.spec.ts | 107 +++++++++++++++++- server/src/services/transcoding.service.ts | 5 + 2 files changed, 111 insertions(+), 1 deletion(-) diff --git a/server/src/services/transcoding.service.spec.ts b/server/src/services/transcoding.service.spec.ts index 4629033711..18c45e1b0e 100644 --- a/server/src/services/transcoding.service.spec.ts +++ b/server/src/services/transcoding.service.spec.ts @@ -1,4 +1,10 @@ -import { HLS_CLEANUP_INTERVAL_MS, HLS_INACTIVITY_TIMEOUT_MS, HLS_LEASE_DURATION_MS } from 'src/constants'; +import { + HLS_BACKPRESSURE_PAUSE_SEGMENTS, + HLS_BACKPRESSURE_RESUME_SEGMENTS, + HLS_CLEANUP_INTERVAL_MS, + HLS_INACTIVITY_TIMEOUT_MS, + HLS_LEASE_DURATION_MS, +} from 'src/constants'; import { TranscodingService } from 'src/services/transcoding.service'; import { VIDEO_STREAM_SESSION_PK_CONSTRAINT } from 'src/utils/database'; import { eiffelTower, train, waterfall } from 'test/fixtures/media.stub'; @@ -161,6 +167,105 @@ describe(TranscodingService.name, () => { }); }); + describe('backpressure', () => { + let proc: ReturnType; + + beforeEach(async () => { + proc = mockSpawn(0, '', ''); + mocks.process.spawn.mockReturnValue(proc); + + await sut.onSessionRequest({ sessionId, assetId, ownerId }); + await sut.onSegmentRequest({ sessionId, assetId, variantIndex: 0, segmentIndex: 0 }); + }); + + const completeSegment = (index: number) => { + const listener = vi.mocked(mocks.storage.watchDir).mock.lastCall?.[1]; + expect(listener).toBeDefined(); + listener!('rename', `seg_${index}.m4s`); + }; + + it('pauses the transcode once the lead exceeds HLS_BACKPRESSURE_PAUSE_SEGMENTS', async () => { + completeSegment(HLS_BACKPRESSURE_PAUSE_SEGMENTS + 1); + + await sut.onHeartbeat({ sessionId, segmentIndex: 0 }); + + expect(proc.kill).toHaveBeenCalledWith('SIGSTOP'); + }); + + it('does not pause when the lead equals the pause threshold', async () => { + completeSegment(HLS_BACKPRESSURE_PAUSE_SEGMENTS); + + await sut.onHeartbeat({ sessionId, segmentIndex: 0 }); + + expect(proc.kill).not.toHaveBeenCalled(); + }); + + it('resumes once the lead drops below HLS_BACKPRESSURE_RESUME_SEGMENTS', async () => { + completeSegment(HLS_BACKPRESSURE_PAUSE_SEGMENTS + 1); + await sut.onHeartbeat({ sessionId, segmentIndex: 0 }); + expect(proc.kill).toHaveBeenCalledWith('SIGSTOP'); + vi.mocked(proc.kill).mockClear(); + + const requested = HLS_BACKPRESSURE_PAUSE_SEGMENTS + 1 - (HLS_BACKPRESSURE_RESUME_SEGMENTS - 1); + await sut.onHeartbeat({ sessionId, segmentIndex: requested }); + + expect(proc.kill).toHaveBeenCalledWith('SIGCONT'); + }); + + it('stays paused while the lead is in the dead-band', async () => { + completeSegment(HLS_BACKPRESSURE_PAUSE_SEGMENTS + 1); + await sut.onHeartbeat({ sessionId, segmentIndex: 0 }); + vi.mocked(proc.kill).mockClear(); + + const requested = HLS_BACKPRESSURE_PAUSE_SEGMENTS + 1 - HLS_BACKPRESSURE_RESUME_SEGMENTS; + await sut.onHeartbeat({ sessionId, segmentIndex: requested }); + + expect(proc.kill).not.toHaveBeenCalled(); + }); + + it('is a no-op when no segment has completed yet', async () => { + await sut.onHeartbeat({ sessionId, segmentIndex: 0 }); + + expect(proc.kill).not.toHaveBeenCalled(); + }); + + it('is a no-op when the heartbeat omits segmentIndex', async () => { + completeSegment(HLS_BACKPRESSURE_PAUSE_SEGMENTS + 1); + + await sut.onHeartbeat({ sessionId }); + + expect(proc.kill).not.toHaveBeenCalled(); + }); + + it('resumes the paused transcode when the client requests the next in-range segment', async () => { + completeSegment(HLS_BACKPRESSURE_PAUSE_SEGMENTS + 1); + await sut.onHeartbeat({ sessionId, segmentIndex: 0 }); + expect(proc.kill).toHaveBeenCalledWith('SIGSTOP'); + vi.mocked(proc.kill).mockClear(); + + await sut.onSegmentRequest({ sessionId, assetId, variantIndex: 0, segmentIndex: 1 }); + + expect(proc.kill).toHaveBeenCalledWith('SIGCONT'); + expect(mocks.process.spawn).toHaveBeenCalledTimes(1); + }); + + it('does not re-pause a freshly spawned transcode after a seek-driven restart', async () => { + const newProc = mockSpawn(0, '', ''); + mocks.process.spawn.mockReturnValueOnce(newProc); + + completeSegment(HLS_BACKPRESSURE_PAUSE_SEGMENTS + 1); + await sut.onHeartbeat({ sessionId, segmentIndex: 0 }); + expect(proc.kill).toHaveBeenCalledWith('SIGSTOP'); + + await sut.onSegmentRequest({ sessionId, assetId, variantIndex: 1, segmentIndex: 0 }); + vi.mocked(newProc.kill).mockClear(); + + await sut.onHeartbeat({ sessionId, segmentIndex: 0 }); + + expect(newProc.kill).not.toHaveBeenCalled(); + }); + }); + describe('inactivity sweeper', () => { it('reaps a session whose last activity exceeds the inactivity timeout', async () => { vi.useFakeTimers(); diff --git a/server/src/services/transcoding.service.ts b/server/src/services/transcoding.service.ts index df6f5df8b8..926462dda7 100644 --- a/server/src/services/transcoding.service.ts +++ b/server/src/services/transcoding.service.ts @@ -158,6 +158,7 @@ export class TranscodingService extends BaseService { return; } const lead = session.lastCompletedSegment - session.lastClientRequestedSegment; + this.logger.debug(`Session ${session.id} lead is ${lead} segments`); if (!session.paused && lead > HLS_BACKPRESSURE_PAUSE_SEGMENTS) { this.pauseTranscode(session); } else if (session.paused && lead < HLS_BACKPRESSURE_RESUME_SEGMENTS) { @@ -278,6 +279,7 @@ export class TranscodingService extends BaseService { variantIndex, segmentIndex, }); + this.applyBackpressure(session); }); watcher.on('error', (error) => { this.logger.error(`watcher error for ${variantDir}: ${error}`); @@ -315,6 +317,7 @@ export class TranscodingService extends BaseService { this.resumeTranscode(session); session.process.kill(); session.process = null; + session.lastCompletedSegment = null; } private pauseTranscode(session: Session) { @@ -323,6 +326,7 @@ export class TranscodingService extends BaseService { } session.process.kill('SIGSTOP'); session.paused = true; + this.logger.debug(`Paused transcoding for session ${session.id}`); } private resumeTranscode(session: Session) { @@ -331,6 +335,7 @@ export class TranscodingService extends BaseService { } session.process.kill('SIGCONT'); session.paused = false; + this.logger.debug(`Resumed transcoding for session ${session.id}`); } private async removeSessionDir(session: { ownerId: string; id: string }) {