Using asyncio to download hackernews (at 1300 requests/sec)!

tl;dr super-fast and easy hn client, based on python 3.6+

As this is my first blog post, I’ll focus on downloading a set of Hackernews posts and user data to use as the data set for the machine learning exercises we’ll look at in future posts. 

We’ll start by exploring the different options to create a local copy of  HN’s data (or of any other site), and will continue to improve and optimize our solution.
All of the benchmarking was done on my laptop –
Macbook Pro 16 RAM , i7, 100mb/sec download speed 

Setup: 

#You need python 3.6+
#To start a python virtualenv
python3 -m venv ./venv
# Great way of working with python from shell
pip install jupyter 
pip install haxor
pip install aiohttp

 

Official API

We’ll start by looking at the official hacker-news API.
While it appears quite neat and simple (and supports Firebase out of the box),
there isn’t a publicly available Python SDK.

Semi-official (Most popular)

After a quick Google search, I learned about hoxer, which appeared to be a simple and effective tool.
After playing with it a bit I wrote the following script:
from hackernews import HackerNews
hn = HackerNews()
def get_last_n_stories(n=10):
    max_item = hn.get_max_item()
    return [hn.get_item(max_item-i) for i in range(n)]
last_n_stories = get_last_n_stories()

using the time command to measure the performance:

time python downloadHackerNews/sync_hn_python_download.py
python downloadHackerNews/sync_hn_python_download.py  0.32s user 0.04s system 3% cpu 10.630 total

As hn.get_max_item() was around 13,000,000 when I checked, it would have taken me (130,00,000/10)*10.6 =13780000 seconds (or 3800 hours) to download all the posts (not including users data).

Using asyncio – first try (Naive)

At this stage, I decided to try using Python 3’s asyncio and aiohttp in lieu of attempting to improve the haxor code (using requests + sessions + threads).
I based my initial work on this great post: Making 1 million requests with python-aiohttp .
I won’t delve into how asyncio works here, so I definitely recommend that post as a starting point.

import asyncio

from aiohttp import ClientSession

GET_POST_URL = "https://hacker-news.firebaseio.com/v0/item/{}.json"
GET_MAX_ITEM_URL = 'https://hacker-news.firebaseio.com/v0/maxitem.json'


# A simple async fetch function
async def fetch(url, session):
    # (a)waiting for for the server response to come back (during that time the event loop is free)
    async with session.get(url) as response:
        # reading the response and parsing in is also async operation
        return await response.json()


# The main function to download, get the number of posts to download as n
async def run(n):
    # fetch all responses within one Client session,
    # keep connection alive for all requests.
    async with ClientSession() as session:
        async with session.get(GET_MAX_ITEM_URL) as resp:
            max_item = await resp.json()
        # init the future, each future is a url-request
        tasks = [asyncio.ensure_future(fetch(GET_POST_URL.format(max_item - i)
                                             , session)) for i in range(n)]
        # wait for all responses to come back
        return await asyncio.gather(*tasks)


def get_last_n_stories(n=10):
    loop = asyncio.get_event_loop()
    future = asyncio.ensure_future(run(n))
    return loop.run_until_complete(future)


responses = get_last_n_stories()

sample response:

{'by': 'adrienne',
 'id': 13430261,
 'parent': 13425436,
 'text': 'Except she has <i>very clearly stated her preference</i> in this regard, and that is trivial to discover. Why are you people so afraid to Google things?',
 'time': 1484773488,
 'type': 'comment'}

And the performance:

python downloadHackerNews/async_hn_python_download.py &nbsp;0.20s user 0.02s system 12% cpu 1.718 total

Right off the bat, we got a speed boost. But can we improve it further?
The answer is Sure!

asyncio – second try (Consumer/Producer)

But first, let’s try to understand the limitations with the naive solution.

  1. The number of open coroutines sclaes linearly with the number of requests (if we are fetching 100,000 posts we’ll generate 100,000 futures) – not scalable.
  2. The number of open TCP onnections is 20 by default – too low.
  3. When we reach high rates (1000 reqs/sec) we sometimes fail to process the response due to  server slowdowns – we need some retry mechanism.

