Conduit - A framework for building reliable, event-based systems

https://www.conduitframework.com/

The best overview for how things are tied together is this presentation. Modules and functions are pretty well documented at this point, but higher level docs are needed.

I started this project about 2 years ago. At the time I had just sold the company I work for that Elixir would be a good idea and had already implemented a very simplistic setup that could connect an Elixir application to RabbitMQ. That implementation didn’t quite what I wanted to do and so I embarked on creating Conduit.

Conduit is intended to be a framework for building reliable, event-based systems. It does that by allowing you to integrate your application with a message broker like RabbitMQ, SQS, and in the future others. One problem I had when initially implementing stuff to connect to RabbitMQ was that there were libraries to connect to message brokers, but they didn’t give you a scalable OTP supervisor structure. For that part you were on your own. So, Conduit also attempts to address that, with adapters for the various brokers that has an OTP structure that scales well. Finally, I wanted a way to flexibly build patterns for processing messages that could be reused.

Here are two sample apps that use the two available adapters:

Here’s the source:

The library is at v0.12.5 currently and I expect to release v0.12.6 this weekend.

12 Likes

Version 0.12.6 was just published.

This version changes how Broker.publish/3 works. Instead of Broker.publish(:route, message), it is now Broker.publish(message, :route). This was changed because it makes pipelining much easier.

alias Conduit.Message
%Message{}
|> Message.put_header("foo", 1)
|> Message.put_destination("my.dynamic.queue")
|> Broker.publish(:route)

This release also fixes a regression for dynamic to and from. This was mostly an undocumented feature unless you read the typespecs. However, you can specify a function for to and from in publish and subscribe. This is primarily useful for subscribe as you could already dynamically set the destination for publish. You might want to use this if you have multiple instances of an application that need queues that only they will consume. For example:

subscribe :route, Subscriber, 
  from: fn ->
    :inet.gethostname()
    |> elem(1)
    |> to_string()
    |> String.replace("-", "_") 
  end

There were also a couple fixes to the generators that were contributions!

3 Likes

@blatyo This sounds very interesting. Could you explain a little bit more about how Conduit provides a “Scalable OTP supervisor structure”? Does it still give you the flexibility to define your own supervisor structure if you want it?

3 Likes

So, the goal of the adapters is to give you the OTP supervisor structure you would build anyways if you were just using SQS or AMQP directly. So, it’s very opinionated about the supervision structure, but does provide settings to tweak some parts of it. The best comparison I can give is to how Ecto’s Repo transparently does things for you like manage a connection pool. There are settings to manage the number of connections in the pool, but no way to say don’t use a pool and open a connection on every SQL request.

I mean a couple things when I say scalable. One is stable resource usage. So, for any given application, you should generally have near constant memory usage, connections, etc. This helps protect you from resource exhaustion, which could get you in situations where your entire application crashes. This also means that conduit is designed in such a way that you’re system should never be overwhelmed and if you are, it’s easy to tweak a few settings so that you’re not. Basically, it ensures there’s a back pressure mechanism. Conduit can’t make guarantees about the code the user writes, but it uses patterns to ensure that reasonable things happen around the users code. For example, if your messages are large, the BEAM can put them on the binary heap and they may not be GC’d for a long time. So, conduit does work to ensure that doesn’t happen. Also, your code could allocate a lot of memory, but because that’s run in an isolated process that dies after your code is done running, the BEAM can immediately reclaim that memory.

The second thing I mean by scalable, is that it is fast. I only have anecdata for this, but at work we have a couple applications that use conduit and process millions of messages per day and are idle most of the time. This isn’t a guarantee that there will never be queue backups, just that conduit is unlikely to be the reason why you have queue backups.

The third thing I mean by scalable is that it should recover gracefully. The BEAM certainly helps a lot here with that. But some things that are handled specifically by conduit are fault tolerance when an external message broker becomes unavailable. Isolation of user code from other parts of the supervision hierarchy and tools to deal with failures in user code, like the DeadLetter, Retry, and AckException plugs. By default, at least once delivery semantics. So, if something fails processing a message, you’re guaranteed to get that message again.

These quotes explain what the real goal here is:

Conduit doesn’t have a scalable OTP structure for the sake of it. It’s so the user doesn’t need to spend a bunch of time doing that themselves and can focus on their business logic.

7 Likes

Thanks, that’s really helpful. So I think I could summarize it as conduit leverages OTP semantics to minimize the impacts of faulty user code on the overall system while still maintaining high throughput and low latency.

5 Likes

ConduitAMQP v0.6.2 was just released!

Previously, setup of exchanges/queues/bindings happened at boot. If rabbit was unavailable at that time, then the application would crash. This release does setup after boot has happened. In order to do that, it:

  1. Starts connection and channel pools
  2. Starts subscribers in a waiting mode
  3. Starts a setup process
  4. Connections and channels attempt to connect until they are successful
  5. Setup runs to create exchanges/queues/bindings
  6. When setup is done, it sets values in ETS that subscribers are polling for to start
  7. Subscribers start
3 Likes

Conduit v0.12.7 was just released!

This release focuses on some improvements necessary for two new adapters being built. So, there’s no need to rush to update.

2 Likes

Conduit v0.12.8 was just release!

This release adds two new plugs to Conduit: Conduit.Plug.Wrap and Conduit.Plug.Unwrap. These were added primarily to support new adapters for brokers that do not support headers natively. It allows you to embed that information into the body of the message and extract them on the receiving side.

Even if you’re using something that does support headers, it still may be useful to embed some of that information into the message as well. At a place I used to work, we defined a meta section in the body that duplicated the correlation_id, user_id, and created_at. This was useful, because sometimes we would copy a message and share it with someone else and getting the body and all the headers was annoying extra work.

