Broadway / Flow / GenStage

First of all, I realize that both flow and broadway are built on top of genstage.

I find flow a bit confusing, compared to how broadway is presented. Probably because flow is trying to be applicable to many more uses than just data pipelines.

My Use Case:
I’m fetching from an API and need to run many requests as fast as possible, in parallel. And it’s time-series data with associated sensors. First I fetch a list of sensors (~1,000 of these), then, for each sensor, I will fetch the time series data in 1 day chunks, between now and some selected date in the past. Then I will pipe all chunks to my postgres+timescaledb database (calling Repo.insert_all/3 per batch).

Based on my what I’ve read and watched, it sounds like broadway would be really ideal, but it seems as if writing my own producer/producer-consumer/consumer is or was not the original intent of the authors. And I’m saying this just based on how the readme comes across to me. Please correct me if I’m wrong.

Flow doesn’t seem as nicely tailored to processing data like I’m doing, and also doesn’t seem as approachable.

Could somebody please advise me as to which of these I should make my focus?

Also, does it make sense to do postgres bulk inserts from parallel consumers? My guess is that it does not. The folks at TimescaleDB recommend a raid 0 array of a small number of drives, and to put the WAL on a separate disk from the data to get better bulk insert performance, but I assume parallel writes are still not supported. Could somebody tell me how this works with elixir? Does elixir run a single process for all DB queries, regardless of how many processes I have sending DB write requests?

3 Likes

My take is that there are some common producers provided, but the rest is up to us. Nothing wrong with building your own producers/consumers. Last time I read the docs, the takeway was to limit the number of producer-consumers like in GenStage.

(I’m assuming that you’re using Ecto). No, it has a pool of connections so you can query the DB in parallel. But then you need to check how much the DB is able to sustain.

1 Like

This is definitely not true. The Elixir community in general strives to include only the minimal batteries and is more about giving you high-quality LEGO blocks with which you build your own thing. As @stefanchrobot said, don’t be shy about building your own custom parts when the need calls for it. :slight_smile:

If you would like to get good advice you’ll need to share some more details, f.ex.:

How often does this happen? Say, once an hour? Once every minute? Also, it’s not clear if you need to call the external API on as many requests as possible VS. you first calling that API and then doing many parallel requests to… where? Another API?

We lack enough details to give you a recommendation.

We don’t know anything about in which business you operate, how loaded your system is, and do you actually need those scaling advises that the folks gave you. From what I’ve seen from my freelancing career, most people tend to overestimate their hosting needs.

Truth is, my i3 CPU Linux laptop can easily handle dozens of thousands of writes per second when connected via an Ethernet cable on the same local network as my much beefier workstation.

So don’t rush introducing huge complexity to your setup – unless you haven’t shared with us that you already need to scale your DB servers.


All in all, we need more details. I personally never used Flow once. Every time I felt I could, I just got away with a few supervised processes that periodically do Task.async_stream-based algorithms, and never had a problem.

I’m not building a web server, it’s a tool for personal use. What I’m trying to accomplish is to seed my database from an external API so that I can do statistics on the data, and I want the seeding process to complete in a reasonable amount of time.

You still need to tell us if that external API won’t ban you if you do 50+ requests at the same time, being one example.

No, it has not banned me. I’ve had many successful tests so far, even though my pipeline is not fully working at this point.

How do I figure out how many concurrent writes my postgres can handle? Yes, I’m using Ecto.

This StackOverflow thread could help.

This article is not bad as well.

You have two distinct actions, one is to schedule, the other is to fetch…

Schedule is something different, but You could have GenServer (maybe one per sensor), triggering event periodically, that will put job in a GenStage pipeline.

Then, the pipeline will achieve its work, even under load pressure. You will achieve concurrency by specifying the number of producer/consumer.

Beware of not going too fast, and not flooding your database.It happened to me when trying to crawl a website and saving to db. I was doing insert_all with potentially too much data, and postgrex did not like it that much.

GenStage helped me to have a better control of concurrency.

Flow would be nice if You have aggregation per sensor, but it seems not, as You just insert this into db.

1 Like

Create a buffer process. Generate a job for each sensor day combo you need. Queue up those jobs in your buffer. Create a Broadway producer for your buffer and in the Broadway pipeline pull the data, batch it and insert. Scale up producer consumers as needed.

2 Likes

Yes, that is the path to follow…

It is expected for you to write your own GenStage producers and plug them into Broadway. There is even a guide that covers this: Custom Producers — Broadway v1.0.7

However, the producers_consumers and consumers are Broadway responsibility, you can’t plug them in.

6 Likes

Thanks José. I’m confused by this last statement though. Wouldn’t the consumer in my case be the layer to performs the bulk insert with ecto? That would be a consumer I would have to write.

My steps are:
(1) query → (2) for each, fetch (API get request) many times → (3) bulk insert

So I see this as producer → producer_consumer → consumer. Is this correct?

1 Like