How to write a caching server in Elixir

There is an oracle on the moon, who can give profound answers to all questions. The oracle is all knowing, but requires a minute or two to ponder each question before answering. Luckily he has 4 brains, so he can ponder 4 questions simultaneously. Unfortunately the oracle has very bad memory, so if you ask the same question over and over again, he will have to think about it every time.

How do we write an OracleServer in Elixir that can query the oracle for answers? It needs to do the following:

  • allow multiple people to ask questions at once, because the oracle has 4 brains
  • cache all the answers, to avoid asking repeat questions
  • if person A asks a question, and person B asks the same question while the oracle is still pondering it, OracleServer would smartly avoid sending the same question to the oracle again, but informs both A and B when the answer comes back
  • the answer is not always 42

Obviously it’s a thinly disguised way to describe a well known problem. I am familiar with solutions in other languages, but am having trouble figuring out how to do it in Elixir.

1 Like

Welcome @highmountaintea to elixir.

Cachex can help you adding cache to your app.

One thing I did not understand from your question is you want to write as learning exercise or use it in a project.

GenServers and Agent can hold data in memory. They can be used to store state or data in memory. Best fit if you want to implement a Cache as a learning exercise.

Cachex can be used directly as cache in your project if you are looking to just use a cache library - like fetch some config from source and store in memory, refresh every n seconds, etc.

Hi Kartheek. It’s not meant for an exercise or an existing project. I am more interested in understanding how things are done in Elixir.

I did consider using Agent for this particular problem, but I am not sure if it would really satisfy my conditions as set above.

I took a look at Cachex you mentioned, but can you give an example of how Cachex would help prevent the situation I mentioned above? if person A asks a question, and person B asks the same question while the oracle is still pondering it, OracleServer would smartly avoid sending the same question to the oracle again, but informs both A and B when the answer comes back

You need to keep track of the question and who has asked it. When you have the answer, you send it to the references you kept track of. When a GenServer receives a call, you have the return address. Keep track of it.

