All you need to know about pipelines pt 1

Source

Let's get to the code! We will start where all the pipelines start - with the Source element. Since this will be the first element we implement, we need to find out something more about how the Membrane Framework's elements should be implemented and some concepts associated with them. The first thing you need to be aware of is that Membrane. Element describes a specific behavior, based on the OTP GenServer's behavior. Our process keeps a state which is updated in callbacks. We only need to provide an implementation of some callbacks in order to make our element act in the desired way. The set of callbacks that can be implemented depends on the type of the elements and we will get familiar with them during the implementation of these elements. However, each callback is required to return a tuple of a specific form. As you can see, we are returning an optional list of actions to be performed, and the updated state (which later on will be passed to the next invoked callback). Take your time and read about the possible actions which can be requested to be performed while returning from the callback. Their usage is crucial for the pipeline to work.

As you can judge based on the structure of the project, all the elements will be put in the lib/elements directory. Therefore there is a place where source.ex with the Basic.Elements.Source module's definition should be placed.

What makes our module a Membrane Framework's element?

Let's start with specifying that our module will implement the Membrane.Source behavior as well as alias the modules which will be used later in the module's code:

lib/elements/source.ex

defmodule Basic.Elements.Source do use Membrane.Source alias Membrane.Buffer alias Basic.Formats.Packet ... end

Pads and options

Later on, we will make use of macros defined in the Membrane.Source module:

lib/elements/source.ex

defmodule Basic.Elements.Source do ... def_options location: [ spec: String.t(), description: "Path to the file" ] def_output_pad :output, accepted_format: %Packet{type: :custom_packets}, flow_control: :manual ... end

The first macro, def_options allows us to define the parameters which are expected to be passed while instantiating the element. The parameters will be passed as an automatically generated structure %Basic.Elements.Source{}. In our case, we will have a :location field inside of that structure. This parameter is about to be a path to the files which will contain input packets. Later on, while instantiating the Source element, we will be able to write:

%Basic.Elements.Source{location: "input.A.txt"}

and the :location option will be passed during the construction of the element.

The second macro, def_output_pad, lets us define the output pad. The pad name will be :output (which is a default name for the output pad). The second argument of the macro describes the :accepted_format - which is the type of data sent through the pad. As the code states, we want to send data in Basic.Formats.Packet format. What's more, we have specified that the :output pad will work in :manual mode. You can read more on pad specification here.

Initialization of the element

Let's define our first callback! Why not start with handle_init/2, which gets called once the element is created?

lib/elements/source.ex

defmodule Basic.Elements.Source do ... @impl true def handle_init(_context, options) do {[], %{ location: options.location, content: nil }} end ... end

As said before, handle_init/2 expects a structure with the previously defined parameters to be passed as an argument. All we need to do there is to initialize the state - our state will be in a form of a map, and for now on we will put there a location (a path to the input file) and the content, where we will be holding packets read from the file, which haven't been sent yet. The recommended approach is to keep handle_init/2 lean and defer any heavier initialization to handle_setup/2 which runs automatically afterwards, if defined. In the next section we'll talk about more involved initialization and resources managed by our element.

TIP

You might also wonder what is the purpose of the @impl true specifier, put just above the function signature - this is simply a way to tell the compiler that the function defined below is about to be a callback. If we have misspelled the function name (or provided a wrong arguments list), we will be informed in the compilation time.

Preparing our element

In the handle_setup/2 callback, we'd like to open, read and save the contents of the input file. We then save it in our state as content.

lib/elements/source.ex

defmodule Basic.Elements.Source do ... @impl true def handle_setup(_context, state) do content = File.read!(state.location) |> String.split("\n") new_state = %{state | content: content} {[], new_state} end ... end

When the setup is complete, the element goes into :playing state. It can then demand buffers from previous elements and send its :stream_format to subsequent elements. Since we are implementing a source we do not have any previous element to demand from, but we can specify the format. We can do this, for example, in handle_playing/2:

lib/elements/source.ex

defmodule Basic.Elements.Source do ... @impl true def handle_playing(_context, state) do {[stream_format: {:output, %Packet{type: :custom_packets}}], state} end ... end

The :stream_format action means that we want to transmit the information about the supported formats through the output pad, to the next element in the pipeline. In chapter 4 you will find out more about stream formats and learn why it is required to do so.

Demands

Before going any further let's stop for a moment and talk about the demands. Do you remember, that the :output pad is working in :manual mode? That means that the succeeding element has to ask the Source element for the data to be sent and our element has to take care of keeping that data in some kind of buffer until it is requested. Once the succeeding element requests for the data, the handle_demand/4 callback will be invoked - therefore it would be good for us to define it:

lib/elements/source.ex

defmodule Basic.Elements.Source do ... @impl true def handle_demand(:output, _size, :buffers, _context, state) do if state.content == [] do {[end_of_stream: :output], state} else [first_packet | rest] = state.content new_state = %{state | content: rest} actions = [ buffer: {:output, %Buffer{payload: first_packet}}, redemand: :output ] {actions, new_state} end end ... end

The callback's body describes the situation in which some buffers were requested. Then we are checking if we have any packets left in the list persisting in the state of the element. If that list is empty, we are sending an end_of_stream action, indicating that there will be no more buffers sent through the :output pad and that is why there is no point in requesting more buffers.

However, in case of the content list of packets being non-empty, we are taking the head of that list, and storing the remaining tail of the list in the state of the element. Later on, we are defining the actions we want to take - that is, we want to return a buffer with the head packet from the original list. We make use of the buffer: action, and specify that we want to transmit the %Buffer structure through the :output pad. Note the fields available in the %Buffer structure - in our case, we make use of only the :payload field, which, according to the documentation, can be of any type - however, in almost all cases you will need to send binary data within this field. Any structured data (just like timestamps etc.) should be passed in the other fields available in the %Buffer, designed especially for that cases.

Then there's the other action that is taken - the :redemand action, queued to take place on the :output pad. This action will simply invoke the handle_demand/4 callback once again, which is helpful when the whole demand cannot be completely fulfilled in the single handle_demand invocation we are just processing. The great thing here is that the size of the demand will be automatically determined by the element and we do not need to specify it anyhow. Redemanding, in the context of sources, helps us simplify the logic of the handle_demand callback since all we need to do in that callback is to supply a single piece of data and in case this is not enough, take a :redemand action and invoke that callback once again. As you will see later, the process of redemanding is even more powerful in the context of filter elements.

TIP

Membrane also supports :auto demand mode, which should cover 90% of use cases.

You can learn more about demand modes here.

In the next chapter we will explore what stream formats are in Membrane.

Next chapter
Stream format
Next Chapter