Promises and resolving them

Speaking in terms of promises, I want a server which creates a promise, returns it to a client, and resolves it sometimes in the future; a client would await/yield for result.
General schema:

# GenServer
def handle_call(:get_promise, from, promises) do
  promise = Task.??? # can't say what task should return right now; it's value must be filled later by this module
  
  {:reply, promise, [promises | promise], {:continue, :resolve_promise}}
end

def handle_continue(:resolve_promise, promises) do
  ... # resolve promises now or maybe later
end

# Client
promise = Server.get_promise()
Task.await(promise)

The thing is Task creator cannot provide a function returning a value because it will be available sometimes in the future (e.g. result is a response of external API call).

I’m struggling to do it with Task and Task.Supervisor and looks like I’m missing something. Or Task is missing some of Promises flexibility?

I come up with some ideas which I don’t like:

  • Actually make “promise” resolving inside Task and send some message to from. Bad idea because a user of the module has to modify itself to be able to use the module.
  • Same but instead of sending message to client, return client a function creating a task. So the task will be linked to client process. Am I going to monitoring hell with this?
  • Save resolving values into state and wait for them inside Task:
    def handle_call(:get_promise, from, responses_map) do
      promise_uuid = get_uuid_somewhere()
      promise = Task.Supervisor.asynk_nolink(MySup, fn ->
        # somehow wait for value in responses_map under key promise_uuid; I guess task would need to be a module to get `while loop`
      end)
    
      {:reply, promise, [promises | promise], {:continue, {:resolve_promise, promise_uuid}}}
    end
    
    This looks bad too because it is verbose and smells with reinventing the wheel.

Also I cannot google why Task has to be owned by calling process and cannot be awaited by anyone (or cannot transfer ownership or change it upon creation).

Also I cannot google why Task has to be owned by calling process

A task is an independent process which can return the computed result back the the “owner” process that spawned it.

start_link can wrap for reply:

so the result is sent back to the “owner”.

By using start/3 or start_link/3 instead of async the “owner” is indicating that it doesn’t want a result sent to it. However the function passed to start or start_link could contain code to send the result to another process entirely.

The only problem is that the task wouldn’t be linked to the receiver process - so the receiver process wouldn’t know whether the task died or the task would continue to run even if the receiver process died.


await/2 is simply a selective receive for the final message from the task.

Typically processes don’t await results - they simply process the message when it arrives in the process mailbox. In GenServer the process mailbox is managed by the OTP behaviour so a task reply would be fed to the handle_info callback.

Can you elaborate on your overall goal? This seems like a bit of an XY problem since normally concepts like Promises are used in languages to avoid blocking, when there are different approaches you’d want to take in Elixir.

5 Likes

I have external API with rate limits. I want to make a client to that API (“server” in my terms) which would respect limits, but the rest of an app does not need to know about limits. So, if server is asked for data from external API when limits are exhausted, it could postpone actual call for some time. I want this delay to be transparent for server’s users. Ideally I do not want to bring heavy machinery like message brokers because in JS or Java my case is super simple to do, right the way explained.

We may approach this question from another side. Promises are widely adopted in many languages including the most popular one and the concept (and maybe the standard) is familiar to many developers coming from other languages. Does it make sense to transfer people from Promises to Elixir’s basic concurrency? Would Task be good for it?

For me Task is a bit confusing so far because it could be awaited only by caller process - I cannot find a reason to not allow ownership transfer. And it breaks simple (in other languages) cases for me, like the one explained.

so the result is sent back to the “owner”.

Yes, but I cannot find a way to change the owner to another process. So one process would create Tasks for another to await.

await/2 is simply a selective receive for the final message from the task.

Thanks, I think I should look to Elixir sources more.

:wave:

I think it’s because it’s linked to the calling process so as to keep the sequential semantics such that if the task dies, the caller dies with it. In theory you might be able to transfer a task by unlinking it from the caller, and then linking it to some other process. You’d also need to take care of monitors.


As for your actual problem, I’d probably try a genserver as the API wrapper which would use initial :noreplys with subsequent GenServer.reply/2 to achieve something like

