Consumer design question

Hello all, I have a question about writing a consumer with GenStage.
I’d like this consumer to receive at some point a module name that should be used to perform some work on the data received.
My gut feeling would be to have something in the init function, but I’m quite sure I’m thinking at objects right now, where I “instance” the consumer with a module name that will become an attribute, while the docs call the second argument “state”.
How to design such feature?
This is what I’d like to do for clarity:

GenStage.start_link(MyConsumer, worker: MyWorker)

Suggestions?

ngw

1 Like

Alright, just wanted to give the idea without forcing an API… My question is more about how to do that right :slight_smile:

ngw

1 Like

Will MyConsumer implement any generic behaviour on top of GenStage, that you plan to reuse in all workers? Because otherwise, each MyWorker module should just implement the GenStage behaviour as a consumer, with the specific parts done in the handle_events/3 callback.

But to answer the initial question:

The second argument of GenStage.start_link/2 will be passed to the init/1 function once the process is started.

So assuming the way you’re calling start_link, you can define init as:

def init(args) do
  worker_mod = Keyword.fetch!(args, :worker)
  {:consumer, %{worker_mod: worker_mod}}
end

Then, in handle_events/3, just fetch the worker module from the state:

def handle_events(events, _from, %{worker_mod: worker_mod} = state) do
  # assumes MyWorker implements handle/1 that takes a list of events
  worker_mod.handle(events)
  {:noreply, [], state}
end
1 Like

That’s the case, it will implement a generic behaviour on top of GenStage.
Your answer clarified all my doubts, thanks a lot.

ngw

1 Like