SQL Fragment used for conflict strategy causing error(Ecto 3.10.3, Posgresql 15.4)

I’m getting an query error when attempting to apply a conflict strategy here at line 38: progress_track.ex · GitHub

The idea is that if the incoming last_track_timestamp is lower than the one in the db, then keep the db version, otherwise, use last_track_timestamp.

The error is: ** (Postgrex.Error) ERROR 42601 (syntax_error) syntax error at or near "last_track_timestamp"

but it’s not clear to me what is going on. I’m cribbing this code from here: https://github.com/emerleite/video_watch_progress/blob/master/lib/video_watch_progress/progress_track.ex#L38

(he is using a different IF() syntax than I am. I changed it because the postgres docs seem to show different syntax than the one he is using, found here). Despite that, I’m still getting an error and I’m not sure why. Any ideas? A further explanation can be found in this youtube video(with timestamp).)

The full error:

[debug] QUERY ERROR db=0.0ms queue=12.2ms idle=1608.5ms
INSERT INTO "progress_track" AS p0 ("user_id","media_item_id","seconds_watched","fully_watched","last_track_timestamp","id","inserted_at","updated_at") VALUES ($1,$2,$3,$4,$5,$6,$7,$8) ON CONFLICT ("seconds_watched","last_track_timestamp") DO UPDATE SET "seconds_watched" = IF last_track_timestamp < VALUES(last_track_timestamp) THEN VALUES(seconds_watched); ELSE seconds_watched END IF;, "last_track_timestamp" = IF last_track_timestamp < VALUES(last_track_timestamp) THEN VALUES(last_track_timestamp); ELSE last_track_timestamp END IF;, "updated_at" = $9 ["1ee573ed-280b-649a-b128-754189a2b76a", "1ee573ef-fbe0-69bc-bae9-5143c3d27d39", 0, false, 1695164017112, "1ee573f5-e464-61f2-aa2a-3ab68add3f36", ~N[2023-09-19 22:53:37], ~N[2023-09-19 22:53:37], ~N[2023-09-19 22:53:37]]
↳ MyApp.Multimedia.ProgressStore.save/1, at: lib/my_app/multimedia/progress_store.ex:20
[error] GenServer #PID<0.2181.0> terminating
** (Postgrex.Error) ERROR 42601 (syntax_error) syntax error at or near "last_track_timestamp"

    query: INSERT INTO "progress_track" AS p0 ("user_id","media_item_id","seconds_watched","fully_watched","last_track_timestamp","id","inserted_at","updated_at") VALUES ($1,$2,$3,$4,$5,$6,$7,$8) ON CONFLICT ("seconds_watched","last_track_timestamp") DO UPDATE SET "seconds_watched" = IF last_track_timestamp < VALUES(last_track_timestamp) THEN VALUES(seconds_watched); ELSE seconds_watched END IF;, "last_track_timestamp" = IF last_track_timestamp < VALUES(last_track_timestamp) THEN VALUES(last_track_timestamp); ELSE last_track_timestamp END IF;, "updated_at" = $9

called from:

  def save(progress_data) do
    ProgressTrack.changeset(%ProgressTrack{}, progress_data)
    |> dbg()
    |> Repo.insert(on_conflict: ProgressTrack.insert_conflict_strategy(progress_data), conflict_target: [:seconds_watched, :last_track_timestamp])
    |> process_result
  end

This isn’t going to work. The IF you are using is from PL/pgSQL which is the most common PostgreSQL procedural language used for creating stored procedures and functions. PL/pgSQL doesn’t work as part of the standard query language which is what you need.

I haven’t really looked closely at your query or what the best way to do what you’re doing is. But if you really need something looking like a conditional in the query, look at CASE in the PostgreSQL docs… PostgreSQL: Documentation: 16: 9.18. Conditional Expressions. These are the SQL query conditional expressions you’re looking for.

That’s using MariaDB, where the syntax to put in upsert clauses is different - VALUES() is from there, as is IF()

The Postgres equivalents would be the EXCLUDED table and CASE statement, though GREATEST will save some typing. Something like this:

      update: [
        set: [
          seconds_watched:
            fragment(
              "CASE WHEN last_track_timestamp < EXCLUDED.last_track_timestamp THEN  EXCLUDED.seconds_watched ELSE seconds_watched END"
            ),
          last_track_timestamp:
            fragment(
              "GREATEST(last_track_timestamp, EXCLUDED.last_track_timestamp)"
            ),
          updated_at: ^ecto_time()
        ]
      ]
    )

BEWARE: I have not run this code and I’m not certain it will work. In particular, I’m unsure if the bare references to last_track_timestamp and seconds_watched will still count as unambiguous (since EXCLUDED has the same columns). You may need to do some fragment juggling to use ? for those.

1 Like

Thanks for the insight. So I think I’m close here: revised code

But I’m getting a new error ** (Postgrex.Error) ERROR 42P10 (invalid_column_reference) there is no unique or exclusion constraint matching the ON CONFLICT specification:

Is the issue using EXCLUDED whilst neither last_track_timestamp nor seconds_watched are unique constraints?

progress_track #=> %{
  "last_track_timestamp" => 1695171783604,
  "media_item_id" => "1ee573ef-fbe0-69bc-bae9-5143c3d27d39",
  "seconds_watched" => 0,
  "user_id" => "1ee573ed-280b-649a-b128-754189a2b76a"
}