In order to solve all of these problems we’ll use a different approach: producer -> queue -> consumers.

The design is simple, as we have n consumers which read from the queue, and 1 producer which fills the queue. This way we have only n open coroutines no matter how many requests we have.

We will alsoe dead-letter-queue, which will store a failed requests.

Let’s start with the consumer code:

async def consumer(main_queue, dlq, session, responses):
    while True:
        try:
            # Fetch the url from the queue, blocking until the queue has item
            url = await main_queue.get()
            # Try to get a response from the sever
            async with session.get(url, timeout=10) as response:
                # Check we got a valid response
                response.raise_for_status()
                # append it to the responses lists
                responses.append(await response.json())
                # telling the queue we finished processing the massage
                main_queue.task_done()
        # In case of a time in our get request/ problem with response code
        except (HttpProcessingError, asyncio.TimeoutError) as e:
            logger.debug("Problem with %s, Moving to DLQ" % url)
            # we put the url in the dlq, so other consumers wil handle it
            await dlq.put (url)
            # lower the pace
            asyncio.sleep(5)
            # telling the queue we finished processing the massage
            main_queue.task_done()

The consumer reads a URL from the main queue -> performs an HTTP request -> inserts the response into the responses array.
In case a consumer encounters a problem it writes the URL to the DLQ, sleeps, and reads the next URL from the queue.

async def produce(queue, itr_items, base_url):
    for item in itr_items:
        # if the queue is full(reached maxsize) this line will be blocked
        # until a consumer will finish processing a url
        await queue.put(base_url.format(item))

The producer goes over the iterable and put his items in the queue. in case the queue is full it is blocked until the consumer read messages from the queue.

async def download_last_n_posts(session, n, consumer_num):
    # We init the main_queue with a fixed size, and the dlq with unlimited size
    main_queue, dlq, responses = asyncio.Queue(maxsize=2000), asyncio.Queue(), []
    # we init the consumers, as the queues are empty at first,
    # they will be blocked on the main_queue.get()
    consumers = [asyncio.ensure_future(
        consumer(main_queue, dlq, session, responses))
                 for _ in range(consumer_num)]
    # init the dlq consumers, same as the base consumers,
    # Only main_queue is the dlq.
    dlq_consumers = [asyncio.ensure_future(
        consumer(main_queue=dlq, dlq=dlq, session=session, responses=responses))
                     for _ in range(10)]
    # get the max item from the API
    async with session.get(URL_MAX_ITEM) as resp:
        max_item = await resp.json()
    producer = await produce(queue=main_queue, itr_items=
            range(max_item, max_item - n, -1), base_url=URL_GET_POST)
    # wait for all item's inside the main_queue to get task_done
    await main_queue.join()
    # wait for all item's inside the dlq to get task_done
    await dlq.join()
    # cancel all coroutines
    for consumer_future in consumers + dlq_consumers:
        consumer_future.cancel()
    return responses

 

  • In lines 2-14 we initialize the queues and the consumer
  • 15-16 we fetch the last item from HN
  • 17-18 we initialize the producer – the producer will finish only after all the URLs are inserted  into the queue.
  • 20 we wait for the task_done call for all the URLs (this means we had at least one attempt per  each URL)
  • 22 we wait for the task_done call for all URLs that entered the DLQ(a message can be processed more than once)
  • 24-25 remove all the consumers (no need of them as we already got all posts)

 Full code:

import asyncio
import json
import logging

import aiohttp
from aiohttp import HttpProcessingError, ClientSession

# setting up logger
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
console = logging.StreamHandler()
logger.addHandler(console)
MAX_CONNECTION = 1000
URL_GET_POST = "https://hacker-news.firebaseio.com/v0/item/{}.json"
URL_MAX_ITEM = 'https://hacker-news.firebaseio.com/v0/maxitem.json'


