GenStage and twitter stream

Hi guys,

I’m playing a bit with GenStage, after reading the blog post on elixir with a full case description, and watching the youtube Jose’s speech.
I’m playing with it using twitter streams. The producer reads the twitter streams, receiving all the tweets linked to the hashtag #rio2016, and pass them to the consumers.

in the gist there are app, producer, consumer and log output

I’ve started 3 consumers. Once a consumer receives a bunch of tweets from the producer, I put it to sleep for 10s to simulate the processing. The problem is that the other 2 consumers don’t receive anything…

if I change the stream to a local big file, all the 3 consumers receive events…

Does anyone know why ?

Thanks

Alvise

3 Likes

You haven’t overridden the default demand quantities. GenStage operates in batches, and the min / max batch sizes are 500 / 1000 by default, so unless you have more than that it isn’t gonna use more than one worker.

This is a good example of https://github.com/elixir-lang/gen_stage/issues/72

1 Like

Thanks for your answer @benwilson512

Actually in the consumer the init is this

{:consumer, tweets, subscribe_to: [{Twitter.Producer, max_demand: 20}]}

so the max_demand is 20. Also changing it to 1, the result stays the same: only one consumer receives the tweets, goes to sleep for 10s and once awake gets another tweet from the stream…

  [#PID<0.177.0>,
 "RT @review5x1: https://t.co/Fahn0npWhA #SummerSlam #CeremoniaDeClausura #ClosingCeremony #Tokio202 #FearTWD #Nexus6P #Nougat #Uefa2016 #ukz…"]
  [#PID<0.177.0>, "sleeping for 10s"]
    [#PID<0.177.0>, "awake"]
    [#PID<0.177.0>,
     "RT @OlympiaMschaft: Wir machen uns fertig für den Ausstieg. #WirfuerD #rio2016 https://t.co/iOZkuXjdZV"]
    [#PID<0.177.0>, "sleeping for 10s"]
    [#PID<0.177.0>, "awake"]
    [#PID<0.177.0>,
     "RT @BorisJohnson: Congratulations #Rio2016 for a fantastic Olympics &amp; to all @TeamGB for inspiring us again - London’s legacy lives on http…"]
    [#PID<0.177.0>, "sleeping for 10s"]

Can you please include a github repository with the code? I want to clone it and try to reproduce the error but I want to be sure to be using the same dependencies as you.

thanks @josevalim for the help.

Here is the github repo: https://github.com/alvises/elixir-twitter-genstage

I had hardcoded the twitter tokens so I had to invalidate them. You need to put your tokens in config/config.ex to be able to get the tweets from the twitter streams api

It is a bug in ExTwitter. The stream is discarding messages from the process inbox, which is a big no-no. This means that the subscriptions for the second and third stages are never consumed:

ExTwitter should use a reference and make sure it only consumes messages it knows about. This must be reported as a bug.

Ideally ExTwitter would provide a GenStage producer, to avoid hijacking the process inbox, which won’t work with a GenStage since both GenStage and ExTwitter are trying to use the same process inbox. You can check the issues tracker so see if someone already requested this feature.

4 Likes

thank you!

I wrote a Twitter library from scratch (at that time, it was my first project to get startet with Elixir). It uses HTTPoison (wrapper for :hackney).

A few weeks ago, I’ve updated the streaming part of the lib to use GenStage.

You can have a look at the implementation here: https://github.com/almightycouch/twittex/blob/master/lib/twittex/client/stream.ex

Basically, the Twittex.Client.Stream receives incoming HTTP chunks from :hackney applying back-pressure at the TCP level with :hackney.stream_next/1.

The next thing I want to try out is using Flow and Flow.Window to implement features such as peak detection.

7 Likes

Thanks Mario!

Can you please tell me what do you mean with “applying back-pressure” ?

When my GenStage consumer(s) are too slow to handle the amount of incoming data (Tweets) the producer will stop ack the HTTP/TCP connection packages. The Twitter streaming endpoint will stop sending more data through the connection.

This is what whe want and follows the same pattern GenStage and Flow follow to consume data.

Without back-pressure, my producer would have to buffer the incoming Tweets or it’s internal message box would grow indefinitly. In both cases this would result in taking more and more memory, crashing the VM in the long term.

8 Likes

Thanks for the explanation :smiley:

Very well done! Thank you for sharing, I’ll use it as an example of usage of the HTTPoison streaming feature.

Hi mario, is it possible with twittex to stream a user’s timeline? With ExTwitter you can stream a #hashtag, but you cannot stream a timeline.

The 2nd parameter, options, given to stream/2 allow you pass extra parameters to the Twitter API. For example, you can use follow in order to follow tweets from a specifier user:

stream = Twittex.stream! :filter, follow: 4708084272

Note that follow takes a user id and not a user handle.

1 Like

Hi Mario, am I using Twittex right? I’m getting

iex(1)> Twittex.search "#myelixirstatus"
{:error, %HTTPoison.Error{id: nil, reason: "Could not authenticate you."}}

Here’s what I did,
for config/config.exs:

config :twittex,
  consumer_key: "xxxxxxxx", 
  consumer_secret: "xxxxxxxx",
  token: "xxxxxxxx",
  token_secret: "xxxxxxxx"

I’ve confirmed the slugs are valid.
for mix.exs:

  def application do
    [ [applications: [:twittex]],
      extra_applications: [:logger]
    ]
  end

and:

  defp deps do
    [
      {:twittex, "~> 0.3.4"}
    ]
  end

Any ideas?

1 Like

Yes, this is how you should use the library.

I think you should check your consumer keys and tokens. It seems that they are not valid: https://apps.twitter.com

1 Like

Hey Mario, I don’t mean to be a pest but I went to https://apps.twitter.com and generated new keys and tokens(with a different account also). It seems that after updating config.exs, mix clean, mix deps.get and mix compile then opening iex with iex -S mix and running:

iex(1)> Twittex.home_timeline

I’m still getting

{:error, %HTTPoison.Error{id: nil, reason: "Could not authenticate you."}}

Not sure what to do at this point.

1 Like

Hi @maz, no problem at all, you helped me found an annoying bug here :grinning::

@api_key Application.get_env(:twittex, :consumer_key)
@api_secret Application.get_env(:twittex, :consumer_secret)

Basically, @consumer_key and @consumer_secret where defined at compile time in the Twittex.Client.Base module. Updating those configuration keys did not force the module to recompile which I think is why you encountered you authentication problems (the first time you compiled :twittex, those keys where compiled into the application even when you updated your config).

You could try to run mix deps.clean twittex and recompile your application (or run iex -S mix directly), this should fix the problem. Otherwise, I just published version 0.3.5 which fixes the issue and loads :consumer_key and :consumer_secret at runtime.

3 Likes

I’m authenticating now, thanks. Glad I could help squash a bug!

1 Like