Hi everyone,
I implemented the example from this blog RPC over RabbitMQ (with Elixir) – Andrea Leopardi.
My dev environment:
Elixir 1.14.2, RabbitMQ 3.12.0, Erlang 25.3.2.2, {:amqp, "~> 3.3"}, {:broadway, "~> 1.0"}, {:broadway_rabbitmq, "~> 0.7.0"}
I created: a caller module, a receiver module use Broadway and can start with Supervisor.
The situation is: if I did not start the receiver module, but run the caller program I still get the result as the receiver is started.
I don’t know if whether it is wrong with my dev environment or it is the auto start mechanism of Broadway, RabbitMQ and Supervisor.
If someone face this situation, please help to explain to me.
- caller_rpc_service.exs
defmodule CallerRPCService do
def wait_for_messages(_channel, correlation_id) do
receive do
{:basic_deliver, payload, %{correlation_id: ^correlation_id}} ->
IO.puts("Response Message: #{inspect(payload)}")
end
end
end
{:ok, connection} = AMQP.Connection.open()
{:ok, channel} = AMQP.Channel.open(connection)
{:ok, %{queue: queue_name}} = AMQP.Queue.declare(channel, "", exclusive: true)
AMQP.Basic.consume(channel, queue_name, nil)
{headers, message} = {[{"destination", "my_service"}], "Hello Broadway and RPC."}
correlation_id = :erlang.unique_integer() |> :erlang.integer_to_binary() |> Base.encode64()
AMQP.Exchange.declare(channel, "rpc", :headers,
arguments: [{"destination", :longstr, "my_service"}]
)
AMQP.Basic.publish(channel, "rpc", "", message,
reply_to: queue_name,
correlation_id: correlation_id,
headers: headers
)
CallerRPCService.wait_for_messages(channel, correlation_id)
- receiver_rpc_service.ex
defmodule MyService.RPCConsumer do
use Broadway
@producer BroadwayRabbitMQ.Producer
@producer_config [
queue: "my_service.rpcs",
declare: [durable: true],
bindings: [
{"rpc", arguments: [{"destination", :longstr, "my_service"}]}
],
metadata: [:reply_to, :correlation_id],
on_failure: :reject_and_requeue # mandatory option
]
def start_link(_args) do
options = [
name: RPCConsumerPipeline,
producer: [
module: {@producer, @producer_config}
],
processors: [
default: []
]
]
Broadway.start_link(__MODULE__, options)
end
def handle_message(_, %Broadway.Message{} = message, _context) do
IO.puts("Request message: #{inspect(message)}")
AMQP.Basic.publish(
message.metadata.amqp_channel,
"",
message.metadata.reply_to,
"We have got it. Ok!",
correlation_id: message.metadata.correlation_id
)
message
end
end
- application.ex
defmodule RabbimqTutorials.Application do
@moduledoc false
alias MyService.RPCConsumer
use Application
@impl true
def start(_type, _args) do
children = [
RPCConsumer
]
opts = [strategy: :one_for_one, name: RabbimqTutorials.Supervisor]
Supervisor.start_link(children, opts)
end
end