How to make request per second without waiting for previous one to complete in GenServer

Problem: I am trying to make an HTTP request per second. I have written a gen server and starting it using DynamicSupervisor. But now I am self-invoking it and it’s not being invoked per second but wait for the methods to be completed in between the call. My only use case is to send each request per second and capture the time when it sent and also the response. I am not bothered by the time it will take to complete. my use is only to send a request per second and move on, to next second.

this is my genserver.

defmodule Recording.Worker do
  use GenServer
  require Logger

  def start_link(opts) do
    {id, opts} = Map.pop!(opts, :id)
    GenServer.start_link(__MODULE__, opts, name: id)
  end

  def init(state) do
    schedule_fetch_call(state.sleep)
    {:ok, state}
  end

  def handle_info(:jpeg_fetch, state) do
    {for_jpeg_bank, running} =
      make_jpeg_request(state.camera)
      |> running_map()

    IO.inspect("request")
    IO.inspect(DateTime.utc_now())
    put_it_in_jpeg_bank(for_jpeg_bank, state.camera.name)
    schedule_fetch_call(state.sleep)
    {:noreply, Map.put(state, :running, running)}
  end

  def get_state(pid) do
    GenServer.call(pid, :get)
  end

  def handle_call(:get, _from, state),
    do: {:reply, state, state}

  defp schedule_fetch_call(sleep),
    do: Process.send_after(self(), :jpeg_fetch, sleep)

  defp make_jpeg_request(camera) do
    headers = get_request_headers(camera.auth, camera.username, camera.password)
    requested_at = DateTime.utc_now()
    Everjamer.request(:get, camera.url, headers)
    |> get_body_size(requested_at)
  end

  defp get_body_size({:ok, %Finch.Response{body: body, headers: headers, status: 200}}, requested_at) do
    IO.inspect(headers)
    {body, "9", requested_at}
  end

  defp get_body_size(_error, requested_at), do: {:failed, requested_at}

  defp running_map({body, file_size, requested_at}),
    do:
      {%{datetime: requested_at, image: body, file_size: file_size},
       %{datetime: requested_at}}

  defp running_map({:failed, requested_at}), do: {%{}, %{datetime: requested_at}}

  defp get_request_headers("true", username, password),
    do: [{"Authorization", "Basic #{Base.encode64("#{username}:#{password}")}"}]

  defp get_request_headers(_, _username, _password), do: []

  defp put_it_in_jpeg_bank(state, process) do
    String.to_atom("storage_#{process}")
    |> Process.whereis()
    |> JpegBank.add(state)
  end
end

GenServer with 1 HTTP request per second

This is related to the above question, So I have posted the link.

I have made such GenServer worker.

this my whole GenServer

defmodule Recording.Worker do
  use GenServer
  require Logger

  def start_link(opts) do
    {id, opts} = Map.pop!(opts, :id)
    GenServer.start_link(__MODULE__, opts, name: id)
  end

  def init(state) do
    schedule_fetch_call(state.sleep)
    {:ok, state}
  end

  def handle_info(:jpeg_fetch, state) do
    {for_jpeg_bank, running} =
      make_jpeg_request(state.camera)
      |> running_map()

    IO.inspect("request")
    IO.inspect(DateTime.utc_now())
    put_it_in_jpeg_bank(for_jpeg_bank, state.camera.name)
    schedule_fetch_call(state.sleep)
    {:noreply, Map.put(state, :running, running)}
  end

  def get_state(pid) do
    GenServer.call(pid, :get)
  end

  def handle_call(:get, _from, state),
    do: {:reply, state, state}

  defp schedule_fetch_call(sleep),
    do: Process.send_after(self(), :jpeg_fetch, sleep)

  defp make_jpeg_request(camera) do
    headers = get_request_headers(camera.auth, camera.username, camera.password)
    requested_at = DateTime.utc_now()
    Everjamer.request(:get, camera.url, headers)
    |> get_body_size(requested_at)
  end

  defp get_body_size({:ok, %Finch.Response{body: body, headers: headers, status: 200}}, requested_at) do
    {body, "9", requested_at}
  end

  defp get_body_size(_error, requested_at), do: {:failed, requested_at}

  defp running_map({body, file_size, requested_at}),
    do:
      {%{datetime: requested_at, image: body, file_size: file_size},
       %{datetime: requested_at}}

  defp running_map({:failed, requested_at}), do: {%{}, %{datetime: requested_at}}

  defp get_request_headers("true", username, password),
    do: [{"Authorization", "Basic #{Base.encode64("#{username}:#{password}")}"}]

  defp get_request_headers(_, _username, _password), do: []

  defp put_it_in_jpeg_bank(state, process) do
    String.to_atom("storage_#{process}")
    |> Process.whereis()
    |> JpegBank.add(state)
  end
end

I am trying to make an HTTP request per second. even while using GenSever and starting it with a DynamicSupervisor such as

General.Supervisor.start_child(Recording.Worker, %{id: String.to_atom(detailed_camera.name), camera: detailed_camera, sleep: detailed_camera.sleep})

this part

  def handle_info(:jpeg_fetch, state) do
    {for_jpeg_bank, running} =
      make_jpeg_request(state.camera)
      |> running_map()

    IO.inspect("request")
    IO.inspect(DateTime.utc_now())
    put_it_in_jpeg_bank(for_jpeg_bank, state.camera.name)
    schedule_fetch_call(state.sleep)
    {:noreply, Map.put(state, :running, running)}
  end

still waiting for the previous request to complete and it becomes (the time request took to get complete) / per request not request per second.

the results of IO.inspect are such as

"request"
~U[2020-12-30 05:27:21.466262Z]
"request"
~U[2020-12-30 05:27:24.184548Z]
"request"
~U[2020-12-30 05:27:26.967173Z]

"request"
~U[2020-12-30 05:27:29.831532Z]

is there any way possible in Elixir that without using spawn, handle_info just keep running without waiting for the previous request or previous method to complete?

or is this not a GenServer use case? Should I use any other tool such as GenStage or Flow for this?

PS: I am not bothered about how much time the process will take to complete, I am only bothered the time when the request starts, it should start each second. I have tried putting schedule_fetch_call(state.sleep) in the first line of handle_info but it wont work either.

I have been juggling around with this problem but I am not getting any solution on this. Is there any way in Elixir to run a method each second?

1 Like

The example I provide you with 5 days ago does exactly what you need.

  1. It spawns the worker each second…
  2. each of those workers do their job concurrently and stop (terminate themselves)

Please let us know if something is not clear.

2 Likes

Thanks for the reply I ended up with something like this.

image

starter.ex

defmodule Recording.WorkerStarter do
  use GenServer

  def start_link(opts) do
    GenServer.start_link(__MODULE__, opts)
  end

  def init([state]) do
    IO.inspect(state)
    schedule_fetch_call(state.sleep)
    {:ok, state}
  end

  def handle_info(:request, state) do
    IO.inspect(state)
    schedule_fetch_call(state.sleep)
    DynamicSupervisor.start_child(General.Supervisor, {Recording.Worker, [state]})
    {:noreply, state}
  end

  defp schedule_fetch_call(sleep) do
    IO.inspect(sleep)
    Process.send_after(self(), :request, sleep)
  end
end

worker.ex

defmodule Recording.Worker do
  use GenServer

  def start_link(state),
    do: GenServer.start_link(__MODULE__, state)

  def init(state) do
    perform_work(state)
    {:stop, :normal}
  end

  defp perform_work(state), do: IO.inspect(state)
end

and supervisor.ex

defmodule Recording.Supervisor do
  use Supervisor

  def start_link(_state),
    do: Supervisor.start_link(__MODULE__, nil, name: __MODULE__)

  @impl Supervisor
  def init(nil) do
    children = [
      {DynamicSupervisor, strategy: :one_for_one, name: General.Supervisor},
      {Recording.WorkerStarter, [%{sleep: 1_000}]}
    ]

    Supervisor.init(children, strategy: :one_for_one)
  end
end

and I had added {Recording.Supervisor, []} in my application.ex start/2 's children.

my question now is: in supervisor.ex I am starting a worker such as {Recording.WorkerStarter, [%{sleep: 1_000}]}

this is fine, I can get all the WorkerStarters here start them, but when a user add an entity and you want to start a worker, what would be the way to start it then? in short how you can another child to an already init
Supervisor.init(children, strategy: :one_for_one) supervisor?

I have followed your previous answer and made my confusions clear, thank you.

I now have these 3 modules.

starter.ex

defmodule Recording.WorkerStarter do
  use GenServer

  def start_link(opts) do
    IO.inspect("Started Recording.WorkerStarter")
    {id, opts} = Map.pop!(opts, :id)
    GenServer.start_link(__MODULE__, opts, name: id)
  end

  def init(state) do
    schedule_fetch_call(state.sleep)
    {:ok, state}
  end

  def handle_info(:request, state) do
    schedule_fetch_call(state.sleep)
    DynamicSupervisor.start_child(General.Supervisor, {Recording.Worker, state})
    {:noreply, state}
  end

  defp schedule_fetch_call(sleep) do
    Process.send_after(self(), :request, sleep)
  end
end

worker.ex as

defmodule Recording.Worker do
  use GenServer

  def start_link(state),
    do: GenServer.start_link(__MODULE__, state)

  def init(state) do
    IO.inspect(state)
    perform_work(state)
    {:stop, :normal}
  end

  defp perform_work(state) do
    IO.inspect(state)
  end
end

and a general supervisor as

defmodule General.Supervisor do
  use DynamicSupervisor

  def start_link(opts) do
    DynamicSupervisor.start_link(__MODULE__, opts, name: __MODULE__)
  end

  def start_child(module, opts),
    do: DynamicSupervisor.start_child(__MODULE__, {module, opts})

  @impl DynamicSupervisor
  def init(opts),
    do: DynamicSupervisor.init(strategy: :one_for_one, extra_arguments: [opts])
end

this works fine.

can you tell me how I can terminate a Recording.WorkerStarter

I tried this



iex(9)> DynamicSupervisor.terminate_child(General.Supervisor, Process.whereis(:junaid))

** (exit) exited in: GenServer.call(GeneralSupervisor, {:terminate_child, #PID<0.521.0>}, :infinity) 
    ** (EXIT) no process: the process is not alive or there's no process currently associated with the given name, possibly because its application isn't started
    (elixir 1.11.1) lib/gen_server.ex:1017: GenServer.call/3

I can see the worker I started as General.Supervisor.start_child(Recording.WorkerStarter, %{id: String.to_atom("junaid"), sleep: 1_000}) is running but what its not being terminated?

# worker.ex
def init(state) do
  IO.inspect(state)
  perform_work(state)
  {:stop, :normal}
end

Just in case, it looks like you want to perform all work in init and then stop it. That’s usually not the best way to do it, because init/1 function is blocking. (I guess that’s the reason why in handle_info you first schedule_fetch_call and then start worker to not get blocked by worker)

Back to the question.
if the process is started normally under DynamicSupervisor it looks odd to me why it can’t find it to terminate. :thinking:

as per our discussion ?

defmodule Recording.WorkerStarter do
  use GenServer

  def start_link(opts) do
    IO.inspect("Started Recording.WorkerStarter")
    {id, opts} = Map.pop!(opts, :id)
    GenServer.start_link(__MODULE__, opts, name: id)
  end

  def init(state) do
    schedule_fetch_call(state.sleep)
    {:ok, state}
  end

  def handle_info(:request, state) do
    schedule_fetch_call(state.sleep)
    ping_time_state = Map.put(state, :running, %{datetime: DateTime.utc_now()}) |> IO.inspect
    DynamicSupervisor.start_child(General.Supervisor, {Recording.Worker, ping_time_state})
    {:noreply, ping_time_state}
  end

  def update_state(pid, state) do
    GenServer.cast(pid, {:update_state, state})
  end

  def handle_cast({:update_state, state}, _old_state) do
    {:noreply, state}
  end

  def get_state(pid) do
    GenServer.call(pid, :get)
  end

  def handle_call(:get, _from, state),
    do: {:reply, state, state}

  defp schedule_fetch_call(sleep) do
    Process.send_after(self(), :request, sleep)
  end
end

this part

DynamicSupervisor.start_child(General.Supervisor, {Recording.Worker, ping_time_state})

will just run the code 1time per second and in worker

it will perform work here

  def init(state) do
    IO.inspect state
    # perform_work(state)
    {:stop, :normal}
  end

unfortunately its not working as its expected. the Recording.WorkerStarter is still waiting for

DynamicSupervisor.start_child(General.Supervisor, {Recording.Worker, state})

to get complete. and not spawning it at all.

~U[2020-12-30 13:38:39.838205Z]
~U[2020-12-30 13:38:42.584918Z]
~U[2020-12-30 13:38:44.995407Z]
~U[2020-12-30 13:38:47.297593Z]
~U[2020-12-30 13:38:49.668983Z]

this is IO.inspect of ping_time_state = Map.put(state, :running, %{datetime: DateTime.utc_now()})

datetime. any clue?

Thank you I have figured it out. your last snippet helped a lot thank you