Hi, alchemists! I started to work on this library about a year ago, it was put in production around December 2018. I figured it is time to publish it as a package.
Streams are neat when you are dealing with potentially infinite data sources. For instance, you query a database table and process returned rows as they arrive. Unfortunately, should a network hiccup happen, the entire stream would fail. Naively retrying the query could be expensive.
RecoverableStream
moves stream evaluation into a separate process, to isolate errors.
A motivational example:
gen_stream = fn last_val, %{conn: conn} ->
[from, _, _] = last_val || [0, 0, 0]
Postgrex.stream(conn, "SELECT a, b, c FROM recoverable_stream_test WHERE a > $1 ORDER BY a", [from])
|> Stream.flat_map(fn(%Postgrex.Result{rows: rows}) -> rows end)
end
# `Postgrex.stream/3` returns a stream that must be evaluated within a transaction
wrapper_fun = fn f ->
Postgrex.transaction(db_pid, fn(conn) -> f.(%{conn: conn}) end)
end
RecoverableStream.run(gen_stream, wrapper_fun: wrapper_fun)
|> Stream.each(fn data -> Plug.Conn.chunk(conn, encode(data)) end)
|> Stream.run
What do you think? Did I reinvent the wheel? Is it useful outside of my project?