The GenServer from hell (needs some refactoring)

I’m looking into Elixir because I think it can help us make better embedded (IoT) devices.

One of the things I’m working on is a KNX-stack. KNX is an open standard for commercial and domestic building automation. [wikipedia]

KNX can communicate connectionless and connection-oriented.

The former case is simple. A frame is received by the hardware and is piped through the layers which are just pure functions

# main-pipe
frame |> layer0 |> layer1 |> layer2 |> layer3 |> layer4 |> application

Each layer decodes some bits (which is really great with binary-pattern-matching) and on the way a %Frame{} struct is built. If a frame should not be further processed by the pipe (eg not addressed), a drop-flag is set in the struct, the following layers match on this flag an just pass the frame on until the end of the pipe.

The pain starts with the connection-oriented frames. These have to be processed by a state-machine (SM) in layer2. When layer2 receives a connection-oriented frame it sets the frame.drop flag (so that the pipe can continue) and builds an event for the SM from the frame and dispatches it to the SM-GenServer.

Depending on the frame and the state of the SM one of twelve actions is to be performed. An action is a pure function that returns a struct:

  • new state (always)
  • an up-frame (optional, eg a data frame for an open connection is passed to the upper layer)
  • down-frame (optional, eg an ACK for a data frame)
  • timer-config (optional, eg reset the timeout when a data frame arrives)
  • defer-event (optional, the frame can’t be handled now)

  def dispatch(%Event{} = event) do
    GenServer.cast(@me, {:dispatch, event})
  end

  def handle_cast({:dispatch, %Event{} = event}, %State{} = state) do
    state
    |> handle_dispatch(event)
    |> noreply
  end

  def handle_dispatch(%State{} = state, %Event{} = event) do
    {new_state, action} = state.handler.(state, event)
    {new_state, up_frame, dn_frame, timer_config, defer} = action.(new_state, event)

    # these are just pipes similar to the 'main-pipe' (see above)
    send_frame(:up, up_frame) # layer1 |> layer0 |> hardware
    send_frame(:dn, dn_frame) # layer3 |> layer4 |> application

    new_state
  end

So the complexity suddenly explodes here, instead of a simple linear flow layer2 suddenly may shoot frames in different directions, or, even worse may just send a frame because a timeout occurs (eg a CLOSE-frame).

The thing works, but its hard to test and I don’t like it. Any ideas how to improve it?

One thing I considered was to pipe the complete state with the frame. Everything inside the pipe could be pure funtions (even the SM). On the end of the pipe I would not have a single frame but maybe multiple frames and maybe some commands for a separate GenServer controlling the timeouts.

for example for an incoming connection-oriented data-frame for an exisiting connection I would get:

  • the decoded frame as a %Frame{}-struct
  • the request to send an ACK-frame back down
  • instructions to reset the connection-timeout-timer.

But that would mean a lot of data on the pipe…

1 Like

You may try the erlang :gen_statem. it is a fancier :gen_server with some bells and whistles that you might need. (timeout, postpone, self inserted event…)

3 Likes

There’s an Elixir wrapper for the :gen_statem but I haven’t used that myself:

https://hexdocs.pm/gen_state_machine/GenStateMachine.html

3 Likes

While :gen_statem may make the SM implementation more concise, I don’t think it helps with the general problem, that I have a GenServer that has too many responsibilities, is hard to understand and hard to test.

I suspect gen_statem would help in your case mainly because of things like internal events and event postponement.

In particular, using internal events would allow you to transform code like the send_frames in handle_dispatch into a pure function that returns something like:

