Multi stage pipelines with Broadway

I want to build a system that supports user provided multi steps workflows. Every such step may include interacting with a third party service.
Broadway looks like a perfect fit for the producer side but I didn’t see any complex example that includes multi step flows. Most examples do a local simple transformation and that’s it.
My current thought is to start a GenServer or GenStage on every new message that will handle the execution of the flow.

Can you please provide a concrete example of a workflow you’re trying to achieve?

I want to build a system that supports user provided multi steps workflows. Every such step may include interacting with a third party service.

When you say “user provided,” do you mean the steps are defined by the user, similar to automation tools like Zapier/Make? Or are the steps predefined, but each step requires some input from a user?

Yes, exactly something like Zapier.
More concrete example:
Subscribe to a Google Pub/Sub subscription, dispatch http request to a service with the message as payload, split execution based on the response. If true, format the response and send it as a slack message. Else, drop the message or do something else.
I hope that makes sense, let me know if you need more details

This can all be done pretty easily with one step. Instead of a simple transformation you could have one step that sends the http request and acts on the response. Not sure if there is a need to split it up.

This is another option I’m considering my two concerns:

  • Long running tasks may exceed the acknowledgment deadline of the message queue and trigger a retry. This is why I think it should be better to ack immediately the message and handle it myself. I don’t know if it’s possible without separating.
  • I want to manage the execution state in the database. How long each step took and the overall pipeline. And reflect everything in real-time using LiveView. This is why event sourcing feels like the more intuitive approach.

Sorry for not providing this context earlier. :sweat_smile:

Hello :wave:

Even if it looks beautiful, using processes for handling business logic is not a very good pattern.

You can try to go with something like this

def handle_request(request) do
    case request do
          {:type1, some} -> do_something(request)
          {:type2, some} -> do_anotherthing(request)
    end

end

    defp do_something(request) do
    # code
    send_event_to_db()
    end
end

and then spawn a process for each request

The GenStage library is thought to provide back pressure on your system. With the previous example, if you are spawning too many process and your system can’t handle it you can go with a producer [ConsumerSupervisor](ConsumerSupervisor — gen_stage v1.1.2) architecture to limit the pressure that your system is able to handle just limiting the the amount of process it spawns

Would you say that an execution of a workflow is a business logic? To be clear I mean the orchestration of a single workflow. It involves an intermediate state management as well.

I would say that if you have in mind a workflow like this:

receive the request
save state to db
interact with a service
save state to db
interact with another service
save to db
...

you can just use a single process to do everything.

Using GenStage with multiple layers passing state between them would only complicate things.

And to define it, you can use a GenServer spawning more GenServers under a Dynamic Supervisor

1 Like

Ah ok I understand what you mean now :). The team I work on has done something similar to this. You can use Broadway to listen to the PubSub events and save the message details in a “jobs” database table and then separately create a GenStage pipeline to poll the table for new jobs and perform work and record all the details of it.

One GenStage to pull all the jobs right?

One GenStage pipeline where the producers poll the table and then pass the information to the consumers who will do the processing. And then you can configure the concurrency for the producers/consumers to however much you need.

You might also be able to use a job processing library like Oban. I never used it myself but I believe the general idea is it stores jobs inside of a Postgres table and schedules them/runs them/updates their status.

1 Like

Thanks!