Task.await terminates GenServer because of timeout. How to fix?

Why you don’t use :httpc library from erlang ?

iex(1)> :inets.start
:ok
iex(2)> :httpc.request(:get, {'http://localhost/index', []}, [timeout: :timer.seconds(1)], [])
{:error, :timeout}

http://localhost/index has a sleep for 6 seconds

Also :http has options for async requests and to notify the caller:

iex(1)> :inets.start
:ok
iex(2)> pid = self()
#PID<0.84.0>
iex(3)> :httpc.request(:get, {'http://localhost/index', []}, [], [sync: false, receiver: pid])
{:ok, #Reference<0.2595546524.2142502913.38782>}
iex(4)> flush() 
:ok
iex(5)> flush() #after 6 seconds
{:http,
 {#Reference<0.2595546524.2142502913.38782>,
  {{'HTTP/1.1', 200, 'OK'},
   [{'connection', 'Keep-Alive'}, {'date', 'Wed, 14 Jun 2017 10:48:44 GMT'},
    {'server', 'Apache/2.4.25 (Ubuntu)'}, {'content-length', '9'},
    {'content-type', 'text/html; charset=UTF-8'},
    {'keep-alive', 'timeout=5, max=100'}], "index.php"}}}
:ok

make an async request and store reference in state, http will notify the gen_server by calling handle_call({:http, {:ref, result}, state) when request is finished, after some time you can call gen_server to give the response … so the problem with http timeouts is resolved.

If you want to know how to manage exists from Task.Supervisor.async_nolink for example you can take a look over this topic

Hope will help :slight_smile:

1 Like

Sometimes it takes some digging to figure out how things work - for example HTTPoison is based on hackney, while HTTPotion is based on ibrowse. HTTPoison’s options are documented and found in the code here. The ones that stick out are:

  • :timeout - timeout to establish a connection, in milliseconds. Default is 8000
  • :recv_timeout - timeout used when receiving a connection. Default is 5000
  • :stream_to - a PID to stream the response to

An asynchronous example is found here.

The following code demonstrates the beginnings of using HTTPoison’s asynchronous functionality within a GenServer (without using Task):

defmodule Poison do
  use GenServer

  defp handle_response(ref, response) do
    IO.puts "Received all response parts for request #{inspect ref}"
    IO.inspect response
  end

  defp handle_request_error(details) do
    IO.puts "A request failed with reason: #{inspect details.reason}"
  end

  defp async_request(response_map, timeout, recv_timeout) do
    url = "http://httparrot.herokuapp.com/get"
    body = ""
    headers = []
    options =[stream_to: self(), timeout: timeout, recv_timeout: recv_timeout]
    case HTTPoison.request :get, url, body, headers, options do
      {:ok, result} ->
        Map.put response_map, result.id, [] # start collecting a new response
      {:error, details} ->
        handle_request_error details
        response_map
    end
  end

  defp attach_response(response_map, response),
    do: Map.update! response_map, response.id, &([response | &1])

  def response_complete(response_map, ref) do
    case Map.get response_map, ref, :none do
      :none ->
        {response_map, :none}
      parts_in_reverse ->
        response_parts = Enum.reverse parts_in_reverse
        new_map = Map.delete response_map, ref
        {new_map, response_parts}
    end
  end

  defp response_error(response_map, error_msg) do
    new_map = Map.delete response_map, error_msg.id
    IO.puts "Request #{inspect error_msg.id} resulted in an error response with reason: #{inspect error_msg.reason}"
    new_map
  end

  ## callbacks: message handlers
  def handle_cast(:regular, state) do
    new_state = async_request state, 8000, 5000 # defaults
    {:noreply, new_state}
  end
  def handle_cast(:short_connect, state) do
    new_state = async_request state, 2, 5000
    {:noreply, new_state}
  end
  def handle_cast(:short_receive, state) do
    new_state = async_request state, 8000, 5
    {:noreply, new_state}
  end

  def handle_info(%HTTPoison.AsyncStatus{} = msg, state) do
    new_state = attach_response state, msg
    {:noreply, new_state}
  end
  def handle_info(%HTTPoison.AsyncHeaders{} = msg, state) do
    new_state = attach_response state, msg
    {:noreply, new_state}
  end
  def handle_info(%HTTPoison.AsyncChunk{} = msg, state) do
    new_state = attach_response state, msg
    {:noreply, new_state}
  end
  def handle_info(%HTTPoison.AsyncEnd{} = msg, state) do
    {new_state, response} = response_complete state, msg.id
    handle_response msg.id, response
    {:noreply, new_state}
  end
  def handle_info(%HTTPoison.Error{} = msg, state) do
    new_state = response_error state, msg
    {:noreply, new_state}
  end

  ## callbacks: lifecycle
  def init(_args) do
    {:ok, %{}} # use map to collect the various parts of the response
  end

  def terminate(_reason, _state) do
    :ok
  end

  ## public interface
  def start_link,
    do: GenServer.start_link __MODULE__, []

  def stop(pid),
    do: GenServer.stop pid

  ## client interface
  def short_connect(pid),
    do: GenServer.cast pid, :short_connect

  def short_receive(pid),
    do: GenServer.cast pid, :short_receive

  def regular(pid),
    do: GenServer.cast pid, :regular

end

.

$ iex -S mix
iex(1)> {:ok,pid} = Poison.start_link
{:ok, #PID<0.398.0>}
iex(2)> Poison.short_connect pid     
:ok      
A request failed with reason: :connect_timeout
iex(3)> Poison.short_receive pid
:ok      
Request #Reference<0.0.6.4279> resulted in an error response with reason: {:closed, :timeout}
iex(4)> Poison.regular pid      
:ok      
Received all response parts for request #Reference<0.0.6.4284>
[%HTTPoison.AsyncStatus{code: 200, id: #Reference<0.0.6.4284>},
 %HTTPoison.AsyncHeaders{headers: [{"Connection", "keep-alive"},
   {"Server", "Cowboy"}, {"Date", "Thu, 15 Jun 2017 03:04:49 GMT"},
   {"Content-Length", "493"}, {"Content-Type", "application/json"},
   {"Via", "1.1 vegur"}], id: #Reference<0.0.6.4284>},
 %HTTPoison.AsyncChunk{chunk: "{\n  \"args\": {},\n  \"headers\": {\n    \"host\": \"httparrot.herokuapp.com\",\n    \"connection\": \"close\",\n    \"user-agent\": \"hackney/1.8.6\",\n    \"x-request-id\": \"db9e3183-036b-455f-bb9c-2c0dc379ce9a\",\n    \"x-forwarded-for\": \"72.39.127.107\",\n    \"x-forwarded-proto\": \"http\",\n    \"x-forwarded-port\": \"80\",\n    \"via\": \"1.1 vegur\",\n    \"connect-time\": \"0\",\n    \"x-request-start\": \"1497495889950\",\n    \"total-route-time\": \"0\"\n  },\n  \"url\": \"http://httparrot.herokuapp.com/get\",\n  \"origin\": \"10.171.119.12\"\n}",
  id: #Reference<0.0.6.4284>}]
iex(5)> Poison.stop pid
:ok      
iex(6)>  

Now one interesting thing to note is that the error for the connection timeout (as opposed to the receive timeout) is reported as a return value of request. This suggests that request actually blocks until it has established a connection - it is only from that point on that asynchronous operation begins.

It is therefore in your best interest to make the :timeout value much smaller than the default of 8000 (8 secs). The :recv_timeout can be longer as this is entirely handled by hackney (HTTPoison) and won’t lock up your GenServer.

here’s a parallel crawl function, it’s called from a genserver process and handles timeouts. You can do something similar:

@doc """
Crawl a list of URLs and return responses.
"""
@spec crawl(list()) :: List
def crawl(list) do
  list
  |> Enum.map(&Task.Supervisor.async(TaskSupervisor, fn() -> visit(&1) end))
  |> Enum.map(fn(task_id) ->
    try do
      Task.await(task_id, 18_000)
    catch _reason, _info ->
      {:timeout, "Process reached timeout while making an HTTP request"}
    end
  end)
end
1 Like
  1. because of Enum.map, in the worst case it’ll return a value in 18 seconds blocking the control flow. how is better?
  2. how try … catch will prevent it from sending the :exit message ?

the control flow of this process is “visiting N urls every X seconds”, this list will be crawled as long as the longest url will take, so there is no unnecessary blocking unless I didn’t understand your situation

after timeout the task will error out, this error can be caught but you can also trap the exit message. In any case it would be an exception, normally you’d get an error response after 5-10 seconds

Task.await will not error out it will exit/1, which is not catchable:

iex(2)> try do
...(2)>   exit("foo")
...(2)> catch
...(2)>   _ -> :catched
...(2)> end
** (exit) "foo"

the current thread will be blocked. what’s not clear?

Well, somewhere you have to wait, or you hadn’t choosen a call from the beginning, hadn’t you?

Actually exit/1 can be caught:

iex(1)> try do
...(1)>   exit("foo")
...(1)> catch 
...(1)>   :exit, _ -> :caught
...(1)> end
:caught

however your objection is still intact:

  • the timeout doesn’t have anything to do with the Task, which just keeps running - it is the await/2 process that times out
  • the termination of the await/2 process emits an exit signal to all linked processes. That exit signal cannot be “caught” - the exit signal will terminate any linked processes which aren’t trapping exits and those which do trap exits will get an :EXIT message in their mailbox.

.

  1. Kernel.exit/1 can only be caught inside the process that invokes it - if it is allowed to escape the process it turns into an exit signal - at which point it is too late to catch it anywhere.

  2. Process.exit/2 is an entirely different animal. While exit/1 terminates the process that invokes it, exit/2 sends an exit signal to the specified process usually with the intent to “tell that process to terminate”. So exit/2 cannot be caught under any circumstances (but it can be trapped - unless the reason is :kill).

.

iex(2)> try do
...(2)>   Process.exit(self(),"foo")
...(2)> catch                       
...(2)>   :exit, _ -> :caught       
...(2)> end                         
** (EXIT from #PID<0.87.0>) "foo"

.

iex(1)> Process.flag :trap_exit, true 
false
iex(2)> try do                       
...(2)>   Process.exit(self(),"foo") 
...(2)> catch                        
...(2)>   :exit, _ -> :caught        
...(2)> end                          
true
iex(3)> flush()
{:EXIT, #PID<0.103.0>, "foo"}
:ok
3 Likes

no[quote=“NobbZ, post:49, topic:5987, full:true”]
Well, somewhere you have to wait
[/quote]

no. why?

Your original plan does do the work in a handle_call/3, so the caller will wait for an answer until that GenServer.call times out or it gets a reply. Sou you have to wait.

Yes, but that doesn’t mean that a caller has to wait for a result.

Of course it does. The caller of GenServer.call, has to wait for a result at least as long as the timeout specifies.

No, it doesn’t.

Please explain me, why you think that the caller does not need to wait for a result, GenServer.call is a synchronous operation.

If you don’t want the caller to wait you can use GenServer.cast, create async requests and when you want interrogate GenServer

But cast doesn’t even guarantee that the GenServer will receive that information.

Neither does call technically. ^.^

The GenServer call is just a GenServer cast with a receive for the result immediately after it (and a different ID atom of course). :slight_smile:

1 Like

Due to the fact, that it is documented for cast that it might happen that the GenServer doesn’t receive the message, but it is not documented for a call, I have to assume, that we have the guarantee in a call…

1 Like

Documentation should be fixed then. ^.^

Doing a GenServer.call is like doing:

ref = GenServer.cast(blah blah blah)
receive do
  {:special, :cast, :response, ^ref, result} -> result
after 5000 -> raise puke
end

And so forth. It can entirely fail, and that is why the timeout exists too, not just if the remote process does not return in time, but if something else happened, like the GenServer crashed before you could send the cast to it, or if it is on another node that just had a netsplit. :slight_smile:

You can never assume that a message absolutely positively will always reach its intended process, just the way reality works and so the BEAM handles it via timeouts and monitors and other links and other measures unlike almost every other language that just prays it works and dies horribly when it does not. :slight_smile:

1 Like