Ingesting stream of MQTT events with Flow. Is it a good idea?

Hey folks,

I’m trying to figure out how to ingest a stream of messages from an MQTT broker in an efficient way. The problem involves many different sensors that publish a measurement once every minute. A server (one node for now) subscribes to these messages and picks these messages up from the broker. The stream of messages should be partitioned by sensor (so one stream of messages un-bundles into many streams, one for each sensor) so they can be processed separately. Then it would be desirable to have all messages accumulated every period of ten minutes, before they can be handed over to the business logic. The business logic is encapsulated in an OTP process (a GenServer for each sensor). The accumulation is required to calculate an average out of the ten measurements that arrived in each window.

As I’m reading Concurrent Data Processing in Elixir I’m trying to bend my head over this problem and see how my envisioned system maps onto concepts of the Flow library. I’m excluding Broadway here, because I can’t find a mature library that acts as a Broadway producer from an MQTT broker.

Am I on the right track with this? Is Flow a good fit for this use-case? Or is this better solved in the Functional Core (I’m borrowing terms from the Designing Elixir Systems with OTP book). Will Flow allow me to partition(?) the data per sensor, and create windows (or maybe I need triggers here) for each interval of ten minutes, after which I can emit the received messages once the period has passed?

I’m having a hard time understanding the Flow concepts, but I’m getting there (I have read the hexdocs five times by now).

is plugging in a time series database an option?

How many sensors are we talking about? Emitting once every minute is slow enough that you should only worry about performance and partitioning if there are enough sensors.

Also what should happen if something fails? Is it ok if you drop messages? What should happen if after 6 minutes your accumulation logic crashes. Is it ok to just restart from the current message or do you need to restart from the first message?

Do you need to store all incoming data for later usage? Or can you just drop it. What about the accumulation, do you need to store that or just emit it somewhere?

These would be the kind of questions I’d ask myself before making a decision on anything. Because they will affect how you want to design your solution.

Depending on some of the answers to the above question. I’d probably start with a MQTT client and handle the logic myself. I don’t know flow so I can’t comment if it would be a good choice for you.

1 Like

Yes, the averaged measurements would have to end up in a time series database. I was thinking about inserting them in the database as part of the Functional Core.

Those are all good questions. I can answer some:

How many sensors are we talking about? Emitting once every minute is slow enough that you should only worry about performance and partitioning if there are enough sensors.

A few hundred at first, but should be able to scale (I know, the most generic answer ever :slight_smile:)

Also what should happen if something fails? Is it ok if you drop messages? What should happen if after 6 minutes your accumulation logic crashes. Is it ok to just restart from the current message or do you need to restart from the first message?

I was hoping for the broker to be able to retain all unacknowledge messages, so they can be re-delivered when something fails. I’ve worked with queues and topics before, but I didn’t delve deep into the possibilities of MQTT brokers yet.

Do you need to store all incoming data for later usage? Or can you just drop it. What about the accumulation, do you need to store that or just emit it somewhere?

Only the averages would need to be retained somewhere, in a time series database.

I don’t know flow so I can’t comment if it would be a good choice for you.

I’m trying to get a feel of how Flow could work here, but I appreciate your feedback.

If you do not need the actual values the time series database can do the aggregation. You can then pull from the DB, which will simplify the architecture.

1 Like

You suggest to stream the events directly to the database? Didn’t think of that yet :thinking:

I still need to trigger some business logic when events flow in though, so I don’t know how that would play out in this dorect-to-time-series-db-setup…

Can you save your accumulation after every message or only after 10 messages? Because that will make a big difference if you can let the broker handle unacknowledged messages or not. Basically if you save accumulation after every message then you can acknowledge right after saving. In the other case you’d need to wait 10 minutes. that’s something the message broker probably doesn’t support…

A time series database might definitely be worth looking at, but for me it would depend if the added complexity of the extra database (although there are postgres extensions as well) is worth it.

I’ve built something similarly in the past. With rabbitMq and postgres. I could easily handle 1000s msg/sec on a fairly regular server. Don’t remember the exact details but nothing very beefy was needed.

