Ordering Buffer
In this chapter we will deal with the next element in our pipeline - the Ordering Buffer. As stated in the chapter about the system architecture, this element is responsible for ordering the incoming packets, based on their sequence id. Because Ordering Buffer is a filtering element, we need to specify both the input and the output pads:
lib/elements/ordering_buffer.ex
defmodule Basic.Elements.OrderingBuffer do use Membrane.Filter alias Basic.Formats.Packet def_input_pad :input, flow_control: :manual, demand_unit: :buffers, accepted_format: %Packet{type: :custom_packets} def_output_pad :output, flow_control: :manual, accepted_format: %Packet{type: :custom_packets} ... end
Note the format specification definition there - we expect Basic.Formats.Packet
of type :custom_packets
to be sent on the input pad, and the same type of packets to be sent through the output pad.
In the next step let's specify how we want the state of our element to look like:
lib/elements/ordering_buffer.ex
defmodule Basic.Elements.OrderingBuffer do ... @impl true def handle_init(_context, _options) do {[], %{ ordered_packets: [], last_sent_seq_id: 0 }} end ... end
If you don't remember what is the purpose of the Ordering Buffer, please refer to the 2nd chapter. We will need to hold a list of ordered packets, as well as a sequence id of the packet, which most recently was sent through the output pad (we need to know if there are some packets missing between the last sent packet and the first packet in our ordered list).
Handling demands is quite straightforward:
lib/elements/ordering_buffer.ex
defmodule Basic.Elements.OrderingBuffer do ... @impl true def handle_demand(:output, size, _unit, _context, state) do {[demand: {:input, size}], state} end ... end
We simply send the :demand
on the :input
pad once we receive a demand on the :output
pad. One packet on input corresponds to one packet on output so for each 1 unit of demand we send 1 unit of demand to the :input
pad.
Now we can go to the main part of the Ordering Buffer implementation - the handle_buffer/4
callback.
The purpose of this callback is to process the incoming buffer. It gets called once a new buffer is available and waiting to be processed.
lib/elements/ordering_buffer.ex
defmodule Basic.Elements.OrderingBuffer do ... @impl true def handle_buffer(:input, buffer, _context, state) do packet = unzip_packet(buffer.payload) ordered_packets = [packet | state.ordered_packets] |> Enum.sort() state = %{state | ordered_packets: ordered_packets} [{last_seq_id, _} | _] = ordered_packets ... end defp unzip_packet(packet) do regex = ~r/^\[seq\:(?<seq_id>\d+)\](?<data>.*)$/ %{"data" => data, "seq_id" => seq_id} = Regex.named_captures(regex, packet) {String.to_integer(seq_id), %Membrane.Buffer{payload: data} } end ... end
First, we are taking advantage of the Regex module available in the Elixir.
With the Regex.named_captures
we can access the values of the fields defined within the regex description.
Do you remember what our packet looks like?
[seq:7][frameid:2][timestamp:3]data
Above you can see an exemplary packet. We need to fetch the value of the sequence id (in our case it is equal to 7) and get the rest of the packet. Therefore we have defined the regex description as:
~r/^\[seq\:(?<seq_id>\d+)\](?<data>.*)$/
TIP - How to read the regex?
~r/.../
stands for thesigil_r/1
sigil^
describes the beginning of the input\[
stands for the opening square bracket ('[') at the beginning is required to escape the char since the plain '[' has a special meaning in the regex syntaxseq
is a sequence of 's', 'e', 'q' characters (we need to adjust our regex description to match the header of the packet)\:
stands for the ':' character (we also need to escape that character since it is meaningful in the regex's syntax)(?<seq_id>\d+)
allows us to define a named capture - later one, once we use theRegex.named_captures/2
, we will retrieve the map with 'seq_id' key and the corresponding value equal to the string described by the\d+
'partial' regex (which means - one or more occurrences of a decimal). Generally speaking, a named capture can be specified with the following structure:(?<key>regex)
whereregex
is a regex description.\]
is the escaped closing square bracket character(?<data>.*)
is a named capture description that allows us to get the value of a.*
regex (any character no or any number of times) under adata
key.$
stands for the end of the input
The result of Regex.named_captures/2
applied to that regex description and the exemplary packet should be following:
{"seq_id"=>7, "data"=>"[frameid:2][timestamp:3]data"}
Once we unzip the header of the packet in the handle_buffer/4
callback, we can put the incoming packet in the ordered_packets
list and sort that list. Due to the fact, that elements of this list are tuples, whose first element is a sequence id (a value that is unique), the list will be sorted based on the sequence id.
We also get the sequence id of the first element in the updated ordered_packets
list.
Here comes the rest of the handle_buffer/4
definition:
lib/elements/ordering_buffer.ex
defmodule Basic.Elements.OrderingBuffer do ... def handle_buffer(:input, buffer, _context, state) do ... if state.last_sent_seq_id + 1 == last_seq_id do {ready_packets_sequence, ordered_packets_left} = get_ready_packets_sequence(ordered_packets, []) {last_sent_seq_id, _} = List.last(ready_packets_sequence) state = %{ state | ordered_packets: ordered_packets_left, last_sent_seq_id: last_sent_seq_id } ready_buffers = Enum.map(ready_packets_sequence, &elem(&1, 1)) {[buffer: {:output, ready_buffers}], state} else {[redemand: :output], state} end end ... end
We need to distinguish between two situations: the currently processed packet can have a sequence id which is subsequent to the sequence id of the last sent packet or there might be some packets not yet delivered to us, with sequence ids in between the last sent sequence id and the sequence id of a currently processed packet. In the second case, we should store the packet and wait for the next packets to arrive. We will accomplish that using redemands
mechanism, which will be explained in detail in the next chapter.
However, in the first situation, we need to get the ready packet's sequence - that means, a consistent batch of packets from the :ordered_packets
. This can be done in the following way:
lib/elements/ordering_buffer.ex
defmodule Basic.Elements.OrderingBuffer do ... defp get_ready_packets_sequence([first_packet | ordered_rest], []) do get_ready_packets_sequence(ordered_rest, [first_packet]) end defp get_ready_packets_sequence( [next_seq = {next_id, _} | ordered_rest], [{last_id, _} | _] = ready_sequence ) when next_id == last_id + 1 do get_ready_packets_sequence(ordered_rest, [next_seq | ready_sequence]) end defp get_ready_packets_sequence(ordered_packets, ready_sequence) do {Enum.reverse(ready_sequence), ordered_packets} end ... end
Note the order of the definitions, since we are taking advantage of the pattern matching mechanism!
The algorithm implemented in the snippet above is really simple - we are recursively taking the next packet out of the :ordered_packets
buffer until it becomes empty or there is a missing packet (next_id == last_id + 1
) between the last taken packet and the next packet in the buffer.
Once we have a consistent batch of packets, we can update the state (both the:ordered_packets
and the :last_sent_seq_id
need to be updated) and output the ready packets by defining the :buffer
action.
Test the OrderingBuffer
:
mix test test/elements/ordering_buffer_test.exs
Now the OrderingBuffer
element is ready. Before we implement the next element let us introduce you to the concept of redemands.