Problem with window function

I’m having a bit of trouble trying to get a query to work in Ecto that uses window functions.

I have a BlogPost model that has a user_id column and a published_at column.
I need to make a query that returns the newest published blog posts, but only the single newest one for each user, so that one user cannot just post a bunch of blog posts and have them all shown in the list of recent blog posts.

If I were to build the query in raw sql, I would do something like this:

SELECT * FROM (
	SELECT
		row_number() over (partition by blog_posts.user_id order by blog_posts.published_at desc) row_num,
		blog_posts.*
	FROM blog_posts
	WHERE blog_posts.published = TRUE
) AS sq
WHERE row_num = 1
ORDER BY published_at DESC

I do however not want to do a raw query, as I would need to use this query in combination with preload and maybe even combine it with other queries.

The closest I’ve gotten to work is this:

from bp in (Helheim.BlogPost |> published),
join: rn in fragment("SELECT id, row_number() OVER (PARTITION BY blog_posts.user_id ORDER BY blog_posts.published_at DESC) as row_number FROM blog_posts WHERE blog_posts.published = TRUE"),
where: rn.id == bp.id and rn.row_number == 1,
order_by: [desc: [bp.published_at, bp.inserted_at]],
limit: ^limit

This generates a query like this:

SELECT b0."id", b0."title", b0."body", b0."visitor_count", b0."comment_count", b0."published", b0."published_at", b0."inserted_at", b0."updated_at", b0."user_id" FROM "blog_posts" AS b0 INNER JOIN (SELECT id, row_number() OVER (PARTITION BY blog_posts.user_id ORDER BY blog_posts.published_at DESC) as row_number FROM blog_posts WHERE blog_posts.published = TRUE) AS f1 ON TRUE WHERE (b0."published" = TRUE) AND ((f1."id" = b0."id") AND (f1."row_number" = 1)) ORDER BY ARRAY[b0."published_at",b0."inserted_at"] DESC LIMIT 8

This query runs just fine if I run it directly in the database, but when trying to run it through Phoenix and Ecto, I get this error:

ERROR 42601 (syntax_error): syntax error at or near "."

So, is this even the right way to make a query like this in Ecto? And if it is, then what am I doing wrong to have the query fail?

:ecto, "2.1.6"
:phoenix, "1.3.0"

You cannot wrap up a query in a fragment, only simple expressions. Instead of a fragment there use a subquery, and inside that subquery in it’s select use fragment for the "row_numer() OVER (PARTITION BY ? ORDER BY ? DESC" and so forth. :slight_smile:

Your query should work, have you tried executing it like this?:
Helheim.Repo.all(from bp in …)

If it does not work, you can try OvermindDL1’s suggestion or use a raw query or use distinct:
query = from bp in Helheim.BlogPost, select: bp, distinct: bp.user_id, order_by: [desc: [bp.published_at, bp.inserted_at]], where: bp.published == true, limit: ^limit

I tried changing the query to:

def newest_for_frontpage(limit) do
  sq = from(
    bp in (Helheim.BlogPost |> published),
    select: fragment("blog_posts.id, row_number() OVER (PARTITION BY blog_posts.user_id ORDER BY blog_posts.published_at DESC) as row_number")
  )

  from(
    bp in (Helheim.BlogPost |> published),
    join: rn in subquery(sq),
    where: rn.id == bp.id and rn.row_number == 1,
    order_by: [desc: [bp.published_at, bp.inserted_at]],
    limit: ^limit
  )
end

I’m calling the query like this:

def front_page(conn, _params) do
  newest_blog_posts = BlogPost.newest_for_frontpage(8)
                      |> preload(:user)
                      |> Helheim.Repo.all
  render conn, "front_page.html", newest_blog_posts: newest_blog_posts
end

I’m now getting this error:

