Aggregate that requires complex joins

As a follow-on to this conversation, I have a solution using a calculation, but it introduces an N+1 query.

And now I’m working on a very similar part of the application, on basically the same problem. This time, I want to avoid the N+1 query.

In a conversation forum, there are Conversations, Users, and Messages. In my system, there is a join resource we’ll call ConversationUser, that relates a User with a Conversation. It also tracks the most recent Message the User has seen, stored as last_read_message_id. There is a listConversations AshGraphql query, and it now needs to include a count of the number of unread messages in the result. The path to that count would be conversations.selfConversationUser.unreadMessagesCount.

As before, I successfully implemented this as a calculation. And again, it introduces an N+1 query, because it loads from the database for each Conversation. So I’m going back to my first post, and looking for a solution using an aggregate. I made some headway, but I can’t figure out how to do the complex joining that needs to happen.

This is how far I’ve gone:

defmodule ConversationUser do
  aggregates do
    count :unread_messages_count, [:conversation, :messages] do
      join_filter [:conversation, :messages], expr(id == parent(parent(last_read_message_id)))
    end
  end

The above successfully calculates 1 if there is a last_read_message_id, but it needs to do much better: it needs to count the number of messages that were created after that particular message. So it needs another join on messages, where inserted_at is greater than the initial joined message. Is that possible? Or do you have a better approach?

I just tried this:

  aggregates do
    count :unread_messages_count2, [:conversation, :messages] do
      join_filter [:conversation, :messages],
                  expr(
                    id == parent(parent(last_read_message_id)) and
                      inserted_at > parent(parent(last_read_message.inserted_at))
                  )
    end
  end

But it gave this:

** (Ash.Error.Unknown) 
Bread Crumbs:
  > Loading aggregate: :unread_messages_count2 for query: #Ash.Query<resource: GF.Messaging.Participant2, filter: #Ash.Filter<id == 115166>, calculations: %{unread_messages_count: unread_messages_count}>
  

Unknown Error

* Invalid reference last_read_message.inserted_at
    at aggregates

And another variant:

      join_filter [:conversation, :messages],
                  expr(
                    id == parent(parent(last_read_message_id)) and
                      inserted_at > parent(parent(last_read_message).inserted_at)
                  )

The .inserted_at is to the right of the inner parent().

     ** (Ash.Error.Unknown) 
     Bread Crumbs:
       > Loading aggregate: :unread_messages_count2 for query: #Ash.Query<resource: GF.Messaging.Participant2, filter: #Ash.Filter<id == 115171>, calculations: %{unread_messages_count: unread_messages_count}>
       

     Unknown Error

     * Invalid reference parent.inserted_at
         at aggregates

I think this is closer to the desired query:

      join_filter [:conversation, :messages],
                  expr(
                    is_nil(parent(parent(last_read_message_id))) or
                      inserted_at > parent(parent(last_read_message.inserted_at))
                  )

But it still fails with Invalid reference last_read_message.inserted_at. I just need to figure out how to reference inserted_at.