[debug] QUERY ERROR db=4.0ms queue=0.6ms idle=926.7ms
INSERT INTO "progress_track" AS p0 ("media_item_id","seconds_watched","fully_watched","last_track_timestamp","user_id","id","inserted_at","updated_at") VALUES ($1,$2,$3,$4,$5,$6,$7,$8) ON CONFLICT ("seconds_watched","last_track_timestamp") DO UPDATE SET "seconds_watched" = CASE WHEN p0."last_track_timestamp" < EXCLUDED.last_track_timestamp THEN  EXCLUDED.seconds_watched ELSE p0."seconds_watched" END, "last_track_timestamp" = GREATEST(p0."last_track_timestamp", EXCLUDED.last_track_timestamp), "updated_at" = $9 ["1ee573ef-fbe0-69bc-bae9-5143c3d27d39", 0, false, 1695171783604, "1ee573ed-280b-649a-b128-754189a2b76a", "1ee57517-3784-600e-9f2c-8f57da0461ea", ~N[2023-09-20 01:03:03], ~N[2023-09-20 01:03:03], ~N[2023-09-20 01:03:03]]
↳ MyApp.Multimedia.ProgressStore.save/1, at: lib/my_app/multimedia/progress_store.ex:20
[error] GenServer #PID<0.1104.0> terminating
** (Postgrex.Error) ERROR 42P10 (invalid_column_reference) there is no unique or exclusion constraint matching the ON CONFLICT specification

Yes. Postgres needs your conflict target to be a unique index or unique constraint (which makes a unique index).

1 Like

So this strategy won’t work with postgresql? I have to account for the possibility of a newer timestamp being clobbered by an older one because I plan to use a Genstage queue which might deliver tracking events out of order. Is there any way to do this with postgresql without a costly Repo.get() query before I insert? Maybe if I just write the fragment without using EXCLUDED?

So this is what I’m trying now:

"CASE WHEN last_track_timestamp < v.last_track_timestamp THEN v.seconds_watched ELSE seconds_watched END"
and
"CASE WHEN last_track_timestamp < v.last_track_timestamp THEN v.last_track_timestamp ELSE last_track_timestamp"

But postgresql goes all weird on me saying syntax error at or near "," but I’m sure there is no misplaced , anywhere … bizarre.

INSERT INTO "progress_track" AS p0 ("media_item_id","seconds_watched","fully_watched","last_track_timestamp","user_id","id","inserted_at","updated_at") VALUES ($1,$2,$3,$4,$5,$6,$7,$8) ON CONFLICT ("seconds_watched","last_track_timestamp") DO UPDATE SET "seconds_watched" = CASE WHEN last_track_timestamp < v.last_track_timestamp THEN v.seconds_watched ELSE seconds_watched END, "last_track_timestamp" = CASE WHEN last_track_timestamp < v.last_track_timestamp THEN v.last_track_timestamp ELSE last_track_timestamp, "updated_at" = $9 ["1ee573ef-fbe0-69bc-bae9-5143c3d27d39", 0, false, 1695177517810, "1ee573ed-280b-649a-b128-754189a2b76a", "1ee575ec-d517-6c82-a828-40d758d4a1a0", ~N[2023-09-20 02:38:37], ~N[2023-09-20 02:38:37], ~N[2023-09-20 02:38:37]]
↳ MyApp.Multimedia.ProgressStore.save/1, at: lib/my_app/multimedia/progress_store.ex:20
[error] GenServer #PID<0.1285.0> terminating
** (Postgrex.Error) ERROR 42601 (syntax_error) syntax error at or near ","

You’re missing the END at the end of the clause. The first character after where the END should be is a ,.

1 Like

The columns passed to conflict_target should be the ones that have a unique index, not the ones you want to update.

In the case of your original example, that would be :user_id and :video_id

1 Like

Is it possible to run an sql fragment before every upsert? I plan to use a Genstage queue which might deliver tracking events out of order, so I need the newest timestamp to always “win”.

Whoops, thanks.

I got it to work. I don’t need to have a conflict with the unique keys media_item_id or user_id either. It will always upsert:

  def insert_conflict_strategy(%{"fully_watched" => fully_watched}) do
    dbg(fully_watched)

    from(t in MyApp.Multimedia.ProgressTrack,
      update: [
        set: [
          updated_at: ^ecto_time(),
          fully_watched: ^fully_watched
        ]
      ]
    )
  end

  def insert_conflict_strategy(progress_track) do
    from(t in MyApp.Multimedia.ProgressTrack,
      update: [
        set: [
          seconds_watched:
            fragment(
              "CASE WHEN ? < excluded.last_track_timestamp THEN excluded.seconds_watched ELSE ? END",
              ^progress_track["last_track_timestamp"],
              ^progress_track["seconds_watched"]
            ),
          last_track_timestamp:
            fragment(
              "CASE WHEN ? < excluded.last_track_timestamp THEN excluded.last_track_timestamp ELSE ? END",
              ^progress_track["last_track_timestamp"],
              ^progress_track["last_track_timestamp"]
            ),
          updated_at: ^ecto_time()
        ]
      ]
    )
  end

caller:

    ProgressTrack.changeset(%ProgressTrack{}, progress_data)
    |> Repo.insert(
      on_conflict: ProgressTrack.insert_conflict_strategy(progress_data),
      conflict_target: [:media_item_id, :user_id]
    )
1 Like