Streams in Tests: strange fails

I am building a Phoenix app with session-based authentication. Sessions are stored in the database (for reporting/visibility/persistence) AND in ETS via Cachex. I’m working through the code that needs to run on startup that queries the database for all active sessions and loads that data into cache. I thought this would be a good opportunity to explore using streams (and tasks + async).

I came up with the following code that works in the iel shell:

import Ecto.Query, warn: false
alias Auth.Schemas.Session
alias Auth.Contexts.SessionContext
alias Auth.Repo

now = Timex.now()
query =
from s in Session,
     where: s.expires_at > ^now,
     select: s
stream = Repo.stream(query)

Repo.transaction(fn() ->
  Enum.to_list(stream)
  |> Enum.reduce(fn s, _acc ->
              SessionContext.cache_session(s)
              IO.puts("Session caching #{s.id}")
              end)
end)

That works! And I can query Cachex on the various session IDs and it returns what I would expect, i.e. SessionContext.get_from_cache("some-session-id-here") returns an :ok with my session data.

However… when I try to test that in my ExUnit tests, I get a failure. I suspect it has something to do with threads and processes and however those get spawned when running mix test, but honestly, that’s only a guess. I’ve noticed that sometimes when a function writes data to both PostGres AND to Cachex, sometimes the test can only see one of them (?!?).

Can someone shed light on why this doesn’t work in the test environment? Why does SessionContext.get_from_cache("some-session-id-here") return an :error with no session data found?
Do I have to update my mix.exs or something to make ExUnit be able to “see” what’s going on in the cache?
How can I incorporate an asynchronous task to make sure that this completes the process as quickly as possible?

Thanks for any insights!

I ask for your patience as I’m going to mention something that may be tangential to your problem, but struck me when looking at your code:

where: s.expires_at > ^now

Can you compare dates using the > operator in this way? It seems that you should be using DateTime.compare (or something similar)

AFAIK ecto does allow this, as it operates on the DB rather than on datetimes.

Yes, that comparison works. If you would care to provide an alternate example, that would be great!

It sounds like you may be experiencing unexpected isolation from Ecto.Adapters.SQL.Sandbox - it’s used by default in the generated DataCase module. Each of your tests runs in a transaction which is rolled back at the end of the test, and unless you take specific steps to share the underlying database connection different processes won’t see each others updates.

Docs: https://hexdocs.pm/ecto_sql/Ecto.Adapters.SQL.Sandbox.html

IIRC the sharing situation is better on Elixir 1.8+ due to some plumbing changes ($callers etc).

I figured out a solution that satisfies my tests:

    Repo.transaction(
      fn () ->
        Enum.map(stream, &cache_session(&1))
      end
    )
1 Like