awaiting = %{”Why?" => [pid<0.90.0>, pid<0.765.0>]}

Do you want all the questions & answers kept in memory, on disk or in a db?

You’re going to need a worker pool. Do you want to write that too or use a library?

1 Like

Is this is the same thing you are looking for ?
https://hexdocs.pm/cachex/reactive-warming.html#courier

As of v3, fallbacks changed quite significantly to provide the guarantee that only a single fallback will fire for a given key, even if more processes ask for the same key before the fallback is complete. The internal Courier service will queue these requests up, then resolve them all with the results retrieved by the first. This ensures that you don’t have stray processes calling for the same thing (which is especially bad if they’re talking to a database, etc.). You can think of this as a per-key queue at a high level, with a short circuit involved to avoid executing too often.

The new Courier service in Cachex v3 will actually queue the second and third calls to fire after the first one, rather than firing them all at once. What’s even better; the moment the first call resolves, the second and third will immediately resolve with the same results. This ensures that your fallback only fires a single time, regardless of the number of processes awaiting the result. This change in behaviour means that the code above would result in "key" having a single value of 1 as the second and third never fire. Although this results in a behaviour change above, it should basically never affect you in the same way as the code above is deliberately designed to highlight the changes.


Cachex takes care of these things automatically.

2 Likes

Hi cmo, thanks for your answer. So when the answer is available, what mechanism should I use to send the answer back to the requester(s)? I think I can send a plain message, but is there a more OTP way of doing it?

Also, if we are awaiting for answer inside the OracleServer, would it result in blocking, preventing others from asking questions simultaneously?

Hi kartheek, I am still trying to digest the fallback mechanism fully, but I think it is what I am looking for.

It’s nice to have a caching library that handles it automatically.

Forgot to answer your other questions…

Do you want all the questions & answers kept in memory, on disk or in a db? memory

You’re going to need a worker pool. Do you want to write that too or use a library? I would prefer to use a library

Ok well you’ll probably want to use ETS to cache your answers and there are a few worker pool libraries. Poolboy is a pretty common choice.

No problem just sending the messages.

Elixir In Action is super helpful in exposing you too all this sort of stuff.

Ha, this question did spring into my mind while I was reading Elixir In Action :slight_smile: I remembered that each handle_call is blocking, so was trying to figure out how to design a non-blocking server that also handles caching.

Maybe I should re-read chapter 7 again, and some of it might become clear to me.

See how he keeps track of it in this Port example.

1 Like

Cachex has get, put, fetch, clear, etc.

  • get returns value if it is present in cache, nil if its a cache miss
  • fetch executes fallback function on cache miss.

For example - lets have a fallback function which returns same value after delay of 5 seconds.

iex(1)> import Cachex.Spec
iex(2)> Cachex.start_link(:my_cache, [ fallback: fallback(default: fn x -> :timer.sleep(5000)
...(2)> x end) ]) 
iex(3)> Cachex.get(:my_cache, 1) #get will return nil as there is no entry
{:ok, nil}

fetch will invoke fallback function when there is a miss - fallback function is executed and value is put into the cache after delay of 5 seconds.

iex(4)> Cachex.fetch(:my_cache, 1) # 5 sec delay due to timer.sleep in fallback function
{:commit, 1}
iex(5)> Cachex.get(:my_cache, 1) # get will return value as it exists in cache
{:ok, 1}

Now fetch will return value from cache without executing fallback function

iex(6)> Cachex.fetch(:my_cache, 1) # returns immediately, fallback not invoked
{:ok, 1}
2 Likes

Thanks for your detailed explanations. Yes, this will work for the scenario I am describing.

1 Like

I gave this some more thought, I presume the easiest way is to hold a dictionary of worker processes, each worker process only answers one question. When multiple clients ask for the same question, they’ll queue up on the same worker process, so the same question won’t be asked multiple times. Once the answer arrives, it is cached so all queued and subsequent questions are answered from the cache.

OracleWorker:

defmodule OracleWorker do
  use GenServer
  
  defp query_oracle(question) do
    Process.sleep(30000)
	String.length(question)
  end
  
  def start(question) do
    GenServer.start(__MODULE__, question)
  end
  
  def ask(pid) do
    GenServer.call(pid, :ask)
  end
  
  def init(question) do
    {:ok, {question, nil}}
  end
  
  def handle_call(:ask, caller, state) do
    case state do
	  {question, nil} ->
	    answer = query_oracle(question)
		{:reply, answer, {question, answer}}
	  {question, answer} ->
	    {:reply, answer, state}
	end
  end
end

OracleServer:

defmodule OracleServer do
  use GenServer
  
  def start do
    GenServer.start(__MODULE__, nil)
  end
  
  defp get_worker(pid, question) do
    GenServer.call(pid, {:get_worker, question})
  end
  
  def ask(pid, question) do
    worker = get_worker(pid, question)
	OracleWorker.ask(worker)
  end
  
  def init(_) do
    {:ok, %{}}
  end
  
  def handle_call({:get_worker, question}, _, cache) do
    case Map.fetch(cache, question) do
	  {:ok, worker} ->
	    {:reply, worker, cache}
	  :error ->
	    {:ok, new_worker} = OracleWorker.start(question)
		{:reply, new_worker, Map.put(cache, question, new_worker)}
	end
  end
end

This means each question uses one process, so it may use a bit of memory.

For the worker pool portion (since you need to limit the number of workers explicitly) I would probably use a worker pool library. I don’t have direct experience with one but I think two of the most popular are:

You could also look into Broadway or GenStage (although GenStage is a little low level and a bit tricky to use).

For the solution I mentioned just now, a pool would not work, because I need to keep all worker processes alive to cache the answers.

Each worker caches one answer, so we would lose cached answers if worker processes are allowed to terminate.

Ah, I meant using the pool in combination with Cachex and not in the worker processes. Although I now realize that was not very clear. The reason that using a pool is important with Cachex for your use-case is that If you’re using the fallback function with Cachex then you could have more than 5 fallback functions running at the same time if you’re trying to fetch more than 5 keys at the same time. By using a pool you can explicitly limit the number of fallback functions that are currently accessing the oracle on the moon.

2 Likes

Ah, got it. Thanks for the tip. I will keep it in mind when I use Cachex.

1 Like