Wrap async process for GenServer.call?

I have a process, currently implemented as a GenServer, that acts as a middleman for a 3rd party service, handling inbound and outbound messages.

Responses aren’t immediate. Let’s say you’re subscribed to A, you’re now receiving A messages. You send “subscribe B”, several more A messages arrive, then “confirm subscribe B”, followed by a mix of A and B messages.

How can I take the async subscribe/confirm cycle and make it an implementation detail? I’d love for this API to be a standard GenServer.call().

From what I understand your problems -
There are two possibilities that are causing your problem. It could be either be one of this below -

  1. First, when you receive a first receive message, there is are steps of receives which are not the same patterns of the first receive. And by the time you finish all of the receives that follow the first one. There are already messages of the same pattern of the first receive in your process message box.
  2. Second there are single receive block but with different patterns and each patterns usually take very long time to send a finish it execution and send a reply back. So there are many messages in the received process receive box already.

To solve the first one - there is tool call pollboy that when configure will set a pool of server processes. Please see - poolboy-example
There is a sample video on youtube about poolboy please see - this video. This video talks how he use this toll here.

To solve the second one - There is an optimizationn pattern written from ParTe. I think it’s implemented in wrangler refactor tool for eralng. It’s suggest that after the receive the gen_server do some initialization and then spawn another process, which eventually send back the reply to the gen_server. While the spawn the process do some work. Gen_Server already {:no_reply} back to the initiate process. And there are some changes to the initiate process that will have to wait for the reply from the spawn process instead of the gen_server. By doing this the gen_server will be available to receive a message sooner. The mechanism is similar to this I will have to check docs about this to give you more details on this.

I hope you understand the idea I presented here.


Dev.

If I understand correctly, you want the client of the subscriber GenServer to issue a call which will e.g. return :ok when the confirmation arrives? If that’s so, you could have something like this.

In handle_call you don’t reply immediately, but store the from info into the state:

def handle_call({:subscribe, topic}, from, state) do
  send_subscription_message(topic)
  {:noreply, store_subscription(state, topic, from)}
end

When the confirmation message arrives (e.g. in handle_info), you need to find the corresponding from and send the response with GenServer.reply:

def handle_info(inbound_message, state) do
  if (subscription_confirmation?(inbound_message)) do
    from = subscriber(state, topic(inbound_message))
    GenServer.reply(from, :ok)
  else
    # ...
  end
end

I’d also recommend monitoring the client process from subscription handle_call. If the client crashes, you’ll get a :DOWN message, and when that happens you can delete all the associated data from the GenServer state.

6 Likes