defmodule API do
  use GenServer

  defstruct inflight_requests: %{}

  @type t :: %__MODULE__{
    inflight_requests: %{ref => GenServer.from}
  }

  @doc false
  def start_link(opts) do
    GenServer.start_link(__MODULE__, opts, name: __MODULE__)
  end

  @impl true
  def init(_opts) do
    {:ok, %__MODULE__{}}
  end

  def request(something) do
    GenServer.call(__MODULE__, {:request, something}, timeout: :infinity)
  end

  @impl true
  def handle_call({:request, something}, from, %__MODULE__{inflight_requests: inflight_requests} = state) do
    # or use *_nolink if you don't want to crash this process if the task dies
    task = Task.async(fn -> request(something) end, timeout: :infinity)
    {:noreply, %{state | inflight_requests: Map.put(inflight_requests, task.ref, from)}}
  end

  @impl true
  def handle_info({ref, result}, state) do
    inflight_requests =
      case Map.pop(state.inflight_requests, ref) do
        {nil, inflight_requests} -> 
          inflight_requests
      
        {from, inflight_requests} ->
          GenServer.reply(from, result)
          inflight_requests
      end
    
    {:noreply, %{state | inflight_requests: inflight_requests}}
  end
end
1 Like

The interaction pattern you may looking for:

GenServer A       GenServer B

call/3 -------> handle_call/3 # A requests work from B
  <----------------------     # B acknowledges request to A in return message (result value of call/3)

handle_cast/2 <------- cast/2 # B casts result of work back to A
                              # A processes result in handle_cast

i.e. processes typically don’t “wait” for something to happen - instead they process messages as they arrive and just don’t do anything when there aren’t any messages to process.

Also have a look at this topic:

1 Like

Right, this is entirely possible to do without using promises. Just block the client. From the client side it can look like:

result = SomeAPI.some_call(data) # this will block until the API is available

This is to say, it looks exactly like any normal function call. @idi527’s post has a few issues I think. Task.async doesn’t take a timeout option, that’s just for Task.await. It also doesn’t appear to meaningfully rate limit either since all requests are just spawned in a new task and allowed to happen concurrently.

The essence of the idea is there though, and there are libraries out there that abstract this idea.

The fundamental idea though is that the some_call function ultimately boils down to:

def some_call(data) do
  send(some_process, {:request, data})
  receive do
    {:reply, result} -> result
  end
end

You send the request data to a process that regulates your request rate, and then you block the client by waiting with receive until you get a reply. Promises are required in other languages because receive constructs don’t exist, or would block everything. Fortunately we don’t have to worry about that in Elixir.

1 Like

Hmmm, I had an impression that owner and linked process are different things.


That is similar to my current implementation.

GenServer.reply(from, result)

For this to work client needs to receive result, right? If yes, calling GenServer concurrently would not work because you don’t know which result corresponds to which call.


Also I don’t like that way much because it is much bigger and complex than old plain stupid promises.

So yes, that way does the job, I’m looking for better solution: shorter, not breaking concurrency in client, not breaking anything I don’t see yet.

Wouldn’t “send-receive” make client unable to call server concurrently? Client would need to block for every result separately.

Usually you spawn a new process for every web request so that one process being blocked does not affect the others. That way you can write simpler-to-understand blocking code.

So, I wrap server’s calls into Tasks rather than creating Tasks inside server?

No, it just waits for the function call to finish.

Well I won’t comment further as I only know how web servers like Cowboy do it. But in any case you should be able to formulate it so that the calling side just gets a blocking function call without the need for promises. Like for example Ecto when you call some DB function, it has a certain amount of connections and waits until one is ready, then waits for the DB call to finish, all transparent to the caller.

1 Like

Oh. I totally misunderstood :noreply.

I guess I’d take a look how they do it there.

I’m working on something with a similar implementation. I’m using a GenServer to handle caller requests and centralize them (b/c I want to force delays between requests, etc.).

Let’s say we have a caller process, the GenServer, and the API.

From the caller’s point of view, it just calls MyModule.request(...) and gets a reference() back. Then, the client waits for the actual value with a selective receive:

ref = MyModule.request(...)

receive do
  {^ref, result} -> # use the result
end

request will make a call to the GenServer, which looks something like (going from memory):

def handle_call({:request, opts}, {pid, _ref}, state) do
  ref = make_ref()

  %Task{ref: task_ref} = Task.async(function_that_calls_the_api_and_does_the_work(opts))

  {:reply, ref, %{state | tasks: Map.put(tasks, task_ref, {ref, pid})}}
end

Then (still in the GenServer), I have a handle_info to forward the task result to the caller once it’s finished:

def handle_info({task_ref, result}, state) do
  {ref, pid} = Map.get(state.tasks, task_ref)
  send(pid, {ref, result})

  {:noreply, %{state | tasks: Map.delete(state.tasks, task_ref)}}
end

The advantage of doing it this way as opposed to GenServer.reply is that there’s no need to handle timeouts for long-running tasks.

2 Likes