Code review for an Ecto GenStage Producer

Hi there,

I just wrote a GenStage Producer that I use with Flow in order to stream entries from an Ecto query.

I’m learning GenStage and would like some feedback regarding my code (especially the send_after which I’d like to avoid … but gives me no process: the process is not alive or there's no process currently associated with the given name errors when removed

1 Like

I haven’t looked at the the code in depth, but from handle_demand you can return {:stop, reason, new_state} (https://hexdocs.pm/gen_stage/GenStage.html#c:handle_demand/2). Are you saying that this is what was giving you the error when you tried it?

No I can’t stop the genserver this way because I first need to fulfill the demand and emit last events fetched from db before I can stop the server.

It doesn’t look like I can emit events and stop the server in a single reply.

There’s some potentially related info here: https://github.com/elixir-lang/gen_stage/issues/85

I guess that GenStage.Streamer seems to use a similar approach, just as GenStage.async_info(self(), :stop) instead…

1 Like

Good idea! but unfortunately under the hood GenStage.async_info is just sending (GenServer.cast) a message. Which gives me the same error (unless I wait for 5 or 10ms before)

By the way the error is only happening when the producer has 0 to very few events to produce. No error for larger streams … :thinking:

Hmm, really? A cast (as opposed to call) shouldn’t care at all about if the process exists or not… :thinking:

But the cast itself isn’t the issue: it works and the server terminates properly, but then some consumer tries to send messages to the producer and stumbles on the error.

It looks like this issue which has already been fixed by @josevalim : https://github.com/plataformatec/flow/issues/76#issuecomment-546948561

1 Like

Ahh… Awesome! :blush:

Hmmm … fixed but doesn’t work for me :grimacing: