Is Stateful Stream possible?

The issue:
I would like to consume stream one by one.But its not possible look below examples.

iex(1)> s = 1..1000 |> Stream.map(& "Consumed #{&1}")
#Stream<[enum: 1..1000, funs: [#Function<49.82544474/1 in Stream.map/2>]]>
iex(2)> s |> Enum.take(1)
["Consumed 1"]
iex(3)> s |> Enum.take(1)
["Consumed 1"]
iex(4)> s |> Enum.take(2)
["Consumed 1", "Consumed 2"]
iex(5)> s |> Enum.take(2)
["Consumed 1", "Consumed 2"]
iex(6)> s |> Enum.take(1)
["Consumed 1"]
iex(7)> s |> Enum.take(1)
["Consumed 1"]
iex(8)> s |> Enum.take(1)
["Consumed 1"]
iex(9)> s |> Enum.take(1)
["Consumed 1"]

Expectations: When I do Enum.take(1) , I expect it to consume one element, and When I do I expect it to return the next element.

s |> Enum.take(1) =>["Consumed 1"]
s |> Enum.take(1) =>["Consumed 2"]
s |> Enum.take(1) =>["Consumed 3"]
s |> Enum.take(1) =>["Consumed 4"]
s |> Enum.take(1) =>["Consumed 5"]
s |> Enum.take(1) =>["Consumed 6"]

What is that I am doing wrong?
How to achieve this behaviour.?
I impl this behaviour using processes, but I am not sure how valid it is.See below

defmodule StatfulStream do
  @moduledoc false

  def start(stream, fx) do
    spawn(fn ->
      stream
      |> Stream.map(fn chunk ->
        fx.(chunk)
        IO.inspect(chunk, label: :inside)
        wait()
        chunk
      end)
      |> Enum.to_list()
      |> IO.inspect(label: :end)
    end)
  end

  def wait() do
    receive do
      :process ->
        :ok
    end
  end

  def ge_str() do
    Stream.resource(
      fn -> 1 end,
      fn
        10 ->
          {:halt, 10}

        1 ->
          {[1], 2}

        v ->
          {[v], v + 1}
      end,
      fn v -> IO.inspect(v) end
    )
  end

  def r() do
    str = ge_str()

    fx = fn r ->
      IO.inspect(r, label: :fx)
    end

    start(str, fx)
  end

  def send(p) do
    Process.send(p, :process, [])
  end
end

This is how ran it.

iex(1)> import StatfulStream
StatfulStream
iex(2)> pid= r()
fx: 1
#PID<0.237.0>
inside: 1
iex(3)> send(pid)
fx: 2
:ok
inside: 2
iex(4)> send(pid)
fx: 3
:ok
inside: 3
iex(5)> send(pid)
fx: 4
:ok
inside: 4
iex(6)> send(pid)
fx: 5
:ok
inside: 5
iex(7)> send(pid)
fx: 6
:ok
inside: 6
iex(8)> send(pid)
fx: 7
:ok
inside: 7
iex(9)> send(pid)
fx: 8
:ok
inside: 8
iex(10)> send(pid)
fx: 9
:ok
inside: 9
iex(11)> send(pid)
10
:ok
end: [1, 2, 3, 4, 5, 6, 7, 8, 9]
iex(12)> send(pid)
:ok
iex(13)> 

What you think ? whats valid? , and How to impl it?

That’s not what should happen. Elixir doesn’t differenciate “streams” – lazily producing enumerables – from other enumerables like lists or maps – all of them are Enumerables. Hence Enum works exactly the same if you provide either of those – each individual call of Enum.take(enumerable, 1) gives you a list with the first item of the enumerable, which means the same result given the enumerable stayed the same. If there are sideffects they’re hidden behind the enumerable interface. Iterating over an enumerable doesn’t change the input enumerable.

You could probably build an api over Enumerable.reduce for stateful iteration of an enumerable, but the intermediate states wouldn’t necessarily themselves be enumerable. So an enumerable could just be the input to such a new api.

1 Like

But the issue is I am passing ref here and there b/w functions I(.e in genserver handle info )and want to resume it where i left it.

That’s not how the Enumerable abstraction works though. I don’t know what you’re streaming there, but I’d consider using other apis instead of an enumerable.

I am streaming here a S3 object, or a file. And I am sending it to 3310 port to clamd for scanning

Start a GenServer to handle the streaming logic.
Initialize GenServer state with an offset (e.g., 0) in init/1.
store the stream object in an ETS table.

defmodule StreamServer do
  use GenServer

  @ets_table :stream_storage

  def start_link(_opts \\ []) do
    GenServer.start_link(__MODULE__, 0, name: __MODULE__)
  end

  def next(n \\ 1) do
    GenServer.call(__MODULE__, {:next, n})
  end

  def init(offset) do
    :ets.new(@ets_table, [:named_table, :public, read_concurrency: true])
    stream = 1..1000 |> Stream.map(&"Consumed #{&1}")
    :ets.insert(@ets_table, {:stream, stream})
    {:ok, offset}
  end

  def handle_call({:next, n}, _from, offset) do
    case :ets.lookup(@ets_table, :stream) do
      [{:stream, stream}] ->
        result = stream |> Stream.drop(offset) |> Enum.take(n)
        {:reply, result, offset + n}

      _ ->
        {:reply, {:error, :stream_not_found}, offset}
    end
  end
end

Or, use the process state as storage.

1 Like