{:next_state, :something, data, [{:next_event, :internal, {:send_frame, :up, up_frame}}, {:next_event, :internal, {:send_frame, :dn, dn_frame}]}

The state functions are pure functions that can return additional “actions” in the last position.

Then the handlers look like (assuming the state machine is in :state_functions mode, and the current state is something):

def something(:internal, {:send_frame, dir, frame}, data) do
...
end

Actions can also include :postpone (which holds the event until the next state change) and three different flavors of timeout

3 Likes

I have an opinionated wrapper of :gen_statem that is designed to help you organize your state machine, and also marshalls the confusing gen_statem callbacks into something that looks more like a gen_server.

https://hexdocs.pm/state_server/StateServer.html

Of course, in the wrong hands it could merely wind up being more rope to hang oneself with, but if you are a person that likes organizational clarity, it could be useful.

3 Likes

I just had a quick look at :gen_statem again. Turns out I did not understand what it does when I first looked - and I still don’t, but I’ll have a closer look. Thanks for the pointer, I’ll be back.

If I understand correctly, this protocol resembles the OSI model, where multiple layers are stacked on top of each other with the following properties:

  1. Each layer receives input packets and produces decoded packets for the consumer layer (the one directly on top of it).
  2. Some layers might need to perform additional imperative logic, such as send a response to the peer layer on the other side, or defer an action for later.

Stacked behaviours

One way of implementing this would be to rely on stacked behaviours. The most foundational one, called Layer0 would receive raw bits (e.g. using gen_tcp) and invoke layer-specific callbacks, such as handle_packet which receives a decoded layer 0 packet. On top of this abstraction you can build Layer1, on top of that Layer2, and so on.

This approach allows you to test at lower levels. I’d still test as much as I can at the final level (Application), moving deeper to test the scenarios which are harder to setup at the higher level, such as lower-level timeout.

One downside of is that the implementation will always be coupled to the physical transport. If layer 0 uses tcp to communicate, you’ll always need a fake tcp peer to test the logic, which could become cumbersome to implement. Another issue is that implementing 6 behaviours could lead to a lot of boilerplate. For example, to accommodate deferred actions you’ll probably need to support the handle_info callback in the corresponding layer and all the layers below it.

Functional core, imperative shell

Another option is to go for functional core, imperative shell. You can see a detailed example in my post To spawn, or not to spawn?.

Basically in this approach we distinguish between the driver and the functional logic. The driver takes care of receiving bits/bytes from the remote client and shipping the responses back. It’s also responsible for time-based logic. So that’s the imperative shell. The functional core is a pure-functional state machine which receives inputs from the driver and produces outputs. At the highest level it could look something like:

defmodule KNX do
  @spec new(...) :: t

  @spec handle_impulse(t, impulse) :: [action]
end

where types have the following meaning:

  • t - SM state represented as a map or a struct
  • impulse - external actions such as {:bytes, binary}, {:message, payload}
  • action - an imperative action that must be executed by the driver, such as {:send_bytes, binary}, {:delayed_impulse, time::pos_integer, payload}

With such contract, the driver would be a stateful process that establishes the connection to the remote client, accept bytes, sends impulses to the functional core (which is kept as the state of the process), and interprets the actions. An immediate benefit here is that you can test the core without needing to fake a remote TCP peer, and without needing to depend on time. For example, if the core sends {:delayed_impulse, ...} (a delayed self-send), the test can immediately pass the impulse back.

Inside the core module, I’d start by implementing the zero layer as two private functions:

@spec new_layer_0(...) :: layer_0_state

@spec handle_layer_0_impulse(layer_0_state, layer_0_impulse) :: [layer_0_actions]

This follows the same pattern as the high-level KNX API, and these functions would initially reside in the same module (I’d move them to a separate “sub-module” if the code grows). The impulses to layer 0 are e.g. raw bits/bytes, and the output actions are decoded packets.

Now on top of this we can build layer 1. When layer 1 receives an impulse, it needs to make a decision. If the impulse specific to layer-1 (e.g. a delayed self-send), the layer handles it and produces output actions. Otherwise, the lower-level layer is invoked to handle the impulse.

In the latter case, layer 0 will return a set of actions. Some of these actions will be decoded packets. Layer 1 has to interpret each decoded packet and produce its own additional actions. So for example, let’s say that layer 0 returns:

[
  layer_0_action_1,
  layer_0_action_2,
  layer_0_decoded_packet_1,
  layer_0_action_3,
  ...
]

Layer 1 could produce:

[
  layer_0_action_1,
  layer_0_action_2,
  layer_1_action_1,
  layer_1_decoded_packet_1,
  layer_0_action_3,
  ...
]

So by stacking all these layers you end up with the top-level handle_impulse that will return actions required by different layers in the proper order.

This approach also supports testing at the lower levels. You can expose each individual layer API, e.g. as public functions marked with @doc false, or by moving each layer into its own submodule. Again, I’d test as much as I can at the highest possible level.

FWIW I used this style to implement the server-side of the PostgreSQL protocol (only 1 layer, but still), and it has its pros and cons. As said, the biggest pro is that you decouple the protocol from the imperative logic, which simplifies testing. On the flip side, any combination of imperative + conditional logic will be trickier to implement. E.g. if you need to send something to the peer and than interpret the peer’s response, you need to produce an output action, store some piece of information in your state, and then reconcile that with a response impulse. This is much trickier than plain old imperative interpret_response(send(peer, ...)), where send is a blocking operation that awaits for the peer’s response.

But I think that you’ll face the same challenge in the first proposed approach (behaviour-based). In general, a straightforward imperative style is blocking, which means that you lose the ability to perform asynchronous actions (e.g. ship something else to the peer while you’re awaiting for its response).

16 Likes

I just had a lot of fun building a toy-protocol based on your

Functional core, imperative shell

proposal. Thank you!

I took the state-machine (the functional part) and the actions from the KNX-stack and removed sequence-numbers and repeat-count. (those do not effect the mechanics, just bring some complexity).

The protocol is very simple, a frame looks like this:

[source-address, destination-address, service, data]

for example

0xDEAD_BEEF

The impulse form the driver for the lowest layer (data-link, :dl) looks like this:

{:dl, :ind, <<0xDEAD_BEEF::32>>} # data-link, indication, frame

def handle({:dl, :ind, <<src::8, dest::8, data::bits>>}, %State{}),
  do: {:nl, :ind, %Frame{src: src, dest: dest, data: data}}

:dl matches on the addresses and builds a %Frame{}. network-layer (:nl) is next:

def handle({:nl, :ind, %Frame{dest: address} = frame}, %State{addr: address}),
  do: {:tl, :ind, frame}

If the destination address matches the device address (addr in %State{}), it is transport-layer’s (:tl) turn.
:tl has to extract the service. (0xBE is the data-connected service, others are ack, connect, connectionless, …)

def handle({:tl, :ind, %Frame{data: data} = frame}, %State{}) do
  {service, data} = Service.decode(data)

  case service do
    :invalid -> {:logger, :error, {:invalid_service, frame}}
    _ -> {:tlsm, :ind, %Frame{frame | service: service, data: data}}
  end
end

Next is the transport-layer-state-machine (:tlsm)

def handle({:tlsm, primitive, %Frame{service: service} = frame}, %State{} = state)
    when service in @connected_services do
  {new_state, action} = state.handler.(state, primitive, frame)
  action.(new_state, frame)
end

The SM has to be in open_idle-state and the source-address has to match the connected-address to let the data-connected service pass:

def open_idle(
     %State{c_addr: c_addr} = state, primitive, 
     %Frame{service: service, src: src}) do

  {next_handler, action} =
    case {primitive, service, src} do
      ...
      {:ind, :t_data_con, ^c_addr} -> {&Sm.open_idle/3, &Action.a02/2}
      ...
    end

  {%{state | handler: next_handler}, action}
end

the SM executes action a02

def a02(%State{c_addr: c_addr} = state, %Frame{} = frame) do
  {
    state,
    [
      {:timer, :reset, :connection},
      {:al, :ind, frame},
      {:tl, :req, %Frame{service: :t_ack, dest: c_addr}}
    ]
  }
end

This action does not change the state (the connect-action for example would set the connected-address).
But it adds three new impulses:

  • a command for the timer to reset the connection-timeout
  • the indication is forwarded to the application-layer (:al)
  • an :ack is scheduled to be sent back

:al is boring (the real thing actually handles about 100 services)

def handle({:al, :ind, %Frame{data: data}}, %State{}),
    do: {:user, :ind, data}

The engine looks like this.
There is a reducer that takes a state, a list of impulses and a handler

def reducer({state, impulses}, handler) do
  {state, impulses} =
    Enum.reduce(impulses, {state, []}, fn a, {state, impulses} ->
      case handler.(a, state) do
        {%State{} = state, impulse} -> {state, [impulse | impulses]}
        impulse -> {state, [impulse | impulses]}
      end
    end)

  {state, impulses |> Enum.reverse() |> List.flatten()}
end

also there is an up() and a down() function that calls each layer with the (reduced) output of the adjacent layer:

def up(state, impulses) do
  {state, impulses}
  |> reducer(&Dl.handle/2)
  |> reducer(&Nl.handle/2)
  |> reducer(&Tl.handle/2)
  |> reducer(&Tlsm.handle/2)
  |> reducer(&Al.handle/2)
  |> IO.inspect()
end

This call for eample:

up(
  %State{c_addr: 0xDE, addr: 0xAD, handler: &Sm.open_idle/3},
  [{:dl, :ind, <<0xDEAD_BEEF::32>>}]
)

yields these impulses:

[
   {:timer, :reset, :connection},
   {:user, :ind, <<0xEF>>},
   {:tl, :req, %Knx.Frame{data: "", dest: 222, service: :t_ack, src: nil}}
]

so now the timer has to do some work, the user has to be notified and the :tl, :req goes into down().

I really like it. The core now is pure and so super-easy to test and understand. Even the annoying things like deferred events are super simple (just keep the impulse).

If someone is interested I can push this to github, but this is most likely very boring stuff for normal people.

I also have to look into :gen_statem but I’ve had a hard time understanding it, and the

Stacked behaviours

5 Likes

You need to always remember that when doing any type of server in Elixir, the server itself shouldn’t have any logic, instead should dispatch it to modules, so that you can rip-off this benefits that you now are aware off after following @sasajuric suggestion.

If you have not read it yet I would recommend you to read his book Elixir in Action:

The book is an eye opener for properly use server processes and message passing in Elixir :slight_smile:

I wouldn’t go that far :slight_smile: If the server is fairly small, I prefer to implement everything in the same (server) module, postponing the split until the code becomes “too large” and some clear abstractions emerge.

3 Likes

I do it from the start, unless I am only prototyping :slight_smile:

When looking into the functional core, imperative shell pattern I found this

All code can be classified into two distinct roles; code that does work (algorithms) and code that coordinates work (coordinators). [kbilsted/Functional-core-imperative-shell]

This puts the finger on the problem I had with my original implementation. The protocol-layers do their work, but in the middle sits this annoying randomly acting coordinator. The coordinator does not really behave better, if it is split into multiple modules. It has to be removed from the core and pushed to the boundaries (shell) of the application. (Just like I like management, sales and marketing on a different floor, or even better: building)

2 Likes

Refactoring the stack with the functional core, imperative shell pattern was a really pleasant experience. We have now decided to build a prototype of one our products completely in Elixir, replacing:

  • multiple C and Cpp-modules
  • node.js/express
  • python
  • cron
  • systemd
  • homebrewn OTA-updater
  • MQTT,
  • openvpn (maybe)
  • couple of bash-scripts
  • … (I’m sure I’m missing some)

Right now I don’t see what we need besides Elixir, OTP, phoenix, nerves(hub), some C for hard-realtime stuff and a couple of lines in Go to talk to the cloud.

But I’d like to also try the

Do I understand correctly that’s just sth like

def layer_0_handle(packet) do
   packet = ...   
   layer_1_handle(packet)
end

def layer_1_handle(packet) do
   packet = ...   
   layer_2_handle(packet)
end

and so on?

2 Likes

Elixir cannot replace that one to be honest. These two have different purposes in the OS lifetime and can nicely complement each other, but not replace.

Usually I’d agree with you here, though as Sebb mentions Nerves it is probably some Raspberry Pi running the current stack on a raspian.

In this scenario, the Nerves-init system is indeed the only way to go and replaces systemd.

2 Likes

correct, at least that’s the plan.

Something like that, but based on behaviours. The idea was to build each layer as a behaviour, i.e. something GenServer-like. The bottom layer (0) would internally use e.g. gen_tcp or ranch (assuming tcp is the transport). An upper layer would be implemented on top of the bottom one, e.g.:

defmodule Layer1 do
  @behaviour Layer0

  # define layer1 callbacks
  @callback ...
  @callback ...
  ...

  def start_link(arg), do: Layer0.start_link(__MODULE__, arg)

  # layer0 callbacks:

  @impl Layer0
  def handle_packet(layer_0_packet, state) do
    layer_1_packet = decode(layer_0_packet)

    # fire the event to the upper layer
    state.callback.handle_packet(layer_1_packet)

    ...

    {:reply, layer_0_out_packets, new_state}
  end

  ...
end
4 Likes

Understood, I’ll try that out, tank you!