Skip to content

Commit

Permalink
Fix double EOS with video-only SDK src (#759)
Browse files Browse the repository at this point in the history
* only log mute status of subscribed tracks

* fix double eos
  • Loading branch information
frostbyte73 authored Aug 19, 2024
1 parent 0fbc019 commit 2bbeac8
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 5 deletions.
17 changes: 17 additions & 0 deletions pkg/gstreamer/callbacks.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type Callbacks struct {
onTrackMuted []func(string)
onTrackUnmuted []func(string)
onTrackRemoved []func(string)
onEOSSent func()

// internal
addBin func(bin *gst.Bin)
Expand Down Expand Up @@ -140,3 +141,19 @@ func (c *Callbacks) OnTrackRemoved(trackID string) {
f(trackID)
}
}

func (c *Callbacks) SetOnEOSSent(f func()) {
c.mu.Lock()
c.onEOSSent = f
c.mu.Unlock()
}

func (c *Callbacks) OnEOSSent() {
c.mu.RLock()
onEOSSent := c.onEOSSent
c.mu.RUnlock()

if onEOSSent != nil {
onEOSSent()
}
}
16 changes: 13 additions & 3 deletions pkg/pipeline/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ func New(ctx context.Context, conf *config.PipelineConfig, ipcServiceClient ipc.
monitor: stats.NewHandlerMonitor(conf.NodeID, conf.ClusterID, conf.Info.EgressId),
}
c.callbacks.SetOnError(c.OnError)
c.callbacks.SetOnEOSSent(c.onEOSSent)

// initialize gst
go func() {
Expand Down Expand Up @@ -360,6 +361,15 @@ func (c *Controller) streamFailed(ctx context.Context, stream *config.Stream, st
return c.streamBin.RemoveStream(stream)
}

func (c *Controller) onEOSSent() {
// for video-only track/track composite, EOS might have already
// made it through the pipeline by the time endRecording is closed
if c.SourceType == types.SourceTypeSDK && !c.AudioEnabled {
// this will not actually send a second EOS, but will make sure everything is in the correct state
c.SendEOS(context.Background(), "source closed")
}
}

func (c *Controller) SendEOS(ctx context.Context, reason string) {
ctx, span := tracer.Start(ctx, "Pipeline.SendEOS")
defer span.End()
Expand Down Expand Up @@ -399,11 +409,11 @@ func (c *Controller) SendEOS(ctx context.Context, reason string) {
}

func (c *Controller) sendEOS() {
c.eosTimer = time.AfterFunc(time.Second*30, func() {
c.OnError(errors.ErrPipelineFrozen)
})
go func() {
logger.Debugw("sending EOS")
c.eosTimer = time.AfterFunc(time.Second*30, func() {
c.OnError(errors.ErrPipelineFrozen)
})
c.p.SendEOS()
}()
}
Expand Down
14 changes: 12 additions & 2 deletions pkg/pipeline/source/sdk.go
Original file line number Diff line number Diff line change
Expand Up @@ -535,11 +535,21 @@ func shouldSubscribe(pub lksdk.TrackPublication) bool {
}

func (s *SDKSource) onTrackMuted(pub lksdk.TrackPublication, _ lksdk.Participant) {
logger.Debugw("track muted", "trackID", pub.SID())
s.mu.RLock()
_, ok := s.writers[pub.SID()]
s.mu.RUnlock()
if ok {
logger.Debugw("track muted", "trackID", pub.SID())
}
}

func (s *SDKSource) onTrackUnmuted(pub lksdk.TrackPublication, _ lksdk.Participant) {
logger.Debugw("track unmuted", "trackID", pub.SID())
s.mu.RLock()
_, ok := s.writers[pub.SID()]
s.mu.RUnlock()
if ok {
logger.Debugw("track unmuted", "trackID", pub.SID())
}
}

func (s *SDKSource) onTrackUnsubscribed(_ *webrtc.TrackRemote, pub *lksdk.RemoteTrackPublication, _ *lksdk.RemoteParticipant) {
Expand Down
1 change: 1 addition & 0 deletions pkg/pipeline/source/sdk/appwriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ func (w *AppWriter) start() {

// clean up
if w.playing.IsBroken() {
w.callbacks.OnEOSSent()
if flow := w.src.EndStream(); flow != gst.FlowOK && flow != gst.FlowFlushing {
w.logger.Errorw("unexpected flow return", nil, "flowReturn", flow.String())
}
Expand Down
7 changes: 7 additions & 0 deletions test/track_composite.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,13 @@ func (r *Runner) testTrackCompositeFile(t *testing.T) {
videoCodec: types.MimeTypeH264,
filename: "tc_{room_name}_h264_{time}.mp4",
},
{
name: "File/VideoOnly",
fileType: livekit.EncodedFileType_MP4,
videoCodec: types.MimeTypeH264,
filename: "tc_{room_name}_video_{time}.mp4",
videoOnly: true,
},
} {
r.runTrackTest(t, test.name, test.audioCodec, test.videoCodec, func(t *testing.T, audioTrackID, videoTrackID string) {
var aID, vID string
Expand Down

0 comments on commit 2bbeac8

Please sign in to comment.