Task.Supervisor.start_child can be a memory problem?

Based on the Genserver is single process, I recently deleted some Genservers state and change them to ETS, especially the part of my project has concurrent read and write something like storing user token

Please see this code, with this function I can delete all the user tokens in m ETS table

  def delete(user_id) do
    Task.Supervisor.start_child(UserToken, fn ->
      UserToken.revaluation_user_token_as_stream(&(UserToken.delete(&1.id)), %{user_id: user_id})
    end)
    ETS.Set.match_delete(table(), {:_, user_id, :_})
  end

But every user request I send or call a function in Task.Supervisor.start_child task to do something like updating database or delete a record on another ETS table and I do not need to wait for the response.

My question is “is it going to be a problem like Genserver for concurrent user because I use Task.Supervisor when many users send request in a same time?” if yes, what is your suggestion?


I thought about |> Task.async_stream(action) |> Stream.run, I am not sure :thinking:


Thank you in advance

I have read “Scalability and partitioning” part in elixir 1.14 (link)

The Task.Supervisor is a single process responsible for starting other processes. In some applications, the Task.Supervisor may become a bottleneck. To address this, you can start multiple instances of the Task.Supervisor and then pick a random instance to start the task on.

Hence, before releasing the 1.14 of elixir, what we should do? As I do not need to see the result and I am not in a Genserver, can I do like this for example, and we have any error for Scalability and partitioning?

Task.async_stream([1], fn _item -> 
  UserToken.revaluation_user_token_as_stream(&(UserToken.delete(&1.id)), %{user_id: user_id})
end)
|> Stream.run()

But I think if one of the of Task.async_stream failed, it terminates all the task, but in this block code I just use Task.async_stream([1],, and I create a new process for each user request, Am I going to dive in a problem?

I’d start with figuring out if this is actually a problem for your usecase. While this is indeed a problem (hence it being addressed) it’s something you have a lot of runway to get to.

1 Like

Hi Dear @LostKobrakai, I have been starting to improve the new version of my CMS ( mishka-cms), I figured out I have many problems with all functions of Genserver and Task I used in my project, after that I started to edit them.


Imagine 20,000 users sending their token removal request at a time.

It should be noted before this improving, I created a dynamic supervisor for each user and the Genserver is my user state to keep user tokens, and each user can have different tokens for devises.


Now I keep all the tokes of user in ETS with read_concurrency: true, write_concurrency: true options.

So we have 20k users which send requests to delete their token

ETS.Set.match_delete(table(), {:_, user_id, %{token: token}})

But I need to send a request each request of users to check or update my Postgres for example or delete a record on another ETS table

For now, I try like this (I put this function in first post):

Task.Supervisor.start_child(UserToken, fn ->
      # Do something ...
end)

It seems 20k requests load my function like this:

def delete_token(user_id, token) do
  Task.Supervisor.start_child(UserToken, fn ->
    UserToken.revaluation_user_token_as_stream(&(UserToken.delete(&1.id)), %{token: token})
  end)
  ETS.Set.match_delete(table(), {:_, user_id, %{token: token}})
  get_all(user_id)
end

1. Now first step, because Supervisor is single possesses, do I have problems in concurrent request like 20k?


2. What way can I use not to be linked to any caller and does not consider the result and has no problem with concurrent users requests?


  • If I use Task.async without Task.await to do something! If the caller has problem or raise an error, I think all the task I create are going to be failed, Am I right?
  • if I use Task.Supervisor.start_child, the Supervisor is single process
  • I do not know Task.async_stream with Stream.run() can be useful for concurrent user or not, and the document says it depends on the caller, based on what I understood I need to use Task.Supervisor.async_stream_nolink and again Supervisor is single process

I really have no idea what should I do, I have used Task.Supervisor.start_child and another Task.Supervisor function in the places that my users send the request at the same time

Thank you and I am very sorry for my English skill to describe my problem

All systems overload. The real question is, what happens when the system overloads?

If UserToken.revaluation_user_token_as_stream does significant computation / IO / etc, your system is likely to run out of those resources before a single Supervisor managing their distribution is a major problem.

If UserToken.revaluation_user_token_as_stream doesn’t do significant computation / IO / etc, it doesn’t belong in a Task at all.

Also, that ETS.Set.match_delete(table(), {:_, user_id, %{token: token}}) call is far more of a concern:

  • the ETS table is also just one process
  • that match_delete is going to need to examine every entry to do its job. Consider a secondary index to avoid this, as in a relational database
1 Like

I consider a secondary strategy for the time if a problem happens and ram is cleaned; A Genserver Get all refresh token in handle_continue

@impl true
def handle_continue(:sync_with_database, state) do
  UserToken.revaluation_user_token_as_stream(&save(
    %{ ... }
    },
    &1.user_id
  ), %{expire_time: DateTime.utc_now()})
  {:noreply, state}
end