aggregates do
    count :unread_messages_count, [:conversation, :messages] do
      filter expr(inserted_at > parent(last_read_message.inserted_at)
    end
end

I think something like this should work. Since the thing being filtered is the destination. We recently (like last release) fixed bug around tracking the parent/1 references in complex calculations. I don’t think it would come into play here, but worth updating just in case.

1 Like

Depending on how well this performs, you may end up wanting a manual joinable relationship called something like unread_messages.

https://hexdocs.pm/ash_postgres/manual-relationships.html

When the join conditions get complex, or you have some way of optimizing the joins, that can help :smiley:

I tried that, and I added the :last_read_message relationship, which I somehow missed before. Here’s the result:

     ** (Ash.Error.Unknown) 
     Bread Crumbs:
       > Exception raised in: GF.Messaging.Participant2.read

     Unknown Error

     * ** (RuntimeError) Error while building reference: last_read_message.inserted_at
       (ash_sql 0.2.55) lib/expr.ex:1908: AshSql.Expr.default_dynamic_expr/6
       (ash_sql 0.2.55) lib/expr.ex:1008: AshSql.Expr.default_dynamic_expr/6
       (ash_sql 0.2.55) lib/filter.ex:37: anonymous fn/2 in AshSql.Filter.add_filter_expression/2
       (elixir 1.17.2) lib/enum.ex:2531: Enum."-reduce/3-lists^foldl/2-0-"/3
       (ash_sql 0.2.55) lib/filter.ex:22: AshSql.Filter.filter/4
       (ash_sql 0.2.55) lib/aggregate.ex:883: anonymous fn/6 in AshSql.Aggregate.maybe_filter_subquery/6
       (elixir 1.17.2) lib/enum.ex:4858: Enumerable.List.reduce/3
       (elixir 1.17.2) lib/enum.ex:2585: Enum.reduce_while/3
       (ash_sql 0.2.55) lib/aggregate.ex:369: anonymous fn/10 in AshSql.Aggregate.add_aggregates/6
       (ash_sql 0.2.55) lib/join.ex:290: AshSql.Join.related_subquery/3
       (ash_sql 0.2.55) lib/aggregate.ex:217: anonymous fn/7 in AshSql.Aggregate.add_aggregates/6
       (elixir 1.17.2) lib/enum.ex:4858: Enumerable.List.reduce/3
       (elixir 1.17.2) lib/enum.ex:2585: Enum.reduce_while/3
       (ash_sql 0.2.55) lib/aggregate.ex:88: AshSql.Aggregate.add_aggregates/6
       (ash 3.4.63) lib/ash/query/query.ex:3034: Ash.Query.data_layer_query/2
       (ash 3.4.63) lib/ash/actions/read/read.ex:581: anonymous fn/8 in Ash.Actions.Read.do_read/5
       (ash 3.4.63) lib/ash/actions/read/read.ex:927: Ash.Actions.Read.maybe_in_transaction/3
       (ash 3.4.63) lib/ash/actions/read/read.ex:308: Ash.Actions.Read.do_run/3
       (ash 3.4.63) lib/ash/actions/read/read.ex:82: anonymous fn/3 in Ash.Actions.Read.run/3
       (ash 3.4.63) lib/ash/actions/read/read.ex:81: Ash.Actions.Read.run/3

(The actual names of my resources are different than what is mentioned in my post.)

I’ll keep playing with that, and I may try a manual relationship.

Interesting :thinking: Oh, I see, is last_read_message a relationship on the conversation resource?

last_read_message is a relationship on the ConversationUser (Participant) resource.

So then I tried parent(parent(...)):

filter expr(inserted_at > parent(parent(last_read_message.inserted_at)))

And got this:

     ** (Ash.Error.Unknown) 
     Bread Crumbs:
       > Exception raised in: GF.Messaging.Participant2.read

     Unknown Error

     * ** (KeyError) key :parent_bindings not found in: %{
       sort: [],
       domain: GF.Domain,
       context: %{
         private: %{
           actor: #GF.Members.Member2<
defmodule ConversationUser do
  aggregates do
    count :unread_messages_count, [:conversation, :messages], filter: filter_unread_messages/2
  end

  defp filter_unread_messages(query, %{last_read_message_id: last_read_message_id}) when not is_nil(last_read_message_id) do
    # Join to the last read message to get its timestamp
    last_read_message = 
      from m in Message,
        where: m.id == ^last_read_message_id,
        select: m.inserted_at

    # Filter for messages in this conversation with timestamps after the last read message
    from m in query,
      join: lrm in subquery(last_read_message), as: :last_read,
      where: m.inserted_at > lrm
  end

  # Handle case where user hasn't read any messages yet
  defp filter_unread_messages(query, _) do
    # Count all messages in the conversation as unread
    query
  end
end
aggregates do
    count :unread_messages_count, [:conversation, :messages] do
      filter expr(inserted_at > parent(last_read_message.inserted_at)
    end
end

relationships do
  has_one :last_read_message, ...
end

If the resources look like this, then I’m pretty sure you’re looking at a bug.

unless inserted_at is not an attribute on messages

You can’t drop an Ecto query into an Ash aggregate. Was this response generated w/ ChatGPT :sweat:

1 Like

Yes, that’s almost the relationship. It’s actually a belongs_to:

defmodule ConversationUser do
  relationships do
    belongs_to :last_read_message, ...
  end

  aggregates do
    count :unread_messages_count, [:conversation, :messages] do
      filter expr(inserted_at > parent(last_read_message.inserted_at))
      # Or:
      filter expr(inserted_at > parent(parent(last_read_message.inserted_at)))
    end
  end
end

Yeah, so parent(parent(...)) shouldn’t come into play here. It should work. If not, try updating ash,ash_postgres and ash_sql to get the latest fixes, and if that doesn’t help then it’s a bug. (a surprising one, a single parent/1 nesting ought to work)

2 Likes

I updated all the Ash dependencies, and I’m using the double parent filter:

filter expr(inserted_at == parent(parent(last_read_message.inserted_at)))

This time the behavior is different. When loading (Ash.get!(..., load: ...)) that aggregate, it raises this error:

     ** (Ash.Error.Unknown) 
     Bread Crumbs:
       > Exception raised in: UserConversation.read

     Unknown Error

     * ** (MatchError) no match of right hand side value: []
       (ash 3.4.66) lib/ash/filter/filter.ex:1370: Ash.Filter.update_aggregates/5
       (ash 3.4.66) lib/ash/filter/filter.ex:1375: Ash.Filter.update_aggregates/5
       (ash 3.4.66) lib/ash/filter/filter.ex:1399: Ash.Filter.update_aggregates/5
       (ash 3.4.66) lib/ash/filter/filter.ex:1343: Ash.Filter.update_aggregates/5
       (ash 3.4.66) lib/ash/actions/read/read.ex:3319: Ash.Actions.Read.authorize_aggregate/10
       (ash 3.4.66) lib/ash/actions/read/read.ex:3140: anonymous fn/7 in Ash.Actions.Read.authorize_loaded_aggregates/6
       (stdlib 6.0) maps.erl:860: :maps.fold_1/4
       (ash 3.4.66) lib/ash/actions/read/read.ex:529: anonymous fn/8 in Ash.Actions.Read.do_read/5
       (ash 3.4.66) lib/ash/actions/read/read.ex:956: Ash.Actions.Read.maybe_in_transaction/3
       (ash 3.4.66) lib/ash/actions/read/read.ex:315: Ash.Actions.Read.do_run/3
       (ash 3.4.66) lib/ash/actions/read/read.ex:82: anonymous fn/3 in Ash.Actions.Read.run/3
       (ash 3.4.66) lib/ash/actions/read/read.ex:81: Ash.Actions.Read.run/3
       (ash 3.4.66) lib/ash.ex:1578: Ash.do_get/5
       (ash 3.4.66) lib/ash.ex:1552: Ash.get/3
       (ash 3.4.66) lib/ash.ex:1516: Ash.get!/3
       test/gf/messaging/participant2_test.exs:71:

On a hunch from that stack trace, I changed the load to pass authorize?: false, whereas it was passing the actor, and the error went back to:

** (KeyError) key :parent_bindings not found in: %{

I’ll try the manual relationship approach.

The double parent filter would not work. and in this case you should only need parent/1. There is no second parent in this context.

parent in an aggregate is the root resource, not the resource one up the aggregate relationship path. I would expect that to break more gracefully, but it would always break nonetheless.

defmodule ConversationUser do
  relationships do
    belongs_to :last_read_message, ...
  end

  aggregates do
    count :unread_messages_count, [:conversation, :messages] do
      filter expr(inserted_at > parent(last_read_message.inserted_at))
    end
  end
end

So if this is failing on latest Ash its a bug, if you can create a reproduction I will fix.