Fix fast restream in saved permanent live

This commit is contained in:
Chocobozzz 2022-06-23 10:29:43 +02:00
parent 50341c8fe9
commit 53023be33a
No known key found for this signature in database
GPG Key ID: 583A612D890159BE
9 changed files with 182 additions and 65 deletions
server
shared/server-commands

View File

@ -865,7 +865,7 @@ if (isTestInstance() === true) {
PLUGIN_EXTERNAL_AUTH_TOKEN_LIFETIME = 5000
VIDEO_LIVE.CLEANUP_DELAY = 5000
VIDEO_LIVE.CLEANUP_DELAY = getIntEnv('PEERTUBE_TEST_CONSTANTS.VIDEO_LIVE.CLEANUP_DELAY') ?? 5000
VIDEO_LIVE.SEGMENT_TIME_SECONDS.DEFAULT_LATENCY = 2
VIDEO_LIVE.SEGMENT_TIME_SECONDS.SMALL_LATENCY = 1
VIDEO_LIVE.EDGE_LIVE_DELAY_SEGMENTS_NOTIFICATION = 1
@ -1174,3 +1174,9 @@ function buildLanguages () {
function generateContentHash () {
return randomBytes(20).toString('hex')
}
function getIntEnv (path: string) {
if (process.env[path]) return parseInt(process.env[path])
return undefined
}

View File

@ -4,7 +4,7 @@ import { join } from 'path'
import { ffprobePromise, getAudioStream, getVideoStreamDimensionsInfo, getVideoStreamDuration } from '@server/helpers/ffmpeg'
import { getLocalVideoActivityPubUrl } from '@server/lib/activitypub/url'
import { federateVideoIfNeeded } from '@server/lib/activitypub/videos'
import { cleanupNormalLive, cleanupPermanentLive, cleanupTMPLiveFiles, LiveSegmentShaStore } from '@server/lib/live'
import { cleanupUnsavedNormalLive, cleanupPermanentLive, cleanupTMPLiveFiles, LiveSegmentShaStore } from '@server/lib/live'
import {
generateHLSMasterPlaylistFilename,
generateHlsSha256SegmentsFilename,
@ -22,15 +22,17 @@ import { VideoLiveSessionModel } from '@server/models/video/video-live-session'
import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist'
import { MVideo, MVideoLive, MVideoLiveSession, MVideoWithAllFiles } from '@server/types/models'
import { ThumbnailType, VideoLiveEndingPayload, VideoState } from '@shared/models'
import { logger } from '../../../helpers/logger'
import { logger, loggerTagsFactory } from '../../../helpers/logger'
const lTags = loggerTagsFactory('live', 'job')
async function processVideoLiveEnding (job: Job) {
const payload = job.data as VideoLiveEndingPayload
logger.info('Processing video live ending for %s.', payload.videoId, { payload })
logger.info('Processing video live ending for %s.', payload.videoId, { payload, ...lTags() })
function logError () {
logger.warn('Video live %d does not exist anymore. Cannot process live ending.', payload.videoId)
logger.warn('Video live %d does not exist anymore. Cannot process live ending.', payload.videoId, lTags())
}
const liveVideo = await VideoModel.load(payload.videoId)
@ -73,8 +75,6 @@ async function saveReplayToExternalVideo (options: {
}) {
const { liveVideo, liveSession, publishedAt, replayDirectory } = options
await cleanupTMPLiveFiles(getLiveDirectory(liveVideo))
const video = new VideoModel({
name: `${liveVideo.name} - ${new Date(publishedAt).toLocaleString()}`,
isLive: false,
@ -243,7 +243,7 @@ async function cleanupLiveAndFederate (options: {
if (live.permanentLive) {
await cleanupPermanentLive(video, streamingPlaylist)
} else {
await cleanupNormalLive(video, streamingPlaylist)
await cleanupUnsavedNormalLive(video, streamingPlaylist)
}
}

View File

@ -10,20 +10,20 @@ function buildConcatenatedName (segmentOrPlaylistPath: string) {
return 'concat-' + num[1] + '.ts'
}
async function cleanupPermanentLive (video: MVideo, streamingPlaylist?: MStreamingPlaylist) {
async function cleanupPermanentLive (video: MVideo, streamingPlaylist: MStreamingPlaylist) {
const hlsDirectory = getLiveDirectory(video)
await cleanupTMPLiveFiles(hlsDirectory)
if (streamingPlaylist) await streamingPlaylist.destroy()
await streamingPlaylist.destroy()
}
async function cleanupNormalLive (video: MVideo, streamingPlaylist?: MStreamingPlaylist) {
async function cleanupUnsavedNormalLive (video: MVideo, streamingPlaylist: MStreamingPlaylist) {
const hlsDirectory = getLiveDirectory(video)
await remove(hlsDirectory)
if (streamingPlaylist) await streamingPlaylist.destroy()
await streamingPlaylist.destroy()
}
async function cleanupTMPLiveFiles (hlsDirectory: string) {
@ -49,7 +49,7 @@ async function cleanupTMPLiveFiles (hlsDirectory: string) {
export {
cleanupPermanentLive,
cleanupNormalLive,
cleanupUnsavedNormalLive,
cleanupTMPLiveFiles,
buildConcatenatedName
}

View File

@ -1,4 +1,5 @@
import './live-constraints'
import './live-fast-restream'
import './live-socket-messages'
import './live-permanent'
import './live-rtmps'

View File

@ -0,0 +1,128 @@
/* eslint-disable @typescript-eslint/no-unused-expressions,@typescript-eslint/require-await */
import 'mocha'
import * as chai from 'chai'
import { wait } from '@shared/core-utils'
import { HttpStatusCode, LiveVideoCreate, VideoPrivacy } from '@shared/models'
import {
cleanupTests,
createSingleServer,
makeRawRequest,
PeerTubeServer,
setAccessTokensToServers,
setDefaultVideoChannel,
stopFfmpeg,
waitJobs
} from '@shared/server-commands'
const expect = chai.expect
describe('Fast restream in live', function () {
let server: PeerTubeServer
async function createLiveWrapper (options: { permanent: boolean, replay: boolean }) {
const attributes: LiveVideoCreate = {
channelId: server.store.channel.id,
privacy: VideoPrivacy.PUBLIC,
name: 'my super live',
saveReplay: options.replay,
permanentLive: options.permanent
}
const { uuid } = await server.live.create({ fields: attributes })
return uuid
}
async function fastRestreamWrapper ({ replay }: { replay: boolean }) {
const liveVideoUUID = await createLiveWrapper({ permanent: true, replay })
await waitJobs([ server ])
const rtmpOptions = {
videoId: liveVideoUUID,
copyCodecs: true,
fixtureName: 'video_short.mp4'
}
// Streaming session #1
let ffmpegCommand = await server.live.sendRTMPStreamInVideo(rtmpOptions)
await server.live.waitUntilPublished({ videoId: liveVideoUUID })
await stopFfmpeg(ffmpegCommand)
await server.live.waitUntilWaiting({ videoId: liveVideoUUID })
// Streaming session #2
ffmpegCommand = await server.live.sendRTMPStreamInVideo(rtmpOptions)
await server.live.waitUntilSegmentGeneration({ videoUUID: liveVideoUUID, segment: 0, playlistNumber: 0, totalSessions: 2 })
return { ffmpegCommand, liveVideoUUID }
}
async function ensureLastLiveWorks (liveId: string) {
// Equivalent to PEERTUBE_TEST_CONSTANTS.VIDEO_LIVE.CLEANUP_DELAY
for (let i = 0; i < 100; i++) {
const video = await server.videos.get({ id: liveId })
expect(video.streamingPlaylists).to.have.lengthOf(1)
await server.live.getSegment({ videoUUID: liveId, segment: 0, playlistNumber: 0 })
await makeRawRequest(video.streamingPlaylists[0].playlistUrl, HttpStatusCode.OK_200)
await wait(100)
}
}
async function runTest (replay: boolean) {
const { ffmpegCommand, liveVideoUUID } = await fastRestreamWrapper({ replay })
await ensureLastLiveWorks(liveVideoUUID)
await stopFfmpeg(ffmpegCommand)
await server.live.waitUntilWaiting({ videoId: liveVideoUUID })
// Wait for replays
await waitJobs([ server ])
const { total, data: sessions } = await server.live.listSessions({ videoId: liveVideoUUID })
expect(total).to.equal(2)
expect(sessions).to.have.lengthOf(2)
for (const session of sessions) {
expect(session.error).to.be.null
if (replay) {
expect(session.replayVideo).to.exist
await server.videos.get({ id: session.replayVideo.uuid })
} else {
expect(session.replayVideo).to.not.exist
}
}
}
before(async function () {
this.timeout(120000)
const env = { 'PEERTUBE_TEST_CONSTANTS.VIDEO_LIVE.CLEANUP_DELAY': '10000' }
server = await createSingleServer(1, {}, { env })
// Get the access tokens
await setAccessTokensToServers([ server ])
await setDefaultVideoChannel([ server ])
await server.config.enableMinimumTranscoding(false, true)
await server.config.enableLive({ allowReplay: true, transcoding: true, resolutions: 'min' })
})
it('Should correctly fast reastream in a permanent live with and without save replay', async function () {
this.timeout(240000)
// A test can take a long time, so prefer to run them in parallel
await Promise.all([
runTest(true),
runTest(false)
])
})
after(async function () {
await cleanupTests([ server ])
})
})

View File

@ -12,7 +12,6 @@ import {
createMultipleServers,
doubleFollow,
findExternalSavedVideo,
makeRawRequest,
PeerTubeServer,
setAccessTokensToServers,
setDefaultVideoChannel,
@ -442,46 +441,6 @@ describe('Save replay setting', function () {
await checkVideosExist(liveVideoUUID, false, HttpStatusCode.NOT_FOUND_404)
await checkLiveCleanup(servers[0], liveVideoUUID, [])
})
it('Should correctly save replays with multiple sessions', async function () {
this.timeout(120000)
liveVideoUUID = await createLiveWrapper({ permanent: true, replay: true })
await waitJobs(servers)
// Streaming session #1
ffmpegCommand = await servers[0].live.sendRTMPStreamInVideo({ videoId: liveVideoUUID })
await waitUntilLivePublishedOnAllServers(servers, liveVideoUUID)
await stopFfmpeg(ffmpegCommand)
await servers[0].live.waitUntilWaiting({ videoId: liveVideoUUID })
// Streaming session #2
ffmpegCommand = await servers[0].live.sendRTMPStreamInVideo({ videoId: liveVideoUUID })
await waitUntilLivePublishedOnAllServers(servers, liveVideoUUID)
await wait(5000)
const video = await servers[0].videos.get({ id: liveVideoUUID })
expect(video.streamingPlaylists).to.have.lengthOf(1)
await makeRawRequest(video.streamingPlaylists[0].playlistUrl)
await stopFfmpeg(ffmpegCommand)
await waitUntilLiveWaitingOnAllServers(servers, liveVideoUUID)
// Wait for replays
await waitJobs(servers)
const { total, data: sessions } = await servers[0].live.listSessions({ videoId: liveVideoUUID })
expect(total).to.equal(2)
expect(sessions).to.have.lengthOf(2)
for (const session of sessions) {
expect(session.error).to.be.null
expect(session.replayVideo).to.exist
await servers[0].videos.get({ id: session.replayVideo.uuid })
}
})
})
after(async function () {

View File

@ -395,7 +395,7 @@ describe('Test live', function () {
for (let i = 0; i < resolutions.length; i++) {
const segmentNum = 3
const segmentName = `${i}-00000${segmentNum}.ts`
await commands[0].waitUntilSegmentGeneration({ videoUUID: video.uuid, resolution: i, segment: segmentNum })
await commands[0].waitUntilSegmentGeneration({ videoUUID: video.uuid, playlistNumber: i, segment: segmentNum })
const subPlaylist = await servers[0].streamingPlaylists.get({
url: `${servers[0].url}/static/streaming-playlists/hls/${video.uuid}/${i}.m3u8`
@ -628,9 +628,9 @@ describe('Test live', function () {
commands[0].waitUntilPublished({ videoId: liveVideoReplayId })
])
await commands[0].waitUntilSegmentGeneration({ videoUUID: liveVideoId, resolution: 0, segment: 2 })
await commands[0].waitUntilSegmentGeneration({ videoUUID: liveVideoReplayId, resolution: 0, segment: 2 })
await commands[0].waitUntilSegmentGeneration({ videoUUID: permanentLiveVideoReplayId, resolution: 0, segment: 2 })
await commands[0].waitUntilSegmentGeneration({ videoUUID: liveVideoId, playlistNumber: 0, segment: 2 })
await commands[0].waitUntilSegmentGeneration({ videoUUID: liveVideoReplayId, playlistNumber: 0, segment: 2 })
await commands[0].waitUntilSegmentGeneration({ videoUUID: permanentLiveVideoReplayId, playlistNumber: 0, segment: 2 })
{
const video = await servers[0].videos.get({ id: permanentLiveVideoReplayId })

View File

@ -39,15 +39,18 @@ export class ConfigCommand extends AbstractCommand {
enableLive (options: {
allowReplay?: boolean
transcoding?: boolean
resolutions?: 'min' | 'max' // Default max
} = {}) {
const { allowReplay, transcoding, resolutions = 'max' } = options
return this.updateExistingSubConfig({
newConfig: {
live: {
enabled: true,
allowReplay: options.allowReplay ?? true,
allowReplay: allowReplay ?? true,
transcoding: {
enabled: options.transcoding ?? true,
resolutions: ConfigCommand.getCustomConfigResolutions(true)
enabled: transcoding ?? true,
resolutions: ConfigCommand.getCustomConfigResolutions(resolutions === 'max')
}
}
}

View File

@ -154,13 +154,33 @@ export class LiveCommand extends AbstractCommand {
waitUntilSegmentGeneration (options: OverrideCommandOptions & {
videoUUID: string
resolution: number
playlistNumber: number
segment: number
totalSessions?: number
}) {
const { playlistNumber, segment, videoUUID, totalSessions = 1 } = options
const segmentName = `${playlistNumber}-00000${segment}.ts`
return this.server.servers.waitUntilLog(`${videoUUID}/${segmentName}`, totalSessions * 2, false)
}
getSegment (options: OverrideCommandOptions & {
videoUUID: string
playlistNumber: number
segment: number
}) {
const { resolution, segment, videoUUID } = options
const segmentName = `${resolution}-00000${segment}.ts`
const { playlistNumber, segment, videoUUID } = options
return this.server.servers.waitUntilLog(`${videoUUID}/${segmentName}`, 2, false)
const segmentName = `${playlistNumber}-00000${segment}.ts`
const url = `${this.server.url}/static/streaming-playlists/hls/${videoUUID}/${segmentName}`
return this.getRawRequest({
...options,
url,
implicitToken: false,
defaultExpectedStatus: HttpStatusCode.OK_200
})
}
async waitUntilReplacedByReplay (options: OverrideCommandOptions & {