Single-threaded vs Multi-threading vs Multi-processing in Python

December 15, 2020

Single-threaded vs Multi-threading vs Multi-processing in Python

We will try to run a few simulated processes to understand the performance difference between Single-threaded, Multi-threading and Multi-processing in Python.

We will learn about GIL - Alternative Python interpreters - by counting to 255 million and downloading few webpages

import concurrent.futures
import requests
import threading
import time
import math
import random
from multiprocessing import Pool

# Reference and Credits
# https://realpython.com/python-concurrency

thread_local = threading.local()

def get_session():
    if not hasattr(thread_local, "session"):
        thread_local.session = requests.Session()
    return thread_local.session


def download_site(url):
    """Function to simulate a high IO operation"""
    start_time = time.time()
    session = get_session()
    log = None
    with session.get(url) as response:
        log = f"Read {len(response.content)} from {url}"
    duration = time.time() - start_time
    return {
        'work_start_time': start_time,
        'work_duration': duration,
        'work_output': log,
        'work_type': 'IO',

    }

def countdown(n):
    """Function to simulate a CPU-bound operation"""
    start_time = time.time()
    while n>0:
        n -= 1
    duration = time.time() - start_time
    return {
        'work_start_time': start_time,
        'work_duration': duration,
        'work_output': n,
        'work_type': 'CPU',
    }


def download_all_sites_threaded(sites):
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        return executor.map(download_site, sites)

def process_work(args):
    (work_func, work_item) = args
    return work_func(work_item)

def process_with_thread_pool_executor(work_load, max_workers=5):
    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
        result_list = list(executor.map(process_work, work_load))
    return result_list


def process_with_multiprocessing_pool(work_load, max_workers=5):
    with Pool(max_workers) as executor:
        result_list = list(executor.map(process_work, work_load))
    return result_list

def get_runtime(work_load, multithreading=False, multiprocessing=False, max_workers=1):
    start_time = time.time()
    if multithreading:
        results = process_with_thread_pool_executor(work_load, max_workers=max_workers)
    if multiprocessing:
        results = process_with_multiprocessing_pool(work_load, max_workers=max_workers)
    duration = time.time() - start_time
    return {
        'work_load_size': len(work_load),
        'process_start_time': start_time,
        'process_duration': duration,
        'results': results
    }

def get_cpu_work_load(load_size=1):
    return [(countdown, 850000) for n in range(load_size)]

def get_io_work_load(load_size=1):
    """
    A work is defined as tuple with the function and arg pair.
    This function returns a work_load of requested load_size
    """
    seed_sites = [
        "https://www.jython.org",
        "http://olympus.realpython.org/dice",
    ]

    arg_sites = seed_sites * math.ceil(load_size/2)
    arg_sites = arg_sites[0:load_size]
    work_load = [(download_site, site) for site in arg_sites]
    return work_load

def run_simulated_work_load():
    runtimes = []
    load_size = 300
    print('Load Size:{0}'.format(load_size))
    io_work_load = get_io_work_load(load_size=load_size)
    cpu_work_load = get_cpu_work_load(load_size=load_size)
    io_and_cpu_work_load = io_work_load+cpu_work_load
    random.shuffle(io_and_cpu_work_load)

    work_load_label = 'io_work_load single thread'
    runtime = get_runtime(io_work_load, multithreading=True, multiprocessing=False, max_workers=1)
    runtime['work_load_label'] = work_load_label
    print(work_load_label)
    runtimes.append(runtime)

    work_load_label = 'io_work_load 5 threads'
    runtime = get_runtime(io_work_load, multithreading=True, multiprocessing=False, max_workers=5)
    runtime['work_load_label'] = work_load_label
    print(work_load_label)
    runtimes.append(runtime)

    work_load_label = 'io_work_load 5 process'
    runtime = get_runtime(io_work_load, multithreading=False, multiprocessing=True, max_workers=5)
    runtime['work_load_label'] = work_load_label
    print(work_load_label)
    runtimes.append(runtime)

    work_load_label = 'cpu_work_load single thread'
    runtime = get_runtime(cpu_work_load, multithreading=True, multiprocessing=False, max_workers=1)
    runtime['work_load_label'] = work_load_label
    print(work_load_label)
    runtimes.append(runtime)

    work_load_label = 'cpu_work_load 5 threads'
    runtime = get_runtime(cpu_work_load, multithreading=True, multiprocessing=False, max_workers=5)
    runtime['work_load_label'] = work_load_label
    print(work_load_label)
    runtimes.append(runtime)

    work_load_label = 'cpu_work_load 5 process'
    runtime = get_runtime(cpu_work_load, multithreading=False, multiprocessing=True, max_workers=5)
    runtime['work_load_label'] = work_load_label
    print(work_load_label)
    runtimes.append(runtime)

    work_load_label = 'io_and_cpu_work_load single thread'
    runtime = get_runtime(io_and_cpu_work_load, multithreading=True, multiprocessing=False, max_workers=1)
    runtime['work_load_label'] = work_load_label
    print(work_load_label)
    runtimes.append(runtime)


    work_load_label = 'io_and_cpu_work_load 5 threads'
    runtime = get_runtime(io_and_cpu_work_load, multithreading=True, multiprocessing=False, max_workers=5)
    runtime['work_load_label'] = work_load_label
    print(work_load_label)
    runtimes.append(runtime)

    work_load_label = 'io_and_cpu_work_load 5 process'
    runtime = get_runtime(io_and_cpu_work_load, multithreading=False, multiprocessing=True, max_workers=5)
    runtime['work_load_label'] = work_load_label
    print(work_load_label)
    runtimes.append(runtime)

    return runtimes

Build with Hugo and other OSS