I am trying to do some works concurrently. The current code speaks for itself:
@sources
|> Flow.from_enumerable(max_demand: 50, min_demand: 20)
|> Flow.map(fn {mod, fun, args} = mfa ->
Logger.debug(fn -> ~s(Running #{inspect(mfa)}) end)
end)
|> Stream.run()
Basically, I am trying to run a map
operation in parallel.
Some questions:
- Does this map
@sources
in parallel using 50 workers?
- Is this the right way to use Flow? I read the docs, and they mentioned partitioning to achieve better performance, but I don’t quite get if it is useful in my case.
- Is there any other mature library for this kind of tasks? I am already using
GenStage
so don’t mention it. I think it’s an overkill to start GenStage
for some temporary, periodic work (Am I wrong?).
I don’t know how complex your mapping function is but to me Flow
and even GenStage
seem to be an overkill for the simplified scenario that you presented. Especially GenStage
is basically only necessary when you want to have back-pressure (namely consumers to be able to inform the producers that they are giving them work too fast).
Have you tried with Task.async_stream
and controlling its maximum concurrency?
1 Like
Nothing too complex, some ordinary HTML fetch and parse thing.
How come I have never seen Task.async_stream
? Seems like exactly what I needed.
Anyway, what I did with Flow at first was wrong, here is a version that works in parallel:
@sources
|> Flow.from_enumerable()
|> Flow.partition(stages: 20)
|> Flow.map(fn {mod, fun, args} = mfa ->
Logger.debug(fn -> ~s(Running #{inspect(mfa)}) end)
end)
|> Stream.run()
I know, 20 stages are a bit high, but my work is mostly IO bound.
1 Like
Just for the sake of it, this is the same thing using Task
:
MySupervisor
|> Task.Supervisor.async_stream_nolink(@sources, fn {mod, fun, args} = mfa ->
Logger.debug(fn -> ~s(Running #{inspect(mfa)}) end)
end, max_concurrency: 20, timeout: 120_000, ordered: false, on_timeout: :kill_task)
|> Stream.run()
I used supervisor and no linking version, because tasks could fail sometimes.
But I think the Flow is more beautiful, and since I’m already using GenStage somewhere else (processing infinite stream of data), having Flow wouldn’t hurt that much.
One thing about Task.Supervisor.async_stream_nolink
bothers me, why arguments start with supervisor name and not the enumerable? Seems unnatural.
1 Like
Oh well, it can happen. Important thing is you know about it now!
If that is indeed the case then you are better off using Task.async_stream
and manually tune its concurrency parameter to see how much I/O plus CPU load your work can tolerate. IMO if you’re mainly I/O bound then using the default behaviour of Task.async_stream
will leave your CPU under-utilised so definitely use a higher concurrency!
Recently I was exercising importing CSV data into a DB from a public dataset. And I actually had to increase my DB pool from 10 to 20 (my CPU’s virtual core count is exactly 20), and instead of putting records one by one in the DB, I used chunking and then inserting records in those chunks (I chose 500 at a time). And this was multiplexed over all CPU cores and I was able to achieve about 90% load on all CPU cores.
In short, Elixir gives you everything you might need for your work but you have to make sure you properly load your machine.
1 Like
Because the module is Task.Supervisor. The convention in Elixir is that if you are calling a module function, then the first parameter will be whatever data structure or thing is referred to by the module (of course Elixir is not perfect). This was a major bone of contention with erlang, which, quite frankly is all over the place and there are several erlang functions which I am constantly looking up the docs on and shaking my head.
1 Like