How to take the Erlang/Elixir way of doing things to Python?

I have a new role in a domain that is completely dominated by Python. This is in the scientific R&D domain, where Python is used for everything from distributed control systems to data analysis pipelines. Of course, it’s the latter that makes some sense but not the former. However, that is the ultimate reality of the situation, in the short term at least.

I have never built big systems in Python before, so I am feeling a little bit like a fish out of water, especially since most of my systems have been built using Elixir, F#, and LabVIEW, all using actor/process-oriented architectures.

What I am looking for is advice from people who have built large systems in both Python and Elixir or Erlang and how they went about doing Erlang/Elixir-y type of things in Python without straying too far away from so-called Pythonic code. Obviously, a distributed control system calls for exactly the things provided by the BEAM, Erlang/Elixir, and OTP, but in the early days here, I must build in Python. So what I need are BEAM and OTP-ish type of things in Python without being too exotic. I.e., I need this to be robust as possible in Python while having a reasonable architecture for handling asynchronous and distributed things.

What architectures did you use? What libraries? Etc. Think of large amounts of hardware being controlled with large amounts of telemetry coming in from distributed network devices, all collected into a single control computer that will also handle user interfaces. I know of Pykka, and I’ve done some experiments with it, and it looks reasonable. But I am wanting some advice of robust frameworks, and I don’t know how robust and well-used Pykka is. There is also Ray. The user interface(s) will be using PyQt6 or PySide6, although there will also likely be some remote viewing (over the browser) for telemetry only.

1 Like

Hi,

I have both Python and Elixir experience. Other languages greatly influenced me with their ideas. Beam VM and OTP inspires me with architecture decisions. Its documentation is very proactive and helps to see the problems from other perspective.

I have never worked with Pykka directly, only when debugged code in Python music player. This experience is not enough, so will not speculate on it.

As I understand, you have a server, that collects telemetry data. This server also needs to display interface for the user. It would be helpful, if you could give more context about the task. For example:

  • Do you need to aggregate data before showing it? You can discard raw data after aggregation.
  • Do you need a raw data? You can run other aggregations on it later.
  • How fast the data comes in?
  • How do you plan to store this data?
  • What will be if server crashes? Is it OK, if it stops responding?
  • Do you really have to implement both desktop and browser UI?
  • Which web framework do you plan to use?
  • Does a server fetch the data from these devices, or the devices send telemetry to the server?
  • How much history do you plan to store? Do you have enough disk space for it (if everything will be on the single server)?
  • Do you need to have separate permissions for people? I mean, do you need to hide some data from specific users?
  • Do you need to show interface remotely? I mean, if it will be deployed to a public server? If yes, then it is better to protect it.
  • How do you plan to scale the application, if you will have 2 servers instead of one?

I would split the task into 3 main parts. Receiving the data, processing it and presentation.

Receive (devices send data to a server)

If I expect large amounts of data, then I possibly need to store it raw before processing.
There are some ways to do it. For example, the data can be saved directly to a database (many options here), added to a queue (RabbitMQ, Redis, Kafka, etc) or possibly some other solution.

It is better to have these endpoints as fast as possible, so they can handle much data. Also I would load-test different solutions here. Possibly web servers with async features will work fine.

We can compare it to process mailbox in Elixir. A process receives many messages, but can handle them as soon as gets some processor time. So, we try to emulate a mailbox here.

Receive (server polls devices for data)

If server fetches data from many devices over the network, then this part should be as concurrent, as it can be. Running multiple Python processes will introduce overhead here, so possibly you will want to go with threads, or even asyncio-related libraries. Idea is the same - you need to fetch as many data as possible with as little overhead, as possible.

You can compare this to launching multiple processes in Elixir to get the data, but we try to emulate the same behaviour with features from Python.

Process the data

If you need to show a raw data, then you can omit this step at all.

If you need to process it somehow, then you can look at something like Celery. I have also worked with Apache Airflow, but it can be overkill here. So, the basic idea is to get a chunk (batch) to process and store the results somewhere. The details greatly depend on the use-case.

Present data

It depends on the type of data, its amounts etc. Possibly you can just generate pages with charts via Flask or Django templates. Besides, you can just refresh a page every N seconds instead of using websockets. It can be just refreshing the whole page, only a part of it, or fetching data from API. Example with Stimulus, example with unpoly.js, example with HTMX. I would like to have only one UI. If you do really need a desktop application - possibly there is a way to show the same UI in a web-view.

