I’m finding catching channel unsubscribe events is error prone

I need to perform some logic when client unsubscribes from absinthe channel or if socket is severed (like browser windows closed). In the example below, the app allows multiple users to work on an object graph, and each node being edited is locked. When user switches to another UI or closes browser window the lock needs to be released.

So far I’ve been handling this by providing a custom channel and socket decorators, and pattern-matching handle_in() callbacks.

In the absinthe schema I, of course, have my subscriptions defined, along with channel names that I expect the user to subscribe to. To handle unsubscribes, I have custom pattern matching for each type of subscription with handle_in. But the arguments that come in make it incredibly difficult to figure out what channel I’m unsubscribing from because all I have is a subscriptionId and a query, so I have to pattern-match on substring to know which subscription it is.

This is extremely ugly and error-prone! Why couldn’t Absinthe just provide a configurable callback for channel unsubscribes that I could plug in the configuration somewhere? Like on_unsubscribe:

  object :lock_subscriptions do
    field :lock_changed, non_null(:lock_result) do
      arg(:lock_input, :lock_input)

      config(fn
        %{lock_input: %{id: target_id, target_type: target_type}}, _ ->
          {:ok, topic: "lock-changed:#{target_type}:#{target_id}", on_unsubscribe: &my_callback/3}

        _, _ ->
          {:ok, topic: "lock-changed:*"}
      end)

      trigger([:acquire_lock, :release_lock],
        topic: fn
          %{success: true, id: target_id, target_type: target_type} ->
            [
              "lock-changed:#{target_type}:#{target_id}",
              "lock-changed:*"
            ]
          _ ->
            "lock-changed-do-not-publish"
        end
      )
    end

    field :lock_freed, non_null(:lock_result) do
      arg(:lock_input, non_null(:lock_input))

      config(fn
        %{lock_input: %{id: target_id, target_type: target_type}}, _ ->
          {:ok, topic: "lock-freed:#{target_type}:#{target_id}", on_unsubscribe: &my_callback/3}
      end)

      trigger(:release_lock,
        topic: fn
          %{success: true, id: target_id, target_type: target_type} ->
            "lock-freed:#{target_type}:#{target_id}"

          _ ->
            "not-freed_do-not-publish"
        end
      )
    end
  end

If all I have is subscriptionId which looks like subscriptionId: "__absinthe__:doc:-576460752303423225:09741AC4B0D1FF92645F8C67BA51FB3656E35965E66A2A42FCEA62D4B9BB6754" then I have to build a registry that associates initial arguments to subscriptionId so I can retrieve them on unsubscribe and perform handle_unsub.

I have a suspicion that absinthe disregards the channel name I’m providing in the schema in config and instead uses this long subscriptionId as the topic name. Life would be easier if it was passing the channel name that I could parse for arguments, "#{@lock_changed_prefix}:#{target_type}:#{target_id}". But it doesn’t.

So either I’m not configuring the subscription properly or nobody is using absinthe unsubscribe events, because otherwise they’d be pulling their hair out too and we’d have a lot of bald people.

Appreciate some help. How have you done it before, when you needed to, say, release resources on unsubscription, based on the arguments provided during subscribe?

If you need to handle the client going away entirely (connection severed, tab closed), you need to have another process monitor you then react when you receive DOWN in that process. Relying on the client is not going to work to free locks like this.

1 Like

When the client goes away the socket is severed which means ALL of client’s subscriptions are unsubscribed. I am already catching it quite happily via AbsintheChannelDecorator’s terminate(reason, socket) callback. Again, it lacks useful information in arguments, but one can assume that all subscriptions are cancelled when browser window is closed.

What’s really a major pain point is that absinthe/phoenix gives you a single endpoint to handle all subscribe and unsubscribe actions AND the arguments are not very informative:

def handle_in(event = “unsubscribe”, payload = %{“subscriptionId” => subscription_id}, socket)

