I have this challenge of importing (downloading and saving) paginate data.
Currently I’m doing it recursively in a workflow step and I think it’s safe because if it will crash (f. ex. due to remote api error).
I designed this code to resume from the last known transaction on the first fetch, so it should also resume from the last saved transaction in a case of a crash
def insert(account_id) do
Workflow.new()
|> Workflow.put_context(%{account_id: account_id})
|> Workflow.add_cascade(:trigger_sync, &trigger_sync/1, queue: :transactions_sync)
|> Workflow.add_cascade(:import, &import_txs/1,
deps: [:trigger_sync],
queue: :transactions_sync
)
|> Oban.insert_all()
end
def import_txs(%{account_id: account_id}) do
account = BankAccount.by_id(account_id)
import_txs(account.finexer_id, account)
end
def import_txs(%{paging: %{next: nil}}, _account) do
:ok
end
def import_txs(source, account) do
{:ok, txs_resp} = download_transactions(source, account)
{_, _} = save_transactions(txs_resp, account)
import_txs(txs_resp, account)
end
def download_transactions(finexer_id, account) when is_binary(finexer_id) do
APIClient.account_transactions(finexer_id, [{:"timestamp.gte", Transaction.account_checkpoint(account_id)}])
end
def download_transactions(txs_resp, _account) when is_map(txs_resp) do
APIClient.account_transactions(txs_resp, :next_page)
end
defp save_transactions(txs_resp, account) do
txs_resp
|> Map.get(:data)
|> Transaction.bulk_import(account)
end
As a matter of curiosity I wonder if workflows design can support replacing downloading paginated data by dynamically adding jobs.
For example import_txs can import first page, check if there is more (txs_resp.page.next != nil) and then schedule next job with %{source: txs_resp.page.next} as an argument.






















