What Elixir related stuff are you doing?

I’ll explain the general processing workflow I have in place. I apologize if this is too long/verbose. My goal with this project wasn’t so much a cost-cutting move or to increase speed - but more for predictable performance and stability. So keep that in mind while reading. Maybe it will help someone think through their problem in a different way - but not suggesting my approach is perfect at all.

TLDR - try to break your work down into processes and leverage what the BEAM excels at.

Since this was my second big project, I’d say I still struggled with using processes effectively early on and fully embracing the benefits of “let it crash”.

The source of my processing jobs are a redis list with hundreds of thousands of ‘jobs’ to pop off throughout the day. For this particular case, that redis queue runs about 150k in a 24 hour period. So, just think of everything I mention below as a job being generated from the LPOP against that list in redis. (the redis portion has been in place for years - its not something introduced with the elixir effort)

I started out pulling jobs into the system and generating structs/maps that I would pass around through various steps or stages (crawling, parsing, validation, post-processing, etc). My struct had a state/status attribute that was an atom like :crawling or :parsing, which I matched on and then I would change that value as the job progressed through the system. I used a lot of GenStage for portions of this. For other things, where I had to restrict concurrency, I would use Conqueuer - which is just a worker abstraction on top of Poolboy. So, lets say I want to have 50 things in the pipeline, I may not want all 50 to be doing some post-processing step that involved uploading data to S3 or something. I’d tighten that step up with a limited number of workers and let it queue right there.

The issue came down to how I was passing the job data around in the form of a map/struct - my code got really rough around how to handle scenarios where things did not go as planned. I had to be real careful to not bring down certain internal processes that would then dump job data in progress. I had supervision trees in place and so-on, but I found myself managing internal queues all over the place for this struct data. Things just became overly complex after the project grew.

So about a month ago I stepped back and realized that by creating a GenServer process for each job (under supervision) it would made things much simpler. No more passing data structures around to different processes and all of that.

I now use the DynamicSupervisor in GenStage for handling the supervision of these processes and also to limit the number of job processes running concurrently. I feed that DynamicSupervisor with a GenStage producer stage that pulls data from Redis as needed using the demand settings available to provide the back-pressure mechanism GenStage provides. GenStage keeps me from pulling more data from redis than I can handle. Rather than passing around struct for the job data attributes - that data is now just the internal process state. I just handle updates and progression through standard GenServer calls. Also note that I spawn the child job processes off in the DynamicSupervisor using a restart strategy of :transient. This will restart a job process automatically if it exits/crashes with something other than a normal exit. By default the supervisor will just restart the job with the data it got originally from redis, so any steps I have completed before crashing will not be persisted. I could have done that with ETS, but decided that I’d rather just start it over rather than risk restoring a bad state from ETS upon crashing. Note that I do have a simple global agent process that keeps track of process restart counts, though. If something crashes more than three times - I log it and throw it away. I do not want an inherently bad job eating up a spot in my DynamicSupervisor if it will fail repeatedly.

To keep hundreds of these independent processes progressing ahead smoothly and to give me a way to globally control how fast I want states to progress forward - I implemented a real basic ‘clock’ process, similar to how some video games and simulation systems are designed. When each job process is spawned they subscribe to the clock messages (which are sent as a tuple with the current timestamp/epoch like {:tick, 1480100323) using a simple pubsub mechanism. So this allows everything in my job process to pattern match on a combination of the :tick and the current state it is in. I also did this because certain steps in my processing require the job to wait a few seconds - so this tick allows me to have a process chill on certain stages for like 5 seconds by scheduling it to wait until a certain timestamp threshold is met. Its also kinda a handy approach because I can adjust the ‘clock rate’ at runtime and slow things down or even pause it to make debugging a live issue simpler. The thing to note on this clock idea is that be sure your job processes can process the tick messages at least as fast as you are sending them so you do not fill message queues up. I’m pushing my ticks out at about 1.5 second intervals right now, for example.

Again, I’m not suggesting anyone get onboard with this clock idea - but it works for me.

As far as parsing goes, I’m using Floki like you are.

10 Likes