fix backpressure

This commit is contained in:
mertalev
2026-05-08 14:19:05 -04:00
parent 96ba978cc8
commit ac004e9d84
2 changed files with 111 additions and 1 deletions
+106 -1
View File
@@ -1,4 +1,10 @@
import { HLS_CLEANUP_INTERVAL_MS, HLS_INACTIVITY_TIMEOUT_MS, HLS_LEASE_DURATION_MS } from 'src/constants';
import {
HLS_BACKPRESSURE_PAUSE_SEGMENTS,
HLS_BACKPRESSURE_RESUME_SEGMENTS,
HLS_CLEANUP_INTERVAL_MS,
HLS_INACTIVITY_TIMEOUT_MS,
HLS_LEASE_DURATION_MS,
} from 'src/constants';
import { TranscodingService } from 'src/services/transcoding.service';
import { VIDEO_STREAM_SESSION_PK_CONSTRAINT } from 'src/utils/database';
import { eiffelTower, train, waterfall } from 'test/fixtures/media.stub';
@@ -161,6 +167,105 @@ describe(TranscodingService.name, () => {
});
});
describe('backpressure', () => {
let proc: ReturnType<typeof mockSpawn>;
beforeEach(async () => {
proc = mockSpawn(0, '', '');
mocks.process.spawn.mockReturnValue(proc);
await sut.onSessionRequest({ sessionId, assetId, ownerId });
await sut.onSegmentRequest({ sessionId, assetId, variantIndex: 0, segmentIndex: 0 });
});
const completeSegment = (index: number) => {
const listener = vi.mocked(mocks.storage.watchDir).mock.lastCall?.[1];
expect(listener).toBeDefined();
listener!('rename', `seg_${index}.m4s`);
};
it('pauses the transcode once the lead exceeds HLS_BACKPRESSURE_PAUSE_SEGMENTS', async () => {
completeSegment(HLS_BACKPRESSURE_PAUSE_SEGMENTS + 1);
await sut.onHeartbeat({ sessionId, segmentIndex: 0 });
expect(proc.kill).toHaveBeenCalledWith('SIGSTOP');
});
it('does not pause when the lead equals the pause threshold', async () => {
completeSegment(HLS_BACKPRESSURE_PAUSE_SEGMENTS);
await sut.onHeartbeat({ sessionId, segmentIndex: 0 });
expect(proc.kill).not.toHaveBeenCalled();
});
it('resumes once the lead drops below HLS_BACKPRESSURE_RESUME_SEGMENTS', async () => {
completeSegment(HLS_BACKPRESSURE_PAUSE_SEGMENTS + 1);
await sut.onHeartbeat({ sessionId, segmentIndex: 0 });
expect(proc.kill).toHaveBeenCalledWith('SIGSTOP');
vi.mocked(proc.kill).mockClear();
const requested = HLS_BACKPRESSURE_PAUSE_SEGMENTS + 1 - (HLS_BACKPRESSURE_RESUME_SEGMENTS - 1);
await sut.onHeartbeat({ sessionId, segmentIndex: requested });
expect(proc.kill).toHaveBeenCalledWith('SIGCONT');
});
it('stays paused while the lead is in the dead-band', async () => {
completeSegment(HLS_BACKPRESSURE_PAUSE_SEGMENTS + 1);
await sut.onHeartbeat({ sessionId, segmentIndex: 0 });
vi.mocked(proc.kill).mockClear();
const requested = HLS_BACKPRESSURE_PAUSE_SEGMENTS + 1 - HLS_BACKPRESSURE_RESUME_SEGMENTS;
await sut.onHeartbeat({ sessionId, segmentIndex: requested });
expect(proc.kill).not.toHaveBeenCalled();
});
it('is a no-op when no segment has completed yet', async () => {
await sut.onHeartbeat({ sessionId, segmentIndex: 0 });
expect(proc.kill).not.toHaveBeenCalled();
});
it('is a no-op when the heartbeat omits segmentIndex', async () => {
completeSegment(HLS_BACKPRESSURE_PAUSE_SEGMENTS + 1);
await sut.onHeartbeat({ sessionId });
expect(proc.kill).not.toHaveBeenCalled();
});
it('resumes the paused transcode when the client requests the next in-range segment', async () => {
completeSegment(HLS_BACKPRESSURE_PAUSE_SEGMENTS + 1);
await sut.onHeartbeat({ sessionId, segmentIndex: 0 });
expect(proc.kill).toHaveBeenCalledWith('SIGSTOP');
vi.mocked(proc.kill).mockClear();
await sut.onSegmentRequest({ sessionId, assetId, variantIndex: 0, segmentIndex: 1 });
expect(proc.kill).toHaveBeenCalledWith('SIGCONT');
expect(mocks.process.spawn).toHaveBeenCalledTimes(1);
});
it('does not re-pause a freshly spawned transcode after a seek-driven restart', async () => {
const newProc = mockSpawn(0, '', '');
mocks.process.spawn.mockReturnValueOnce(newProc);
completeSegment(HLS_BACKPRESSURE_PAUSE_SEGMENTS + 1);
await sut.onHeartbeat({ sessionId, segmentIndex: 0 });
expect(proc.kill).toHaveBeenCalledWith('SIGSTOP');
await sut.onSegmentRequest({ sessionId, assetId, variantIndex: 1, segmentIndex: 0 });
vi.mocked(newProc.kill).mockClear();
await sut.onHeartbeat({ sessionId, segmentIndex: 0 });
expect(newProc.kill).not.toHaveBeenCalled();
});
});
describe('inactivity sweeper', () => {
it('reaps a session whose last activity exceeds the inactivity timeout', async () => {
vi.useFakeTimers();
@@ -158,6 +158,7 @@ export class TranscodingService extends BaseService {
return;
}
const lead = session.lastCompletedSegment - session.lastClientRequestedSegment;
this.logger.debug(`Session ${session.id} lead is ${lead} segments`);
if (!session.paused && lead > HLS_BACKPRESSURE_PAUSE_SEGMENTS) {
this.pauseTranscode(session);
} else if (session.paused && lead < HLS_BACKPRESSURE_RESUME_SEGMENTS) {
@@ -278,6 +279,7 @@ export class TranscodingService extends BaseService {
variantIndex,
segmentIndex,
});
this.applyBackpressure(session);
});
watcher.on('error', (error) => {
this.logger.error(`watcher error for ${variantDir}: ${error}`);
@@ -315,6 +317,7 @@ export class TranscodingService extends BaseService {
this.resumeTranscode(session);
session.process.kill();
session.process = null;
session.lastCompletedSegment = null;
}
private pauseTranscode(session: Session) {
@@ -323,6 +326,7 @@ export class TranscodingService extends BaseService {
}
session.process.kill('SIGSTOP');
session.paused = true;
this.logger.debug(`Paused transcoding for session ${session.id}`);
}
private resumeTranscode(session: Session) {
@@ -331,6 +335,7 @@ export class TranscodingService extends BaseService {
}
session.process.kill('SIGCONT');
session.paused = false;
this.logger.debug(`Resumed transcoding for session ${session.id}`);
}
private async removeSessionDir(session: { ownerId: string; id: string }) {