def save(user_token, user_id) do
  ETS.Set.put!(table(), {String.to_atom(user_token.token_info.token_id), user_id, user_token.token_info})
end

def revaluation_user_token_as_stream(action) do
  from(t in UserToken)
  |> MishkaDatabase.Repo.stream()
  |> run_action_repo_stream(action)
end

defp run_action_repo_stream(stream, action) do
  MishkaDatabase.Repo.transaction(fn() ->
    stream
    |> Task.async_stream(action, max_concurrency: 10)
    |> Stream.run
  end)
end

I created a query which is stream in Ecto side and call save function in each Task.async_stream to send refresh tokes to ETS.

But the access tokens are going to be deleted, and it is not important because user can create it with refresh token

I just do 2 jobs, the first one is getting data or deleting a record from Postgres with Ecto that you can see in this comment I put in top as revaluation_user_token_as_stream function

The second one is rejecting the tokens are expired with this function:

def delete_expire_token() do
    time = DateTime.utc_now() |> DateTime.to_unix()
    pattern = [{{:"$1", :"$2", :"$3"}, [{:<, {:map_get, :access_expires_in, :"$3"}, time}],[true]}]
    ETS.Set.select_delete(table(), pattern)
  end

I need to let users load and delete a record from ram not database but in background I want to sync them.

I think with my previous explanations in this post if you are in my place and are creating an Open source CMS for a normal scale what I example in this post what do you do?

We have some limitations. First, we do not want to use external software except Postgrass.
Second, we are going to cover the high scale of the user and be ready for distributing in the future.
Thirdly, it makes me a good learner and also a good software for people who want to test, otherwise they can use Joomla or WordPress.


I really want to edit my MishkaCms entire structure over time and provide a good service to users in the future versions that are productions.

It should be noted, I can not create a queue to response my user about their tokens, it should be accessible every time and very fast as possible


Some activities in this system should not keep the user waiting and should be done in the background and their output is not important but should be done quickly. They should not be in the queue, and it is very important that they are applied quickly.

Thank you in advance


Moving to ETS based on:

1.https://dockyard.com/blog/2017/05/19/optimizing-elixir-and-phoenix-with-ets
2. Genserver VS ETS - #10 by benwilson512
3. Help to understand PartitionSupervisor in elixir 1.14

General note: calling String.to_atom with a token is almost certainly a bad idea. The atom table isn’t garbage-collected AFAIK

This is a great example of code that does NOT belong in a Task: it sends a call to table, but table can only handle one at a time so the 9 other streams in async_stream will be waiting in line.

To clarify my comment about a secondary index: the expensive part of match_delete / select_delete is checking all the rows for a matching user_id in the second tuple slot - only lookups on keypos (the first position) are efficient. A “secondary index” in this situation would be an ETS duplicate_bag table with each entry mapping user_id to a token, so the question “what tokens does this user have?” can be answered efficiently.

Write the simplest code that could work, and then come back and revisit that code if it becomes a bottleneck. How many tokens could a single user actually have? How often would they be deleting them?

1 Like

Ohh, it is a good point, I need to change my structure to duplicate_bag, it can be useful more than now because I can search all the user token based on user_id instead of second parameter of a tuple.
I converted to atom, because I am using UUID

You suggest me to use string key which is Ecto.UUID.generate? Instead of atom, which is limit in OS?


Each user can

  1. Has 5 refresh tokens (it means each user is limited to 5 devices, and it should be stored for 13 days)
  2. Has 5 access token (it should be stored for 1 hour).
  3. Has 1 Phoenix token for browser login as phoenix live view part (it is store by session, not my part)

Rules

  1. Each page needs access; except refresh creator page
  2. After 1 hour, user should create a new access token and refresh token.

If I think change my structure to duplicate_bag it can be optimized, because I need to check and delete expired tokens of a user every request and can be sent by 5 another devices


Important part of my questions, how can test and make the code under the pursuer to create a bottleneck?

Let’s improve this part, This part is using Task.async_stream and system loads every 24 hours with Oban cronjob.

stream
|> Task.async_stream(&delete(&1.id), max_concurrency: 10)
|> Stream.run()

But consider the part we send a query with where == user_id or token every request user sends to us, and we check is there an expired token for him/her or not. This is not a stream query because at least 12 token can exist for each user, but I do not keep a user waiting to see this result. hence, do you suggest Task here, and what Task function can’t be bottle necked?

The big update in this part is changing the ETS structure from set to duplicate_bag, Am I right?

So I changed my code and move to bag duplicate Link of the module.

Then only part I did not find solution for this Task.Supervisor.start_child. It should be noted, I implemented supporting for elixir 1.14-dev with PartitionSupervisor

As I understand, we should design software to use Task when concurrency has no conflict with race condition, if I am not right please tell me and if is there reference it would be very useful

I have read this post

Update for elixir 1.14

And I have no idea to monitor all request and make my software stable in big scale and prevent overflow of the data on ram ! Because I had no project in high scale before.