?print-pdf
' Created for
lscpu | head -10
import psutil
# Get the number of physical cores
physical_cores = psutil.cpu_count(logical=False)
print(f"Number of physical cores: {physical_cores}")
# Get the number of logical processors (hardware threads)
logical_processors = psutil.cpu_count(logical=True)
print(f"Number of logical processors (hardware threads): {logical_processors}")
import multiprocessing
import time
import math
def perform_heavy_calculation(n):
"""Perform a computationally intensive calculation."""
print(f"Processing {n} in process {multiprocessing.current_process().name}")
# A simple but computationally intensive task
result = 0
for i in range(10000000): # 10 million iterations
result += n * (math.sin(i * 0.00001) * math.cos(i * 0.00002))
return f"Input: {n}, Result: {result:.6f}"
def process_without_multiprocessing(numbers):
"""Process a list of numbers sequentially."""
start_time = time.time()
results = []
for number in numbers:
results.append(perform_heavy_calculation(number))
end_time = time.time()
return results, end_time - start_time
def process_with_multiprocessing(numbers, num_processes=None):
"""Process a list of numbers with multiprocessing."""
if num_processes is None:
num_processes = multiprocessing.cpu_count() # Use all available CPU cores
start_time = time.time()
with multiprocessing.Pool(processes=num_processes) as pool:
results = pool.map(perform_heavy_calculation, numbers)
end_time = time.time()
return results, end_time - start_time
if __name__ == "__main__":
# Just a list of numbers for our tasks
test_numbers = list(range(1, 9))
print(f"Running tests on {len(test_numbers)} tasks...")
print("CPU Count:", multiprocessing.cpu_count())
print("\nRunning without multiprocessing...")
single_results, single_time = process_without_multiprocessing(test_numbers)
print("\nRunning with multiprocessing...")
multi_results, multi_time = process_with_multiprocessing(test_numbers)
# Compare the results
print("\nResults:")
print(f"Without multiprocessing: {single_time:.2f} seconds")
print(f"With multiprocessing: {multi_time:.2f} seconds")
print(f"Speedup: {single_time / multi_time:.2f}x")
# Print first result from each method to verify they're the same
print("\nSample results (first result):")
print(f"Sequential: {single_results[0]}")
print(f"Multiprocessing: {multi_results[0]}")
pip install requests
import threading
import os
import time
import requests
def download_image(url, index, folder):
"""Download an image and save it to the specified folder."""
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36"
}
response = requests.get(url, headers=headers, stream=True)
if response.status_code != 200:
print(f"Failed to download {url} - HTTP {response.status_code}")
return
file_path = os.path.join(folder, f"image_{index}.jpg")
# Write image to file in chunks
with open(file_path, "wb") as file:
for chunk in response.iter_content(1024):
file.write(chunk)
print(f"Downloaded: {file_path} ({file_size} bytes)")
def download_without_threading(urls, folder):
"""Sequential download without threading."""
start_time = time.time()
for i, url in enumerate(urls):
download_image(url, i, folder)
return time.time() - start_time
def download_with_threading(urls, folder):
"""Download using threading."""
start_time = time.time()
threads = []
for i, url in enumerate(urls):
thread = threading.Thread(target=download_image, args=(url, i, folder))
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
return time.time() - start_time
if __name__ == "__main__":
# List of image URLs to download
urls = [
"https://unsplash.com/photos/CTflmHHVrBM/download?force=true",
"https://unsplash.com/photos/pWV8HjvHzk8/download?force=true",
"https://unsplash.com/photos/1jn_3WBp60I/download?force=true",
"https://unsplash.com/photos/8E5HawfqCMM/download?force=true",
"https://unsplash.com/photos/yTOkMc2q01o/download?force=true",
]
# Directories to save images
sequential_path = os.path.join(os.getcwd(), "downloaded_images_sequential")
threading_path = os.path.join(os.getcwd(), "downloaded_images_threading")
os.makedirs(sequential_path, exist_ok=True)
os.makedirs(threading_path, exist_ok=True)
print(f"Running tests on {len(urls)} images...")
print("\nRunning without threading...")
single_time = download_without_threading(urls, sequential_path)
print("\nRunning with threading...")
multi_time = download_with_threading(urls, threading_path)
# Compare the results
print("\nResults:")
print(f"Without threading: {single_time:.2f} seconds")
print(f"With threading: {multi_time:.2f} seconds")
print(f"Speedup: {single_time / multi_time:.2f}x")
# view all processes:
top
# view all threads per process:
top -H -p <pid>
Thread
class constructor.start()
methodjoin()
method. This blocks the calling thread until the thread whose join() method is called is terminated
tr_obj = threading.Thread(target=None, name=None, args=(), kwargs={}, daemon=None)
target
- function to be run in a threadname
is the thread name. By default, a unique name is constructed of the form "Thread-N" where N is a small decimal numberargs
is the argument tuple for the target invocationkwargs
is a dictionary of keyword arguments for the target invocationdaemon
- if not None, a daemonic thread will be created.
import threading
import time
def worker(x):
tid = threading.currentThread().name;
print(f"Work started in thread {tid}")
time.sleep(5)
print(f"Work ended in thread {tid}")
# create the tread
tr = threading.Thread(target=worker, args=(42,))
# start the thread:
tr.start()
# wait until thread terminates:
tr.join()
print("Worker did its job!")
import threading
import time
def worker(x):
tid = threading.currentThread().name
# do some hard and time consuming work:
time.sleep(2)
print(f"Worker {tid} is working with {x}")
#################################################
# Sequential Processing:
#################################################
t = time.time()
worker(42)
worker(84)
print("Sequential Processing took:",time.time() - t,"\n")
#################################################
# Multithreaded Processing:
#################################################
tmulti = time.time()
tr1 = threading.Thread(target=worker, args=(42,))
tr2 = threading.Thread(target=worker, args=(82,))
tr1.start();tr2.start()
tr1.join(); tr2.join()
print("Multithreaded Processing took:",time.time() - tmulti)
You can enjoy the speed of multithreading in Python, if the threaded workers are not CPU intensive.
Process
class constructor.start()
methodjoin()
method. This blocks the calling process until the process whose join() method is called is terminated
pr_obj = multiprocessing.Process(target=None, name=None, args=(), kwargs={}, daemon=None)
if __name__ == '__main__':
when using processes!
import multiprocessing as mp
import time
def worker(x):
pid = mp.current_process().name;
print("x = {} in {}".format(x, pid))
time.sleep(2)
if __name__ == '__main__':
# create the process
p = mp.Process(target=worker, args=(42,))
# start the process:
p.start()
# wait until process completes:
p.join()
print("Worker did its job as separate Process!")
import multiprocessing as mp
import time
def worker(r):
pid = mp.current_process().name
# do some hard and time consuming work:
global result
res = 0
for i in r:
res += i
if "Process-" in pid:
output.put(result)
else:
result += res
print("Worker {} is working with {}".format(pid, r))
if __name__ == '__main__':
#################################################
# Sequential Processing:
#################################################
t = time.time()
result = 0
worker(range(50_000_000))
worker(range(50_000_000,100_000_000))
print("Sequential Processing result: ", result)
print("Sequential Processing took:",time.time() - t,"\n")
#################################################
# Multithreaded Processing:
#################################################
t = time.time()
# Define an output queue
output = mp.Queue()
p1 = mp.Process(target=worker, args=(range(50_000_000),))
p2 = mp.Process(target=worker, args=(range(50_000_000,100_000_000),))
p1.start();p2.start()
p1.join(); p2.join()
print("Multiprocess Processing result: ", output.get())
print("Multiprocess Processing took:",time.time() - t,"\n")
# Worker MainProcess is working with range(0, 50000000)
# Worker MainProcess is working with range(50000000, 100000000)
# Sequential Processing result: 4999999950000000
# Sequential Processing took: 7.217836141586304
# Worker Process-2 is working with range(50000000, 100000000)
# Worker Process-1 is working with range(0, 50000000)
# Multiprocess Processing result: 4999999950000000
# Multiprocess Processing took: 4.363953113555908
Value
, Array
and Queue
classes for sharing data between processes.
import multiprocessing
# Function to be executed by each process
def increment_shared_counter(counter, lock):
for _ in range(100000):
with lock:
counter.value += 1
def main():
# Create a shared memory variable (integer) to act as a counter
counter = multiprocessing.Value("i", 0)
# Create a Lock to synchronize access to the shared counter
lock = multiprocessing.Lock()
# Create two processes, each incrementing the shared counter
process1 = multiprocessing.Process(
target=increment_shared_counter, args=(counter, lock)
)
process2 = multiprocessing.Process(
target=increment_shared_counter, args=(counter, lock)
)
# Start both processes
process1.start()
process2.start()
# Wait for both processes to finish
process1.join()
process2.join()
# Print the final value of the shared counter
print("Final counter value:", counter.value)
if __name__ == "__main__":
main()
# Final counter value: 200000
map
method of the Pool class is one of the most commonly used features. It distributes an iterable of data to the worker processes, applies a specified function to each item in parallel, and returns the results as a list in the same order as the input iterable.
from multiprocessing import Pool
import time
def worker(n):
return n ** 1000
if __name__ == '__main__':
# Define the range of numbers
numbers_range = range(100000)
# Multiprocessing Pool
start_time = time.time()
with Pool(5) as p:
pool_result = p.map(worker, numbers_range)
multiprocessing_execution_time = time.time() - start_time
print("Multiprocessing Pool took:", multiprocessing_execution_time, "seconds")
# Serial processing
start_time = time.time()
serial_result = [worker(n) for n in numbers_range]
serial_execution_time = time.time() - start_time
print("Serial processing took:", serial_execution_time, "seconds")
# Multiprocessing Pool took: 3.358834981918335 seconds
# Serial processing took: 5.694896459579468 seconds
These slides are based on
customised version of
framework