Also there is Plotly Dash - have used it to render dashboards. It also supports open-source version.

General advices

I would recommend Designing Data-Intensive Applications book. It gives nice overview of tradeoffs, architecture details and approaches to work with data. It does not dive too deep into details, but gives many ideas (at least for me). Enjoyed reading it (and watching nice pictures there).

Another book with useful tips for overloaded system is Erlang in anger. It describes some typical issues with overloaded system in Beam and how to fight them. But you can apply ideas in it in other languages. Actually you can use actor model to design the system, but it will consist in Operating System processes instead of BEAM VM ones.

I would also recommend not to make the system too complicated - you will be grateful for it later. Possibly the simplest solution will work fine for you. The more moving parts you have - the more of them you need to support.

Hope it will help you, or at least give some ideas.

3 Likes

Hi Roman,

I apologize for my delayed response. Thank you very much for such a detailed response and helpful pointers! I read it at the time you replied and kept doing some research. To provide a little more detail, here is a kind of architecture I’ve landed upon:

However, even that is a little bit of an old design, because as of yesterday, we no longer have a need to separate the GUI over a network, so it’s likely that the WebSocket server is no longer needed and can become some sort of internal worker process.

If server fetches data from many devices over the network, then this part should be as concurrent, as it can be. Running multiple Python processes will introduce overhead here, so possibly you will want to go with threads, or even asyncio-related libraries.

There are indeed a lot of external devices, primarily TCP/IP servers. So I have been landing on the idea of using asyncio for all (or at least most of) the TCP/IP requests with asyncio.Queues for message passing, treating the various asyncio streams clients as little workers. I think you are right that creating many Python processes introduces a lot of overhead (because of the additional network calls between the processes) and orchestration (because of having to rely on the OS to manage these) needed. And it also requires the need for serialization/deserialization between the processes, instead of using native Python datatypes and of course custom ones using dataclasses and typing.NamedTuple.

Between threading, asyncio, multiprocessing, and separate Python processes, I mainly landed on asyncio because of the very nice APIs it has (such as asyncio streams). For a threading solution, I probably would lean on Pykka since it is the actor model on top of Python’s threading, but I worry about the context switching performance hit. For multiprocessing, I considered a queue-based solution there, but I am unfamiliar with multiprocessing for IO-bound applications, and asyncio seemed to be the recommendation there. And as mentioned, the multiple, separate Python processes feels unwieldy, slower, and requires serialization/deserialization for datatypes.

Does that sound accurate to you? Or a valid approach and decision criteria?

If you need to show a raw data, then you can omit this step at all.

There is some processing, but it is mainly parsing the text-based responses into datatypes and then passing them around to a timeseries database and the GUI. There may be one or two procedures that we need with heavier calculations, but even there it shouldn’t be too much to handle. If so, then perhaps multiprocessing is the way to offload the calculation. The aggregation is primarily simply collating the data and then displaying and saving it. There will be some watchers that look for values out of bounds that then provide alerts. But such calculations are like “if outside of a given numeric range, then alert”.

I think in general, I have ruled out the need for and complexity of things like RabbitMQ, Celery, etc.

Also there is Plotly Dash - have used it to render dashboards. It also supports open-source version.

Thank you for mentioning that. I am aware of Plotly, having used it in F#, but I haven’t ever used Dash or looked into it much. Right now, the plan was to write data to a timeseries database (such as InfluxDB, QuestDB, or TimescaleDB), and then view that data with Grafana. How does Dash compare to that approach?

Thank you again, Roman! I greatly appreciate the time you put into your original post.

1 Like

Hi thanks for a reply! I am glad that it helped a little bit :slight_smile:

So, as I understand you have something like this:

  • Multiple remote servers. They connect via TCP/IP to a central one.
  • Central server gathers information, converts it to the required format and stores it to the database.
  • You need to send alerts if the data is out of bounds.
  • You need to display data.

According to your description, it is not an ordinary HTTP server. I will be able to provide only general advice in this case. Possibly it will give you some insights. You know the requirements better and will be able to decide how to structure the system.

Regarding multiprocessing.

Here is a nice article about different ways to speedup Python code.

