Sending an HTTP request each second using recurring work with genserver

I am trying to make an HTTP request each second.

I have created this genserver.

defmodule Recorder do
  use GenServer

  def start_link(args) do
    id = Map.get(args, :id)
    GenServer.start_link(__MODULE__, args, name: id)
  end

  def init(state) do
    schedule_fetch_call()
    {:ok, state}
  end

  def handle_info(:jpeg_fetch, state) do
    spawn(fn ->
      IO.inspect("I am being called")
      IO.inspect(DateTime.utc_now())
      Fincher.request(:get, state.url) |> IO.inspect()
    end)
    schedule_fetch_call()
    {:noreply, Map.put(state, :run, true)}
  end

  defp schedule_fetch_call do
    Process.send_after(self(), :jpeg_fetch, 1000)
  end
end

I am starting genserver with such state

  defp get_children do
    Enum.map([
      %{
        id: :hdipc,
        url: "http://77.66.206.55/jpgmulreq/1/image.jpg?key=1516975535684&lq=1&COUNTER"
      }
    ], fn(camera) ->
      Supervisor.child_spec({Recorder, camera}, id: camera.id)
    end)
  end
end

Code is quite simple so I am posting everything.

Questions:

If I don’t spawn each request, this doesn’t even work how it supposed to work as each second. What If I don’t spawn it then how it will work how it suppose to work? each second?

suppose there are 10 seconds. (there will be more than that, infinite as it will run as time grows)

each second shooting an HTTP call and of course, it will take time to complete, what will happen to the next second? it simply means it will lose all the next seconds until the request is getting completed?

Is there any way possible to make each second a request and also handle the back pressure?

If I understand correctly, what you want is to run Supervised tasks from your Recorder. Or further on if task is not enough, take a look at DynamicSupervisor under which you could run dynamically child workers.

There is a nice write up by @SophieDeBenedetto How We Used Elixir’s GenServers + Dynamic Supervisors to Build Concurrent, Fault Tolerant Workflows (Attention to “Dynamic” StudentRepoSupervisor)

Thank you very difficult article to understand.

If your GenServer is perfectly capable of handling 1 request per second, you can just move the schedule_fetch_call() to the top of handle_info/2 and remove the spawn.

If your GenServer needs more than 1 second to handle the request but you force it to send 1 request per second, the unhandled requests will pile up and eventually exhaust some kind of the resources (RAM, nofile or Erlang ports). So, do you really need to send 1 request exactly each second?

How I can make genserver capable of 1 request per second?

I have different use cases, this could be 1 per second, 1 per minute, 1 per 10 minutes and so on.

but the most obvious case is 1 per second.

Now I really want to know how I can handle those pileup requests? I don’t want to use spawn as well , I want to make a genserver which can run each second and then also handle the responses and errors as well.

I am just looking for a minimal example at the moment which can give me little brief of how to do it.

I am just looking for a minimal example

Probably some minimal example with DynamicSupervisor might look like

defmodule Recorder do
  use GenServer

  def start_link(args) do
    id = Map.get(args, :id)
    GenServer.start_link(__MODULE__, args, name: id)
  end

  def init(state) do
    schedule_fetch_call()
    {:ok, state}
  end

  def handle_info(:jpeg_fetch, state) do
    # start worker under dynamic supervisor
    DynamicSupervisor.start_child(
      Recorder.ImageProcessorSupervisor,
      {Recorder.ImageProcessorWorker, state.url}
    )

    schedule_fetch_call()
    {:noreply, Map.put(state, :run, true)}
  end

  defp schedule_fetch_call do
    Process.send_after(self(), :jpeg_fetch, 1000)
  end
end

defmodule Recorder.ImageProcessorWorker do
  # We use restart: :transient because we don't want 
  # GenServer to be restarted after normal stop
  # see more https://hexdocs.pm/elixir/Supervisor.html#module-restart-values-restart
  use GenServer, restart: :transient

  def start_link(url) do
    GenServer.start_link(__MODULE__, url)
  end

  def init(url) do
    send(self(), :fetch_and_process)
    {:ok, url}
  end

  def handle_info(:fetch_and_process, url) do
    # the body of your spawn
    IO.inspect("I am being called")
    IO.inspect(DateTime.utc_now())
    result = Fincher.request(:get, url) |> IO.inspect()

    case result do
      {:error, %Mint.TransportError{reason: :closed}} ->
        # retry
        send(self(), :fetch_and_process)
        {:noreply, url}

      {:ok, response} ->
        # do something with the response...
        # and stop worker normally
        {:stop, :normal, url}
    end
  end
end

With each second it spans new Recorder.ImageProcessorWorker that immediately sends to itself :fetch_and_process (see the only handle_info callback).
That callback is either successfully finishes its work and terminates the Worker by responding {:stop, :normal} or retries. (see possible return values)

And in application.ex don’t forget to register the DynamicSupervisor

children = 
  [
    ...
    {DynamicSupervisor, strategy: :one_for_one, name: Recorder.ImageProcessorSupervisor}
  ] ++ get_children() 
