Processing incoming message with saving state

Hi everyone,

I’m trying to build a pipeline that works with RabbitMQ and passing the results to the next stage that is also works with queues. However, I’ve stucked with a problem when necessary storing on a state on the one of those stage, after processing the message.

In my case, what I’m expecting from the microservice, that works with this schema:

  1. The GenServer instance consumes the message from the certain RabbitMQ queue and spawn the process that will process the request.

  2. After it the processing worker doing the next substeps:

    1. Publish the message into another queue which is listening by other microservice and wait for a response.
    2. If the response contains the “Ok”, then the message must be saved in the GenStage (or worker?) instance, otherwise discard the message and don’t do anything.
  3. When the state contains N saved responses, then publish it as a message to next stage in the pipeline. After publishing clear the storage for messages.

Has anyone encountered this or similar problem? How it can be solved gracefully in Elixir? Does it a good idea to use here the poolboy package and define the storage for the messages per each worker, until it won’t reach some limit? Or probably I should go an another way?

I would not store state in your workers, just let them exit after they are done. Poolboy is for managing access to a finite resource, like database connections. You can just spawn/kill workers as needed unless you need back-pressure; but back-pressure you could manage in the GenServer just by keeping a count of how many workers are active.

You can easily store the message state in the GenServer, but what happens if your app crashes or your server is restarted?

1 Like

I thought about this potential issue. And one of possible solutions for me is use the external storage for storing the actual result of processing after each update. It can be ETS / Mnesia or maybe even Cassandra, for example. But I didn’t decided yet what will be used.

After spending some for thinking, I came to two possible solutions:

  1. Create one list (or a couple?) of responses in GenServer worker during initialization. After it for each consumed message we will delegate the processing to the separate process/async task (or a worker from poolboy) that will do all required stuff that was mentioned before. Then the responses from each worker will be received and applied some changes to the list of responses (if it necessary). However, this solution probably isnt’ good enough, because we must block the GenServer and wait for the result from each worker before going further.
  2. Define an Agent per each GenServer worker and store the Agent’s process pid in the GenServer worker state. The agent process will be used for storing/getting a list (or lists) of responses. This list will be re-used by the spawned worker to which the consuming message was delegated for processing. After making changes, each worker pushes the changes to the agent, and the agent will dump the changes after it to the external storage (ETS, Mnesia, Cassandra or anything else) for durability. This solution sounds better, but it’s harder to implement because necessary to guarantee that any changes to list must be atomic and unique.

Does it have any sense to go this way? Any thoughts about the two approaches? What is the best solution here? Or probably is better to do something else?