Task async stream function name arguments

I’m using task.async_stream for one of the expensive functions I have but the problem I’m facing is passing the function name. The function I’m passing has 3 arguments and when I use this its takes by default 1 argument and it’s not able to find the same function.

So what can I do here?

Task.async_stream(__MODULE__, :function_name, [], max_concurrency: @max_concurrency)

Task.async_stream(collection, __MODULE__, :function_name, [arg1, arg3, arg3,], max_concurrency: @max_concurrency)

Hey. But These arguments will be not available when I use them there.

 def read_csv_file() do
    File.stream!("annual.csv")
    |> NimbleCSV.parse_stream(skip_header: false)
    |> Stream.map(fn org -> data_org(org, user, id) end)
    |> Task.async_stream(__MODULE__, :data_org, [], max_concurrency: @max_concurrency)
  end

And? Can’t you pass them to the streaming function like the following?

def data_org(org, user, id) do
  # process your data here
end

def read_csv_file(user, id) do
    File.stream!("annual.csv")
    |> NimbleCSV.parse_stream(skip_header: false)
    |> Task.async_stream(
      fn record ->
        data_org(record, user, id)
      end,
      max_concurrency: @max_concurrency
    )
end

I am assuming that org is the current record from the stream of records that you want to work on from the CSV file.

Without more details it’s not clear what your real issue is. If you want to use Task.async_stream you have 2 options:

Task.async_stream(enumerable, fun, options \\ []) or

Task.async_stream(enumerable, module, function_name, args, options \\ [])

So you either need to pass the args to your read_csv function or pass a closure containing the actual function you want to call and use that in Task.async_stream/3. i.e.

my_task_fn = fn x -> my_fun_i_really_want_to_call(x, arg1, arg2, arg3) end

read_csv(my_task_fn)

By the way, you might want to think about taking more time explaining your problem in the initial post. Imagine you are using your spare time to help someone and each time you reply they say “that won’t work because <insert relevant detail not included in previous post>”. It’s not a nice feeling and might dissuade people from replying to you.

1 Like