Ecto upsert/insert_all - only update on_conflict if an item datetime is newer than DB record datetime

{:ok, timestamp} = Ecto.Type.cast(:utc_datetime_usec, DateTime.utc_now())

items = [
  %{
        alt_uniq_id: 7,
        name: "whatever",
        publish_date: ~U[2022-10-14 17:41:55.717204Z],
        inserted_at: timestamp,
        updated_at: timestamp
  },
  ....
]

Repo.insert_all(
      MySchema,
      items,
      on_conflict: {:replace_all_except, [:alt_uniq_id, :inserted_at]},
      conflict_target: [:alt_uniq_id],
      set: [updated_at: timestamp]
    )

Lets say I have a list of maps that are parsed from a call to a third party API. I want to insert anything new that doesnt exist based on alt_uniq_id which is fine that works, but then update anything else with the conflict_target of alt_uniq_id ONLY if the publish_date of the database record is older than the publish_date in the map for that item. Every publish_date is different and from 3rd party source, so I can’t rely on inserted/updated timestamps for everything in the upsert, is it even possible to do this?

IMO there are two parts to this:

  • does Postgres support doing that operation with ON CONFLICT?
  • does Ecto support creating that SQL?

The PG docs suggest this might be possible with a WHERE in the ON CONFLICT clause, something like:

...
ON CONFLICT (alt_uniq_id) DO UPDATE
SET (... etc ...)
WHERE excluded.publish_date >= my_schemas.publish_date

Here excluded is PG’s name for the row that was trying to be inserted.

I believe this might be possible by passing an Ecto.Query to on_conflict, but I’m not sure what the note in the documentation about “If the struct cannot be found, Ecto.StaleEntryError will be raised.” means will happen when excluded.publish_date < my_schemas.publish_date. :thinking:

You can pass a predicate to your on_conflict specification in ecto with the :unsafe_fragment tuple. We use it like so:

Repo.insert_all(Schema, attrs,
  on_conflict: {:replace_all_except, [:id, :inserted_at]},
  conflict_target: {:unsafe_fragment, "our_field WHERE other_field IS NULL"}

That said, we use a postgres trigger that we wrote in a migration to avoid the scenario in the original post.

Yours might look something like:

# some_migration_file.exs
execute """
CREATE OR REPLACE FUNCTION no_old_updates() RETURNS TRIGGER LANGUAGE plpgsql AS
$$
BEGIN
  IF OLD.publish_date > NEW.publish_date THEN
   RAISE 'Error: Attempting to replace newer data with older data' USING ERRCODE = 'integrity_constraint_violation';
  END IF
END;
$$;
"""

execute """
CREATE TRIGGER no_old_updates ON myschema.mytable FOR EACH ROW
EXECUTE FUNCTION no_old_updates();
"""
1 Like

Postgres will silently ignore the update if no entries are found and insert_all will return its usual thing. The stale error is only for queries that use changesets. Ecto adapters will see no rows were inserted/updated and raise the error. Or to be more precise, the adapters will return {:error, :stale} then ecto will raise.