I tried putting together a custom Broadway Producer that operated on a simple stream. I used a file for testing, but in theory this could work on other streams too. I discovered the stream_split
package which made this possible.
Here’s the code for my custom (file) stream producer:
defmodule StreamProducer do
# See https://hexdocs.pm/broadway/custom-producers.html#example
alias Broadway.Message
# Broadway will not call the child_spec/1 or start_link/1 function of the producer.
# That's because Broadway wraps the producer to augment it with extra features.
def start_link(filepath) do
GenStage.start_link(__MODULE__, filepath)
end
# When Broadway starts, the GenStage.init/1 callback will be invoked w the given opts.
def init(filepath) do
{:producer, File.stream!(filepath)}
end
def handle_demand(demand, stream) when demand > 0 do
{head, tail} = StreamSplit.take_and_drop(stream, demand)
{:noreply, head, tail}
end
# Not part of the behavior, but Broadway req's that we translate the genstage events
# into Broadway msgs
def transform(event, _opts) do
%Message{
data: event,
acknowledger: {__MODULE__, :ack_id, :ack_data}
}
end
def ack(:ack_id, successful, failed) do
IO.puts("ACKING successful: #{length(successful)} failed: #{length(failed)}")
# Write ack code here
end
end
Then I set up my Broadway implementation:
defmodule MyApp do
use Broadway
alias Broadway.Message
def start_link(file) do
Broadway.start_link(__MODULE__,
name: __MODULE__,
producer: [
module: {StreamProducer, file},
transformer: {StreamProducer, :transform, []}
],
processors: [
default: [concurrency: 100]
],
batchers: [
default: [concurrency: 1, batch_size: 100, batch_timeout: 2000]
]
)
end
# Do the work
@impl true
def handle_message(_, %Message{data: data} = message, _) do
IO.inspect(data, label: "HANDLING MESSAGE")
# Simulate load
Process.sleep(1000)
message
end
@impl true
def handle_batch(_, messages, _, _) do
IO.puts("HANDLING BATCH OF >>>> #{length(messages)} <<<<")
messages
end
end
And I ran it doing something like:
iex> MyApp.start_link("/path/to/huge/file.txt")
The whole thing worked exactly as advertised… concurrency configured to the limits of my system etc. and my huge file was processed.
Does anyone know if this will work to process an Ecto
stream? Ecto.Repo — Ecto v3.9.4
I’m gonna try that… although the bit about it only working within a transaction may be a deal breaker… I’m not sure if it would work with multiple transactions… you’d have to do a new transaction inside handle_demand
.