Postgrex cannot automatically encode data

Hi, I am working on a script to insert huge csv files into MySQL and Postgres database.
I take the help of Ectos Repo.insert_all for this task of bulk inserting data.

While my code runs fine for MySQL database however I run into some issues when working with Postgres.

Since I am inserting csvs I don’t have schemas defined and I just do Repo.insert_all(table_name, data_chunk)

Postgres gives me these errors:

** (DBConnection.EncodeError) Postgrex expected an integer in -2147483648..2147483647, got "9". Please make sure the value you are passing matches the definition in your table or in your query or convert the value accordingly.

Also…

21:46:45.501 [error] GenServer #PID<0.313.0> terminating
** (DBConnection.EncodeError) Postgrex expected %DateTime{} or %NaiveDateTime{}, got "2017-05-02 15:28:20.2000000 -04:00". Please make sure the value you are passing matches the definition in your table or in your query or convert the value accordingly.

I am using these dependencies…

 {:myxql, "~> 0.3.4"},
 {:postgrex, ">= 0.0.0"},
 {:ecto, "~> 3.4"},
 {:ecto_sql, "~> 3.1"},

While MySQL does not have a problem Postgres has some issues with data types. The only idea I can think of is to iterate over the data and convert strings to integers and timestamps to NativeDateTime, something like String.slice(x, 0..18) |> NaiveDateTime.from_iso8601!, but I don’t think this is a good solution since it involves iterating over all rows which might take time in case of huge csvs.

Are there any other better solutions to this? Am I missing something obvious?

Any help or advice will be very helpful.

I am stuck on this for quite sometime now. :pensive:

Well, you iterate over that data at least once anyway, so I wouldn’t be surprised if it wouldn’t be as much of the problem as you think it will be. If anything then I think that loading file will be bigger issue rather than the processing it. Alternatively you can check if the parser you are using do not support parse-time decoding of data.

For CSV parsing I am using nimble_csv which is written by jose valim himself.

My code looks somewhat like…

file
    |> File.stream!()
    |> CSV.parse_stream()
    |> Stream.chunk_every(chunk_size)
....

I will try the iterating over the data and convert the types, and check if it leads to major performance issues.

That is incorrect, since both postgresql and mysql converts supported string representation as needed.

The different behavior comes from driver - in this case, myxql vs postgrex - on how it converts parameter (given by ecto) when building a message.

See the following code

{:ok, pid} = Postgrex.start_link(database: db_name)

Postgrex.query!(pid, "CREATE TABLE posts (ts timestamp without time zone)", [])

Postgrex.query!(pid, "INSERT INTO posts VALUES ('2017-05-02 15:28:20.2000000 -04:00')", [])
# => success

Postgrex.query!(pid, "INSERT INTO posts VALUES ($1);", ["2017-05-02 15:28:20.2000000 -04:00"])
# => error

---

{:ok, pid} = MyXQL.start_link(database: db_name)

MyXQL.query!(pid, "CREATE TABLE posts (ts DATETIME)")

MyXQL.query!(pid, "INSERT INTO posts VALUES (\"2017-05-02 15:28:20\")")
# => success

MyXQL.query!(pid, "INSERT INTO posts VALUES (?)", ["2017-05-02 15:28:20"])
# => success

See the following code to encode a value for postgresql. We may add parsing datetime to postgrex… but I think it’s generally better to transform the data in the right format on your side, not having conversion in driver side.


Please note that you may load CSV directly to mysql/postgresql.

Postgrex supports it:

Postgrex.query!(pid, "COPY posts FROM '#{csv_filename}' CSV", [])

MySQL also has it with LOAD DATA but myxql supports only prepared statements (even unmaed) - so it fails with (MyXQL.Error) (1295) (ER_UNSUPPORTED_PS) This command is not supported in the prepared statement protocol yet.

Edit: please note that COPY of postgresql reads the file on the server side. If you want to load local file, then /copy of psql has the feature…

Edit 2: I figured out how to use LOAD with myxql - LOAD DATA support · Issue #122 · elixir-ecto/myxql · GitHub

1 Like

Hi, thank you for your reply.