# asyncio http consumer
async def consumer(main_queue, dlq, session, responses):
    while True:
        try:
            # Fetch the url from the queue, blocking until the queue has item
            url = await main_queue.get()
            # Try to get a response from the sever
            async with session.get(url, timeout=10) as response:
                # Check we got a valid response
                response.raise_for_status()
                # append it to the responses lists
                responses.append(await response.json())
                # telling the queue we finished processing the massage
                main_queue.task_done()
        # In case of a time in our get request/ problem with response code
        except (HttpProcessingError, asyncio.TimeoutError) as e:
            logger.debug("Problem with %s, Moving to DLQ. main_queue: (%s), dlq: (%s)" %
                         (url, str(main_queue.qsize()), str(dlq.qsize())))
            # we put the url in the dlq, so other consumers wil handle it
            await dlq.put(url)
            # lower the pace
            asyncio.sleep(5)
            # telling the queue we finished processing the massage
            main_queue.task_done()


async def produce(queue, itr_items, base_url):
    for item in itr_items:
        # if the queue is full(reached maxsize) this line will be blocked
        # until a consumer will finish processing a url
        await queue.put(base_url.format(item))


async def download_last_n_posts(session, n, consumer_num):
    # We init the main_queue with a fixed size, and the dlq with unlimited size
    main_queue, dlq, responses = asyncio.Queue(maxsize=2000), asyncio.Queue(), []
    # we init the consumers, as the queues are empty at first,
    # they will be blocked on the main_queue.get()
    consumers = [asyncio.ensure_future(
        consumer(main_queue, dlq, session, responses))
                 for _ in range(consumer_num)]
    # init the dlq consumers, same as the base consumers,
    # Only main_queue is the dlq.
    dlq_consumers = [asyncio.ensure_future(
        consumer(main_queue=dlq, dlq=dlq, session=session, responses=responses))
                     for _ in range(10)]
    # get the max item from the API
    async with session.get(URL_MAX_ITEM) as resp:
        max_item = await resp.json()
    producer = await produce(queue=main_queue, itr_items=
    range(max_item, max_item - n, -1), base_url=URL_GET_POST)
    # wait for all item's inside the main_queue to get task_done
    await main_queue.join()
    # wait for all item's inside the dlq to get task_done
    await dlq.join()
    # cancel all coroutines
    for consumer_future in consumers + dlq_consumers:
        consumer_future.cancel()
    return responses


async def run(loop, n):
    conn_num = min(MAX_CONNECTION, n)
    # we init more connectors to get better performance
    with ClientSession(loop=loop, connector=aiohttp.TCPConnector(limit=conn_num)) as session:
        posts = await download_last_n_posts(session, n, conn_num)
        with open("hn_posts.json") as f:
            json.dump(posts)


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(run(loop, n=1000000))

 

Using this code we can  reach 1,000-2,000 requests per second (single thread), and a total run time of 13 minutes, 11 seconds! Not bad for a single-threaded client : )

python downloadHackerNews/download_hn_for_blog.py  624.53s user 23.78s system 81% cpu 13:11.37 total

Summary

I pushed the full wrapper for the Hackernews API we saw here to Github: asyncio-hn.
Feel free to ask any questions (and of course, any improvements will be most appreciated)
And lastly –  stay tuned for using (basic) sanic  and the hackernews wrapper for some neat python3 combination ?


Posted

in

,

by

Comments

2 responses to “Using asyncio to download hackernews (at 1300 requests/sec)!”

  1. Ocl@y Avatar
    Ocl@y

    Please take a look to uvloop 2-3x faster than asyncio if you want speed

    1. itiel Avatar

      Hi,
      i tested uvloop before writing this post and must say it didn’t seem to make much of a different:
      I add the following line to the example:

      
      import uvloop
      asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
      

      And for a total of 100K requests the diff was 2 seconds:

      
      python download_hn_for_blog.py  61.79s user 2.71s system 91% cpu 1:10.60 total
      python download_hn_for_blog_uvloop.py  56.23s user 1.53s system 84% cpu 1:08.12 total
      

Leave a Reply

Your email address will not be published. Required fields are marked *