Is it possible to start a Broadway pipeline with a DynamicSupervisor? Well, it is possible start at least one but when I try to start multiple only the first one starts.
I tried overriding the default child_spec
with
%{
id: "#{__MODULE__}:#{unique_id}",
start: {__MODULE__, :start_link, [destination: destination]},
shutdown: :infinity
}
but then none of the pipelines start and I am getting zero errors.
I overrode the default child_spec
because this is the implementation:
default = %{
id: unquote(module),
start: {__MODULE__, :start_link, [arg]},
shutdown: :infinity
}
I have also defined a process_name
:
@impl Broadway
def process_name({:via, module, {registry, name}}, base_name) do
{:via, module, {registry, "#{base_name}:#{name}"}}
end
This is how I am starting the pipeline:
DynamicSupervisor.start_child(
ER.DynamicSupervisor,
{ER.Destinations.Pipeline.S3, [destination: destination]}
)
def start_link(opts) do
broadway_config = get_broadway_config(opts[:destination])
context = %{
destination: broadway_config.destination
}
result =
Broadway.start_link(__MODULE__,
name: broadway_config.name,
producer: [
module:
{OffBroadwayEcto.Producer,
client: {ER.Destinations.Pipeline.Client, [destination: broadway_config.destination]}}
],
context: context,
processors: [
default: [concurrency: broadway_config.processor_concurrency]
],
batchers: [
s3: [
concurrency: broadway_config.batcher_concurrency,
batch_size: broadway_config.batch_size,
batch_timeout: broadway_config.batch_timeout
]
]
)
Anyone have any ideas what I could be doing it wrong? Or if it is possible? Thanks