Importing paginated data

I have this challenge of importing (downloading and saving) paginate data.

Currently I’m doing it recursively in a workflow step and I think it’s safe because if it will crash (f. ex. due to remote api error).

I designed this code to resume from the last known transaction on the first fetch, so it should also resume from the last saved transaction in a case of a crash

   def insert(account_id) do
    Workflow.new()
    |> Workflow.put_context(%{account_id: account_id})
    |> Workflow.add_cascade(:trigger_sync, &trigger_sync/1, queue: :transactions_sync)
    |> Workflow.add_cascade(:import, &import_txs/1,
      deps: [:trigger_sync],
      queue: :transactions_sync
    )
    |> Oban.insert_all()
  end

  def import_txs(%{account_id: account_id}) do
    account = BankAccount.by_id(account_id)
    import_txs(account.finexer_id, account)
  end

  def import_txs(%{paging: %{next: nil}}, _account) do
    :ok
  end

  def import_txs(source, account) do
    {:ok, txs_resp} = download_transactions(source, account)
    {_, _} = save_transactions(txs_resp, account)
    import_txs(txs_resp, account)
  end

  def download_transactions(finexer_id, account) when is_binary(finexer_id) do
    APIClient.account_transactions(finexer_id, [{:"timestamp.gte", Transaction.account_checkpoint(account_id)}])
  end

  def download_transactions(txs_resp, _account) when is_map(txs_resp) do
    APIClient.account_transactions(txs_resp, :next_page)
  end

  defp save_transactions(txs_resp, account) do
    txs_resp
    |> Map.get(:data)
    |> Transaction.bulk_import(account)
  end

As a matter of curiosity I wonder if workflows design can support replacing downloading paginated data by dynamically adding jobs.

For example import_txs can import first page, check if there is more (txs_resp.page.next != nil) and then schedule next job with %{source: txs_resp.page.next} as an argument.

1 Like

That design should work. There’s no “finished” state for a workflow, so you can always append another job. Did you give that a try?

1 Like

I haven’t yet. I just don’t have enough understanding of Oban to put together a working solution.
I guess the shortest path to that would be to call add_cascade instead of import_txs inside import_txs/2.



def insert(account_id)
  Workflow.new(id: @sync_workflow)
  ...
end

  def import_txs(%{account_id: account_id}) do
    import_txs(account.finexer_id, account_id)
  end

  def import_txs(%{paging: %{next: nil}}, _account_id) do
    :ok
  end

  def import_txs(source, account_id) do
    account = BankAccount.by_id(account_id)
    {:ok, txs_resp} = download_transactions(source, account)
    {_, _} = save_transactions(txs_resp, account)
  
   @sync_wokflow
   |> Workflow.status() # get workflow
   |> Workflow.add_cascade(:import, &import_txs/1,
      deps: [:trigger_sync],
      queue: :transactions_sync
    )
   |> Oban.insert_all
end

So the last function would import the current page and schedule the next job. But how do I pass txs_resp to the next job?