DayDreamer
Broadway's ProducerConsumer stage unable to hook up with Broadway's Producer.
Hello Everyone,
I am new in elixir and Broadway and want to setup a data processing pipeline with RabbitMQ Consumer (Filter stage) --> Format stage (Broadway ProducerConsumer) --> Tag stage(Broadway Consumer).
I got the Filter stage correct however, for the Format stage Filter stage should be the producer however, this change does not work.
Filter stage RabbitMQ consumer:
defmodule MyApp.Filter do
use Broadway
def start_link(_) do
Broadway.start_link(__MODULE__,
name: __MODULE__,
producer: [
module: {
BroadwayRabbitMQ.Producer,
queue: "ingress",
declare: [durable: true],
bindings: [{"events", []}],
connection: [host: "localhost"]
}
],
processors: [
default: [concurrency: 1]
]
)
end
def handle_message(_, message, _) do
IO.puts("Received message: #{message.data}")
message
end
end
Format stage:
defmodule MyApp.Formatter do
use Broadway
alias Broadway.Message
def start_link(_) do
Broadway.start_link(__MODULE__,
name: __MODULE__,
producer: [
module: {MyApp.Filter} # this requires "args" however, [] or {} or nil does not work
# main question how to make this ProducerConsumer work with BroadwayProducer(Filter).
],
processors: [
default: [concurrency: 1]
]
)
end
def handle_message(_, %Message{data: data} = message, _) do
# Example processing
transformed_data = String.upcase(data)
IO.puts("Processing message: #{transformed_data}")
%Message{message | data: transformed_data}
end
end
I am not sure what args should look like so that this stage will work
application.ex
defmodule MyApp.Application do
use Application
impl true
def start(_type, _args) do
children = [
{MyApp.Filter, []},
{MyApp.Formatter, []}
]
opts = [strategy: :one_for_one, name: MyApp.Supervisor]
Supervisor.start_link(children, opts) end
end
mix.exs deps
{:broadway, "~> 1.0"},
{:broadway_rabbitmq, "~> 0.7"}
Could someone point out what am I missing
Cheers,
Popular in Questions
Hello, I get Persian date from my client and convert it to normal calendar like this:
def jalali_string_to_miladi_english_number(persi...
New
After calling mix ecto.create I get this error:
17:00:32.162 [error] GenServer #PID<0.412.0> terminating
** (Postgrex.Error) FATAL...
New
Is there a way to rollback a specific migration and only that one (“skipping” all the other ones)?
Would
mix ecto.rollback -v 200809061...
New
I am trying to figure out how Mix knows whether the environment is test, dev, or prod – where is this set?
Thanks.
New
How to bind a phoenix app to a specific ip address?
could not find anything about that, nowhere, unfortunately, but for me this is quite...
New
Hi all,
Trying to get some more clarity over utc_datetime and naive_datetime for Ecto:
The documentation above suggests that while ...
New
Using vs code and installed ElixirLS: support and debugger.
And I got an error popped up on start up says
Failed to run ‘elixir’ comma...
New
I am trying to start a new phoenix project with elixir 1.9, but mix phx.new does not work. It says that ** (Mix) The task "phx.new" could...
New
Okay, I’m having a heck of a time trying to figure out how to best handle the validation of belongs_to associations in Ecto. I’m sure I’...
New
I had some trouble figuring out how to make many-to-many associations work. Once I got it working, I wrote a blog post. Because I’m a nov...
New
Other popular topics
Hello, can anybody help here..? I have a list of players and I what to delete an element, but every for loop the list is reverting to ori...
New
Posting this to see if we can make things easier for people to get into Neovim. If you use Neovim and have a favourite distro please let ...
New
After calling mix ecto.create I get this error:
17:00:32.162 [error] GenServer #PID<0.412.0> terminating
** (Postgrex.Error) FATAL...
New
I want to highlight html closing tags when i click a html tag. That works in .html files but doesnt work for html.eex templates. How can...
New
I’ve read in another post that it may be possible with a router helper - but I couldn’t find an appropriate one, and tbh, I’m still just ...
New
Why is it that the mnesia database isn’t the most preferred database for use in Elixir/Phoenix?
New
We’ve put together this wiki for Phoenix LiveView - please feel free to add any info you feel is worth including.
What is Phoenix LiveV...
New
I had some trouble figuring out how to make many-to-many associations work. Once I got it working, I wrote a blog post. Because I’m a nov...
New
Hello!
Sorry for this astonishing simple question, but I’m really stuck. I try to set up the intellij-elixir plugin, but I don’t know ho...
New
There are pre-rolled solutions for other frameworks that do work. However, Phoenix does not seem to have these. Have people had good expe...
New