Types of tasks:

  • CPU-bound tasks - working with numbers, compressing images, transcoding video, etc. In simple words - the faster PC you have, the faster you get results. You can speed up processing by running multiple parallel tasks.
  • IO-bound tasks - writing/reading files, network requests, etc. You need to wait the response. Faster PC does not mean that you will get results faster. You can speed up this by multiple concurrent tasks.

Difference between processes and threads in Python:

  • Process - OS process. Has one thread by default (you can launch more later). Each Python process has a single GIL (at least now). GIL allows to execute Python code in only one thread at a time.
  • Thread - part of a process. All threads share the same GIL. One thread does some CPU work - other threads wait. If one thread does too much CPU-bound work it blocks other threads in the process, and it “freezes”.
  • Asyncio loop - Runs inside a single thread. “Event-based”. If you run too much CPU-bound tasks in a coroutine - it may block the whole loop (and thread).

Types of IO:

  • blocking - You opened a TCP socket. When you read from socket, the thread waits until you receive a message.
  • non-blocking - You opened a TCP socket. You can periodically poll a socket if it has some data. Or you can set timeout on a read/write operation. The code will throw error if received no data before timeout.
  • async - You opened a TCP socket. You want to read some data and “subscribe” (or “register a callback”) to “data available event”. Event loop does other jobs. When it sees a “data available event” from your socket, it runs your callback.

Major part of Python code and libraries use blocking IO (at least now).

So, here is a difference between between multithreading, multiple processes and asyncio:

  • Multiple processes - Each process is a separate OS process with a single thread (you can launch more threads later). It loads full Python (takes more RAM). Sharing memory is limited. Best for running multiple parallel CPU-bound tasks (Python GIL will not interfere here).
  • Multiple threads - They share the same process. Sharing memory is easier, but may lead to race conditions (use it with caution). You can use it to run multiple concurrent IO-bound tasks (many Python libraries support only this kind of operation).
  • Asyncio - coroutines share the same process. Has same memory and GIL issues as multiple threads. Best for async concurrent IO-bound tasks. Not all Python libraries support async features, use them with caution (they may block the whole event loop).

I gave a very high-level overview of how Python system works. If you have only one process, and some part takes too much CPU time - the whole system may freeze and stop responding. I would not recommend to run many background tasks. Also each process has a single GC - it can pause the whole system if it consumed too much memory. Sometimes Python leaks a memory, so GC is possible for active long-running processes.

Regarding Celery

Celery runs background tasks or . It starts a couple of “worker processes”. Each worker process subscribes to a “broker” - message queue (example brokers are Redis or RabbitMQ). Worker reads a task from the queue and processes it. You can put the task to the queue at any part of your application (even from another task). “Celery beat” may run scheduled tasks (like cronjob).

Regarding message passing between processes

I am not sure that I understand the reason to pass some datatypes between processes. Possibly there is a simpler approach.

Regarding Plotly Dash

I think it will be better to use Grafana here, if it fits you. Plotly Dash allows you to write dashboards in Python code. Used it for small-medium datasets, so have no experience if it scales well and fits your needs. Suggested it as an option (not as a strong recommendation), so you can try it.

My thoughts about possible architecture

Warning - this is not a call to action, but only a point of view.

As I understand, the task is to:

  • connect to the remote servers
  • periodically get some data from them
  • process a data and put it to the database
  • periodically check for outliers and send alerts

I think the system may consist of this parts:

  • “Connectors” - Each connector connects to a remote server. It gets a data, processes it and puts it into the database.
  • Periodic or background tasks - If you don’t need fast alerts, then possibly, you can periodically query the most recent from the database for outliers. This may be once-a-minute periodic job. If you do some heavy processing (it seems not in your case), then you may offload this to background jobs. General recommendation about background jobs - it is better to put the data into the database (possibly into other table) and launch specific job with record ID, instead of serialising the whole structure.
  • Alert system - Optional, if you need to report alerts as soon as possible.
  • GUI - You know the requirements better and can pick the one that fits your needs better. You can get the data directly from DB, instead of serialising objects between processes.

If you need a realtime updates, then you may create a message queue (possibly consider external one). If a connector sees the outlier, it publishes the id of the outlier into the “outliers” topic. Alert system reads this topic and takes does relevant actions.

