Implementing first element
Implementing a new element is very similar to declaring a new pipeline. All you have to do is create a new module and implement some callbacks.
The first decision to make is to specify what kind of element we are going to implement: source
, sink
or filter
. In this chapter, we will implement a very simple filter that counts received buffers and passes them to the next element. It will build simple statistics and send them to Pipeline via Membrane.Notification
mechanism.
Source code for this filter can be found in the membrane demo repository.
Base module
To indicate the choice of implementing 'filter' we have to add the following line to our module:
use Membrane.Filter
This macro forces us to invoke some macros or implement some methods in our module:
def_options
macro - macro that defines known options for the element type. It automatically generates the appropriate struct.def_input_pad
anddef_output_pad
- macros that define the pads of the element- callbacks
handle_init
,handle_demand
andhandle_process
- callbacks that are invoked when initializing the element, handling incoming demand or buffer
Options
Our element will have only one option - time interval, telling how often statistics should be sent and zeroed. Its definition will look like the following:
def_options interval: [
type: :integer,
default: 1000,
description: "Amount of the time in milliseconds, telling how often statistics should be sent and zeroed"
]
Pads
It is very common for the filter to declare two pads that are called :input
and :output
def_input_pad :input,
availability: :always,
mode: :pull,
demand_unit: :bytes,
caps: :any,
options: [
divisor: [
type: :integer,
default: 1,
description: "Number by which the counter will be divided before sending notification"
]
]
def_output_pad :output,
availability: :always,
mode: :pull,
caps: :any
In above definition, availability :always
means that pad of this element is always available.
:pull
mode means that this element sends buffers to the next element only when they are demanded. When a demand is received on an output pad, the handle_demand
callback is invoked with total demand for the pad and a unit - either :bytes
or :buffers
.
For input pads in :pull
mode one more entry has to be provided - :demand_unit
. It determines in which unit the demand is sent to the upstream element and can be set, as mentioned, to :bytes
or :buffers
.
The next element in the keyword list represents the capabilities (caps) of the pad. :any
means that any type of buffer can be passed on this pad. If you want to restrict the types of data allowed on this pad you can define caps specifications as described in docs for module Membrane.Caps.Matcher
.
Last entry for input pad defines an option for this pad added just for demonstration purpose.
The options for pads are defined just like element's options in def_options
macro.
handle_init/1
In handle_init
we initialize the internal state of the element. As the first argument of callback, options will be received. In our state, we will create variable for counting buffers and timer. Timer won't be initialized at this point, we will wait for handle_prepared_to_playing/2
callback, which informs that pipeline is in the :playing
state.
@impl true
def handle_init(%__MODULE__{interval: interval}) do
state = %{
interval: interval,
counter: 0,
timer: nil
}
{:ok, state}
end
handle_prepared_to_playing/2
Now it is a time to start the timer that will send :tick
messages to our element when it should flush results and reset the counter. We should also store the created timer, to be able to release it when the pipeline will stop processing data (in handle_prepared_to_stopped/2
).
@impl true
def handle_prepared_to_playing(_ctx, state) do
{:ok, timer} = :timer.send_interval(state.interval, :tick)
{:ok, %{state | timer: timer}}
end
handle_demand/5
Since both of our pads work in :pull
mode, we have to handle incoming demands. In our case this task is very simple - we have to just redirect incoming demands to the previous element.
For that purpose, we will return an action as an additional term in the output tuple: {{:ok, action_list}, new_state}
.
Actions are generally speaking the activity that we request the element to perform. The example actions are: sending buffer/event/new_caps on some pad, sending messages to the pipeline or - like in our case - sending demand through some input pad.
Actions are always the entries in the keyword list, where the key is an atom indicating the action name and the value contains the parameters of the action. In this case, it is a tuple with the pad name at first position and size of the demand on the second position.
More information on how the demands work can be found in a separate chapter
@impl true
def handle_demand(:output, size, :bytes, _context, state) do
{{:ok, demand: {:input, size}}, state}
end
handle_process/4
Incoming buffers are processed in this callback. We will update our counter and pass the buffer to the :output
pad.
Overriding handle_process
callback means that we want to receive only one buffer at a time. However, keep in mind that very often buffers are delivered to the element in groups. It is also possible to override handle_process_list
callback, which by default performs action :split
that in turn invokes handle_process
callback for each buffer from the group.
@impl true
def handle_process(:input, %Membrane.Buffer{} = buffer, _context, state) do
new_state = %{state | counter: state.counter + 1}
{{:ok, buffer: {:output, buffer}}, new_state}
end
handle_other/3
All messages sent to the element's process that were not recognized as internal membrane messages (like buffers, caps, events or notifications) are handled in handle_other/3
.
We will receive our ticks here, so we will know that we should zero the counter. It is also a good place to send a notification to the pipeline with the statistics.
This also presents how pad options can be accessed via context.
@impl true
def handle_other(:tick, ctx, state) do
# create the term to send
notification = {
:counter,
div(state.counter, ctx.pads.input.options.divisor)
}
# reset the timer
new_state = %{state | counter: 0}
{{:ok, notify: notification}, new_state}
end
handle_prepared_to_stopped/2
Last but not least there is the handle_prepared_to_stopped/2
callback. It is the place to stop the timer and clean up.
@impl true
def handle_prepared_to_stopped(_ctx, state) do
{:ok, :cancel} = :timer.cancel(state.timer)
{:ok, %{state | counter: 0, timer: nil}}
end
Summary
The complete code of our element can look like this:
defmodule Your.Module.Element do
use Membrane.Element.Base.Filter
def_options interval: [
type: :integer,
default: 1000,
description:
"Amount of the time in milliseconds, telling how often statistics should be sent and zeroed"
]
def_input_pad :input,
availability: :always,
mode: :pull,
demand_unit: :bytes,
caps: :any,
options: [
divisor: [
type: :integer,
default: 1,
description: "Number by which the counter will be divided before sending notification"
]
]
def_output_pad :output,
availability: :always,
mode: :pull,
caps: :any
@impl true
def handle_init(%__MODULE{interval: interval}) do
state = %{
interval: interval,
counter: 0,
timer: nil
}
{:ok, state}
end
@impl true
def handle_prepared_to_stopped(_ctx, state) do
{:ok, :cancel} = :timer.cancel(state.timer)
{:ok, %{state | counter: 0, timer: nil}}
end
@impl true
def handle_prepared_to_playing(_ctx, state) do
{:ok, timer} = :timer.send_interval(state.interval, :tick)
{:ok, %{state | timer: timer}}
end
@impl true
def handle_demand(:output, size, :bytes, _context, state) do
{{:ok, demand: {:input, size}}, state}
end
@impl true
def handle_process(:input, %Membrane.Buffer{} = buffer, _context, state) do
new_state = %{state | counter: state.counter + 1}
{{:ok, buffer: {:output, buffer}}, new_state}
end
@impl true
def handle_other(:tick, ctx, state) do
# create the term to send
notification = {
:counter,
div(state.counter, ctx.pads.input.options.divisor)
}
# reset the timer
new_state = %{state | counter: 0}
{{:ok, notify: notification}, new_state}
end
end