Yes, I agree that the problem is on the driver side since postgresql does not automatically parse DateTime or integer from the string.

Now, from what I can understand there are two ways I can solve this first by iterating over the data to transform(convert the strings to Integer or NaiveDateTime) it into types compatible with the Postgresql adapter or second approach could be to build an extension to handle the encoding/decoding as stated here.

For the second approach, I need some help, since I am not very comfortable with macros in elixir, from what I can understand from here, we have to add the Postgrex.Extension behaviour and define an encode and decode functions, I cannot understand however how to implemented the encode or decode functions, can someone help me on this?

Which approach would be better, isn’t there already an extension to do this DateTime or Timestamp encoding for Postgres?

Let me clear followings

  • postgrex not postgresql is the component does not automatically parsing only when using parameterized SQL.
  • postgrex does support datetime on timestamp column type

There are several options

  • Parse before passing to Ecto changeset
  • Create a custom ecto type: pass string, and parse it to DateTime or NaiveDateTime
  • Create a custom postgrex type

I don’t know postgresql protocol and how postgrex supports custom types… but that may not give performance benefits (e.g. you may need to parse the binary anyway!)

I prefer just fixing the data - you get the data from csv, and it’s your responsibility to clean things up before passing it to something else :slight_smile:


Also note that 2017-05-02 15:28:20.2000000 -04:00 is not the iso8601. 2017-05-02 15:28:20.2000000-04:00 is… you may manipulate the string (removing the space before the offet) or use timex’s flexible parsing.

1 Like

Thanks for the explanation.

The csvs come from some external source, so I can’t clean them, since the data might have many problems parsing date or timestamp could be complicated, I think I will just import the date/timestamp as varchar for now.

Btw, I was trying out something like this for now, don’t know if this is good…

val =
          val
          |> String.trim()
          |> Integer.parse()
          |> case do
            {int_val, ""} ->
              if int_val > -2_147_483_648 && int_val < 2_147_483_647, do: int_val, else: val

            _ ->
              cond do
                String.length(val) == 19 &&
                    Regex.match?(~r/\d\d\d\d-\d\d-\d\d \d\d:\d\d:\d\d/, val) ->
                  val |> String.slice(0..18) |> NaiveDateTime.from_iso8601!()

                String.length(val) == 10 && Regex.match?(~r/\d\d\d\d-\d\d-\d\d/, val) ->
                  val |> String.slice(0..9) |> Date.from_iso8601!()

                val == "" ->
                  nil

                true ->
                  val
              end

Also, I am not using changeset its a list of maps that I directly pass to insert_all, the data looks something like this:

[
  [
    {"Assistant's Phone", nil},
    {"Business Phone", "+1234567890"},
    {"Fax", nil},
    {"Mobile Phone", nil}, 
    {"Other Phone", nil},
    {"Primary Phone", nil},
    {"external_ref", "3be16c8a-0fe0-36b4-5bf0-d8d670cb4d5a"}
  ],
  [
    {"Assistant's Phone", nil},
    {"Business Phone", "+1234567890"},
    {"Fax", nil},
    {"Mobile Phone", nil},
    {"Other Phone", nil},
    {"Primary Phone", nil},
    {"external_ref", "ca0e8811-4503-7089-cdfe-ed894f026c2b"}
  ],
]

If at least the structure is fixed (column) - then it would better to apply specific function instead of trying multiple functions with cond. See the example from nimble_csv

"path/to/file"
|> File.stream!
|> MyParser.parse_stream
|> Stream.map(fn [name, age] ->
  %{name: :binary.copy(name), age: String.to_integer(age)}
end)
|> Stream.run

Also please note that Stream may not be performant - in this case, you may just use Enum - depending on the size. You can search more discussion in this forum about why Stream is slow…

For batching insert - you can use Enum.chunk_every/2 or Stream.chunk_every/2

Also… for parsing - you may use binary pattern matching - I guess this is faster than regex… not tested though.

<<date::binary-size(27), " ", offset::binary>> = "2017-05-02 15:28:20.2000000 -04:00"
DateTime.from_iso8601!(date <> offset)
1 Like