Run tasks with a delay

Suppose I have the following function:

def foo, do: IO.puts "Foobar"

I want to run it a minute after some work done in my function:

def fun(x) do
  somework(x)
  # Schedule foo to run after 1 minute
end

I do not want to delay the return of fun by 1 minute - I want to return immediately after somework returns (Otherwise I can just do Process.sleep). I also do not care if foo fails or not.

One thing I have done is the following:

Task.start(fn ->
  Process.sleep(60_000)
  &foo
end)

But if I am not mistaken, another process is started to run the task. Having a process sitting idle for a minute seems wasteful of system resources - especially if the delay is much greater than a minute and there are many tasks being created per second.

Another idea I got is to write a GenServer to handle scheduling tasks, but I avoided this because this seemed like a common pattern, and expected there to be a “preferred” solution. Also, this sort of functionality is probably better off in Task.Supervisor than a hand-written GenServer anyway.

How can I achieve this using the existing task running infrastructure?

It may be useful to know more details about your use-case specifically.

I wouldn’t worry about having too many sleeping processes. The BEAM can manage many thousands of processes as they are lighter weight than a typical OS process.

You can also use GenServer as you said, probably mostly depends how many tasks your running and the structure of them. If you are going to be booting many a second, while also not caring about success, I might use a GenServer simply to act as a throttler/circuit break later down the line, or to distribute it.

Depending on the context, you could use send_after too, if the owning process will still be around to receive the message. (You would probably use send_after in your GenServer, but I mean you could also use it in a LiveView for example (which are just GenServers anyway).)

There is a great talk by Sasa Juric that talks about BEAM and processes that might provide a bit of insight to the model, though it doesn’t answer your question specifically The Soul of Erlang and Elixir • Saša Jurić • GOTO 2019 - YouTube.

1 Like

It may be useful to know more details about your use-case specifically.

So, I want to be able to call this from an Ecto Context. So, I won’t have any long lived process to message. What I want to do is run a function to remove a record from a database if some conditions are met, few minutes after it has been added.

I would probably go with a Task Supervisor in that case. Consider what your failure states are though, which might make a custom GenServer more useful if you have to re-try or re-check a condition at some point, or if a task failed for some season (cosmic wave hits the db for a nanosecond), a gen server could ensure that your removal target doesn’t end up lingering forever, etc. That all depends on your task function though, maybe it just bulk deletes anything stale etc.

1 Like

Processes don’t consume CPU and just a tiny amount of memory when not doing anything, so feel free to spawn processes just to wait for something.

There is also :timer.apply_after/4 which does the job.

1 Like

What should happen with records that are left behind in the database - for instance, if the BEAM is restarted after one is inserted but before the deletion?

The answer to that may suggest alternative approaches:

  • if leftover records should be periodically cleaned up, does there need to be a per-record deletion process at all?
  • if leftover records are ignored, do any of them need to be deleted?

Another thing to think about: while @LostKobrakai is 100% correct that sleeping processes are basically free, having an unlimited number of them wake up simultaneously can still create a mess. Here’s a scenario:

  • a fast API client calls the endpoint that inserts records + schedules deletion 1000 times in 5s. Easy to do, and the server has no problem handling the requests
  • X minutes later, 1000 processes all wake up, and all try to run a Repo.delete.
    • some of them will manage to grab a connection and send a query to the DB
    • then the connection pool will run out. Now every connection in the pool is running a Repo.delete statement.
    • everything else starves: the other 900-some waking-up processes, and every Phoenix request handler block waiting for a DB connection
    • if the Repo.delete statements take too long (~100ms or so with default settings) then waiting processes will start to crash with messages like: ** (DBConnection.ConnectionError) connection not available and request was dropped from queue after NNNNms. You can configure how long requests wait in the queue using :queue_target and :queue_interval. See DBConnection.start_link/2 for more information

The result is that a sufficiently-large-and-fast burst of requests to the API creates a brief whole-server outage a few minutes later.

Solutions like a job queue or a worker pool avoid this “thundering herd” problem by only trying to do a fixed amount of work at once.

5 Likes

:timer.apply_after/4 still spawns a new process for executing the mfa. Though only after the timeout has elapsed.

Think we can safely count this thread in the XY problem category? :smiley:

(We don’t have one but maybe we should? :003:)

Still, to answer the original question, your best bet for a periodic worker would be to just register a worker inside your supervision tree and have it recurse infinitely:

# lib/your_app/periodic_delete_worker.ex
defmodule YourApp.PeriodicDeleteWorker do
  use Task, restart: :permanent

  def start_link(_arg) do
    Task.start_link(__MODULE__, :run, [])
  end

  def run do
    # Do your stuff here

    # Wait for a message for 1 minute.
    # If no message is received, still wake up after the 1 minute has passed
    # and reschedule the same worker.
    receive do
    after
      60_000 -> run()
    end
  end
end

And then:

# lib/your_app.ex
defmodule YourApp do
  use Application

  def start(_type, _args) do
    children =
      [
        YourApp.Repo, # just an example!
        YourApp.PeriodicDeleteWorker # 👈 this is what you want
      ]

    options = [strategy: :one_for_one, name: YourApp.Supervisor]

  Supervisor.start_link(children, options)
end
2 Likes

…that being said, @al2o3cr is much, more more correct here. You’re better off adopting my idiom (or a few others) and use some sort of a DB query that says “get me records that have to be deleted and are at least 5 minutes old” – if that’s your window for deletion – and just do a Repo.delete on the records returned by that query.

Might be 1, might be 1 million records affected, doesn’t matter really. The important part is that you will only use one DB connection from the pool.

1 Like

Think we can safely count this thread in the XY problem category? :smiley:

I think that you can! :laughing:

Concerning “problem y”, this is just the 1st solution I thought of, but as @al2o3cr explained, this may lead to race conditions. The best solution seems to be setting up a recurring task to clean up the db every interval, similar to how @dimitarvp suggested .

Now for “problem x”, I think I prefer @fuelen’s solution of :timer.apply_after, compared to @dimitarvp’s solution of a periodic worker, but that is assuming my job/task is a function is module to supply to timer.apply_after.

Are there any benefits to using the latter (:timer.apply_after), to the former (periodic worker)?

No, not really, I would even say the reverse. :timer.apply_after does not spawn a supervised process if I remember correctly. And having a dedicated Task as shown in my example above does.

(Hello, necro’ing a tread 2 years later.)

1 Like

For reference, I finally went with a periodic worker. Turns out, talking to the person reviewing your PR before posting on the internet is a good idea!

2 Likes