DayDreamer

DayDreamer

Broadway's ProducerConsumer stage unable to hook up with Broadway's Producer.

Hello Everyone,

I am new in elixir and Broadway and want to setup a data processing pipeline with RabbitMQ Consumer (Filter stage) --> Format stage (Broadway ProducerConsumer) --> Tag stage(Broadway Consumer).
I got the Filter stage correct however, for the Format stage Filter stage should be the producer however, this change does not work.

Filter stage RabbitMQ consumer:

defmodule MyApp.Filter do
  use Broadway

  def start_link(_) do
    Broadway.start_link(__MODULE__,
      name: __MODULE__,
      producer: [
        module: {
          BroadwayRabbitMQ.Producer,
          queue: "ingress",
          declare: [durable: true],
          bindings: [{"events", []}],
          connection: [host: "localhost"]
        }
      ],
      processors: [
        default: [concurrency: 1]
      ]
    )
  end

  def handle_message(_, message, _) do
    IO.puts("Received message: #{message.data}")
    message
  end
end

Format stage:

defmodule MyApp.Formatter do
  use Broadway

  alias Broadway.Message

  def start_link(_) do
    Broadway.start_link(__MODULE__,
      name: __MODULE__,
      producer: [
        module: {MyApp.Filter}  # this requires "args" however, [] or {} or nil does not work 
# main question how to make this ProducerConsumer work with BroadwayProducer(Filter).

      ],
      processors: [
        default: [concurrency: 1]
      ]
    )
  end

  def handle_message(_, %Message{data: data} = message, _) do
    # Example processing
    transformed_data = String.upcase(data)
    IO.puts("Processing message: #{transformed_data}")
    %Message{message | data: transformed_data}
  end
end

I am not sure what args should look like so that this stage will work

application.ex

defmodule MyApp.Application do
  use Application

  impl true
  def start(_type, _args) do
    children = [
      {MyApp.Filter, []},
      {MyApp.Formatter, []}
    ]

    opts = [strategy: :one_for_one, name: MyApp.Supervisor]
    Supervisor.start_link(children, opts) end
end

mix.exs deps

{:broadway, "~> 1.0"},
{:broadway_rabbitmq, "~> 0.7"}

Could someone point out what am I missing

Cheers,

Where Next?

Popular in Questions Top

shahryarjb
Hello, I get Persian date from my client and convert it to normal calendar like this: def jalali_string_to_miladi_english_number(persi...
New
Patoshizzle
After calling mix ecto.create I get this error: 17:00:32.162 [error] GenServer #PID<0.412.0> terminating ** (Postgrex.Error) FATAL...
New
jaysoifer
Is there a way to rollback a specific migration and only that one (“skipping” all the other ones)? Would mix ecto.rollback -v 200809061...
New
jononomo
I am trying to figure out how Mix knows whether the environment is test, dev, or prod – where is this set? Thanks.
New
nobody
How to bind a phoenix app to a specific ip address? could not find anything about that, nowhere, unfortunately, but for me this is quite...
New
itssasanka
Hi all, Trying to get some more clarity over utc_datetime and naive_datetime for Ecto: The documentation above suggests that while ...
New
freewebwithme
Using vs code and installed ElixirLS: support and debugger. And I got an error popped up on start up says Failed to run ‘elixir’ comma...
New
shijith.k
I am trying to start a new phoenix project with elixir 1.9, but mix phx.new does not work. It says that ** (Mix) The task "phx.new" could...
New
dotdotdotPaul
Okay, I’m having a heck of a time trying to figure out how to best handle the validation of belongs_to associations in Ecto. I’m sure I’...
New
marick
I had some trouble figuring out how to make many-to-many associations work. Once I got it working, I wrote a blog post. Because I’m a nov...
New

Other popular topics Top

vertexbuffer
Hello, can anybody help here..? I have a list of players and I what to delete an element, but every for loop the list is reverting to ori...
New
AstonJ
Posting this to see if we can make things easier for people to get into Neovim. If you use Neovim and have a favourite distro please let ...
New
Patoshizzle
After calling mix ecto.create I get this error: 17:00:32.162 [error] GenServer #PID<0.412.0> terminating ** (Postgrex.Error) FATAL...
New
dokuzbir
I want to highlight html closing tags when i click a html tag. That works in .html files but doesnt work for html.eex templates. How can...
New
RisingFromAshes
I’ve read in another post that it may be possible with a router helper - but I couldn’t find an appropriate one, and tbh, I’m still just ...
New
jay1
Why is it that the mnesia database isn’t the most preferred database for use in Elixir/Phoenix?
New
AstonJ
We’ve put together this wiki for Phoenix LiveView - please feel free to add any info you feel is worth including. What is Phoenix LiveV...
New
marick
I had some trouble figuring out how to make many-to-many associations work. Once I got it working, I wrote a blog post. Because I’m a nov...
New
openscript
Hello! Sorry for this astonishing simple question, but I’m really stuck. I try to set up the intellij-elixir plugin, but I don’t know ho...
New
PeterCarter
There are pre-rolled solutions for other frameworks that do work. However, Phoenix does not seem to have these. Have people had good expe...
New

We're in Beta

About us Mission Statement