So the algorithm like this:

  • Connector fetches the data.
  • Connector processes data and stores it in the database.
  • If the data is wrong, connector raises alert (possibly it can just put the data to another database table).
  • GUI takes the data from the database and displays it.

As I understand, you have a very similar solution:

  • You have asyncio connectors, that connect to the remote servers, fetch the data and put it into the database.
  • You can send commands to these connectors via asyncio.Queue.
  • Your future GUI shows the data from the database.

Again, you know your system, requirements and constraints better than me, so please, do not take any action unless it makes sense to you. My message is just an opinion, not an order or call to action. I can not read minds or debug systems/architecture without seeing a code.

I wish you to succeed with your system. Possibly this message was helpful for you.

2 Likes

I thought a little about actor system. It is very difficult to create generic one, like in Erlang. But you can use basic actor primitives.

I tried to implement a simple actor implementation on top of asyncio. Possibly it will be help you with some ideas. I do not urge you to integrate it into the codebase :slight_smile:

import asyncio
import random
import uuid


PORT = 8000
REMOTE_SERVERS = [
    ("127.0.0.1", PORT),
    ("localhost", PORT)
]
        

async def server_logic(reader, writer):
    while True:
        data = await reader.readline()
        message = data.decode()
        addr = writer.get_extra_info('peername')
        await asyncio.sleep(random.random())
        response = f"Message: {message.strip()} from {addr}\n"
        writer.write(response.encode())
        await writer.drain()


async def start_example_remote_server():
    server = await asyncio.start_server(server_logic, "127.0.0.1", PORT)
    async with server:
         await server.serve_forever()


class Actor():
    _queues = {}
    _links = {}
    
    @classmethod
    def send(cls, pid, message):
        queue = cls._queues.get(pid)
        if queue:
            queue.put_nowait(message)

    @classmethod
    async def start(cls, implementation, args):
        pid = uuid.uuid4()
        cls._links[pid] = set()
        cls._queues[pid] = asyncio.Queue()

        asyncio.create_task(cls._main_loop(pid, implementation, args))
        return pid

    @classmethod
    async def _main_loop(cls, pid, implementation, args):
        actor = implementation(pid)
        try:
            await actor.init(*args)

            while True:
                match await cls._queues[pid].get():
                    case ("linked_process_died", linked_pid) as message:
                        if not actor.exit_trapped():
                            raise RuntimeError(
                                f"Linked actor with id {linked_pid} died. "
                                "Terminating"
                            )

                        await actor.handle_info(message)
                        
                    case _ as message:
                        await actor.handle_info(message)

                cls._queues[pid].task_done()
      
        except Exception as err:
            for link in cls._links[pid]:
                cls.send(
                    link,
                    ("linked_process_died", pid)
                )
                
            await actor.terminate(err)
            raise
        finally:
            cls._unregister(pid)
        
    @classmethod
    def link(cls, pid1, pid2):
        cls._links[pid1].add(pid2)
        cls._links[pid2].add(pid1)

    @classmethod
    def _unregister(cls, pid):
        for link in cls._links[pid]:
            cls._links[link].remove(pid)
        del cls._queues[pid]


class AbstractActor():
    def __init__(self, pid):
        self._pid = pid
        self._trap_exit = False

    async def init(self, *args):
        raise NotImplementedError()

    async def handle_info(self, message):
        raise NotImplementedError()

    async def terminate(self, error):
        pass
    
    async def start_link(self, implementation, args):
        child_pid = await Actor.start(implementation, args)
        Actor.link(self.get_pid(), child_pid)
        return child_pid
    
    def get_pid(self):
        return self._pid

    def trap_exit(self, value):
        self._trap_exit = value

    def exit_trapped(self):
        return self._trap_exit


class ClientReader(AbstractActor):
    async def init(self, reader, parent_pid):
        self.reader = reader
        
        while True:
            data = await reader.readline()
            Actor.send(
                parent_pid, ("received", data.decode())
            )
            if random.random() < 0.05:
                raise RuntimeError("some error")
            

