RecoverableStreamEx library for transparent error recovery in streams

Hi, alchemists! I started to work on this library about a year ago, it was put in production around December 2018. I figured it is time to publish it as a package.

Streams are neat when you are dealing with potentially infinite data sources. For instance, you query a database table and process returned rows as they arrive. Unfortunately, should a network hiccup happen, the entire stream would fail. Naively retrying the query could be expensive.
RecoverableStream moves stream evaluation into a separate process, to isolate errors.

A motivational example:

gen_stream = fn last_val, %{conn: conn} ->
  [from, _, _] = last_val || [0, 0, 0] 
  Postgrex.stream(conn,  "SELECT a, b, c FROM recoverable_stream_test WHERE a > $1 ORDER BY a", [from])
  |> Stream.flat_map(fn(%Postgrex.Result{rows: rows}) -> rows end) 
end

# `Postgrex.stream/3` returns a stream that must be evaluated within a transaction
wrapper_fun = fn f -> 
  Postgrex.transaction(db_pid, fn(conn) -> f.(%{conn: conn}) end)
end

RecoverableStream.run(gen_stream, wrapper_fun: wrapper_fun)
|> Stream.each(fn data -> Plug.Conn.chunk(conn, encode(data)) end)
|> Stream.run

What do you think? Did I reinvent the wheel? Is it useful outside of my project?

Hex package
GitHub

1 Like

Naively retrying the query could be expensive

You mean retrying the whole db stream from the beginning?
I believe for that you can just store the last processed ID (for ex. in an Agent) and continue from there on retry.

I have dealt with infinite streams with high error rates in production before, could you elaborate more
on the benefits?
How is custom error handling? Maybe on particular errors I don’t want to retry.

1 Like

Yes, exactly. It’s an attempt to build an abstraction that does it more or less transparently from an API consumer standpoint.

RecoverableStream.run allows you to set up a stream and error recovery logic in one place. From an API consumer standpoint, it behaves like a perfectly usual stream. Imagine you have code like the following:

data = DataSource.data_stream(param1, param2)
consume_stream(conn, data)

consume_stream doesn’t need to know anything about error recovery in DataSource.data_stream. It could send data over HTTP, stream it to a file, incrementally compute something…

I could try to explain the benefits in comparison with another approach if you could give me an example.

An excellent question! Presently, you could achieve it by catching errors you don’t want to recover from in wrapper_fun. I should at very least document it, or maybe come up with a better implementation.

1 Like