From 2bbeac885e60262b852e1c0cf4c144d681b16f1c Mon Sep 17 00:00:00 2001 From: David Colburn Date: Mon, 19 Aug 2024 17:29:09 -0400 Subject: [PATCH] Fix double EOS with video-only SDK src (#759) * only log mute status of subscribed tracks * fix double eos --- pkg/gstreamer/callbacks.go | 17 +++++++++++++++++ pkg/pipeline/controller.go | 16 +++++++++++++--- pkg/pipeline/source/sdk.go | 14 ++++++++++++-- pkg/pipeline/source/sdk/appwriter.go | 1 + test/track_composite.go | 7 +++++++ 5 files changed, 50 insertions(+), 5 deletions(-) diff --git a/pkg/gstreamer/callbacks.go b/pkg/gstreamer/callbacks.go index c91af663..1b96f2e7 100644 --- a/pkg/gstreamer/callbacks.go +++ b/pkg/gstreamer/callbacks.go @@ -37,6 +37,7 @@ type Callbacks struct { onTrackMuted []func(string) onTrackUnmuted []func(string) onTrackRemoved []func(string) + onEOSSent func() // internal addBin func(bin *gst.Bin) @@ -140,3 +141,19 @@ func (c *Callbacks) OnTrackRemoved(trackID string) { f(trackID) } } + +func (c *Callbacks) SetOnEOSSent(f func()) { + c.mu.Lock() + c.onEOSSent = f + c.mu.Unlock() +} + +func (c *Callbacks) OnEOSSent() { + c.mu.RLock() + onEOSSent := c.onEOSSent + c.mu.RUnlock() + + if onEOSSent != nil { + onEOSSent() + } +} diff --git a/pkg/pipeline/controller.go b/pkg/pipeline/controller.go index 78cb6954..13d9d1ac 100644 --- a/pkg/pipeline/controller.go +++ b/pkg/pipeline/controller.go @@ -81,6 +81,7 @@ func New(ctx context.Context, conf *config.PipelineConfig, ipcServiceClient ipc. monitor: stats.NewHandlerMonitor(conf.NodeID, conf.ClusterID, conf.Info.EgressId), } c.callbacks.SetOnError(c.OnError) + c.callbacks.SetOnEOSSent(c.onEOSSent) // initialize gst go func() { @@ -360,6 +361,15 @@ func (c *Controller) streamFailed(ctx context.Context, stream *config.Stream, st return c.streamBin.RemoveStream(stream) } +func (c *Controller) onEOSSent() { + // for video-only track/track composite, EOS might have already + // made it through the pipeline by the time endRecording is closed + if c.SourceType == types.SourceTypeSDK && !c.AudioEnabled { + // this will not actually send a second EOS, but will make sure everything is in the correct state + c.SendEOS(context.Background(), "source closed") + } +} + func (c *Controller) SendEOS(ctx context.Context, reason string) { ctx, span := tracer.Start(ctx, "Pipeline.SendEOS") defer span.End() @@ -399,11 +409,11 @@ func (c *Controller) SendEOS(ctx context.Context, reason string) { } func (c *Controller) sendEOS() { + c.eosTimer = time.AfterFunc(time.Second*30, func() { + c.OnError(errors.ErrPipelineFrozen) + }) go func() { logger.Debugw("sending EOS") - c.eosTimer = time.AfterFunc(time.Second*30, func() { - c.OnError(errors.ErrPipelineFrozen) - }) c.p.SendEOS() }() } diff --git a/pkg/pipeline/source/sdk.go b/pkg/pipeline/source/sdk.go index c438a6b5..9e6451bc 100644 --- a/pkg/pipeline/source/sdk.go +++ b/pkg/pipeline/source/sdk.go @@ -535,11 +535,21 @@ func shouldSubscribe(pub lksdk.TrackPublication) bool { } func (s *SDKSource) onTrackMuted(pub lksdk.TrackPublication, _ lksdk.Participant) { - logger.Debugw("track muted", "trackID", pub.SID()) + s.mu.RLock() + _, ok := s.writers[pub.SID()] + s.mu.RUnlock() + if ok { + logger.Debugw("track muted", "trackID", pub.SID()) + } } func (s *SDKSource) onTrackUnmuted(pub lksdk.TrackPublication, _ lksdk.Participant) { - logger.Debugw("track unmuted", "trackID", pub.SID()) + s.mu.RLock() + _, ok := s.writers[pub.SID()] + s.mu.RUnlock() + if ok { + logger.Debugw("track unmuted", "trackID", pub.SID()) + } } func (s *SDKSource) onTrackUnsubscribed(_ *webrtc.TrackRemote, pub *lksdk.RemoteTrackPublication, _ *lksdk.RemoteParticipant) { diff --git a/pkg/pipeline/source/sdk/appwriter.go b/pkg/pipeline/source/sdk/appwriter.go index d5d021f2..b03b7ad7 100644 --- a/pkg/pipeline/source/sdk/appwriter.go +++ b/pkg/pipeline/source/sdk/appwriter.go @@ -155,6 +155,7 @@ func (w *AppWriter) start() { // clean up if w.playing.IsBroken() { + w.callbacks.OnEOSSent() if flow := w.src.EndStream(); flow != gst.FlowOK && flow != gst.FlowFlushing { w.logger.Errorw("unexpected flow return", nil, "flowReturn", flow.String()) } diff --git a/test/track_composite.go b/test/track_composite.go index 835f19f3..a4bf8e13 100644 --- a/test/track_composite.go +++ b/test/track_composite.go @@ -71,6 +71,13 @@ func (r *Runner) testTrackCompositeFile(t *testing.T) { videoCodec: types.MimeTypeH264, filename: "tc_{room_name}_h264_{time}.mp4", }, + { + name: "File/VideoOnly", + fileType: livekit.EncodedFileType_MP4, + videoCodec: types.MimeTypeH264, + filename: "tc_{room_name}_video_{time}.mp4", + videoOnly: true, + }, } { r.runTrackTest(t, test.name, test.audioCodec, test.videoCodec, func(t *testing.T, audioTrackID, videoTrackID string) { var aID, vID string