Dataflow techniques in Elixir?

Hi,

I’ve come across an interesting problem. Say I have this:

a = Task.async(fn -> … end)
b = Task.async(fn -> … end)
c = Task.async(fn -> … end)

Now, later in the flow say I need to calculate:

d = a + b/c, but one of these values is needed prior to getting to that line…

I might use Task.await(a) , but I can’t refer by that twice (I guess await can only be called once and by the process owner). If I want to use the value, I have to “realize” it and store what defeats the purpose of getting that work async. This reminds me of “Dataflow” that is supported in Scala, Groovy and some other languages (originally Oz).

I might think of using an Agent to keep state and register variables to track if values are realized and either await or return… However maybe it can be achieved in some more simple/idiomatic way?

2 Likes

Why do you think that? Maybe I didn’t get what you really mean here, but doing this:

a = Task.async(fn -> ... end)
b = Task.async(fn -> ... end)
c = Task.async(fn -> ... end)

d = Task.await(a) + Task.await(b) / Task.await(c)

Does not make your code “less parallel” in any way. Let’s say, if a process takes a lot to evaluate, which will lock you some time on Task.await(a), b and c still evaluating on their own processes and when you Task.await/1 them they will instantly return.

The thing is that the realized value of say “a” is enscapsulated in that line. If I wanted later use it, I can’t do Task.await again. It is strange to me as all abstractions I’ve seen so far like future etc e.g. in Clojure allow repeatable “dereference”. I suppose the issue here is “state” that is not easily kept in Erlang.

1 Like

Got it now. According to Task.await/2 documentation:

This function can only be called once for any given task. If you want to be able to check multiple times if a long-running task has finished its computation, use yield/2 instead.

Maybe Task.yield/2 or even Task.yield_many/2 can help you!

Nope :slight_smile: That solves another problem, but if a task has finished, for any subsequent yield it will get you nil. I suppose the value would have to be stored somewhere and there is no object or mutable variable. When I tried to use Agent I failed as it turned out the owner of PID “awaiting” was different than the one who did “async”. That seems very limited in terms of concurrency patterns. Akka on Scala takes a completely opposite route - they have all possible patterns. All the mess and complexity too…However I wish Elixir was a bit more sophisticated in that department :slight_smile:

Task does not encapsulate anything. It does one thing. Spawn a process to do the needed computation and give you tools to retrieve the result. There’s nothing future / promise or monad like at it, which would allow you to retrieve the result multiple times. You’ve guessed correctly, that you’d need a GenServer or Agent to get the behaviour you’re requesting.

I’m just not sure what your problem is with just keeping the value around instead of calling Task.await() all over again. I mean if you keep the return value in another process for it to be retrievable multiple times you’re copying the whole datastructure with each retrieval. Surely for small resultsets that’s not a problem, but for larger ones it might.

If you’re not sure about if you’re getting a task or a resolved value in the later computation just use this (it’s easily extracted in a helper function)

value = 
  case value do
    %Task{} = task -> Task.await(task)
    value -> value
  end

Would this work?

a = Task.async(fn -> ... end)
b = Task.async(fn -> ... end)
c = Task.async(fn -> ... end)

a_value = Task.await(a)
d       = a_value + Task.await(b) / Task.await(c)
# ... use a_value here...
1 Like

LostKobrakai - “enscapulated” meaning that it is not a value you can use outside of that very expression.

The problem is that it doesn’t let compose code better. What JEG2 proposes course doesn’t help as it is a sync wait. If I wanted to use all 3 I’d have to have 3 awaits and that would not let me have some other computation havinh those 3 running async. Regardless if it works or not, it is not elegant. Feels a lot “Erlangish”. I understand where the limitations come from. I wanted to make sure. I come from JVM land. I believe the intent of “dataflow” is that you can keep “virtual” values and when you need them it gets de-referenced (and waits if needed) w/o the need to deal with low level aspects.

You need to wait for all three values anyway, independently if you do wait for them on seperate lines and binding the values to a fresh name, or if you wait for them in a single line.

If a takes 3 seconds, b and c 1 second each, and you wait for them in the order a, b, c then a will block the process for 3 seconds, b and c will return instantly, since their value has already been ready 2 seconds ago.

2 Likes

I worry you are over complicating this question by turning it into a solution to all problems. Here’s one possible answer:

{:ok, values} = Agent.start_link(&Map.new/0)
Agent.cast(values, fn map -> Map.put(map, :a, Task.async(&calc_a/0)) end)
Agent.cast(values, fn map -> Map.put(map, :b, Task.async(&calc_b/0)) end)
Agent.cast(values, fn map -> Map.put(map, :c, Task.async(&calc_c/0)) end)

# Later, as needed…
Agent.get_and_update(values, fn map -> 
  value =
    case Map.fetch!(map, :a) do
      %Task{ } = task -> Task.await(task)
      value           -> value
    end
  {value, Map.put(map, :a, value)}
end)

It is not simple, is it? What you proposed (very similar, I also played with agent) didn’t work for me as in my case the process which did “Task.async” was not the same as the one which did “Task.await”. I got an error on PID owners. Maybe that was just my code. If I have time I will try yours. I believe like LostKobrakai wrote: Erlang’s design with processes passing messages is not really compatible with that concept of shared values. Once a value is delivered from one process to the other it is not meant to be accessed (as would have to be transported). In terms of if it is needed? Well, surely it’s not. It is the first time I “felt” I needed “dataflow” abstraction :slight_smile:

I would argue that Erlang and Elixir aim more to give us the tools to build the concurrent data flows we need, than to directly provide all possible combinations. Anything I can assemble in under 20 lines of code feels pretty well supported to me. That’s just my opinion though.

I think you may be interested in Flow (https://hexdocs.pm/flow/Flow.html)

1 Like

This is what I was talking about.

That would be my next suggestion too. :wink:

JEG2 - Agree. Whatever is provided is simple, robust and minimalistic. I do appreciate it.

It’s more about code composition. It is certainly a limitation to me when I have to deliberately make the decision of waiting for a result before I use it. I may want to pass it somewhere else and I have to realize the value first as it is not actually a value, but a “live” ongoing process with restrictions (owner etc). Of course a matter of taste and if one;s not aware of that is not missing it :wink: Also, dataflow style has certain properties: http://doc.akka.io/docs/akka/2.3-M1/scala/dataflow.html However what meant writing my post was more what I know as e.g. Clojure/Scala future/promise (@value or value.get).

Certainly I am. I’ve been meaning to read that :slight_smile: Thanks.

It would be interesting to hear (and see) how you use Flow to mimic the Oz Dataflow pattern that you’re used to in the JVM world. I wouldn’t be surprised to see differences between the async patterns in the JVM & Erlang VM worlds. The JVM is no where near as lightweight and gobbles memory without looking back. They are simply different worlds.