The payload doesn’t tell you which channel was unsubscribed from (even though you had provided the channel names in the config in your absinthe schema, and all you have is a subscriptionId. That means that you have to create your own registry of subscriptionIds to map them to that information when you subscribe to be able to run some subscription-specific logic like lock release in my example on unsub. Why??? Absinthe is already doing that in its own handle_in() implementation.

I wish there was a callback function I could specify in the channel config in the absinthe schema, or at least absinthe would pass channel name back to me in handle_in args - then I could use naming to extract the arguments, e.g. “lock-changed:<object_type>:”.

So, unless I’m misusing Absinthe, my solution so far is to stash channel name into redis cache keyed by subscriptionId. But then you have to start thinking about expiring those entries eventually and renewing the expiration timestamp when the key is in use… This is way too complicated when all the information is already provided to the framework.

Intercepting subscribes is also not pretty. Again, channel names aren’t passed in, and figuring out what the subscription is to can’t even be pattern-matched easily:

# we have "doc" to indicate it's a `subscribe`, then we can try to pattern-match on subscription arguments in the payload. But what if you have multiple subscription with like-names arguments?
def handle_in(event = "doc", payload = %{"variables" => %{"deploymentId" => dep_id}}, socket) do
    # a subscription call will include query such as this:
    # "query" => "subscription OnDeploymentStatusChanged($deploymentId: ID!) {\n  deploymentStatusChanged(deploymentId: $deploymentId) ...
    # For everything else just pass through to default Phoenix handler.
    # So now we need to know ahead of time what the signature is going to look like and match on substring??? SMH
    if String.contains?(Map.get(payload, "query", ""), "deploymentStatusChanged(deploymentId: $deploymentId)") do
      ret = Absinthe.Phoenix.Channel.handle_in(event, payload, socket)

      case ret do
        {:reply, {:ok, %{subscriptionId: sub_id}}, socket} ->
            case user_id = get_user_id_from_socket(socket) do
              nil ->
                Logger.error("Trying to subscribe without current user in the context!")
              _ ->
                DeploymentStatusWorker.subscribe(%{user_id: user_id, deployment_id: dep_id, subscription_id: sub_id})
            end
          _ ->
            :ignore
      end

      ret
    else
      Absinthe.Phoenix.Channel.handle_in(event, payload, socket)
    end
  end

So I think this is a potential major source of errors. Change signature of subscription in the schema and your code is broken now.

I don’t have any idea what absinthe does on top of phoenix channels, but when dealing with locks you usually want to do that in a separate process. That process can start monitoring the channel process when it receives the lock and can release it if the channel goes away. No need for any callbacks to catch the channel closing, no need for any channel naming schemas.

Hmm yeah I see that each channel gets its own process, indeed. Still need to catch subscribe events in handle_in and map channel PID to the argument set to use them when PID dies.

Absinthe multiplexes the individual subscriptions through a single PID, so this unfortunately will not work.

It isn’t disregarding it, it’s that the topic for an individual client’s subscription is distinct from the topic that triggers document execution. Two clients that each submit:

# client 1
subscription { lock_changed(id: 1) { foo } }

# client 2
subscription { lock_changed(id: 1) { bar } }

Each of these clients has submitted a different document that should get different results, even though the topic returned by lock_changed is the same.

The reason that Absinthe doesn’t provide an on_unsubscribe callback is that it is very difficult for this to fire reliably. Client crashes, node crashes, channel process crashes, any of these things will prevent it from firing reliably.

All in all I think a more reliable locking mechanism would work as follows:

A client can request a lock on a resource that only lasts for a certain amount of time, maybe say 30 seconds. The lock has to be pinged every 30 seconds or it expires, ensuring that a resource is never stuck in a locked state even if the client goes down completely. On the client side, you could also explicitly terminate a lock early when the page goes away. All that a subscription would do is provide information about lock status, it wouldn’t also perform a side effect because that’s not what subscriptions are supposed to do.

Yes, I’m already expiring locks in 2 minutes unless the client makes renewLock() call periodically to extend the lifespan of the lock. I’m also relatively easily catching socket death with terminate callback to end all subscriptions for the client (I know who that was from the user stored in socket’s context). For disposing of components on the UI the client just calls unsubscribe on the object it gets when subscribing. That’s something Phoenix provides to the client. That gets translated into handle_in callback on the ChannelDecorator. It’s just hard to work with.
I suppose I can always ask the client to assume the lock will get released when the component is disposed of – and release the lock explicitly.
And we can always fall back on lock expiration, which the client subscribes to separately. It’s just not immediate, so not the best user experience.