Postgrex error(citext extension) from Broadway Kafka consumer

Hi there :wave:

I have a Broadway Kafka Consumer, which consumes some data from Kafka:

defmodule KafkaConsumer do
  use Broadway
  alias Broadway.{Message, BatchInfo}
  alias Company

  def start_link(opts) do
      name: __MODULE__,
      producer: [
        module: {
            group_id: group_id,
            hosts: hosts,
            topics: ...
        concurrency: 1
      processors: [
        default: [
          concurrency: 3
      batchers: [
        default: [],
        insert_all: [
          batch_size: 10,
          batch_timeout: 1_000

There is a table in my PostgreSQL DB with a unique name field. The type of that field is citext.
My app consumes names from Kafka and tries to insert them into DB with the INSERT ON CONFLICT query, based on the unique name constraint. For that I have insert_all batcher.
Sometimes I can receive next names:

names = ["İsbir Yatak", "Isbir Yatak"]

The only difference between these 2 is the accent on the first letter. But it turns out that these two names refer to the same value for the citext type. To exclude the next exception:

  ** (Postgrex.Error) ERROR 21000 (cardinality_violation) ON CONFLICT DO UPDATE command cannot affect row a second time

I’ve created the following function:

  @spec name_to_citext_name([String.t()]) :: %{String.t() => String.t()}
  def name_to_citext_name(names) do
    sql = """
    SELECT name, data.distinct_name
    FROM UNNEST($1::citext[]) name
      SELECT DISTINCT distinct_name
      FROM UNNEST($1::citext[]) distinct_name
    ) data on name = data.distinct_name;

    |> Ecto.Adapters.SQL.query!(sql, [names])
    |> Map.get(:rows)
    |> [name, citext_name] -> {name, citext_name} end)

This function prepares unique names, based on citext type, which I pass to the INSERT ON CONFLICT query. It works as expected, but sometimes, in very rare cases, I receive the next error:

    (Postgrex.Error ERROR 42704 (undefined_object) type "citext[]" does not exist

I tried to intercept the exception and log the results of the next queries:

SELECT * FROM pg_extension;
SELECT oid FROM pg_database WHERE datname = (SELECT current_database() AS name);

The results of these queries are the same, as when I connect to the PostgreSQL directly and manually perform queries.
Also, I’ve noticed that sometimes I can get the next error:

(Postgrex.Error ERROR 42P01 (undefined_table) relation "..." does not exist)

from the other consumer. I suppose the issue here is the same.

I will be very grateful if someone can help in any way, or tell me where to dig further