How to queue messages on a sender node?

I have an IoT device that sends sensor data to a server, but it isn’t always connected to the network (and/or the server might be offline). When the server is available, the IoT device is connected to its cluster.

My plan is that the IoT node saves the sensor data into a queue (preferably persisted with mnesia), and - when they are connected - the server receives the messages from this queue. So the server more or less doesn’t have to know about the queue / when the queue is empty it gets notified about the new messages the same way as it gets notified about the first item of a non-empty queue when the nodes get connected.

My question is: Does such a queue library exists?

If not: What would be the best way to implement this - particularly how to “pull” messages from the queue? My first idea is a GenServer that receives messages from the sensor and requests from the server. After a request it dispatched the first item of the queue to the server as soon its available (and probably pops it from the queue, when the server acknowledges it)

(I was also considering job processing libraries, but for obvious reasons they all assume that the server is always available, thus the messages queued on the server.)

Thanks for any suggestions

I am not aware of specific libraries that implement the retry logic, but if you have to implement it yourself, and you want to make the queue persistent to restarts, you can check CubDB (disclaimer: I am the author of the library). It should be simpler than Mnesia, especially for embedded development. You can find more here about why (or why not) you would use it instead of other DBs.

I have used it in the past years in IoT and embedded applications, for data logging, storing configuration, and queues too. The ability to support both key/value access and selection of sorted ranges of entries comes useful when persisting queues.

One way you can implement the queue is by saving a sorted collection, more or less like explained in the How to here. If you know the ID of the latest message that was successfully synced, you can then select all messages that came after that and sync them, either one by one or in bulk, then delete them from the queue.

As you implement the queuing mechanism, you would still have to decide between “at least once” or “at most once” delivery semantics (when a message fails, in general the client does not know if the failure occurred before the message was received or after). If messages have a unique ID, you can de-duplicate them sever side.

2 Likes

@hangyas I’m not aware of better solution than mnesia for you. Maybe lucaong’s DB can be good also. I think you can have some issues with your solution. I don’t have enough information about your system but I see this problem:

So the server more or less doesn’t have to know about the queue / when the queue is empty it gets notified about the new messages the same way as it gets notified about the first item of a non-empty queue when the nodes get connected.

This with GenServer logic can be problem. In case you went offline for long time, or you have a lot of data to sync it will be problem with simple solution with GenServer. Consider to use batch processing, maybe some kind of GenStage with back pressure, but that can create you more problems in same specific cases.

Small brainstorming:
Did think about BEAM cluster? Maybe wrong idea. When you have cluster, your mnesia tables are sync automatically and you can have own table per IOT and server process can POP table until is empty (back-pressure, batch processing, etc). So good thing in this solution is that IOT part has to just connect to cluster, that’s all, can be stupid. But there can be issues with clustering, connection, etc, it depends on your use case. And also I like that you use just “one” technology.

Or you can use out-of-box solutions like https://barrel-db.org/ where you keep synch processing for DB and all logic will be on server. So you have to just care to connect your IOT 's dbs with network. It can be good if you have multiple IOTs and you can create some dynamic network. But it depends how big is your IOT and if you have multiple devices, etc…

1 Like

Thanks for the guide, I’ll definitely consider CubDB. Queues should be easy to sync to a server, however, in my case I’m fine with losing a few items, as long as I have most of the measurements from the offline period

Yes, I try to solve everything inside the BEAM, mostly to keep deploys simple.

I think my queue should fit into the RAM, but it’s true that I should keep an eye on this, and probably move to some external DB if it becomes a problem. However, I’d still like the nodes to connect for sending control messages from the server to the IoT node for example, so I can’t fully rely on the DB.

Regarding mnesia: I have already experimented with it and it doesn’t handle network partition too well (needs restart after the nodes disconnect/reconnect) and I would still need additional logic to make it work, so it’s probably easier to send the messages myself and not rely on mnesia table sync (https://stackoverflow.com/questions/624570/online-mnesia-recovery-from-network-partition)

1 Like

Inspired by this thread, I created a local and persistent (double-ended) queue and stack abstraction for embedded applications on top of CubDB, basically an open-source version of what I did in similar contexts in the past.

The project is called CubQ, I hope it can be useful, or provide some inspiration.

A quick preview:

# Starting
{:ok, db} = CubDB.start_link(data_dir: "my/data/directory")
{:ok, pid} = CubQ.start_link(db: db, queue: :my_queue_id)

# Queue semantics:

CubQ.enqueue(pid, %SomeStruct{name: :one})
#=> :ok

CubQ.enqueue(pid, %SomeStruct{name: :two})
#=> :ok

CubQ.dequeue(pid)
#=> {:ok, %SomeStruct{name: :one}}

CubQ.dequeue(pid)
#=> {:ok, %SomeStruct{name: :two}}

# When there are no more elements in the queue, `dequeue` returns `nil`:

CubQ.dequeue(pid)
#=> nil

More documentation is found here.

3 Likes

Wow, cool!

I just started to implement the GenServer mentioned above, using pure CubDB, but I’ll change the backend to CubQ then. It will basically add a request function like:

Qub.push(pid, "some item") # might happen later and in another process

Qub.request(pid, self())

receive do
  {:item, item} -> IO.inspect item # "some item"
end

It’s probably a good idea to do this in a separate layer, right?

1 Like

Using request I plan to implement pipe too, which can pipe every item from one queue to another

Glad you like it :slight_smile: Make sure you update to the latest version (currently v0.2.0) because v0.1.0 had a nasty subtle bug that made different queues interfere with each other (oops…).

Yes, I would recommend implementing request and other functions in a different layer.

What might be interesting to implement in CubQ would be to atomically transfer an element from one queue to another. I will maybe implement this soon: I need to think about a good API, but it sounds useful, for example to un-destructively “claim” a job for a worker process, in case there is more than one working in parallel (so that if the process dies and re-spawns, the job is still available and can be retried).

1 Like

If anyone interested, this is how I solved it: https://github.com/hangyas/fungifarm/blob/4bf55b6f5c75f80d9f1a00ab2814af9316223711/apps/fungifarm_shared/lib/pulletmq.ex

I might release this as a separate package later, after I feel like the API is good and stable enough. I’ll post it here when I do

2 Likes

That’s a very interesting project in general :slight_smile:

I also published version v0.3.0 of CubQ, that introduces “at least once” semantics. Here are some docs about it.

2 Likes