Ecto.SubQueryError at GET /front_page
the following exception happened when compiling a subquery.
    ** (Ecto.QueryError) subquery must select a source (t), a field (t.field) or a map, got: `fragment({:raw, "blog_posts.id, row_number() OVER (PARTITION BY blog_posts.user_id ORDER BY blog_posts.published_at DESC) as row_number"})` in query:
    
    from b in Helheim.BlogPost,
      where: b.published == true,
      select: fragment("blog_posts.id, row_number() OVER (PARTITION BY blog_posts.user_id ORDER BY blog_posts.published_at DESC) as row_number")
    
The subquery originated from the following query:
from b0 in Helheim.BlogPost,
  join: b1 in subquery(from b in Helheim.BlogPost,
  where: b.published == true,
  select: fragment("blog_posts.id, row_number() OVER (PARTITION BY blog_posts.user_id ORDER BY blog_posts.published_at DESC) as row_number")),
  on: true,
  where: b0.published == true,
  where: b1.id == b0.id and b1.row_number == 1,
  order_by: [desc: [b0.published_at, b0.inserted_at]],
  limit: ^8,
  select: b0,
  preload: [:user]

What about if you use the ? form in fragments?

I’m not sure if I know what you mean?
I tried changing the fragment to:

fragment("?, row_number() OVER (PARTITION BY ? ORDER BY ? DESC) as row_number", bp.id, bp.user_id, bp.published_at)

But the error remains the same

I now tried changing the query to:

def newest_for_frontpage(limit) do
  sq = from(
    bp in (Helheim.BlogPost |> published),
    select: %{id: bp.id, row_number: fragment("row_number() OVER (PARTITION BY ? ORDER BY ? DESC)", bp.user_id, bp.published_at)}
  )

  from(
    bp in (Helheim.BlogPost |> published),
    join: rn in subquery(sq),
    where: rn.id == bp.id and rn.row_number == 1,
    order_by: [desc: [bp.published_at, bp.inserted_at]],
    limit: ^limit
  )
end

Which I can see in the log output generates the following query:

[debug] QUERY ERROR source="blog_posts" db=0.3ms
SELECT b0."id", b0."title", b0."body", b0."visitor_count", b0."comment_count", b0."published", b0."published_at", b0."inserted_at", b0."updated_at", b0."user_id" FROM "blog_posts" AS b0 INNER JOIN (SELECT b0."id" AS "id", row_number() OVER (PARTITION BY b0."user_id" ORDER BY b0."published_at" DESC) AS "row_number" FROM "blog_posts" AS b0 WHERE (b0."published" = TRUE)) AS s1 ON TRUE WHERE (b0."published" = TRUE) AND ((s1."id" = b0."id") AND (s1."row_number" = 1)) ORDER BY ARRAY[b0."published_at",b0."inserted_at"] DESC LIMIT $1 [8]
[info] Sent 500 in 16ms

Again, running that sql query directly in the database returns the expected records, but Ecto is throwing the original error again:

Postgrex.Error at GET /front_page
ERROR 42601 (syntax_error): syntax error at or near "."

Here is the full stack trace in case it makes a difference:

[error] #PID<0.18267.0> running Helheim.Endpoint terminated
Server: localhost:4000 (http)
Request: GET /front_page
** (exit) an exception was raised:
    ** (Postgrex.Error) ERROR 42601 (syntax_error): syntax error at or near "."
        (ecto) lib/ecto/adapters/sql.ex:436: Ecto.Adapters.SQL.execute_and_cache/7
        (ecto) lib/ecto/repo/queryable.ex:130: Ecto.Repo.Queryable.execute/5
        (ecto) lib/ecto/repo/queryable.ex:35: Ecto.Repo.Queryable.all/4
        (helheim) web/controllers/page_controller.ex:32: Helheim.PageController.front_page/2
        (helheim) web/controllers/page_controller.ex:1: Helheim.PageController.action/2
        (helheim) web/controllers/page_controller.ex:1: Helheim.PageController.phoenix_controller_pipeline/2
        (helheim) lib/helheim/endpoint.ex:1: Helheim.Endpoint.instrument/4
        (phoenix) lib/phoenix/router.ex:278: Phoenix.Router.__call__/1
        (helheim) lib/plug/error_handler.ex:64: Helheim.Router.call/2
        (helheim) lib/helheim/endpoint.ex:1: Helheim.Endpoint.plug_builder_call/2
        (helheim) lib/plug/debugger.ex:99: Helheim.Endpoint."call (overridable 3)"/2
        (helheim) lib/helheim/endpoint.ex:1: Helheim.Endpoint.call/2
        (plug) lib/plug/adapters/cowboy/handler.ex:15: Plug.Adapters.Cowboy.Handler.upgrade/4
        (cowboy) /Users/dipth/code/visioneers/helheim/deps/cowboy/src/cowboy_protocol.erl:442: :cowboy_protocol.execute/4

