Pythonで効率的にタスクを実行するためには、並列処理やマルチスレッドが不可欠です。本記事では、基本的な概念から実践的なテクニックまでを網羅しています。
マルチスレッドの作成
Pythonではthreading
モジュールを使ってスレッドを作成できます。
import threading
def print_numbers():
for i in range(5):
print(f"スレッドから: {i}")
thread = threading.Thread(target=print_numbers)
thread.start()
thread.join()
マルチプロセスの実行
multiprocessing
モジュールを使用すると、マルチプロセスを簡単に実装できます。
from multiprocessing import Process
def print_numbers():
for i in range(5):
print(f"プロセスから: {i}")
process = Process(target=print_numbers)
process.start()
process.join()
スレッドプールの使用
多くのタスクを効率的に処理するには、concurrent.futures
モジュールが便利です。
from concurrent.futures import ThreadPoolExecutor
def square(n):
return n * n
with ThreadPoolExecutor(max_workers=3) as executor:
results = executor.map(square, range(10))
print(list(results))
プロセスプールの使用
複数のプロセスを並列に管理するには、ProcessPoolExecutor
を利用します。
from concurrent.futures import ProcessPoolExecutor
def cube(n):
return n ** 3
with ProcessPoolExecutor(max_workers=3) as executor:
results = executor.map(cube, range(10))
print(list(results))
非同期処理(asyncio)
非同期タスクを簡単に実行するには、asyncio
を使います。
import asyncio
async def say_hello():
await asyncio.sleep(1)
print("こんにちは、非同期処理!")
async def main():
await asyncio.gather(say_hello(), say_hello())
asyncio.run(main())
タスクの並列実行
asyncio
のgather
を使って、複数の非同期タスクを同時に実行します。
async def task(name, delay):
await asyncio.sleep(delay)
print(f"タスク{name}完了!")
async def main():
await asyncio.gather(task("A", 1), task("B", 2))
asyncio.run(main())
queueモジュールでのスレッド間通信
スレッド間でデータをやり取りするには、queue.Queue
を利用します。
from queue import Queue
from threading import Thread
def producer(queue):
for i in range(5):
queue.put(i)
print(f"プロデューサーが {i} をキューに追加")
def consumer(queue):
while not queue.empty():
item = queue.get()
print(f"コンシューマーが {item} を処理")
q = Queue()
producer_thread = Thread(target=producer, args=(q,))
consumer_thread = Thread(target=consumer, args=(q,))
producer_thread.start()
producer_thread.join()
consumer_thread.start()
consumer_thread.join()
グローバル変数のロック(threading.Lock)
スレッドの競合を防ぐには、threading.Lock
を使用します。
import threading
lock = threading.Lock()
counter = 0
def increment():
global counter
with lock:
counter += 1
threads = [threading.Thread(target=increment) for _ in range(100)]
for t in threads:
t.start()
for t in threads:
t.join()
print(f"カウンタの値: {counter}")
ジョブスケジューリング
ジョブを指定時間に実行するには、schedule
やapscheduler
を使います。
使用例1: scheduleモジュール
import schedule
import time
def my_task():
print("ジョブを実行中!")
schedule.every(1).minutes.do(my_task)
while True:
schedule.run_pending()
time.sleep(1)
使用例2: apschedulerモジュール
from apscheduler.schedulers.blocking import BlockingScheduler
def my_task():
print("ジョブを実行中!")
scheduler = BlockingScheduler()
scheduler.add_job(my_task, 'interval', seconds=30)
scheduler.start()
並列処理のデバッグ
デバッグ時には、スレッドやプロセスの動作を追跡することが重要です。
import logging
import threading
logging.basicConfig(level=logging.DEBUG, format="%(threadName)s: %(message)s")
def debug_task():
logging.debug("デバッグ中のタスク実行")
thread = threading.Thread(target=debug_task)
thread.start()
thread.join()
まとめ
Pythonでの並列処理とマルチスレッドは、高速で効率的なプログラムの実現に役立ちます。本記事では、基本から応用までを網羅しました。これを活用して、実務でのスキルをさらに向上させましょう。
コメント