Need help to map kafka topic to liveview

I have a problem when integrate phoenix live view and kafka.
been a week i spend my free time to learn elixir.
Now i have a normal web app just display realtime data from kafka. i have done with producer from another server side. So now in elixir im using phoenix to serve web app.
the problem is i dont actually know how to map consumers topics data to web display.
im using kafka_ex library.
Please give me some advice.
Thank for reading.

Simply subscribe to a topic from Phoenix live view (in mount).

Read data from Kafka and broadcast the data using Phoenix live view.

All your subscribers will receive the data inside a handle_info/3 function. In this function you will be able to assign data to Phoenix client socket.

If you need some code sample, ask in reply :wink:
—

If you want to keep you code with no coupling (business side vs web side), you can use Registry.

I wrote a lib that could interested you https://github.com/wapitea/event_manager. It’s more sugar syntax than anything else.

It’s new and document may have some errors. You can read how to use registry for example or simply use it.

2 Likes

FYI Registry is local - so if you run distributed erlang then you cannot use it since kafka consumer process and liveview process is not in the same node.

1 Like

I spoke about code architecture not multi node. But good reminder :wink:

But to resolve that kind of problem, there is an awesome lib call horde.

He doesn’t say the consumer and the Liveview are on different node. Otherwise he just need to create a simple api with Phoenix.

1 Like

Also I totally forgot but something really nice is use {:global, :name}.

More information The scope of process name?

1 Like

Thank you all.
I have using websocket but that performance are bad. That why im here.

  def mount(_params, _session, socket) do
    stream = KafkaEx.stream("foobar", 0)
    stream |> Stream.map(fn(msg) -> IO.puts(msg.value) end)
    socket = assign(socket, query: "", context: %{stream: stream})
    {:ok, socket}
  end

I have done in console/terminal. so the main issue is in phoenix.
Iam new.
Please explain for me what can i do at handle_info/3.
Thanks

UPDATE!
I have do more code but i think im wrong some where. render function can not listen yield/stream. so must i using channel ?
Here is my live view file https://gist.github.com/bboyadao/7abef92408031a1f04a27a085d052a8f

I think you should consider 2 parts on your apps.

The first one is the web, you will only deal with web things. You will create your LiveView, handle broadcasted messages …

The second one is your “real backend”, where you put all business stuff like consume kafka.

Then you will be able to focus on what to achieve step by step, instead of trying to do everything in one shot.

–

Here your goals (suggestion):

  1. Consume kafka message (you can use GenStage here for example, or use any supervised process with a loop system to consume your kafka queue).
    it will create the “consume” loop system. So everytime a new message income to your kafka, you’ll consume it asap.

  2. Create your web page to display your data (at this point don’t use real data, create a “fake” data representation of what you will have / want to have)

  3. Link both together. Basically, you want your “backend” to push information to your web page. A possible solution is use phoenix broadcast system (Phoenix.Endpoint — Phoenix v1.7.10).

You can also use channels but, IMO it’s overkill for your needs. But it’s really interesting to use it anyway.

At every step check if everything works as expected. For the first step, every time you push a message try to display it with a IO.inspect for example.


Here an example of what you can do

defmodule MinigameWeb.CustomerLive do
  use MinigameWeb, :live_view
  @topic_name "test_topic"

  def mount(_params, _session, socket) do
    # don't forget to subscribes to a topic ;)
    MinigameWeb.Endpoint.subscribe(@topic_name)
  
    # Mount will be call 2 times (not a bug, liveview behaviour)
    # so you don't want to consume anything from your kafka queue here. 
    # otherwise some messages will dispear like a "bug".
    # The first one for "rendering the page", the second one to "reach the real data".

    # So assigns only "default" values.
    {:ok, assign(socket, messages: [], query: ""}}
  end

  # here we'll capture all incoming messages from phoenix broadcasting system
  # we don't whant to many code so keep it simple.
  def handle_info(%{event: "new_message", payload: messages}, socket) do
    # you want to replace or append the result ? 
    # take a look at https://hexdocs.pm/phoenix_live_view/dom-patching.html
    {:noreply, assign(socket, messages)}
  end

  def render(assigns) do
    # render things
  end
end

In your backend you just have push the kafka message with:

MinigameWeb.Endpoint.broadcast("test_topic", "new_message", kafka_message)

doc: Phoenix.Endpoint — Phoenix v1.7.10

Every time you pushed a message with broadcast/3 all liveviews that have been subscribe the the specified topic will receive the message. It mean, if you have 3 tabs on the same page (3 sockets so), the 3 sockets will execute the handle_info/2 so the 3 tabs will be updated in real time.

2 Likes

Many thanks for your very detail. i’ll check it out and response.

It’s seem complex more then i thought at elixir.
i have create a repo here https://github.com/bboyadao/minigame
So could you give me a hand…
thanks