Anyways, check the docs to see the exact details of how they work:

3 Likes

ConduitMQTT v0.1.0 was just released!

MQTT is one of the dominant protocols used in the IoT space for message queues. This adapter wraps tortoise, an excellent MQTT library in its own right, to allow usage of Conduit goodies.

For anyone who uses MQTT, it would be useful if you could provide feedback. It would be interesting to know how you currently manage connections. This adapter, for example, creates a pool of connections for publishing messages and an individual connection for each subscription.

Finally, this adapter was primarily written by Jeremy Isikoff. So, big thanks to him for his contribution.

3 Likes

Conduit v0.12.9 was just release!

If you’re using Conduit.Plug.DeadLetter, you’ll want to upgrade. The code was using a deprecated version of Broker.publish/2, which would be apparent in your logs for messages that failed to process.

2 Likes

@blatyo, this is very cool. I’ve been building my own framework for handling AMQP messages and it’s nowhere near as complete as Conduit, nor as native elixir “feeling” as conduit.

Question … Is Conduit appropriate for building a solution that works with stateful data? I’m looking to add a BEAM-based application into an existing architecture that’s mostly C# microservices with RabbitMQ as a message bus connecting them. My elixir app would want to process messages, and Conduit certainly makes all the RabbitMQ interactions straightforward. However, if I need a message to access and mutate state, there’s no clear way I see for Subscribers to access that state.

Would I be correct in thinking I would need to do a call out from a Subscriber to a target GenServer, Agent, GenStage, or other process that has the state that the message needs to work with?

Also, I don’t see any RPC-style examples. To perform the equivalent of an RPC, do I just build a response message, probably copying over some necessary headers from the original message, and use Broker to publish replies from inside my Subscriber?

Thanks … @msw10100

1 Like

Can Conduit handle dynamically-named queues?

We have a deployment pattern that launches multiple processors in parallel. There’s usually a single work queue to which all instances subscribe, so they can load-balance request handling. Each instance also creates and subscribes to a couple of unique “instance” queues of their own, with special subscriptions for processor management layer, and to allow clients to interact with a specific instance as needed. These instance queues are named at create time with a unique name issued by our fabric service manager.

We use topic exchanges with some dynamic bindings on the dynamically-created queues, as well. Can subscribers be dynamically created and destroyed with custom bindings, or is it all declaratively described at compile time in the Broker?

Perhaps instead of defining the Broker in source and starting it as an application child, I could instead define it as a string at application startup, interpolating in all the dynamic info that I’ve received from my microservice manager, and use Code.eval in some fashion to compile and load it into memory and then start it?

Cheers … @msw10100

1 Like

Yea, Conduit works fine with stateful things. I’ve primarily used it to interact with a database using ecto or in some cases ETS. There’s nothing that would stop you from communicating with other processes though, if that’s where you keep your state.

Yep, that’s how you would do it. RabbitMQ has RPC built in. Direct Reply-to — RabbitMQ

Someone else was asking about RPC recently and there wasn’t a way to set a setting necessary to use RabbitMQ’s RPC. However, I just released a new version of ConduitAMQP (0.6.3) that does support that a couple minutes ago. If you use it and discover something that could be more ergonomic, let me know.

Yes, you can pass functions for the queue names. It’s not well documented, but there are tests for it. The function doesn’t have to be defined inline, you can use a module function by passing &MyModule.queue_name/0.

That should work fine as long as you have some function that can get that name.

Everything is created at application start, but depending on how you specified your config (inline vs. a function vs. application config), some of it may be set at compile time. You cannot currently create a subscriber or queue whenever you want. That will always happen when the broker starts with whatever you’ve specified in your broker. I don’t understand the situation where you would want to do something like that. It’s possible that could be supported, but it’s quite a bit of work, so I’d want to understand that better. Maybe you could explain it to me here or in a DM?

That might be possible, but I would hope that isn’t necessary.

2 Likes

ConduitAMQP v0.6.3 was just released!

This release adds the ability to set consume options for subscribers consuming from a queue. You can see the options here. This enabled a couple use cases, but the one that prompted it was RabbitMQ RPC.

2 Likes

What’s the concurrency model for Conduit Framework? I see that each subscriber invocation is a separate process for each message, much like Phoenix spawns a process per request. How many subscriber instances run at one time for a given connection/channel/etc…?

For ConduitAMQP:

  • There’s a connection pool, where you configure the size.
  • There’s a channel pool, where you configure the size. These are used for publishing messages. They are created using the connection pool.
  • There’s a channel per consuming queue. They are created using the connection pool.
  • Every received message in the system will have a separate subscriber process. You can use the AMQP settings to limit the number of messages in flight per queue. Either prefetch_size or prefetch_count.
4 Likes

ConduitSQS v0.2.7 was just released!

This release handles an annoying issue with hackney, where it’s leaking a message for the connection being closed. It was happening occasionally and causing some of the processes in ConduitSQS to crash. Note that it didn’t stop messages from getting processed.

4 Likes

Semi-random question: have you considered using gun instead of hackney? I’ve been watching a few tutorials on it lately and I kind of like it but not sure yet if it’s a better interface towards the Erlang HTTP clients.

1 Like

conduit_sqs wraps ex_aws_sqs, which uses hackney by default. Because you can configure that with ex_aws_sqs, you can technically use anything.

I haven’t really had more than a cursory glance at gun to be honest. Maybe ask in this thread: HTTP client libraries and wrappers

1 Like

Conduit v0.12.10 was just released!

This fixes a bug in the retry plug, when nacking a message. Also fixes a deprecation warning when running on Elixir 1.8.

3 Likes