class ClientConnector(AbstractActor):
    async def init(self, server, parent_pid):
        self.parent_pid = parent_pid
        self.server = server
        self.reader = None
        self.writer = None
        host, port = server
        reader, writer = await asyncio.open_connection(host, port)
        self.reader = reader
        self.writer = writer

        await self.start_link(ClientReader, [reader, self.get_pid()])

    async def handle_info(self, message):
        match message:
            case ("received", data):
                print(data)
                Actor.send(
                    self.parent_pid, ("processed_item", self.get_pid())
                )

            case ("send", data):
                self.writer.write(f"{message}\n".encode())
                await self.writer.drain()

    async def terminate(self, error):
        print(f"Terminating connection to {self.server}, because of {error}")
        if self.writer:
            self.writer.close()


class LocalServer(AbstractActor):
    async def init(self, remote_servers):
        self.trap_exit(True)
        self.remote_servers = remote_servers
        self.connectors = {}
        
        for server in remote_servers:
            await self.start_connector(server)
    
    async def handle_info(self, message):
        match message:
            case ("processed_item", connector_pid):
                await asyncio.sleep(random.random())
                self.send(connector_pid, f"message {random.random()}")

            case ("linked_process_died", connector_pid):
                server = self.connectors[connector_pid]
                print(f"connector to {server} died, restarting")
                del self.connectors[connector_pid]
                await self.start_connector(server)

    async def terminate(self, error):
        print(f"LocalServer stopped with error {error}")
        
    async def start_connector(self, server):
        connector_pid = await self.start_link(
            ClientConnector, [server, self.get_pid()]
        )
            
        self.connectors[connector_pid] = server
        
        self.send(connector_pid, f"Hello to {server}")

    def send(self, connector_pid, message):
        Actor.send(
            connector_pid, ("send", message)
        )

async def start_local_server():
    await Actor.start(LocalServer, [REMOTE_SERVERS])


async def main():
    await asyncio.gather(
        start_example_remote_server(),
        start_local_server()
    )

asyncio.run(main())

By the way, if you will need to split the load between multiple processes, you can do it with something similar to the approach a mix test uses to partition tests. Basically, you can split a list of remote servers according to their index. This may help to utilize the number of CPU cores. I would recommend to have only a single process locally - it will help to debug issues easier.

Hi Roman. Thanks again for your messages and advice. I’m just now getting back to your latest message, and it’s very interesting because it looks like we independently came up with some very similar things, which I suppose is not a surprise given the familiarity of messaging systems that Elixir/Erlang brings. Here’s a prototype I recently built that creates the concept of a worker (I avoided the term actor because of some non-technical reasons, although I like the term actor) which contain inboxes that wrap asyncio.Queues for messaging. Here’s the Python discussion forum post where I describe the prototype in more detail: Request for review of PySide6 (Qt for Python) and asyncio task prototype - Async-SIG - Discussions on Python.org

1 Like

Hi,
The Elixir/Erlang systems are very nice - you can apply their ideas in other languages too.
I have recently realised, that the author of Python gunicorn web server is very closely related to Erlang ecosystem and has popular libraries here too. Gunicorn uses the analog of supervisor to increase a number of workers and process more requests. Supervisor watches the workers and restarts them once they crashed, or after a certain number of requests (--max_requests setting). This allows to reduce memory leaks of long-running Python processes.

By the way, you can consider using asyncio.PriorityQueue instead of asyncio.Queue. This can help you to add “signals” to your workers. Signal is a more urgent message. If you don’t have priorities, the urgent message will wait until all previous messages are processed. With priority, it will wait only the completion of processing of your current message.

I have looked into your example - it is pretty nice!
Regarding the scaling:

  • Side note. I think you will need to adjust a linux file limits (ulimit -n), if you want to open thousands of connections from the single process. Otherwise you will get something like Too many files open error.
  • It is better to create a benchmark and test by yourself if it scales well. You can create a simple socket server in Elixir. It can use Mix.install (examples) together with ThousandIsland. You can deploy this server to other PC and open as many connections as you wish (or as your system allows). This will be the best proof if your approach scales well and will allow to find issues with the design.
  • You will may want to split workers into multiple processes (for example according to the number of CPU cores). You can have a main “supervisor” that will launch workers and distribute clients between them. If some worker is stuck - it will affect only it. “Supervisor” may exchange heartbeat messages with workers and restart/notify/send event to Sentry about unresponsive workers. Possibly each worker will also have separate ulimit -n limits (but it is better to verify it during a test). Each worker possibly will need to speak only to the “supervisor” and not to other workers. You can partition data between workers as mix test (a sole link in my the previous message).
  • Theoretically this approach should scale well, unless you do too much sync computations in a coroutine. It is better to simulate it with a real-world like data. It is better to run simulation for some time (possibly for an hour) and measure the metrics of the system. You will be able to see CPU utilisation in a real-time with htop or similar application. You can periodically measure amounts of RAM (as well as other useful metrics) and print them into stdout in a CSV format (just separate them with commas or other delimiters). You can store the results in a file with simulation.py > outputs.csv or simulation.py | tee output.csv (if you want to see the logs in the shell too). You can load the results in Excel, and create charts from it. By the way, you can print different data to stderr and stdout. For example, you can use stderr for debugging information, and stdout for metrics, or vice versa.
