I think scala testing libraries has an operator for tests that checks some condition eventuallt evaluates to true (with an implicit timeout). Is there an equivalent function in a common elixir testing utility library?
There’s nothing in core ExUnit. There are few third party libraries or you can do:
# try 10 times every 100ms
assert Stream.interval(100)
|> Stream.take(10)
|> Enum.any?(fn _ -> …test end)
I think one reason this is less common in Elixir is that a lot of things you’d wait for are better accomplished by waiting to receive a message that either is the check or indicates that you can now check instead of polling. In other words if you perform a background task instead of polling to see if the task is complete, monitoring the background process and waiting for an exit message is probably better.
I’ve used ExUnit.Assertions — ExUnit v1.12.3 (assert_receive/3) for times where I’ve needed to do this
I don’t follow this completely. It may or may not be applicable to my situation. I’m trying to test some code under test that is launching asynchronous work which is writing to a log. I eventually want to see the log message in the log file.
I know this post is old, but it came up when I looked for this exact problem, so putting my answer anyway.
I want to test a similar thing, and here is what I eventually came up with.
- You extend the api of your log writer process so that any process can subscribe to an event in case a log has been written. That way your test can do an assert_receive on a log being written.
- You can use something similar to assert_eventually to try an assertion a few times. This seems to be the easiest approach, but it might also point to a design issue in your code.
In my particular case, I had a work queue that divided tasks via work-stealing-like system. A queue keeps a list of tasks, and workers can request work from the queue. When work is available its sent to the workers. The API looks as follows:
@doc """
A process signals the queue that it is ready to ingest a message.
"""
@spec request_work :: :ok
def request_work do
GenServer.cast(__MODULE__, {:request_work, self()})
end
@doc """
Insert a work item in the queue.
"""
@spec insert_work(message()) :: :ok
def insert_work(message) do
GenServer.call(__MODULE__, {:insert_work, message})
end
I extended the API of my WorkQueue
to allow a process to subscribe to events. When a work item is sent, a notification is sent, as well as when a worker reports it needs work.
@doc """
Send a message to the process when a worker asks for work, or when a task is
sent to a worker.
"""
@spec notify(pid()) :: :ok
def notify(pid) do
GenServer.call(__MODULE__, {:notify, pid})
end
When the application starts, the WorkQueue
has n
workers waiting for work.
So, executing two work items should notify the test of two things:
work_requested
when the worker finishes the first task, and is ready for the next one.work_added
when the second work item is put in the queue
My test looks as follows.
# let the queue tell us when a job is done, or when a worker asks for work
IngestQueue.notify(self())
Handler.handle_message([device.device_id, "data"], @telegram) |> tap(&IO.inspect(&1, label: ""))
# before the test all workers are waiting for work, so the first event is work being sent
# when the worker is done, they will ask for new work
# when the second task is done, that worker will ask for work again
assert_receive :sent_work
assert_receive :request_work
# I am now certain the queue executed at least two tasks.
Obviously this could be done a bit better, but with minimal effort I can now make assertions about the state of an asynchronous system.