Golang singleflght concept in Elixir

As what I’ve learned so far, if I build an API server using Elixir and Phoenix. Each requests against the API server is an independent process.
Let’s say if I have an GET API to retrieve items and it takes a long time to query from the database, therefore I decided to cache it. When a cache is missed, the process should proceed to query from database, and save the result to cache on the way responding back the result to client. To prevent large amount of concurrent requests hitting the DB when the cache is penetrated, In golang, they would implement singleflight in the cache process. I’m curious how Elixir and Phoenix would deal with this situation.

1 Like

Seems like an error-prone way to do things, but maybe I didn’t understand the context.

Depends on what kind of cache you have, let’s say for example you have a cache process backed by ETS, you can literally have the following flow if entry is not cached:

request → check cache → fetch record from DB and add it to cache

The process mailbox ensures that all messages will be processed sequentially so there is no race condition. If you want to scale this out you can create a pool of processes that would check for cache and the basic idea will still be the same.

Say when the 1st process checked cache and go to DB, and the 2nd process came to check cache before the 1st process could store the data into cache. Now we have multiple processes querying against DB. Thats the scenario I’m trying to convey, sorry if my crappy English leads to unclear statements.

Maybe this diagram will make it clearer:

image

You cannot have a race condition in this situation because cache process will always process messages sequentially, this is how processes work in elixir.

Keep in mind that this is a simplistic approach that has a inherent problem that the mailbox of cache process can be overflowed. The approach you will see in real-world will be by having cache process replaced with a pool of processes that would share the same ETS table (for example, or other medium you want to cache in).

2 Likes

Assuming that you want several clients each making the same request to get the same response, it should be very simple to roll it yourself. The general idea would be along these lines:

%% "Cache process"
receive_loop(State) ->
    receive
         {request, Pid, Ref, Request} -> 
              receive_loop(make_request(Pid, Ref, Request, State));
         {database_response, DbRef, Result} ->
              receive_loop(handle_response(DbRef, Result, State))
    end.

make_request(Pid, Ref, Request, {Cache, In, Out}) ->
    case {Cache, In} of
        {#{ Request := Cached }, #{}} ->
            %% Cache hit, reply with what we have.
            Pid ! {reply, Ref, Cached},
            {Cache, In, Out};
        {_, #{ Request := Others }} ->
            %% This request has already been made but we haven't received a
            %% reply yet. Tell the previously made request that we want the reply,
            %% too.
            {Cache, In#{ Request => [{Pid, Ref} | Others] }, Out};
        {#{}, #{}} ->
            %% Not in cache, no outstanding request: make a new request.
            %%
            %% It is very important that this request is made asynchronously,
            %% otherwise everyone will have to wait for each request to finish
            %% which makes the whole thing pointless.
            DbRef = call_your_database_asynchronously(self(), Request),
            {Cache,
             In#{ Request => [{Pid, Ref}] },
             Out#{ DbRef => Request } }
    end.

handle_response(DbRef, Result, {Cache, In, Out}) ->
    #{ DbRef := Request } = Out,
    #{ Request := Clients } = In,
    %% Respond to all clients that made the same request, then
    %% cache the result and remove this outstanding request
    %% from the state.
    %%
    %% If you only want the "single flight" functionality, just skip
    %% the cache parts in this example.
    [Pid ! {reply, ClientRef, Result} || {Pid, ClientRef} <- Clients],
    {Cache#{ Request => Result },
     maps:remove(Request, In),
     maps:remove(DbRef, Out)}.

(Apologies for putting it in Erlang, hopefully you’ll get the gist of it though :slight_smile:.)

6 Likes

There is the Cachex library…

As mentionned by the previous answers, it is simple to implement

in cache? → cache
not in cache? → query → cache the result

5 Likes

Cachex has got you covered as @kokolegorille mentioned.

Here’s a demo that shows you that the expensive work is done only once. You can paste this in e.g. dont_repeat_work.exs and just do elixir dont_repeat_work.exs after.

There:

Mix.install([{:cachex, "~> 3.6"}])

Cachex.start_link(name: :our_cache)

defmodule Worker do
  def expensive_work(id) do
    Cachex.fetch(:our_cache, "expensive_data", fn _key ->
      IO.puts(
        "#{inspect(NaiveDateTime.utc_now())}: doing expensive work on behalf of worker #{inspect(id)}"
      )

      :timer.sleep(1000)
      {:commit, "EXPENSIVE_VALUE"}
    end)
  end

  def use_expensive_work(id) do
    IO.puts("#{inspect(NaiveDateTime.utc_now())}: requesting access from worker #{inspect(id)}")
    value = expensive_work(id)
    IO.puts("#{inspect(NaiveDateTime.utc_now())}: worker #{id} received value: #{inspect(value)}")
  end
end

1..3
|> Task.async_stream(fn id ->
  Worker.use_expensive_work(id)
end)
|> Stream.run()

On my machine this returned:

~N[2024-04-09 11:31:13.743650]: requesting access from worker 1
~N[2024-04-09 11:31:13.743645]: requesting access from worker 2
~N[2024-04-09 11:31:13.743642]: requesting access from worker 3
~N[2024-04-09 11:31:13.746957]: doing expensive work on behalf of worker 3
~N[2024-04-09 11:31:14.748989]: worker 2 received value: {:ok, "EXPENSIVE_VALUE"}
~N[2024-04-09 11:31:14.748980]: worker 3 received value: {:commit, "EXPENSIVE_VALUE"}
~N[2024-04-09 11:31:14.749017]: worker 1 received value: {:ok, "EXPENSIVE_VALUE"}

Notice the {:commit, ...} tuple. That means the work has been done only once and the other 2 {:ok, ...} tuples mean a cache hit. The important part however is that this does NOT lead to all 3 workers concurrently doing the expensive work and all 3 writing to the cache at (nearly) the same time. Cachex took special care to prevent this.

6 Likes

omg, TIL about Cachex.fetch. I don’t know how many versions of this I handrolled over the years. :crazy_face:

https://hexdocs.pm/cachex/Cachex.html#fetch/4

1 Like

Happy to help!

1 Like

One thing to watch out for with Cachex.fetch and Ecto’s sandbox - the function passed to Cachex.fetch runs in a separate process from the caller.

1 Like

Wow, this is fantastic! Thanks for the demo code.
one additional question tho, why was the worker 3 doing the expensive work instead of worker 1? worker 1 was the first to be invoked so I thought it would make sense if it reaches the expensive work first

Porting patterns from one language to another is really helpful.

Take a look at the timestamps. It seems the logs are out of order in regards to time. Worker 3 requested work first.

1 Like

The 3 workers were scheduled to start at the same time but the runtime chose to start worker #3 first. Pretty normal for async runtimes for things to be out of order when jobs start. Important part is that eventually everything converges… if you write your algorithm correctly, of course. :slight_smile:

Yeah, it’s educational. Though note that the BEAM VM has no mutexes per se, they can be emulated via various means. Cachex chose to do it with an :ets table and a Process dictionary. If you are curious here’s the meat of the implementation: