How to get Ecto stream respond in ExUnit

Hello, I have a function that uses Repo.stream and Task.async_stream, but in ExUnit every time I have a different result and I can not wait for this function output

def delete_plugins(event) do
    stream = MishkaDatabase.Repo.stream(from(plg in PluginSchema))
    MishkaDatabase.Repo.transaction(fn() ->
      stream
      |> Stream.filter(&(event in &1.depends))
      |> Enum.to_list()
    end)
    |> case do
      {:ok, list} ->
        list
        |> Task.async_stream(&MishkaInstaller.Hook.unregister(module: &1.name), max_concurrency: 20)
        |> Stream.run
      error ->
        IO.inspect(error)
    end
  end

for example:

test "test 1" do
  Plugin.delete_plugins("test"))
  assert_receive :ok, 30_000
end

when I run top test block code, I receive this error

Assertion failed, no matching message after 30000ms
     The process mailbox is empty.

When I print Plugin.delete_plugins("test")) it shows me :ok:, but assert_receive` cannot get it.

Thank you

Looks like you are returning a value, NOT sending a message back:

assert Plugin.delete_plugins("test")) == :ok

That should work.

3 Likes

Yes, thank you, but this function is connected to my Genserver, I need to count my db after terminate my states

see this:

..13:56:34.405 [warning] unnested_plugin_three from nested_event_one event of Plugins manager was Terminated,
      Reason of Terminate :normal
[
  %MishkaInstaller.PluginState{
    depend_type: :soft,
    depends: [],
    event: "nested_event_one",
    extra: [],
    name: "unnested_plugin_five",
    priority: 1,
    status: :started
  },
  %MishkaInstaller.PluginState{
    depend_type: :soft,
    depends: [],
    event: "nested_event_one",
    extra: [],
    name: "nested_plugin_one",
    priority: 1,
    status: :started
  },
  %MishkaInstaller.PluginState{
    depend_type: :soft,
    depends: [],
    event: "nested_event_one",
    extra: [],
    name: "unnested_plugin_four",
    priority: 1,
    status: :started
  },
  %MishkaInstaller.PluginState{
    depend_type: :soft,
    depends: [],
    event: "nested_event_one",
    extra: [],
    name: "nested_plugin_two",
    priority: 1,
    status: :started
  }
]
13:56:34.762 [warning] wordpress_login_plugin from event_one event of Plugins manager was Terminated,
      Reason of Terminate :normal
13:56:34.763 [warning] magento_login_plugin from event_one event of Plugins manager was Terminated,
      Reason of Terminate :normal
13:56:34.767 [warning] joomla_login_plugin from event_one event of Plugins manager was Terminated,
      Reason of Terminate :normal

the first warning came immediately, but the others came after my test done, I put sleep time, but it didn’t work for me

Please see this image, when I run it in my iex I have no error and always my results are same, but in test module it cannot return after all state are terminated

I do not know how can I test it

Sorry, I am not sure I follow.

You are saying you need to stop a bunch of GenServers before the test finishes? To wait for them to finish?

I think I have a logical problem

This below function check a record on db and state, after finding it the record concerned is deleted.

  def unregister(module: module_name) do
    with {:ok, :delete, _msg} <- delete(module: module_name),
         {:ok, :get_record_by_field, :plugin, record_info} <- Plugin.show_by_name(module_name),
         {:ok, :delete, :plugin, _} <- Plugin.delete(record_info.id) do

          Plugin.delete_plugins(module_name)
         {:ok, :unregister, "The module concerned (#{module_name}) and its dependencies were unregister"}
    else
      {:error, :delete, msg} -> {:error, :unregister, msg}
      {:error, :get_record_by_field, :plugin} -> {:error, :unregister, "The #{module_name} module doesn't exist in the database."}
      {:error, :delete, status, _error_tag} when status in [:uuid, :get_record_by_id, :forced_to_delete] ->
        {:error, :unregister, "There is a problem to find or delete the record in the database #{status}, module: #{module_name}"}
      {:error, :delete, :plugin, repo_error} -> {:error, :unregister, repo_error}
    end
  end

If you see I call the function named Plugin.delete_plugins(module_name), this function is:

  def delete_plugins(event) do
    stream = MishkaDatabase.Repo.stream(from(plg in PluginSchema))
    MishkaDatabase.Repo.transaction(fn() ->
      stream
      |> Stream.filter(&(event in &1.depends))
      |> Enum.to_list()
    end)
    |> case do
      {:ok, []} -> []
      {:ok, list} ->
        list
        |> Task.async_stream(&MishkaInstaller.Hook.unregister(module: &1.name), max_concurrency: 20)
        |> Stream.run
      error ->
        IO.inspect(error)
    end
  end

This is like a reverse function, it calls unregister function until, this returns [] or an error

but in testing sometimes it passes me the Preferred output, but the other time it returns a wrong value

I couldn’t still find the problem, I am really in a trap :joy: :facepunch:

Readers of this post don’t have the source of the tests and can’t run the code themselves. What “wrong value” is being returned?

2 Likes

Ohh, I am very sorry, I get involved in Covid-19 and I forgot to update this post :face_with_head_bandage:

Step one

this problem was because I was using handle_cast for pushing data to my state, hence when I send many data in a loop I thought some session was not closed and the program waited for it. After that, I changed it with handle_call and it was fixed.

Step two

I add these line top of my test code

  setup_all _tags do
    :ok = Ecto.Adapters.SQL.Sandbox.checkout(MishkaDatabase.Repo)
    Ecto.Adapters.SQL.Sandbox.mode(MishkaDatabase.Repo, :auto)
    on_exit fn ->
      :ok = Ecto.Adapters.SQL.Sandbox.checkout(MishkaDatabase.Repo)
      Ecto.Adapters.SQL.Sandbox.mode(MishkaDatabase.Repo, :auto)
      clean_db()
      :ok
    end
    [this_is: "is"]
  end

Step three

I changed ExUnit.Case, async to false

At the end

These are from my fork project

Test file:

My state manager

And my hook function

Sorry again and thank you. For now, I have no issue, but if I can find anything I will update this post again.