1 Like

Thanks for the reply.

do we still need Recorder.ImageProcessorSupervisor what will the purpose of supervisor here?

do we still need Recorder.ImageProcessorSupervisor what will the purpose of supervisor here?

DynamicSupervisor is a behavior module, as well as Supervisor, GenServer etc… so we need some kind of an implementation for that module. In our case we name it as Recorder.ImageProcessorSupervisor and register it in supervision tree. (Please take a look at examples in docs, they explain it better than I’ll ever be able to)

BTW, after starting the application with iex -S mix we can run :observer.start() and in Applications tab we can observe that Recorder.ImageProcessorSupervisor is there and its child workers run and terminate every second.

I have added this, {DynamicSupervisor, strategy: :one_for_one, name: Recording.Supervisor} in

  def start(_type, _args) do
    children = [
      {DynamicSupervisor, strategy: :one_for_one, name: Recording.Supervisor}
    ]
    opts = [strategy: :one_for_one, name: Recorder.Supervisor]
    Supervisor.start_link(children, opts)
  end

and from terminal I am doing this to start worker

DynamicSupervisor.start_child(RecordingSupervisor, {Recording.Worker, %{id: :hdipc, url: "http://77.66.206.55/jpgmulreq/1/image.jpg?key=1516975535684&lq=1&COUNTER"}})

my question is now , How I can start child from the application so it will start working with just iex -S mix.

Well, the same way as in your example you periodically run the function… Like just add that to your handle_info.

def handle_info(:jpeg_fetch, state) do
  DynamicSupervisor.start_child(Recording.Supervisor, {Recording.Worker, state})
  ...
end
1 Like

Hi, what if we dont want to spawn each process? and wait for the last one to complete first?

what would be changed here?

  def handle_info(:jpeg_fetch, state) do
    # start worker under dynamic supervisor
    DynamicSupervisor.start_child(
      Recorder.ImageProcessorSupervisor,
      {Recorder.ImageProcessorWorker, state.url}
    )

    schedule_fetch_call()
    {:noreply, Map.put(state, :run, true)}
  end

Just do the work instead of calling DynamicSupervisor.start_child before next scheduling

1 Like
  def handle_info(:fetch_and_process, url) do
    # the body of your spawn
    IO.inspect("I am being called")
    IO.inspect(DateTime.utc_now())
    result = Fincher.request(:get, url) |> IO.inspect()

    case result do
      {:error, %Mint.TransportError{reason: :closed}} ->
        # retry
        send(self(), :fetch_and_process)
        {:noreply, url}

      {:ok, response} ->
        # do something with the response...
        # and stop worker normally
        {:stop, :normal, url}
    end

as in this {:ok , response} part we have said it to stop? when the work is done?

the file on which this work is happening is a genserver and the name is Recording.Worker

the whole code looks like this

defmodule Recording.Worker do
  use GenServer, restart: :transient

  def start_link(state),
    do: GenServer.start_link(__MODULE__, state)

  def init(state) do
    send(self(), :fetch_and_process)
    {:ok, state}
  end

  def handle_info(:fetch_and_process, state) do
    case make_jpeg_request(state.camera, state.running.datetime) do
      {:failed, _requested_at} ->
        {:noreply, state}
      {body, requested_at} ->
        %{datetime: requested_at}
        |> put_it_in_jpeg_bank(state.camera.name)
        Broadcasting.any_one_wacthing?(state.camera.name)
        |> Broadcasting.stream(state.camera.name, body, requested_at)
        {:stop, :normal, state}
    end
  end

  defp make_jpeg_request(camera, requested_at) do
    headers = get_request_headers(camera.auth, camera.username, camera.password)
    Everjamer.request(:get, camera.url, headers)
    |> get_body_size(requested_at)
  end

  defp get_body_size({:ok, %Finch.Response{body: body, status: 200}}, requested_at) do
    {body, requested_at}
  end

  defp get_body_size(_error, requested_at), do: {:failed, requested_at}

  defp get_request_headers("true", username, password),
    do: [{"Authorization", "Basic #{Base.encode64("#{username}:#{password}")}"}]

  defp get_request_headers(_, _username, _password), do: []

  defp put_it_in_jpeg_bank(state, process) do
    String.to_atom("storage_#{process}")
    |> Process.whereis()
    |> JpegBank.add(state)
  end
end

but its not stoping after work completion. as in phoenix live dashboard…

they are still there?

with waiting state? where things went wrong?

waiting means that the process is currently in a receive and waiting for a message that matches.

And given the fact that it’s currently in :gen_server.loop, it’s in the main receive loop of the gen server.

This again means that you either did not yet receive your fetch and process message or it failed. You need to consult your logs to see what exactly of these happened.

2 Likes

There is nothing in logs actually.

even after doing what they suppose to do , I mean the work is done, they are still there not vanishing at all.

At least in the snippet you have shown, you do not create any logging, perhaps you don’t do in your remaining code as well. Have you tried adding some logging?