Persistent Producer Event-buffer?

We’ve got an Elixir application that produces very large XML files containing many tens of thousands of records. It produces several thousand of these batches over the course of a few days. Our main bottleneck is the upstream system providing the data. The application lives in a supervision tree under which exist several subsystems responsible for various domains.

At some point during this process, a GenStage :producer generates events containing the data necessary to process a batch. These events are consumed by a ConsumerSupervisor. We limit the number of concurrent workers that ConsumerSupervisor dispatches in order to not overload downstream systems, which is part of why this process takes days.

Without going into too much detail, the challenge we’re facing is that as our system scales, this process takes more and more time. The more time it takes, the more fragile it feels. Because the state of this system essentially lives inside the GenStage :producer’s event buffer, any interruption to the system means that that state is lost. This is a bad for a number of reasons.

We’d like to understand if there are strategies available to persist this event buffer so that, if we need to say, deploy new application code, we can just start where we left off. We’ve looked into off-shoring the job that this producer has to some messaging application, but we seem to lose the very tidy Producer → ConsumerSupervisor relationship that works so nicely.

Any suggestions? We consider this a re-build, not a refactor, so we’re open to suggestions large and small.

The question is: how do you generate and persist those tasks/events that you make batches from? are there any limitations to dividing them into smaller pieces?

The other question would be if you can deal with at-least-once processing (as in processing the same event multiple times) as you seem to want to get away from at-most-once processing.

Each event here is produced by a GenStage :producer. A ConsumerSupervisor’s worker completes a batch, then using data from that batch, calls on the :producer to add another event for the ConsumerSupervisor to consume. In this way we “paginate” through the entire dataset. Nothing prevents us from breaking this data in to smaller pieces, but it may already be as concise as it can get. The events are not currently persisted-- they only live in this “buffer”, which as I understand it, is just the internal state of the GenStage producer.

I worked on a simlar system and what we did was:

  1. store the data received from external provider(s) into our own database
  2. have a producer produce the data from the database as events
  3. have a producer_consumer format the data
  4. have consumers handle the data

Because we had the data in our own database we could “stop” the server and continue later (deploy, etc)

At-least once processing is not ideal for us here, since duplication in this part of the system potentially costs :money_with_wings: for us. Come to think on it, if at-least once were suitable, we wouldn’t be very worried about shut-down-restart scenarios.

This is one of the patterns we have in mind, but where we get caught up is the connection between the database and the producer. Can you elaborate on how the producer is notified of new data?

We’ve tried out a PoC using RabbitMQ pub/sub where the flow is:

  1. ConsumerSupervisor worker finishes a batch
  2. Worker publishes a new “batch event” (based on data from the previous batch) to a message queue
  3. Producer, subscribed to that message queue, receives a notification and produces a new event
  4. Start over :point_up: :recycle:

This sounds nice, but it feels like we’re side-stepping a lot of the benefits of using Elixir for this sort of problem. The bottleneck becomes the messaging queue

You need a persistent system into the mix, be it postgres or redis or any other you are familiar with. While RabbitMQ is good for passing events around, there needs to be a persistent system that can save the steps of your processing and restart it should anything go down.

Other approach would be to have a backup node/nodes that is always synchronized with the state and can restore it on server restart, but this is more involved process and I don’t think you require this kind of availability.

There are a couple of ways to do it, if you are using PostgreSQL you can create/listen to triggers. The most simple way is just using a Process.send_after/4 in a custom producer that will check it’s own queue and query the database in case it’s (nearly) empty.

See this for Postgres triggers: Listen to Database Changes with Postgres Triggers and Elixir

Ok, this is good to hear. So we’re narrowing in on a secondary persistence layer, which obviously makes sense here. What I’m hearing is that there is not a special Elixir/Erlang mechanism that I don’t know about that solves this problem for me. I do like the idea of using a backup node to store the state-- I like that we’d be keeping it in the family with that one. But I also agree that that solution is likely to be more complicated than we need here.

