When is the assert_end_of_stream supposed to work?

I posting this as sometimes I'm not getting it. Consider a pipeline test such this one

 @tag :tmp_dir
 test "pipeline starts", %{tmp_dir: tmp_dir} do
   master_playlist_uri = URI.new!("test/data/stream.m3u8")
   reader = HLS.FS.OS.new()
   rtmp_output_url = "rtmp://localhost:4000/live"
   output_path = Path.join([tmp_dir, "output.mp4"])

   links = [
     child({:endpoint, "st"}, %Studio.StreamingEndpoint{
       owner_pid: self(),
       rtc_engine: nil,
       rtmp_output_url: rtmp_output_url,
       master_playlist_uri: master_playlist_uri,
       reader: reader
     })
   ]

   pid = Membrane.Testing.Pipeline.start_link_supervised!(spec: links)

   ffmpeg =
     Task.async(fn ->
       # We do not want ffmpeg to block us.
       start_ffmpeg(rtmp_output_url, output_path)
     end)

   assert_end_of_stream(pid, :sink, :audio, @timeout)
   assert_end_of_stream(pid, :sink, :video, @timeout)
   assert :ok == Task.await(ffmpeg)
 end

As soon as the bin is indeed receiving the EOS on both audio and video pads, I would expect this assert to pass. Instead, no such messages are received. Notice that the StreamingEndpoint is a Bin and the Sink we're talking about here is a Bin as well, but the video and audio pads are exposed in the StreamingEndpoint's links, so it's not a nested thing in theory. What is supposed to happen? 😄

16 responses