I’m playing a bit with GenStage, after reading the blog post on elixir with a full case description, and watching the youtube Jose’s speech.
I’m playing with it using twitter streams. The producer reads the twitter streams, receiving all the tweets linked to the hashtag #rio2016, and pass them to the consumers.
in the gist there are app, producer, consumer and log output
I’ve started 3 consumers. Once a consumer receives a bunch of tweets from the producer, I put it to sleep for 10s to simulate the processing. The problem is that the other 2 consumers don’t receive anything…
if I change the stream to a local big file, all the 3 consumers receive events…
You haven’t overridden the default demand quantities. GenStage operates in batches, and the min / max batch sizes are 500 / 1000 by default, so unless you have more than that it isn’t gonna use more than one worker.
so the max_demand is 20. Also changing it to 1, the result stays the same: only one consumer receives the tweets, goes to sleep for 10s and once awake gets another tweet from the stream…
[#PID<0.177.0>,
"RT @review5x1: https://t.co/Fahn0npWhA #SummerSlam #CeremoniaDeClausura #ClosingCeremony #Tokio202 #FearTWD #Nexus6P #Nougat #Uefa2016 #ukz…"]
[#PID<0.177.0>, "sleeping for 10s"]
[#PID<0.177.0>, "awake"]
[#PID<0.177.0>,
"RT @OlympiaMschaft: Wir machen uns fertig für den Ausstieg. #WirfuerD #rio2016 https://t.co/iOZkuXjdZV"]
[#PID<0.177.0>, "sleeping for 10s"]
[#PID<0.177.0>, "awake"]
[#PID<0.177.0>,
"RT @BorisJohnson: Congratulations #Rio2016 for a fantastic Olympics & to all @TeamGB for inspiring us again - London’s legacy lives on http…"]
[#PID<0.177.0>, "sleeping for 10s"]
Can you please include a github repository with the code? I want to clone it and try to reproduce the error but I want to be sure to be using the same dependencies as you.
I had hardcoded the twitter tokens so I had to invalidate them. You need to put your tokens in config/config.ex to be able to get the tweets from the twitter streams api
It is a bug in ExTwitter. The stream is discarding messages from the process inbox, which is a big no-no. This means that the subscriptions for the second and third stages are never consumed:
ExTwitter should use a reference and make sure it only consumes messages it knows about. This must be reported as a bug.
Ideally ExTwitter would provide a GenStage producer, to avoid hijacking the process inbox, which won’t work with a GenStage since both GenStage and ExTwitter are trying to use the same process inbox. You can check the issues tracker so see if someone already requested this feature.
When my GenStage consumer(s) are too slow to handle the amount of incoming data (Tweets) the producer will stop ack the HTTP/TCP connection packages. The Twitter streaming endpoint will stop sending more data through the connection.
This is what whe want and follows the same pattern GenStage and Flow follow to consume data.
Without back-pressure, my producer would have to buffer the incoming Tweets or it’s internal message box would grow indefinitly. In both cases this would result in taking more and more memory, crashing the VM in the long term.
The 2nd parameter, options, given to stream/2 allow you pass extra parameters to the Twitter API. For example, you can use follow in order to follow tweets from a specifier user:
Hey Mario, I don’t mean to be a pest but I went to https://apps.twitter.com and generated new keys and tokens(with a different account also). It seems that after updating config.exs, mix clean, mix deps.get and mix compile then opening iex with iex -S mix and running:
iex(1)> Twittex.home_timeline
I’m still getting
{:error, %HTTPoison.Error{id: nil, reason: "Could not authenticate you."}}
Basically, @consumer_key and @consumer_secret where defined at compile time in the Twittex.Client.Base module. Updating those configuration keys did not force the module to recompile which I think is why you encountered you authentication problems (the first time you compiled :twittex, those keys where compiled into the application even when you updated your config).
You could try to run mix deps.clean twittex and recompile your application (or run iex -S mix directly), this should fix the problem. Otherwise, I just published version 0.3.5 which fixes the issue and loads :consumer_key and :consumer_secret at runtime.