I have created a minimal repo that demonstrates the problem:

We do something similar:

@partition_query """
  SELECT webhooks.id, row_number() OVER (
    PARTITION BY prism_id
    ORDER BY version DESC
  ) FROM webhooks
"""

def each_latest_version(query) do
  from w in query,
    join: partition in fragment(@partition_query),
    where: partition.row_number <= ^1 and partition.id == w.id
end

Honestly I’d probably just do a simple join with no subquery or anything for most of what I see here. I’d only use a subquery if I wanted, say, the first of each rank() or something, but I’d keep the refinement testing like the partition.id == w.id inside the subquery part to minimize the linear search space (unless you can make a materialized view out of the subquery, then keep it out).

Incidentally, this query would be better written using distinct on . Which also happens to be supported by Ecto

1 Like

@gleb I tried to fiddle with DISTINCT, but I don’t know any way to make it behave the same way (Ie. making sure to keep the latest row (by published_at) for each user_id and filtering out the rest). Any hints?

Hi @dipth have you tried the query from my response above?

from bp in Helheim.BlogPost, 
select: bp, 
distinct: bp.user_id, 
order_by: [desc: [bp.published_at, bp.inserted_at]], 
where: bp.published == true, 
limit: ^limit

The order-by clause prioritizes which bog-post for each unique user_id is selected.
Or is there an error that I don’t see?

That doesn’t really work since distinct implicitly adds user_id as the primary sorting column:

from bp in (Helheim.BlogPost |> published),
distinct: bp.user_id,
order_by: [desc: bp.published_at, desc: bp.inserted_at],
limit: ^limit

[debug] QUERY OK source="blog_posts" db=2.3ms
SELECT DISTINCT ON (b0."user_id") b0."id", b0."title", b0."body", b0."visitor_count", b0."comment_count", b0."published", b0."published_at", b0."inserted_at", b0."updated_at", b0."user_id" FROM "blog_posts" AS b0 WHERE (b0."published" = TRUE) ORDER BY b0."user_id", b0."published_at" DESC, b0."inserted_at" DESC, b0."published_at" DESC, b0."inserted_at" DESC LIMIT $1 [8]

Which means that we wouldn’t get the latest blog posts, just the ones from the users with the lowest ids.

This actually seemed to work:

@newest_for_frontpage_partition_query """
  SELECT blog_posts.id, row_number() OVER (
    PARTITION BY blog_posts.user_id
    ORDER BY blog_posts.published_at DESC
  ) FROM blog_posts WHERE blog_posts.published = TRUE
"""
def newest_for_frontpage(limit) do
  from bp in (Helheim.BlogPost |> published |> newest),
  join: partition in fragment(@newest_for_frontpage_partition_query),
  where: partition.row_number <= ^1 and partition.id == bp.id,
  limit: ^limit
end
1 Like

To solve the problem from above:

query1 = from bp in Helheim.BlogPost, 
select: bp, 
distinct: bp.user_id, 
order_by: [desc: [bp.published_at, bp.inserted_at]], 
where: bp.published == true
query2 = from bp in subquery(query1), 
order_by: [desc: bp.published_at],
limit: ^limit
select distinct on (user_id) *
from blog_posts
where published = true
order by user_id, published_at desc

will do what you want. Look at distinct on docs

1 Like