Upholding Slack postMessage limits with Oban

Hey peeps,

I’m working on a daily journal bot for Slack that sends a reminder to all subscribed users to answer two questions at the beginning of their day.

For that, I need to uphold Slack API postMessage limits:

  • 1 direct message per second
  • 600 direct messages per workspace

I’m not really worried about the second limit - that would be an excellent problem to have, but it’s not going to crop in any time soon.

Two other considerations that adds another dimension of complexity:

  • Messages should go out only during working days
  • People should be able to adjust the time when they want to receive their messages
  • Messages should be delivered at most once per day

Here is how I picture a possible implementation

  • I’m considering separately storing the preferred message delivery time (or using the default 9 am).
  • Based on those settings I should schedule jobs for sending messages
  • All messages for a specific organisation should go through a designated queue so that I can uphold the 1 DM per second limit.

Those are general thoughts not grounded in what Oban can and can’t do, so I’d love to hear your opinion on how I can use Oban to meet those requirements (I reckon that I may require Oban Pro).

Best,
Yevhenii

Thank you for moving this from the Oban repo’s issue tracker over to the forum; this is more fitting for advice and discussion.

That’s a good place to use a cron job that generates other jobs. Configure the job to run at midnight UTC, or at intervals throughout the day, and have it insert jobs for preferred message delivery times between then and the next cron interval, e.g. the next 24 hours.

Every queue adds some communication overheard, not to mention the organizational burden from managing them all. While it is possible to use Pro’s DynamicQueues plugin for this, or rig it with start_queue, I higly recommend using Pro’s partitioned rate limiting instead. That would look something like this:

local_limit: 30, rate_limit: [allowed: 1, period: 1, partition: [fields: [:args], keys: [:org_id]]]

That would allow delivering up to 30 concurrent messages while limiting executing to one job per second for a particular organization id.

Hope that helps!

3 Likes

Thanks for the reply!
After thinking a bit more about that option I found it to be quite complex in terms of edge cases and currently exploring another approach - calculating the kickoff time for the initial notification and then rescheduling it in 24hrs.

I’m currently trying to implement per-workspace rate limiting as you suggested and want to ask if it’s possible to observe the expected behavior in tests.

Problem Summary:

I want to insert two jobs, let Oban do its thing, and then see that only one was executed. However, drain_jobs show that both jobs were executed while it took 0.1s to run the test. This tells me I’m doing something wrong: %{cancelled: 0, completed: 2, discarded: 0, exhausted: 0, scheduled: 0}.

Interestingly, I tested the setup in the dev environment, and it seems to do the right thing. It’s just in the test environment where I’m encountering this issue.

Configuration:

My config.exs:

config :sesame, Oban,
  repo: Sesame.Repo,
  plugins: [Oban.Plugins.Pruner],
  engine: Oban.Pro.Engines.Smart,
  queues: [
    default: 10,
    daily_ritual: [
      local_limit: 30,
      rate_limit: [allowed: 1, period: 1, partition: [fields: [:args], keys: [:slack_team_id]]]
    ]
  ]

In tests I additionally set the testing mode to :manual:

config :sesame, Oban, testing: :manual

Test Code:

In my test:

defmodule Sesame.Model.NotificationsSchedulerTest do
  use Sesame.DataCase, async: false
  alias Sesame.Model.DailyRitualSubscription

  alias Sesame.Model.NotificationsScheduler.ObanScheduler, as: NotificationsScheduler

  defmodule NullWorker do
    use Oban.Pro.Worker, queue: :daily_ritual

    @impl Oban.Pro.Worker
    def process(_args) do
      :ok
    end
  end

  describe "schedule" do
    setup do
      start_supervised!({Oban, Application.fetch_env!(:sesame, Oban)})
      prod_scheduler_config = Application.get_env(:sesame, Sesame.Model.NotificationsScheduler.ObanScheduler)
      Application.put_env(:sesame, Sesame.Model.NotificationsScheduler.ObanScheduler, worker: NullWorker)

      on_exit(fn ->
        Application.put_env(:sesame, Sesame.Model.NotificationsScheduler.ObanScheduler, prod_scheduler_config)
      end)

      subscription = %DailyRitualSubscription{
        slack_profile_id: "a slack profile id",
        slack_team_id: "a slack team id",
        morning_ritual_commence_at: DateTime.utc_now(),
      }

      %{subscription: subscription}
    end

    test "throttles one worker per second", %{subscription: subscription} do
      NotificationsScheduler.schedule(subscription)
      NotificationsScheduler.schedule(subscription)

     drain_jobs(queue: :all) |> IO.inspect()
    end
  end
end

Sorry for the loaded setup - it’s not really relevant, but I decided to keep it to illustrate how NullWorker is injected.

Scheduler:

The NotificationScheduler is as simple as:

  defmodule ObanScheduler do
    def schedule(%DailyRitualSubscription{} = subscription) do
      Oban.insert(morning_ritual_worker(subscription))
      :ok
    end

    def morning_ritual_worker(subscription) do
      worker().new(%{
        slack_profile_id: subscription.slack_profile_id,
        slack_team_id: subscription.slack_team_id,
        kind: :morning_ritual,
        scheduled_at: subscription.morning_ritual_commence_at
      })
    end

    def worker() do
      Application.fetch_env!(:sesame, __MODULE__) |> Keyword.fetch!(:worker)
    end
  end

Could you please help me understand what I might be doing wrong, or how to achieve the expected rate-limiting behavior in my tests? Thank you!

1 Like