1 Like

Yes, but can’t say if thats possible in your case.
But you definitely should use one if the main (resource-critical) task is aggregating.

Can you save your accumulation after every message or only after 10 messages? Because that will make a big difference if you can let the broker handle unacknowledged messages or not. Basically if you save accumulation after every message then you can acknowledge right after saving. In the other case you’d need to wait 10 minutes. that’s something the message broker probably doesn’t support…

I want to checkpoint after every message (not only after aggregating over ten minutes), so no need to have long-running transactions, or something like that, before acknowledging (does it even work that way, with MQTT brokers?).

A time series database might definitely be worth looking at, but for me it would depend if the added complexity of the extra database (although there are postgres extensions as well) is worth it.

TimescaleDB is on my radar :satellite:

I’ve built something similarly in the past. With rabbitMq and postgres. I could easily handle 1000s msg/sec on a fairly regular server. Don’t remember the exact details but nothing very beefy was needed.

That would be a nice end-game: being able to process a constant stream of events at this rate.

I’m not familiar enough with them to answer for sure, but I think the protocol supports acknowledgement by the client. Of course the broker needs to support it as well, and it needs to be activated (because there is overhead involved of course)

no, ACKs are handled by the client. You can only set a QOS-level, see What is MQTT Quality of Service (QoS) 0,1, & 2? – MQTT Essentials: Part 6

There are some more features with MQTT5 but I don’t know about that.

QOS supports that, no?
QOS level 1, at least once, waits for the client to acknowledge the message.

yes you can make sure, that the MQTT-client has at least once received the message, but as I understand it OP wants to manually ACK when a message is processed.

Use Shared Subscription

Shared subscriptions are an MQTT v5 feature that allows MQTT clients to share the same subscription on the broker. In standard MQTT subscriptions, each subscribing client receives a copy of each message that is sent to that topic. In a shared subscription, all clients that share the same subscription in the same subscription group receive messages in an alternating fashion. This mechanism is sometimes called client load balancing , since the message load of a single topic is distributed across all subscribers.

image

Along with HiveMQ

Reliable Data Movement

for Connected Devices

HiveMQ’s MQTT broker makes it easy to move data to and from connected devices in an efficient, fast and reliable manner.

image

HiveMQ is architected for scale and reliability. MQTT broker scales up to 10 million connected devices and uses industry standards to ensure data is not lost.

Followed by Genstage based Flow

To intercept and perform business logic before storage.

Use Clickhouse for Timeseries storage

It’s an OLAP + Time series db, faster than Timescale & Influx DB.

See: Benchmark

1 Like

This is just plain marketing. This is not a USP of HiveMQ, all serious brokers do that, see EMQX, Verne.
And if you have any special needs for auth I’d look closely how HiveMQ handles that.

I’m a noob. :sweat_smile:

I just pointed out what I found.

I have no affiliation with any of the tech I put in my reply.

What I meant was:

Lots of IoT → Message Queue to take the blunt of force → Genstage for backpressure while ingesting data → Fast storage with capability to take writes fast.

Else

IoT → Some Broadway compatible Message Queue → App → Fast storage

—-

P.S. I have no experience with IoT, backend, complex system design.

—-

P.P.S. Checkout how Plausible Analytics ingest data into Clickhouse.

Definitely use Broadway

They solved the problems that arose after people started building complex GenStage applications.

  1. Rate Limiting
  2. Batching
  3. Metrics
  4. Graceful shutdown
  5. Automatic acknowledgement at the end of the pipeline
  6. Fault tolerance with minimal data loss
  7. Backpressure
  8. Concurrency

Instead of reinventing the wheel, write a MQTT connector for Broadway if existing ones fall short.


See why Broadway is needed:


Learning material:

1 Like

Just a quick plug for VictoriaMetrics as time series DB. I’m evaluating it for my needs right now. It’s designed to be a “better” backend for grafana metrics. One of it’s features that I needed was arm 32bit support

1 Like