Intersected streams

Hello,

I’m trying to implement backtesting engine as a GenServer that would publish data from stream intersected with other data but I’m afraid that my approach won’t work.

Ok. From the beginning - we have a GenServer that grabs data(trade events) from database (loads of records) as stream.

Task of this GenServer is to publish all of those events to PubSub one by one every X ms. That wouldn’t be so bad as I could use sleep and keep on publishing in map or do something similar.

Problem appears when we add other requirement on the top of above one. So from time to time GenServer will receive a cast that will contain order - now each X ms server needs to decide should it pick up next thing from stream and publish it or this order (it can be many of them so they would be hold on the side in a list inside state).

My problem is that there’s no way to iterate over stream in the fashion:
1 make a query to get a stream
2 assign to state
3 call Process.send_after on myself
4 check list of orders vs stream and publish one from one of them
5 Go to point 3

Problem is that the moment I’m trying to get anything from my stream reference from state it’s already too late as I’m getting error:

[error] GenServer :“Elixir.Hefty.Streaming.Backtester.SimpleStreamer-ADABTC” terminating
** (RuntimeError) cannot reduce stream outside of transaction

If anybody would need to play with it to help - I pushed it here:
https://github.com/Cinderella-Man/toretto/tree/naive-trader-experimental

to get to the issue first follow instructions in Readme to setup (I’m sorry I know) and then call for example:

Hefty.Streaming.Backtester.SimpleStreamer.start_link("ADABTC", "2019-05-30", "2019-05-30")

A little bit more info - this works as expected but uses Stream.cycle as a “data source”.

defmodule Streamintersector do
  use GenServer

  def start_link() do
    GenServer.start_link(__MODULE__, [], name: __MODULE__)
  end

  def init(_arg) do
    GenServer.cast(__MODULE__, :init_stream)
    {:ok, %{}}
  end

  def handle_cast(:init_stream, _state) do
    Process.send_after(self(), :next, 100)
    
    {:noreply, %{:stream => Stream.cycle([1, 2, 3])}}
  end

  def handle_info(:next, state) do
    IO.inspect(Enum.take(state.stream, 1))

    Process.send_after(self(), :next, 500)    
    {:noreply, %{:stream => Stream.drop(state.stream, 1)}}
  end
end

How to make it work in similar way with Ecto.Repo.stream()? As far as I understand, transaction is required around call for stream?

You could have 2 GenServers push to a GenServer publisher

1 Like