Genstage querying database


I want to write a genstage producer that would query the database that returns a count every minute and have a consumer save it to a file.

I was thinking of keeping a state that would store the times (example: oct 2, 2017 1:00pm) and would just use the state to run the query with and update the state + 1minute on the next demand.

Is this the best solution? My problem with this is that I might need to set a cutoff period since I probably dont want to send over events for oct 2, 2017 past now() time since there wont be any data for that and I will just be making a huge file.

Another solution was to use rate limiter and just have 1 demand every minute. Not sure if this is the best use of it though.

Do you guys have any suggestions?

Thanks in advance! =)

You can have your producer send a message to itself to check the database after it’s finished checking a delay. Something like this:

def init(initial_demand) do
  {:producer, initial_demand}

def handle_events(demand, state) do
  send(__MODULE__, :query_db)
  {:noreply, [], demand + state}

def handle_info(:query_db, demand) do
  events = Repo.all(...)
  Process.send_after(__MODULE__, :query_db, 60_000)
  {:noreply, events, demand - Enum.count(events)

Thanks! Will try it out today =)

Hmm… I was thinking about this and was wondering how will it work with the {:noreply, [], demand + state} in the handle_demand function. If its scheduled every minute on the handle_info, will it ever go back to the handle_demand of the producer?

Thanks again! still trying to wrap my mind around genstage. =(


In this case, handle_demand/3 is just collecting demand in state and passing the response off to handle_info/3. handle_info/3 returns the events and updates state, allowing the consumer to submit more demand.

In this case, handle_demand/3 is just receiving demand from the consumer. This is a design pattern suggested by Jose in this talk:

1 Like

thanks! =) Sorry I just got confused with the Process.send_after part. I’ve watched that youtube video so many times. hehe =p

thanks! i finally was able to get that code. It holds off more demand from consumers till it able to return the demand of the current one.

hmm… still thinking about my current problem. Trying to see if I can extend the delayed task example.

  1. add a creation date to the tasks table
id| state   | creation date 
1 | waiting | 10-01-2017 00:00
2 | waiting | 10-01-2017 00:00
3 | waiting | 10-01-2017 01:00
  1. If I want to use the minute to query for the records. Is the best way just to put it along with the demand on the state?
def init(initial_demand) do 
    {:producer, %{last_pull:, demand: initial_demand}}

def handle_demand(demand, state) do 
   minute_interval = state[last_pull] |> Timex.shift(minutes: 1)
   Repo.all(.... where: creation_date >= state[last_pull], where: creation_date < minute_interval)
   new_state = %{state | last_pull: , demand: demand + state}
   {:noreply, events, new_state}

not sure if I use queues for this? just want to pull data from external source.