How to use GenStage/Flow in this case

I am trying to make a script which tends to do a few steps.

  1. get 2 date inputs
  2. format those dates and get pair of dates with a starting date and an ending one.

Just for the sake of convenience, I am putting the full script, you can place it any file and it will run if you want to test it.

defmodule Extractions do

  @format ~r[(?<start_hour>\d{2}):(?<start_minute>\d{2})-(?<end_hour>\d{2}):(?<end_minute>\d{2})]

  def start do
    start_date = Calendar.DateTime.from_erl!({{2020, 1, 25},{0, 29, 10}}, "Etc/UTC", {123456, 6}) #|> Calendar.DateTime.shift_zone!("Europe/Dublin")
    end_date = Calendar.DateTime.from_erl!({{2020, 2, 2},{0, 29, 10}}, "Etc/UTC", {123456, 6}) #|> Calendar.DateTime.shift_zone!("Europe/Dublin")

    schedule = %{
      "Friday" => ["08:00-18:00"],
      "Monday" => ["08:00-18:00"],
      "Saturday" => [],
      "Sunday" => [],
      "Thursday" => ["08:00-18:00"],
      "Tuesday" => ["08:00-18:00"],
      "Wednesday" => ["08:00-18:00"]
    }
    |> Enum.filter(fn {_, v} -> length(v) != 0 end)
    |> Enum.into(%{})

    days =
      schedule
      |> Enum.map(fn(sc) ->
        {day, hours} = sc
        if length(hours) != 0, do: day
      end) |> Enum.filter(& !is_nil(&1))

    camera_exid = "waxie-jolxd"

    interval = 1200

    all_days =
      Calendar.Date.days_after_until(start_date, end_date, true)
      |> Enum.filter(fn(day) ->
        Enum.member?(days, day |> Calendar.Strftime.strftime!("%A"))
      end)

    valid_dates =
      all_days
      |> get_date_pairs(camera_exid, schedule)

    {get_expected_count} =
      Enum.reduce(valid_dates, {0}, fn date_pair, {count} ->
        %{starting: starting, ending: ending} = date_pair
        {:ok, after_seconds, 0, :after} = Calendar.DateTime.diff(ending, starting)
        {count + (after_seconds / interval)}
      end)
      valid_dates
  end

  defp get_date_pairs(dates, camera_exid, schedule) do
    dates
    |> Enum.map(fn date ->
      schedule[Calendar.Strftime.strftime!(date, "%A")]
      |> get_head_tail
      |> Enum.map(fn timings -> Regex.named_captures(@format, timings |> List.first) end)
      |> Enum.map(fn schedule_time ->
        Map.merge(
          %{
            "year" => strft_date(date, "%Y"),
            "month" => strft_date(date, "%m"),
            "day" => strft_date(date, "%d")
          },
          schedule_time
        )
      end)
    end)
    |> List.flatten
    |> Enum.map(fn date_tuple ->
      {starting, ending} = parse_schedule_times(date_tuple)
      %{
        starting: Calendar.DateTime.from_erl!(starting, "Etc/UTC", {123456, 6}) |> shift_zone,
        ending: Calendar.DateTime.from_erl!(ending, "Etc/UTC", {123456, 6}) |> shift_zone
      }
    end)
  end

  defp strft_date(date, pattern), do: Calendar.Strftime.strftime!(date, pattern)

  defp shift_zone(date, timezone \\ "Europe/Dublin") do
    date |> Calendar.DateTime.shift_zone!(timezone)
  end

  defp parse_schedule_times(%{"end_hour" => end_hour, "end_minute" => end_minute, "start_hour" => start_hour, "start_minute" => start_minute, "year" => year, "month" => month, "day" => day}) do
    {{{String.to_integer(year), String.to_integer(month), String.to_integer(day)}, {String.to_integer(start_hour), String.to_integer(start_minute), 0}}, {{String.to_integer(year), String.to_integer(month), String.to_integer(day)}, {String.to_integer(end_hour), String.to_integer(end_minute), 0}}}
  end

  def get_head_tail([]), do: []
  def get_head_tail(nil), do: []
  def get_head_tail([head|tail]) do
    [[head]|get_head_tail(tail)]
  end
