Right now I am using PubSub to send messages to a dedicated genServer, which will handle them in order. The handling of a message may take some time, and in the mean time a newer, but otherwise same message could come in. My message handing is idempotent so some amount of duplication is ok; However, it is still a waste of computation. Can I peek ahead in my mailbox during handle_info/2
, and cancel all pending messages that match a certain pattern?
You can do selective receive, but that will take longer with growth of the message queue. Instead I would add some idempotency token to the message and use that to reject the messages that were already processed, so instead of rejecting messages that you have already computed during computing them, discard them as soon as these arrive.
I’ve done selective receive with after 0
a couple of times to batch messages. But you can also de-duplicate. e.g.
defmodule Dedup do
def example_process() do
Process.sleep(100)
receive do
message ->
dedup(message)
dosomething(message)
end
example_process()
end
def dedup(message) do
receive do
^message ->
dedup(message)
after
0 -> :ok
end
end
defp dosomething(message) do
IO.puts("Got #{inspect(message)}")
end
end
You probably only need the dedup()
method. The rest is just to run the example. Every number will be printed only once even though there are a total of 100 messages sent.
iex(1)> pid = spawn_link &Dedup.process/0
#PID<0.383.0>
iex(2)> for i <- 1..100, do: send(pid, div(i, 10))
[0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 2, 2, 2, 2, 2, 2,
2, 2, 2, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 5, ...]
Got 0
Got 1
Got 2
Got 3
Got 4
Got 5
Got 6
Got 7
Got 8
Got 9
Got 10
iex(3)>
Nice. This is exactly what I was looking for. Thanks a lot!
Thanks, I thought about this before, but it is more state to maintain and complexity I want to avoid. I am going to do dedup with selective receive like @dominicletz mentioned