Experimenting with ETS has forced me to look at Erlang docs and stare at the Stream
module more than I have previously. I am able to traverse an ETS table using Stream.resource/3
, something like this:
tid = :my_ets_table
Stream.resource(
fn -> [] end,
fn
[] ->
case :ets.first(tid) do
:"$end_of_table" -> {:halt, []}
first_key -> {:ets.lookup(tid, first_key), first_key}
end
acc ->
case :ets.next(tid, acc) do
:"$end_of_table" -> {:halt, acc}
next_key -> {:ets.lookup(tid, next_key), next_key}
end
end,
fn _acc -> :ok end
)
And because the above emits a Stream
, I can pipe it to other Stream
functions, e.g. Stream.filter/2
:
# ... cont'd from above
|> Stream.filter(&maybe_remove/1)
|> Stream.run()
But this doesn’t work: the resulting stream has the proper items filtered, but of course, the ETS table remains unchanged.
If I instead use Stream.each/2
and attempt to delete keys, e.g.
# cont'd from orig
|> Stream.each(fn {k, v} ->
case check_something({k, v}) do
false -> :ets.delete(tid, k)
_ -> nil
end
end)
|> Stream.run()
then there are errors because I have maimed the stream:
** (ArgumentError) errors were found at the given arguments:
* 2nd argument: not a key that exists in the table
(stdlib 3.16.1) :ets.next(:my_ets_table, :b)
Things work fine if I do this operation using regular Enum
(i.e. not via a stream), but of course, this could be problematic/slow for large tables.
One possibility here would be to use the stream and accumulate only the keys for items that had to be deleted, then delay the actual deletion until after the stream had been run. (But this is maybe a wee bit tricky because Stream
has no reduce
function).
Another possibility might be to write the stream results into a new table, then replace the original (e.g. by deleting the original and renaming the temporary one).
Am I missing something obvious? Any words of wisdom are welcomed!