end

now when it get starts with above those dates.

    start_date = Calendar.DateTime.from_erl!({{2020, 1, 25},{0, 29, 10}}, "Etc/UTC", {123456, 6})
    end_date = Calendar.DateTime.from_erl!({{2020, 2, 2},{0, 29, 10}}, "Etc/UTC", {123456, 6})

the end result will be

iex(4)> Extractions.start
[
  %{
    ending: #DateTime<2020-01-27 18:00:00.123456+00:00 GMT Europe/Dublin>,
    starting: #DateTime<2020-01-27 08:00:00.123456+00:00 GMT Europe/Dublin>
  },
  %{
    ending: #DateTime<2020-01-28 18:00:00.123456+00:00 GMT Europe/Dublin>,
    starting: #DateTime<2020-01-28 08:00:00.123456+00:00 GMT Europe/Dublin>
  },
  %{
    ending: #DateTime<2020-01-29 18:00:00.123456+00:00 GMT Europe/Dublin>,
    starting: #DateTime<2020-01-29 08:00:00.123456+00:00 GMT Europe/Dublin>
  },
  %{
    ending: #DateTime<2020-01-30 18:00:00.123456+00:00 GMT Europe/Dublin>,
    starting: #DateTime<2020-01-30 08:00:00.123456+00:00 GMT Europe/Dublin>
  },
  %{
    ending: #DateTime<2020-01-31 18:00:00.123456+00:00 GMT Europe/Dublin>,
    starting: #DateTime<2020-01-31 08:00:00.123456+00:00 GMT Europe/Dublin>
  }
]

now. What I am trying to do is: I need to pass each date pair to a process, in which.

Process will start from starting date and increase it with interval as seconds and reach upto ending date.

During this increase…

  1. It will download files from seaweedfs(A file system REST API.) for the date.
  2. Save it and put in dropbox batch
  3. Push those images to dropbox on 1000 no of batches or less if its already on ending date.

Now My question is: I want to set my scenarios in genStage or in Flow. so that with the above Pair. of dates.

How I can use GenStage or Flow to make the process faster.

PS: There can be any date range. let suppose a year. and also there can be more than 10000 pairs of dates.

each date pair, can be working our also can be a full day.

Any guidance would be perfect, I am learning GenStage, I am getting nothing from counter loop examples. any guidline will do. thanks

1 Like

can this be helpful in this case?

https://hexdocs.pm/elixir/Task.html#async_stream/3

Is the question is wrong or its not clearly written that no one is interesting in heading towards any direction ?

I believe GenStage would be an overkill since you don’t need to apply backpressure. You are in control of emitting the date pairs.

Using Flow would only make the code more complex for not much benefit.

Task.async_stream seems perfect IMO. Seeing as that you won’t be limited by a CPU load but by network I/O then you can also increase your max_concurrency so you can get more work done at the same time. For network-bound workloads I usually go for System.schedulers_online() * 5.

Let me start by trying to make your question more clear by attempting to split it into sub-problems:

Problem 1

  1. A user can supply a pair of datetimes, the ‘start’ and ‘end’ datetime.
  2. We transform this into a sequence of moments between these two, according to some rules.

Problem 2

  1. For each element of the sequence obtained from solving Problem 1, we need to perform some work: downloading a file.
  2. We need to batch these downloaded files together. (What rules should there be followed for this batching?)
  3. We then upload all batches.

Problem 3

Instead of submitting only a single date pair, the user can actually submit a whole sequence of them.


