Nathan Grigg

Basic unobtrusive multithreading in Python

I have a Python script that downloads OFX files from each of my banks and credit cards. For a long time, I have been intending to make the HTTP requests multithreaded, since it is terribly inefficient to wait for one response to arrive before sending the next request.

Here is the single-threaded code block I was working with.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
def ReadOfx(accounts):
    downloaded = []
    for account in accounts:
        try:
            account.AddOfx(read_ofx.Download(account))
        except urllib.error.HTTPError as err:
            print("Unable to download {}: {}".format(account, err))
        else:
            downloaded.append(account)

    return downloaded

Using the Python 2.7 standard library, I would probably use either the threading module or multiprocessing.pool.ThreadPool. In both cases, you can call a function in a separate thread but you cannot access the return value. In my code, I would need to alter Download to take a second parameter and store the output there. If the second parameter is shared across multiple threads, I have to worry about thread safety. Doable, but ugly.

In Python 3.2 an higher, the concurrent.futures module makes this much easier. (It is also backported to Python 2.) Each time you submit a function to be run on a separate thread, you get a Future object. When you ask for the result, the main thread blocks until your thread is complete. But the main benefit is that I don’t have to make any changes to Download.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
# Among other imports, we have `from concurrent import futures`.
def ReadOfx(accounts):
    with futures.ThreadPoolExecutor(max_workers=10) as ex:
        ofx_futures = [(account, ex.submit(read_ofx.Download, account))]
        print("Started {} downloads".format(len(ofx_futures)))

    downloaded = []
    for account, future in ofx_futures:
        try:
            account.AddOfx(future.result())
        except urllib.error.HTTPError as err:
            print("Unable to download {}: {}".format(account, err))
        else:
            downloaded.append(account)

    return downloaded

In a typical run, my 6 accounts take 3, 4, 5, 6, 8, and 10 seconds to download. Using a single thread, this is more than 30 seconds. Using multiple threads, we just have to wait 10 seconds for all responses to arrive.