Absinthe middleware for subscribers running in process of event generator

When data is published (e.g. some event happened on the server and clients need to be notified), I want to make some authorization checks before the data is sent to the frontend.

Where can I make these checks?

I first thought of Absinthe middlewares, but I found out that the process running the middleware is the process that made the event, not the process of the individual socket connections. I need to be in the process of each socket as I store the authenticated user in the process (and I want to authorize the data to be sent for each user, i.e. for each socket).

1 Like

If you store the current user in the Absinthe context, that will simply be available in the middleware like normal without using any process dictionary stuff.

1 Like

The current user in the middleware is the user that created the event, not the users that will receive the event. What I’m trying to do is to authorize the data that will be sent to each user (socket), so I will not have the right user in the middleware.

1 Like

That seems wrong, can you provide an example that behaves that way? Absinthe stores the context for each subscriber, and the documented should be resolved with that context. It does happen in the PID of the event generator, but it still will use the subscriber context.

1 Like

Thank you Ben, indeed the current user is the subscriber, I got confused because for some reason the middleware is called twice per subscriber for one event. That might be subject to an issue I’ll post on GH except if there’s a reason for this.

The fact that the middlewares are ran by the PID of the event generator is a problem for me though, because I use the process to store some information of the user in order to add some rules when running queries, see here in the Ecto documentation the use of the Process to store the user’s tenant ID:
https://hexdocs.pm/ecto/multi-tenancy-with-foreign-keys.html#setting-org_id-by-default

I have a similar system as the one described above, where repo calls will use some user data stored in the process (e.g. a tenant ID) to manipulate or check the query.

I was thinking that I could add a callback on the subscriber Socket; a callback that is called before sending out data, in which case I’d be in the process of the subscriber. But I don’t see the existence of such callback.
But even then, the use of process by Repo calls prevents me to do any Repo calls in middlewares because the user in the context doesn’t always match with the user data in the process (and this is prone to future bugs).

What do you think I should do about this mess? I’d be sad to abandon my existing system that enforces tenant ID checks, as it was a lot of work (it is a little more sophisticated than the example in Ecto docs) and ensured me there’ll never be a data leak.

This simply isn’t how it works, the event is not sent to every subscriber pid, so there is no callback for this that could exist without re-architecting how the message pushing goes.

This is a good question, unfortunately it is indeed quite a mess. Absinthe’s internals are built around this approach since it ensures that by default there is back pressure on event publishers, and it allows optimizations like batch processing subscriptions.

Frankly, I wish the Ecto docs did not promote the Process dictionary based approach. There are all kinds of scenarios where it is potentially problematic, and it creates real limits about how you can architect your processes. That doesn’t really help you now though. Let me think about this and see if any ideas come to mind.

As processes in Erlang/Elixir, as far as I understand, are lightweight and nearly free, I have to ask, can Absinthe not run the code for each subscriber in a new process from the process of the event generator?
I don’t see any other solution than that.

It could. A few things though:

  1. Spawning a new process wouldn’t solve your problem really, because the new process won’t have the process dictionary values that you need anyway.
  2. Spawning a new process per subscription document would eliminate a class of optimizations called “batching”. With batching, Absinthe can execute several documents as part of one batch as long as the context matches, and this improves performance significantly by decreasing the number of DB calls that need to be made.
  3. Spawning a new process would complicate back pressure. Back pressure is a good default, because without it it’s easy for whatever process is publishing events to overload the system without realizing it.

It would because I would have a middleware that sets the current user in the process (and all middleware calls for one particular user/subscriber would be called within that same process).

We can just create a new process/task and await until it’s done before continuing, so in terms of throughput you get the same result.

I had to find a solution in any case to move on. For every middleware that runs I create a new process if the user in the context is different than the user in the process (which happens when dealing with subscribers). So for one subscriber I’ll create multiple new processes, one process for one middleware call.

schema.ex

def middleware(middleware, _field, _object) do
  middleware = [Middleware.Authenticate | [Middleware.Authorize | middleware]]
  ensure_new_process(middleware)
end

defp ensure_new_process(middleware) do
  Enum.map(middleware, &{Middleware.EnsureNewProcess, &1})
end

ensure_new_process.ex

defmodule MyAppWeb.Schema.Middleware.EnsureNewProcess do
  @behaviour Absinthe.Middleware

  alias MyApp.Authorizer

  def call(resolution, middleware) do
    case resolution.context[:current_user] do
      nil ->
        call_middleware(middleware, resolution)

      current_user ->
        if current_user.id == Authorizer.get_current_user().id do
          call_middleware(middleware, resolution)
        else
          Task.await(Task.async(fn ->
            Authorizer.put_current_user(current_user)
            call_middleware(middleware, resolution)
          end))
        end
    end
  end

  defp call_middleware({{mod, fun}, opts}, res) do
    apply(mod, fun, [res, opts])
  end

  defp call_middleware({mod, opts}, res) do
    apply(mod, :call, [res, opts])
  end

  defp call_middleware(mod, res) when is_atom(mod) do
    apply(mod, :call, [res, []])
  end

  defp call_middleware(fun, res) when is_function(fun, 2) do
    fun.(res, [])
  end
end

I simplified the code above because it gets more ugly in reality, where I use the context as a cache for one variable that the authorizer module computes (with db calls) and would need to compute every time otherwise (as I keep creating new processes and so keep losing the state).