Advice on how to design this in OTP?

Hi all! I’m quite new to Elixir/OTP/the actor model so I’m having trouble deciding in my head how I’m going to go about designing certain functionality in an app that I’m working on. Basically, this portion of my application is going to receive messages containing information which needs to be written to a file, and then it should queue up these operations as they come in, and finally the queue should be consumed one-at-a-time, writing each piece of information to the file sequentially until the queue is empty.

So far, I’ve created a quick GenServer queue, but I’m having trouble figuring out the architecture from here because the messages which need to be written to the file will be coming in at any time whenever the application is running, so each time a message comes in, I would have to add it to the queue, and then do some checking to figure out if the queue is currently being consumed, and if not, tell the worker process to start consuming the queue until it is empty again.

I’m just looking for some very high-level information about how to best architect/design this kind of thing. I’m trying to keep the implementation as simple as possible, yet make sure to separate the worker process from the queue (and the server process also if it is necessary to have one). Any advice would be appreciated, I’m trying to get my hands dirty and get a handle on OTP with this project. Thanks :slight_smile:

So you only want to consume messages when there has not been a message in some period of time?

Not really, I’m not sure I explained myself well. The queue may or may not be empty at any time as messages will come from outside sources whenever they come. So, the queue could get a flood of messages for a few minutes and get quite large, or it may not receive any messages for a while and stay empty. The problem I’m having is how to design the worker process which will consume the entries in the queue sequentially. When the queue is empty, the worker process has no work, and there is nothing to be done. However, when the queue receives a message and therefore is not empty, the worker process has to be alerted that the queue is no longer empty and that it now has work to do (and it should start consuming the queue sequentially). However, while the worker process is sequentially processing queue entries, more messages may come in and therefore the queue might have new entries being added to it as the worker is processing current entries. The worker should continue consuming the queue entries until it is entirely empty, and then it should cease work (as there is no more work to do) until the queue starts receiving messages again. I hope I explained myself well enough here :stuck_out_tongue:

I’m having trouble deciding how to design this system so that when the queue receives a message, it can tell the worker process to start processing queue entries as there are now entries in the queue which need to be processed. But, if the queue receives a message and the worker process is already working, there is nothing to do since the worker will eventually handle the new entry. However, I’m not sure how to handle this detection of whether the worker process is currently working or not.

I probably don’t understand your problem completely, but maybe you can use the process’s mailbox for this?

Here’s what I mean, say you have an application like this

# lib/test/application.ex
defmodule Test.Application do
  @moduledoc false
  use Application

  def start(_type, _args) do
    children = [
      {Test.Worker, ["somefile.dat"]}
    ]

    opts = [strategy: :one_for_one, name: Test.Supervisor]
    Supervisor.start_link(children, opts)
  end
end

# lib/test/worker.ex
defmodule Test.Worker do
  use GenServer

  def start_link(file) do
    GenServer.start_link(__MODULE__, [file], name: __MODULE__)
  end

  def init(file) do
    {:ok, file}
  end


  def handle_info({:write, data}, file) do
    # might be a bit more efficient to use :raw
    # and file descriptor with IO instead of filename ...
    File.write!(file, data, [:append]) 
    {:noreply, file}
  end
end

Then starting the shell and sending some messages to the worker process will result in a new file, somefile.dat.

iex(1)> send(Test.Worker, {:write, "hello"})
{:write, "hello"}
> cat somefile.dat
hello

You can also create more than one worker process and pool them together with something like :poolboy. Or you can make the processes be short-lived, or :transient, such that they are only “alive” when there are messages to consume.

1 Like

I’d probably just use Phoenix.PubSub to send it work.

But the proper OTP way is just make the worker a GenServer, and make the ‘queue’ be its mailbox, just consume things as you get them. :slight_smile:

1 Like

To me the worker sounds awful lot like GenStage and Flow. The process that receives and queues the messages from the outside is a GenStage producer, the process that is working on the messages is a GenStage consumer.

Even if you don’t actually use GenStage to implement this, you might look at the GenStage producer documentation and examples. Basically the worker issues “demand” to the producer. The producer keeps track of how much “demand” has built up and, when new work comes in, pushes that much work our to the worker(s). The workers handle the work that’s been given to them and respond by sending new “demand” to the producer.

1 Like

I would only use genstage if there was a possibility of a mailbox overflow, otherwise it’s overkill.

1 Like

@OvermindDL1 @idi527 Thank you both! I think I may have been slightly overthinking the architecture. I’m going to try out using the process’s mailbox as the queue and then writing to the file based on the messages there. However, I think I may want to keep the state and the worker separated so that if the worker crashes, I can start another worker with the messages to be processed. Do either of you have any advice on how to handle that?

I think Phoenix PubSub and GenStage would be overkill as I’m trying to keep this as simple as possible but I appreciate those suggestions as well :slight_smile:

Either pipe messages through a genserver that dispatches to other genserver workers when they request something to work on, and there are lots of libraries that encapsulate this design, GenStage is even kind of one of them so it would work for this. :slight_smile:

1 Like

Another trick is to store the state in an ETS table that is owned by the supervisor of the process that is working on it. Steve Vinoski’s Blog: Don’t Lose Your ets Tables

2 Likes