I just wrote a custom Mix task. It’s a pretty bespoke solution; lots of code specific to our app.
defmodule Mix.Tasks.Shard.Migrate do
use Mix.Task
require Ecto.Query
import Mix.Shard
@shortdoc "Runs the client schemas migrations"
@moduledoc """
Runs the pending migrations one or more client schemas.
## Examples
mix shard.migrate
mix shard.migrate -c 50
mix shard.migrate -n 3
mix shard.migrate --step 3
mix shard.migrate --to 20080906120000
mix shard.migrate -o dev -o alpha
## Command line options
* `--all` - run all pending migrations
* `--step`, `-n` - Run n number of pending migrations
* `--to` - Run all migrations up to and including version
* `--quiet` - Do not log migration commands
* `--log-sql` - Log the raw sql migrations are running
* `--concurrency`, `-c` - Set concurrency. Defaults to 20.
* `--only`, `-o` - Run for specified clients only.
"""
@switches [
all: :boolean,
step: :integer,
to: :integer,
quiet: :boolean,
concurrency: :integer,
log_sql: :boolean,
only: [:string, :keep],
]
@aliases [
n: :step,
c: :concurrency,
o: :only,
]
@impl true
def run(args \\ []) do
Mix.Task.run("app.config")
Logger.configure(level: :info)
{opts, _} = OptionParser.parse! args, strict: @switches, aliases: @aliases
opts = if opts[:to] || opts[:step] || opts[:all],
do: opts,
else: Keyword.put(opts, :all, true)
run(:up, opts)
end
def run(dir, opts) do
concurrency = Keyword.get(opts, :concurrency, 20)
# Each migration needs two connections?
start_repos(pool_size: concurrency * 2)
# Load synchronously so we don't get code compilation issues.
migrations = load_migrations()
clients_query(opts, :only)
|> Datastores.Repo.all()
|> Task.async_stream(&run(&1, migrations, dir, opts), max_concurrency: concurrency)
|> Enum.each(fn {:ok, _} -> nil end)
:ok
end
def run(client, migrations, dir, opts) do
Datastores.set(client.name)
dynamic_repo = String.to_atom(client.csn)
prefix = Datastores.Shard.client_to_schema(client)
opts = Keyword.merge(opts,
dynamic_repo: dynamic_repo,
prefix: prefix,
migration_lock: false # Migration lock doesn't honor prefix... :(
)
Ecto.Migrator.run(Datastores.Shard, migrations, dir, opts)
end
def load_migrations() do
migrations_path = Datastores.Shard.config(:db01)
|> Keyword.fetch!(:priv)
Path.wildcard("#{migrations_path}/**/*.exs")
|> Enum.map(fn path ->
[{module, _code}] = Code.compile_file(path)
[version, _trash] = path |> Path.basename() |> String.split("_", parts: 2)
version = String.to_integer(version)
{version, module}
end)
|> Enum.sort()
end
end
Basically…
- Start up all the dynamic repos.
- Query our controller database (regular Ecto repo) for list of tenants.
- Load all migration scripts from priv.
- Run all of them in Task.async_stream.
Datastores.set
is our version of Tenant.set
and it does the whole put_dynamic_repo thing. Each tenant is a Postgres schema, hence we also need to set prefix
Note the # Migration lock doesn't honor prefix... :(
comment. I think that’s a bug in Ecto and why I say things almost work perfectly. This code is pretty old though, I wonder if it’s been fixed.
Edit: I didn’t realize this was a year old. Discourse’s notifications are confusing to me… 