Request for GitHub PR review of GenServer workflow implementation and automated tests

Hello Elixir community! I’m building a workflow built with some GenServers and I would like to ask a GitHub PR review!

Here’s a brief description of my project:

The PESCARTE Project has as its main goal the creation of a regional social network integrated by artisanal fishermen and their families, seeking, through educational processes, to promote, strengthen and improve their community organization and professional qualification, as well as their involvement in the participative construction and implementation of work and income generation projects.
Through the PESCARTE Project, the fishing communities that live in the municipalities of Arraial do Cabo, Cabo Frio, Macaé, Quissamã, Campos dos Goytacazes, São João da Barra, and São Francisco de Itabapoana are mobilized, encouraged, and oriented to participate in different actions and/or activities of an educational nature. These actions and/or activities have the following objectives: to improve the professional performance of these communities, either by increasing their productivity or by being able to better organize themselves and carry out solidary economic activities.
The intention is to reinforce the productive identities of these fishing communities, in order to favor the mitigation of the negative impacts that affect them and that result from the activities carried out, in that region, by the oil and natural gas exploration and production industry.

More context about this feature I’m building:

We need to build a price quote API for fish prices variations, that are updated daily in the Pesagro site (https://www.pesagro.rj.gov.br/). They publish reports that are files in PDF with price quotes for agricultural items, fish included.
So the first step wast to “scrape” all these reports, that was made in this PR: Cria app CotacoesETL e buscador de novas cotações na Pesagro by zoedsoupe · Pull Request #113 · peapescarte/pescarte-plataforma · GitHub. The second step is to convert all these PDFs into TXT to easy parsing of information and the last step is the information ingestion.

Also, I would like to ask some advices on how I could test the flow that this PR implements!

This is the PR description, translated:

Description

This PR implements the second part of the script for importing fish quotes from the Pesagro website. In this part of the flow, a new worker has been implemented, to search the quote table for quotes (links) that have not yet been downloaded.

After downloading each file, a check must be made, because some Pesagro links are a set of PDFs in a zip file, which must be extracted.

With all the PDFs extracted, we must then upload each of the extracted PDFs into the Zamzar API, converting them to TXT. We cannot exceed the rate limit of their API (5 requests per second).

Once we have the converted file, we need to download it so that the last worker can be started for parsing the data from each fish.

The original script can be found in the cotacoes-api repository: https://github.com/peapescarte/cotacao-api/blob/feat-etl-module/etl/crawler.py

Points for Attention

  • The conversion worker should follow the following flow:
    1. Fetch quotes from the database that have not yet been downloaded.
    2. Download each quotation, at the Pesagro site
    3. If a quotation is a zip file, extract all the PDFs contained in the file
    4. Upload each PDF to the Zamzar API, for conversion into TXT of each one, respecting their rate limit (maximum 5 requests per second)
    5. Download the converted file from Zamzar, if it is already ready, or schedule a new query in their API

Do you have new settings?

  • Internal settings for the correct use of the lib mox.
  • Environment variable FETCH_PESAGRO_COTACOTES, a boolean to control if workers should be started with the application or not

Do you have migrations?

N/A

This is the PR link: Cria worker para converter arquivos PDF para TXT by zoedsoupe · Pull Request #119 · peapescarte/pescarte-plataforma · GitHub

2 Likes

Gotta be honest with you, I would have given you 20-30 minutes of my time to review but I don’t speak Spanish and couldn’t read the description so I just bailed.

If you translate that to English you will IMO have a few takers.

Oh, isn’t spanish! The PR description was written in Portuguese. And I already translated to English on the question above: Request for GitHub PR review of GenServer workflow implementation and automated tests

See? :003: Exactly what I am talking about.

Ahh, now I understand why did you put that text there! Thanks.

One thing that jumps out right away is the repeated use of :timer.apply_after to do some kind of time-delayed sequencing of operations; that seems difficult to test and debug.

Calling GenServer.cast inside of handle_cast (via functions like trigger_zip_extraction) also seems strange, since it looks like a function call but doesn’t really do anything until the next receive loop.

Using Task.async inside of GenServer callbacks is not recommended

Consider moving the Process.sleep inside the Task.async here so the tasks are napping instead of the main GenServer process.

Not directly related to the changes in that PR, but having a function Zamzar.Job.changeset/1 that doesn’t return an Ecto.Changeset is going to throw off readers who expect that function to work “like it usually does”.

The handling for download_txt is entirely independent of the GenServer’s state and doesn’t interact with the other handlers - that’s usually a signal that it shouldn’t be part of the same process.

How is trigger_cotacoes_convertion going to be invoked? What (if any) guards are there against invoking it again while the first conversion is processing, which would overwrite the GenServer’s state?

BUT

Bigger than all of those is a design concern: there’s a lot of complexity in CotacaoConverter around dealing with a list of files when the steps involved are all per-file. Consider splitting the behavior differently to focus on a single file at a time:

cotacoes_to_upload = CotacaoHandler.find_cotacoes_not_downloaded()

# NOTE: this is needed to make async_stream not terminate us on timeouts
#       In your real application you may want to use a Task.Supervisor
#       with async_stream_nolink instead.
Process.flag(:trap_exit, true)

cotacoes_to_upload
|> Task.async_stream(&fetch_cotacao/1, timeout: 30_000, on_timeout: :kill_task)
|> Stream.flat_map(fn
  {:ok, files} when is_list(files) -> files
  {:ok, file} -> [file]
  {:error, reason} -> []
end)
|> # ...do more stuff with each file path...

# helper for downloading and unpacking a single file
def fetch_cotacao(cotacao) do
  file_path = pesagro_handler().download_boletim_from_pesagro!(@storage_path, cotacao)

  if String.ends_with?(file_path, ".zip") do
    pesagro_handler().extract_boletins_zip!(file_path, @storage_path)
  else
    file_path
  end
end

One side-effect of this arrangement is that the “do more stuff” part of the stream can start running while some of the fetch_cotacao tasks are still doing their thing.


The other half of CotacaoConverter’s work is around uploading the PDF and then polling for results. This could be extracted to a separate “gateway” process that does the following:

  • accepts filenames to upload, puts them on a queue
  • when there is available capacity (under the rate-limit), upload a file and track the job
  • periodically poll with the “in-flight” job IDs to see if the job is done
  • if so, send a message to the process that queued the filename (or download the file directly)

The “send a message” part is because I’m not sure what else the gateway might need to do (send a PubSub message? Update a record in the DB?) to notify the rest of the system that the file is ready.

5 Likes