So some sort of DB, or persistent-caching mechanism, that notifies the producer when new data is added. This makes sense. But there’s a wrinkle-- because what we really need is “at most once” delivery of events, we need a way to remove or mark-used each of these events in the database so that during replay we don’t re-run a successful batch. This is why I like something like RabbitMQ, except that “time-travel” with RabbitMQ is burdensome. Redis is great here, but for one issue-- and now I’m asking the community for advice on something very-much non-Elixir-- we want Redis pub/sub, but we also want Redis as a message queue. These two patterns don’t seem to play nice.

Before I call this solved and stop asking Elixir forum for Redis advice :sweat_smile: , does anybody have any thoughts on a solution for a “pub/sub FIFO message queue”? My understanding is that Redis pub/sub writes published messages directly to subscriber sockets with no persistence in-between.

It all comes to what your source of truth is, in this case it should be the persistent layer. At each restart of the application you just have to ensure that all RabbitMQ events are removed from the queue and you re-queued based on the database state (you can do that easily when the application supervisor is starting its processes as the init/1 callback is always synchronous). To avoid having performance issues, you just ensure that let’s say no more than 20 messages are active on a RabbitMQ queue.

I don’t see why there would need to be one, you are operating with data, moreover if you are lazy like me you could even encode data structures to binary with term_to_binary/1.

1 Like

This sounds like you want exactly-once not at-most-once. At most once would be not persisting events or deleting them before you process them. At-most-once means no retries and accepting potential loss of messages. At-least-once is retrying at the expense of potentially processing things multiple times. Exactly once delivery is a really hard problem with no one size fits all solution.

2 Likes

You could use an update statement, marking the records as “processed”, to get the records your producer needs. (update with subquery / cte for limiting)

@mashton, here’s an alternative implementation of a persistent queue backed by a memory-mapped file. I use it whenever a simple solution is needed that requires a FIFO queue and persistence:

Thanks everybody, this is all very helpful.

Agreed. This is the thread I’m likely to start pulling next, thank you.

I agree that what we’d like is exactly-once, but as you say, that’s a very tricky proposition. We’ll settle for at-most once. With the volumes we’re dealing with, some loss is acceptable (as long as it’s observable)

The benefit of this approach is that it uses familiar tools like Postgres, with simple writes and updates. I think we’d like to avoid the overhead of a novel SQL DB from an infrastructure perspective, but

1 Like

have a good look at Oban - oban | Hex - uses postgres to persist… by all means you will have a lot of DB traffic/overhead - but you should also be able to prune the DB tables as things go through…

This is a classic article that might be relevant – it has helped me reason through how to design a system that withstand heavy load: Queues Don't Fix Overload

From the details of the approach you mentioned here, did you consider “pull” semantics instead of “push”? Instead of “ConsumerSupervisor” notifying (aka pushing) the producer about the new batch, you can have the producer pull a batch from queue to process it. If this fits your usecase then you don’t really need pub-sub, just a FIFO queue should do it. Basically just replacing the process-buffer with secondary storage.

  • ConsumerSupervisor just appends message/messages to a queue
  • producer pulls message/messages from the queue

Two does not have to know each other.

thoughts on a solution for a “pub/sub FIFO message queue”? My understanding is that Redis pub/sub writes published messages directly to subscriber sockets with no persistence in-between.

For pub sub you need active listener (push semantics). Did you check Redis Stream?. You can also achieve at-max-once with redis streams using NOACK when calling XREADGROUP. And you can get the notification when message is available using blocking pull.

Another great article by fred on related topic: Handling Overload


I created a Broadway producer off_broadway_redis_stream for a similar usecase, you can check it out if interested. It might need few changes for at-max-once.

We’ve now begun iteration on an RFC for our solution to this problem :tada:

With a few small shifts in our mental models, and by shifting our thinking on which problems should be solved by tools and which should be solved by application code, we settled on a proposal that I’m very happy with.

For those curious, we found that Kafka + Broadway was a very excellent match for our needs here. It should be said that one of the things I didn’t mention in this thread was that our system of services already includes a production Kaka cluster, which is in many ways under-utilized (how often is a kafka cluster fully utilized?).

Through the use of message ack and offset commit, we’re able to accomplish something very close to concurrent exactly-once delivery. Broadway’s built-in support for batching messages is a match made in heaven here.

Thanks everybody for helping me get us there. I’m very pleased with the look of the proof-of-concept. Of course, maybe I’ll come back here with my horror stories once we’ve released it :grimacing:

1 Like