The issue:
I would like to consume stream one by one.But its not possible look below examples.
iex(1)> s = 1..1000 |> Stream.map(& "Consumed #{&1}")
#Stream<[enum: 1..1000, funs: [#Function<49.82544474/1 in Stream.map/2>]]>
iex(2)> s |> Enum.take(1)
["Consumed 1"]
iex(3)> s |> Enum.take(1)
["Consumed 1"]
iex(4)> s |> Enum.take(2)
["Consumed 1", "Consumed 2"]
iex(5)> s |> Enum.take(2)
["Consumed 1", "Consumed 2"]
iex(6)> s |> Enum.take(1)
["Consumed 1"]
iex(7)> s |> Enum.take(1)
["Consumed 1"]
iex(8)> s |> Enum.take(1)
["Consumed 1"]
iex(9)> s |> Enum.take(1)
["Consumed 1"]
Expectations: When I do Enum.take(1) , I expect it to consume one element, and When I do I expect it to return the next element.
s |> Enum.take(1) =>["Consumed 1"]
s |> Enum.take(1) =>["Consumed 2"]
s |> Enum.take(1) =>["Consumed 3"]
s |> Enum.take(1) =>["Consumed 4"]
s |> Enum.take(1) =>["Consumed 5"]
s |> Enum.take(1) =>["Consumed 6"]
What is that I am doing wrong?
How to achieve this behaviour.?
I impl this behaviour using processes, but I am not sure how valid it is.See below
defmodule StatfulStream do
@moduledoc false
def start(stream, fx) do
spawn(fn ->
stream
|> Stream.map(fn chunk ->
fx.(chunk)
IO.inspect(chunk, label: :inside)
wait()
chunk
end)
|> Enum.to_list()
|> IO.inspect(label: :end)
end)
end
def wait() do
receive do
:process ->
:ok
end
end
def ge_str() do
Stream.resource(
fn -> 1 end,
fn
10 ->
{:halt, 10}
1 ->
{[1], 2}
v ->
{[v], v + 1}
end,
fn v -> IO.inspect(v) end
)
end
def r() do
str = ge_str()
fx = fn r ->
IO.inspect(r, label: :fx)
end
start(str, fx)
end
def send(p) do
Process.send(p, :process, [])
end
end
This is how ran it.
iex(1)> import StatfulStream
StatfulStream
iex(2)> pid= r()
fx: 1
#PID<0.237.0>
inside: 1
iex(3)> send(pid)
fx: 2
:ok
inside: 2
iex(4)> send(pid)
fx: 3
:ok
inside: 3
iex(5)> send(pid)
fx: 4
:ok
inside: 4
iex(6)> send(pid)
fx: 5
:ok
inside: 5
iex(7)> send(pid)
fx: 6
:ok
inside: 6
iex(8)> send(pid)
fx: 7
:ok
inside: 7
iex(9)> send(pid)
fx: 8
:ok
inside: 8
iex(10)> send(pid)
fx: 9
:ok
inside: 9
iex(11)> send(pid)
10
:ok
end: [1, 2, 3, 4, 5, 6, 7, 8, 9]
iex(12)> send(pid)
:ok
iex(13)>
What you think ? whats valid? , and How to impl it?