1 Like

To potentially transition this away from Python and into more Elixir-land, as I think Roman and I have come up with a pretty good “core to Python” way of doing actor/process stuff with Python’s asyncio that’s ran its course, I do have a question at the end regarding Thousand Island vs Ranch for building TCP/IP servers.


Yea, I considered that, but I generally like to avoid priority queues. However, the one use case that I can consider is using higher priority messages for core/framework messages like “stop”, leaving normal messages available to other workers as normal priority where they can’t set priority of the messages.

And thank you for taking a look at the example. It will keep getting refined, but I am actually liking it. It feels quite process/actor-like, even though everything is in a single-thread, which much be remembered. Because like you said, running a long-running synchronous message will halt the entire thing.

Python’s threading, asyncio, and multiprocessing modules plus the litany of third parties libraries are the perfect example of just how complex Python is. Because in Elixir/Erlang, all of these things are just processes, and there’s no need to worry about threads, schedulers, etc.

I’m still trying to avoid multiple processes, although asyncio does have the ability to use asyncio.run_in_executor, using a ProcesPoolExecutor. The reason is to mainly avoid the weird multiprocessing queues and pickling (serialization/deserialization) behavior that could occur for more custom data types. This is in particular since I adopt a rather thorough type-driven or domain-driven programming style.


For an Elixir-specific question, do you or others have some thoughts on why one should use Thousand Island over Ranch? I have indeed spun up little test servers in Elixir for testing clients in Python, but I was using Ranch. It took a bit to convert the user guide examples in Ranch to something I understood in Elixir. Is Thousand Island as heavily used as Ranch? Is it supposed to be easier? The Ranch documentation is a bit terse.

1 Like

Regarding multiprocessing.

Elixir/Erlang can utilize all CPU cores and resources from a server. Standard Python is single-threaded. This means, that even if you have a beefy server with lots of CPU cores, it will use only one of them by default. I do not try to push you to use multiprocessing or other things. Just looking at it from resource utilization perspective. It is fine, if your server runs other applications except Python server - they will use some CPU.

I remember I load-tested a poorly performing Node.js application. We bought larger server for it, but it did not fixed the issue. It turned out, that the application used only a single core, and was limited by RAM. We tweaked it and the application started to use more resources.

My goal is not to force you to use multiprocessing, but to point to possible limitations. I think there are ways to use multiple processes (to use more resources and scale better) and not to use queues from multiprocessing at all. I would think if it is possible to partition all clients between multiple OS processes (not Python ones) without sharing any memory between them. I like thinking about the possible ways to scale application and creating architecture with this in mind (even if I will not use this option in future).

It is up to you whether to do something about it. Plan not to continue digging into this topic.

Regarding Thousand Islands vs Ranch

Both libraries heavily use standard :gen_tcp or :ssl modules from BEAM. Thousand Islands is heavily inspired by Ranch and use similar ideas. It is rewritten in Elixir and has less code (easier to understand).

I believe you can do the benchmark using any of these libraries, and it should work fine. But it may be slightly easier to configure and reason about the code from Thousand Island. This library is newer, so it is not so I believe it is not so heavily used as Ranch right now. More recent Phoenix versions migrated to it by default.

I had to tweak :gen_smtp once, and it relies on Ranch. It was quite difficult to dig into the Ranch codebase and understand what is going on.

If you already have Ranch-backed test service, then it is better to continue using it. If you want to create a small throwaway script to load-test your implementation - why not to try Thousand Island?

Personally I don’t have strong preference from one library over another. I believe both ones have edge cases.

1 Like