How to avoid bottleneck when using GenServers?

The app I’m working on reads data from a Kafka topic, transforms it and writes to a different topic. There is a consumer process per partition of a topic, so there are multiple processes doing the reading.

Every consumer passes data to GenServers, like Validator (validates data, holds schema as its state) or Encryptor (keeps encryption key in its state).

I’m new to Elixir but it seems that if I have multiple consumers that all pass data to one Validator process, it will become a bottleneck. I’m sure this is a common problem so what is a recommended way to solve it?

Should I create a pool of Validators and Encryptors with something like a poolboy? Or maybe use a simpler pooling approach like one that the author of Redix suggests? Or is it better to use global ETS tables instead to keep schemas, encryption keys, etc. and access them from every consumer process?

I’m trying to keep complexity to a minimum, so I would rather avoid dependencies like poolboy.

Thanks!

1 Like

Maybe You can have a look at GenStage.

From the source repo.

GenStage is a specification for exchanging events between producers and consumers.

1 Like

Thanks for the suggestion!

Consumer process already handles the back-pressure problem (new messages are read only when previously read messages have been processed), so as far as I can understand GenStage will not help here. It seems to me that consumers will read a bunch of messages, pass them to Encryptor, for example, and will wait for a result, maybe it doesn’t even make sense to have multiple consumer processes if all the data will still go through one process anyway.

I considered putting things like validation schema and encryption keys in a consumer’s state while keeping the actual logic of validating and encryption in separate modules but such an architecture feels wrong for a number of reasons.

In a Java Spring project, for example, I would have probably used a bean (singleton) that would keep the logic and the state, and given that consumers only need to read the state without changing it, the concurrency wouldn’t cause any problems and this would be trivial to implement.

Depending on the way your GenServers are designed, you might want to spawn new processes to do the actual processing and only use the GenServers to maintain the state.

1 Like

The GenStage pattern is more like consumers ask the producers for something… And each consumer can be a producer for other consumers.

In your example, that would be: Encryptor asks Intermediate Consumer -> asks Producer.

I cannot compare to Java Spring project, as I have no knowledge of it :slight_smile:

If you use a pool of GenServers, how will you ensure they all share the same state? Because once you figure that out (say, you store the schema in a database), then you can have your consumers do that directly (say, talk directly to the database).

1 Like

All GenServers I use are pretty simple. Validator, as an example, takes a schema definition (loaded from a file) as an argument to start_link, processes it and keeps as a state. The validate function uses that schema to do the actual validating. The same is true for the Encryptor. Basically a state doesn’t change after init for all of my GenServers. So if I use a pool all GenServers will share the same state.

If instead I initialize the consumers with schemes, keys, etc. and pass this data as an argument to the functions holding the logic that will solve the problem indeed but it requires the consumers to ‘know’ about things that are not their responsibility, for example, that a schema should be processed after it is loaded. Feels like a problematic architecture. Creating a pool for every little module that has to keep a state seems ridiculous too.

While studying how this is usually done I found a couple of Erlang examples where a global ETS was used to store a state at init and then in a function that needs the state the ETS was accessed by name. Actually I used this approach to solve an (anticipated) bottleneck problem when writing a Validator (for lack of a better solution that was simple enough). But then a number of modules with a similar problem emerged and I wasn’t fond of an idea of using what looks like a global variable solution all over the place :confused: .

@ryh
maybe then instead of this

  def validate(schema_id, data) do
    GenServer.call(__MODULE__, {:validate, schema_id, data})
  end 

I should do something like this

  def validate(schema, data) do
    schema = GenServer.call(__MODULE__, {:get_schema, schema_id})
    validate_data(schema, data)
  end 

This way the actual processing is happening in the caller process. But as far as I can understand a schema definition would have to be copied from a GenServer process to the caller process, and it is much larger than the actual data being validated. Maybe the same thing happens when using a global ETS table though.

Maybe I’m doing it all wrong. I really hoped there was a widely used solution to this problem :slight_smile:

I was thinking more like

def handle_call({:validate_schema, data}, from, %{schema: schema}=state) do
  ref = spawn(__MODULE__, validate_schema, [schema, data, from], :monitor)
  {:reply, ref, state}
end

def validate_schema(schema, data, from) do
  ...
  send from, {:validation_result, :valid, data}
end

I like this model because the GenServer just does one thing: handle the state. You spawn processes (and free the GenServer for the next request) for everything else that needs to act on that state and send the results wherever they need to go.

I must admit that I don’t fully understand the problem as you’ve outlined it. But it sounds like if you try you may be able to avoid using GenServer’s completely here, either by setting Validator configuration as configuration (instead of at runtime using start_link) or by utilizing ETS as stated above.

If you haven’t read it yet, I’d highly recommend reading To spawn, or not to spawn? by Saša Jurić which touches on these topics and when the proper time to use a genserver vs a plain module (typically you should try to opt for modules)

Does this processing require runtime or is it fine to do it at compile time? If it’s fine to do it at compile time, you don’t need a GenServer at all. You can read the file at compile time, process it, and “embed” it somehow (depends on your application’s logic) in the validate function so that consumers only have to call validate without talking to external processes.

That sounds like a classic use case for mochiglobal / fastglobal

2 Likes

Cool, I didn’t know you can do that at a compile time! This is an umbrella app and different apps in it use Validator with different schemes so I can’t hardcode a schema in the Validator module.

But I can hide the logic of processing a Validator schema in a function, something like this

 # Validator module
  def get_schema(schema_path) do
    load_schema(schema_path)
    |> ExJsonSchema.Schema.resolve
  end

and then use it in a Consumer like this

# Consumer module
require Validator
@schema_path Application.get_env(:consumer, :message_schema_path)
@schema Validator.get_schema(@schema_path)

# ...
:ok == Validator.validate(@schema, message)

@schema will be processed and replaced with a result at a compile time, do I get it right? This would be an acceptable compromise.

@ryh
The suggested approach is probably not a very effective solution. First the data to be validated would be copied to the Validator process, then when a new process is spawned both the data and the schema would be copied to the spawned process. Adding more concurrency here would make it harder to debug too, I guess.

@axelson
great article, thanks for pointing it out to me. I do understand that GenServer count should be kept to a minimum, I just don’t know how do I do it here, hence the question :slight_smile:

Here’s an image to clarify the problem

@dom
this would work, it’s better that using ETS indeed, thanks. Hoped to avoid using global variables though.

Your design feels wrong to me, at least the way its described. As a rule of thumb, I try to use 1 process to deal with 1 request, and not to pass a single message through multiple processes. This means you can have a pool of genservers that retrieve the next item off the topic, work on it to completion, and toss it back onto the next one. Use ets to hold a master copy of the schema, I assume it doesn’t change much. There’s nothing wrong with having 1000s of genservers or tasks doing your bidding. If order is important then I can understand why this approach wouldn’t work.

I suggest you read http://shop.oreilly.com/product/0636920024149.do the background is excellent. Also you may want simply to try a single genserver to get a benchmark for what throughput you can get before looking for a more complicated model.

:process_flag(:sensitive, :true) may be of interest, you can set it on a given process so that it no longer dumps its state. Great for hiding encryption keys.

2 Likes

Your state is really just a cache. I’m not a huge fan of bringing in process semantics and message passing just to cache data. I think this is a use case for ETS (optionally with something like con_cache but probably you don’t need it) or even the process dictionary (you’d have copies for every consumer, but your consumers are persistent and there would be no copying of terms). Process.put(:key, key_value) in each consumer at startup, key = Process.get(:key) in your encryption module and you are done.

1 Like