Greetings,
I’m having a hell of a time trying to build a really simple script to collect chat messages and save them to the database… Unfortunately, I’m still too new to the language and things haven’t “clicked” yet in my mind. I recently bought this book to learn about genstage and broadway: Concurrent Data Processing in Elixir: Fast, Resilient Applications with OTP, GenStage, Flow, and Broadway by Svilen Gospodinov . While I did learn quite a bit more about the technologies, the examples provided there just didn’t apply to my specific case and I can’t find any examples to study. At the end of the book, he describes something about being able to set up persistence queue, but doesn’t actually provide the example.
What I need is a something that collects individual chat messages that come in the form of pre-formatted maps %{}
and adds them to queue, or a big list. Example:
%{userid: 2, message: “hello test test test”, timestamp: …}
%{userid: 15, message: “how are you”, timestamp: …}
%{userid: 2, message: “foo bar”, timestamp: …}
…
If I have 1000 messages a minute, I would like to set up something scalable that will take batches of 10 message every 10 seconds and save them to mysql database using “insert_all”.
If the database is down or there is a connection error, I would like the script to continue to retry inserting the messages to the database without losing them, and keep trying for hours if needed, as our database has been known to go down for that long in rare cases.
I don’t know why I need to use RabbitMQ. I would like to just save these maps (messages) into a big simple list without having to set up a whole RabbitMQ server. Right now, I’m just trying to set up something really simple, and maybe when I become more knowledgeable, I could look into RabbitMQ again. I set up the custom transformation in the code below, as the book described, so I wouldn’t have to use RabbitMQ.
Here is what I have so far:
RoomChannel.ex: The function that sends the chat message map to the queue:
defmodule ExchatWeb.RoomChannel do
use Phoenix.Channel
alias ExchatWeb.Presence
import Ecto.Query
alias Exchat.Repo
def handle_in("new_msg", %{"body" => body}, socket) do
if (socket.assigns.is_anon == 0) do
IO.inspect("user is logged in - send message")
# This function sets the chat message to the database persistence script, formatted for database insert.
:ok = ExchatWeb.TestProducer.save_message([%{message: body, userid: String.to_integer(socket.assigns.user_id), roomid: socket.topic, timestamp: DateTime.truncate(DateTime.utc_now(), :second)}])
# sends the chat message to the chat app
broadcast!(socket, "new_msg", %{body: body})
{:noreply, socket}
else
IO.inspect("user is anonymous - do not send message")
{:noreply, socket}
end
end
end
SaveTest.ex: Broadway Batcher:
defmodule ExchatWeb.TestSave do
use Broadway
require Logger
def start_link(_args) do
options = [
name: ExchatWeb.TestSave,
producer: [
module: {ExchatWeb.TestProducer, []},
transformer: {ExchatWeb.TestSave, :transform, []}
],
processors: [
default: [max_demand: 1, concurrency: 1]
],
batchers: [
default: [batch_size: 1, concurrency: 1, batch_timeout: 10_000]
]
]
Broadway.start_link(__MODULE__, options)
end
def transform(event, _options) do
%Broadway.Message{
data: event,
acknowledger: {ExchatWeb.TestSave, :pages, []}
}
end
def ack(:pages, _successful, _failed) do
:ok
end
def handle_message(_processor, message, _context) do
if ExchatWeb.TestProducer.online?(message.data) do
IO.inspect("handle message from Broadway")
IO.inspect(message.data)
Broadway.Message.put_batch_key(message, :default)
else
IO.inspect("handle message - failed")
Broadway.Message.failed(message, "offline")
end
end
def handle_batch(_batcher, [message], _batch_info, _context) do
IO.inspect(message.data)
IO.inspect("handle batch")
IO.inspect([message])
[message]
end
end
TestProducer.ex: The producer used in the SaveTest.ex broadway batcher:
defmodule ExchatWeb.TestProducer do
use GenStage
require Logger
def init(initial_state) do
Logger.info("TestProducer init")
{:producer, initial_state}
end
def handle_demand(demand, state) do
Logger.info("TestProducer received demand for #{demand} pages")
events = []
{:noreply, events, state}
end
def save_message(pages) do
ExchatWeb.TestSave
|> Broadway.producer_names()
|> List.first()
|> GenStage.cast({:pages, pages})
end
def handle_cast({:pages, pages}, state) do
{:noreply, pages, state}
end
def online?(_url) do
# Pretend we are checking if the
# service is online or not.
# Select result randomly - (result will always be true for testing purposes).
Enum.random([true, true, true])
end
end
What I have here is Frankenstein code… taking examples from the book with parts that I won’t actually be using. Right now I’m trying to inspect and understand the flow of the messages, observe the queue, etc. I haven’t even gotten to the actual insert_all database call, nor the “retry” logic for database downtime yet. Right now, I’m just trying to figure out how to get the Producer queue to receive the individual chat messages (the maps %{...}
) from the chat room and put them in a big list, or a queue. Then have the broadway batcher grab small batches of those messages out of the queue.
The first problem right now is that the producer receives the messages and it instantly ends up on the broadway script, it doesn’t wait 10 seconds, and it doesn’t come in multiple batches. It’s like there’s no queue or something.
To me, it seems like the solution and overall script has to be ridiculously simple, right? I’d greatly appreciate it if someone could provide a fully working example of this, and hopefully something will “click” in my head.