Looking for better architecture in my application

Hi everyone,

My question related about OTP design and event driven architecture.

So my application consume events from Amazon SQS, and there are two types of events. First are events that given to finite state machine build with gen_statem, witch just execute some actions to state pulled from database and save it back. Second are events related to some functions, like send email, read data etc.

I’m new to EDD and I would love to hear some advice.
Maybe using pub/sub system or something else?

2 Likes

I highly recommend checking out GenStage. https://hexdocs.pm/gen_stage/GenStage.html

More specifically, built on top of GenStage, check out Broadway https://hexdocs.pm/broadway/Broadway.html

In combination with Broadway, check out https://hexdocs.pm/broadway/amazon-sqs.html.

Out of the box, you can set up an SQS queue as your producer and process the data how you want.
Broadway allows for batching, which may help you in your use case. Depending on the type of data, a custom metadata attribute on the SQS message, or some other differentiating factor, you are able to put an incoming message into a batch.

2 Likes

Hi, thank you for you response. I really appreciate it, I already consider using Broadway and even implement prototype. But it turn out that my initial solution fit more so i think i will keep it. What I’m confused about how to handle this events once i get them. GenStage seams interesting but under the hood its a GenServer, with I already using to spawn functions and FSM once i get event.

But is it can scale? Is there alternative solutions?
Is better spawn with link or just monitor this processes?

Happy to help! I’ve been doing a lot of research into this recently to build an event queue in my Phoenix app w/ SQS. (I am definitely not an expert though).

GenServer can totally scale. Out of the box you get easy configuration for adding more concurrent consumer processes. (For Broadway, check out all the options you can provide in Broadway.start_link (https://hexdocs.pm/broadway/Broadway.html#start_link/2).
From my understanding, the advantage of GenStage over GenServer casting is back-pressure and making sure your system is not overloaded.

As far as handling the events once you get them: (below is a very basic implementation of broadway sqs pipeline w/ batch processing)

defmodule MyPipeline do
  use Broadway

  @queue_name "your_sqs_queue_name"
    
  def start_link(_opts) do
    # start_link opts https://hexdocs.pm/broadway/Broadway.html#start_link/2
    Broadway.start_link(__MODULE__,
      name: __MODULE__,
      # opts https://hexdocs.pm/broadway/Broadway.html#start_link/2-producers-options
      producers: [
        default: [
          module: {
            BroadwaySQS.Producer,
            queue_name: @queue_name
          }
        ]
      ],
      # opts https://hexdocs.pm/broadway/Broadway.html#start_link/2-processors-options
      processors: [
        default: []
      ],
      # opts https://hexdocs.pm/broadway/Broadway.html#start_link/2-batchers-options
      batchers: [
        batch_a: [
          batch_size: 10,
          batch_timeout: 10_000
        ]
       batch_b: [
          batch_size: 10,
          batch_timeout: 10_000          
       ]
      ]
    )
  end

  # message is %Broadway.Message{} struct
  def handle_message(_, message, _) do
      # here you can look at your message and determine which batch it should be in
       # case 1
        message
        |> Message.update_data(fn (data) ->
            #process the data 
            # return what you want stored as data in Message
        end)
        |> Message.put_batcher(:batch_a)
       # OR case 2
        message
        |> Message.update_data(fn (data) ->
            #process the data 
            # return what you want stored as data in Message
        end)
        |> Message.put_batcher(:batch_b)

     # so handle_message callback needs to return the 'message'
     # if message is bad you can fail it here with Broadway.Message.failed
  end

  def handle_batch(:batch_a, messages, _, _) do
    messages 
    |> Enum.map(fn message -> message.data end)
    |> IO.inspect(label: "batch_a")   

     # take action with processed data from handle_message
     
    messages
    # return messages and they will be marked as complete on SQS
    # if error is raised in this callback, it will be logged and all messages in batch will be marked as failed
    # you may want to set up a dead letter queue on SQS to handle failed messages
  end

  def handle_batch(:batch_b, messages, _, _) do
    messages 
    |> Enum.map(fn message -> message.data end)
    |> IO.inspect(label: "batch_b")   

     # take action with processed data from handle_message

    messages
    # return messages and they will be marked as complete on SQS
    # if error is raised in this callback, it will be logged and all messages in batch will be marked as failed
    # you may want to set up a dead letter queue on SQS to handle failed messages
  end
end
3 Likes

If you want to use the state machine abstraction with gen_statem, i’d say to think of each received event as a different state machine, and stop the state machine when you finish processing it. It’s better than having a centralized state machine that easily can become a bottleneck. Other solution is to move the state machine logic to the queuing system, each step in the state machine would be a different queue. Each queue can have it’s own broadway pipeline and you can tune them more precisely according to each step.

You can just use Broadway directly and classify the type of events and have a different processor and a different batch for each type.

GenStage and Broadway(tools already mentioned here) shouldn’t be tools to say how you organize your code, they’re a way to be more organized/explicit of how your code gonna execute. Said so, it’s more likely they gonna intersect some of the specialized behaviors that you have no matter the “design principles” school that you choose to follow. And it can be the same thing for other OTP stuff, that’s why the Erlang docs has it’s own design principles. You probably gonna have to chose between having a event driven architecture and use OTP stuff on some points to leverage it’s powers or having a OTP system that uses some event driven ideas.

2 Likes

Maybe this can help?

4 Likes