I hope I have interpreted your problem correctly. As you can see, I still have a couple of questions. Nevertheless, I think we already have enough info to talk about how you could use GenStage/Flow/Broadway to work with this.

  1. I would solve ‘Problem 1’ using the functions in Elixir’s built-in Stream module because there is no heavyweight work going on and we might delay the evaluation of all intermediate datetimes to whenever they are required, which will save memory.
  2. For ‘Problem 2’ it seems like using Flow is your best bet. GenStage is the more low-level foundation upon which Flow (and Broadway) are built.
    • Use Flow.from_enumerable to split the stream obtained from Problem 1 across processes.
    • Use Flow.map to perform the downloading work for each datetime.
    • Use (probably? depending on your batching logic) Flow.group_by or Flow.reduce (maybe prefixed by a Flow.partition) to combine the results into the batches you want.
    • Finally, you can upload the batches in another Flow.map.
  3. To make all of this work with not a single pair of dates but with an list of them, I would just call above flow for each element on the list, rather than making the flow more complex. Reason for this is that you probably want the batches/uploads to be different for each of them, and that is conceptually the simplest to do this way.

Does that help? :slightly_smiling_face:

EDIT: I think Task.async_stream cannot properly handle the batching you require, which is why I did not suggest it. However, as I am not entirely sure how you want the batching to work exactly, I might be mistaken here.

2 Likes

I am not 100% clear on the requirements myself but IMO prepending Stream.chunk_by before Task.async_stream can take care of most batching needs. Yours might be the more correct comment though.

1 Like

Thanks for a detailed reply.

I will re-describe a few parts again, to make it more clear.

Problem 1

  • Yes and this part has already been done. I can wrap this whole up in one method let says generate_date_pairs/5

Problem 2

  • this is how those pairs are going to be used. Suppose I take one from the list above.
  %{
    ending: #DateTime<2020-01-27 18:00:00.123456+00:00 GMT Europe/Dublin>,
    starting: #DateTime<2020-01-27 08:00:00.123456+00:00 GMT Europe/Dublin>
  },

Suppose there is a method. such as download_and_create_batch/3

Start date is: #DateTime<2020-01-27 08:00:00.123456+00:00 GMT Europe/Dublin> and an interval will be passed into the method. The method will start on the very first date. go to a url , which is like. 17dom-yraim/snapshots/recordings/2020/03/02/00/00_03_000.jpg

  1. do an HTTP request for get.
  2. Save the content of file on disk.
  3. There are a 2 ways to upload file to dropbox. 1. Send file one by one which gives timeout when the limit to upload gets exceed over time. second option is to create a batch of upload files. such as
    client = ElixirDropbox.Client.new(System.get_env["DROP_BOX_TOKEN"])
    {:ok, file_size} = get_file_size(image_save_path)
    %{"session_id" => session_id} = ElixirDropbox.Files.UploadSession.start(client, true, image_save_path)

and when the batch 1000 or the date has reached to last date then commit files at once to dropbox\

    entries =
      path
      |> check_1000_chunk()
      |> Enum.map(fn entry ->
        [session_id, offset, upload_image_path] = String.split(entry, " ")
        %{"cursor" => %{"session_id" => session_id, "offset" => String.to_integer(offset)}, "commit" => %{"path" => upload_image_path}}
      end)
    ElixirDropbox.Files.UploadSession.finish_batch(client, entries)

now, this is a job done for the above pair.

Problem 3.

yes you are right user can submit the whole list, as its not actually user but server-side, which either deal with each pair singularly or deal with all the pairs in list at once.

I agree and its very helpful what you suggested. my problem is:

I have been fed up with Enum.each. I don’t want to perform all this task with each loop. I have seen a few examples in which. There are some defined methods and they are adjusted in flow to do all the work for example.

You have a few methods and you know the output of each of them.

  • generate_date_pairs
    • It will take a few params, and generate output such as… the above list in question.
  • go_through_a_pair
    • now this programmed to take single pair BUT I have seen a few examples about Flow where you just pas your method with &. and it will deal coming list of input in singularly, this method will start from first date and till last. download each file and put it over dropbox batch until its 1000 or the date ends.
  • commit_batch
    • It will just commit the batch to dropox this can be done in 2nd method as well.

So Now I hope its clear? if you have any other question pleaes do ask. I am learning it the hard way how to use Flow/GenStage or such concepts