В проекте многопоточного приложения выявляется гонка данных при доступе к общему ресурсу: представьте пример кода на Python с использованием threading, объясните проблему и предложите корректные паттерны синхронизации и альтернативы (multiprocessing, async)
Пример (проблема — гонка при инкременте общего счётчика): Несинхронизированный вариант — ошибка видна при запуске двух потоков, каждый делает много инкрементов: ```python # race_example.py import threading counter = 0 def worker(n): global counter for _ in range(n): # read-modify-write не атомарно counter += 1 n = 100000 t1 = threading.Thread(target=worker, args=(n,)) t2 = threading.Thread(target=worker, args=(n,)) t1.start(); t2.start() t1.join(); t2.join() print("counter =", counter) # ожидаем 2*n, часто меньше ``` Ожидаемое значение: 2×n2 \times n2×n (например, 200000200000200000 при n=100000n=100000n=100000), но из‑за гонки данных результат будет меньше. Почему происходит гонка: - Операция `counter += 1` представляет собой последовательность действий (загрузка значения, +1, запись) и не атомарна. - Между этими шагами планировщик может переключить исполнение на другой поток — оба потока читают одно и то же значение и перезаписывают его, теряя инкремент. - Наличие GIL в CPython не гарантирует атомарность таких составных операций. Корректные паттерны синхронизации (threading) 1) Lock (базовый вариант — взаимное исключение) ```python import threading counter = 0 lock = threading.Lock() def worker(n): global counter for _ in range(n): with lock: counter += 1 ``` 2) RLock — если нужна рекурсивная блокировка (один поток может захватывать несколько раз). 3) Condition — когда потокам нужно ждать/сигналить о состоянии: ```python cond = threading.Condition() # cond.acquire(), cond.wait(), cond.notify() ``` 4) Semaphore — ограничение одновременно работающих ресурсов (например пул соединений). 5) Queue (producer-consumer) — thread-safe, хорошая альтернатива для передачи задач/данных между потоками: ```python from queue import Queue q = Queue() # q.put(), q.get() — безопасно между потоками ``` Альтернативы 1) multiprocessing — для CPU-bound задач. Каждый процесс имеет отдельную память; состояние можно передавать через очереди или специальные объекты: ```python from multiprocessing import Process, Value def worker(cnt, n): for _ in range(n): with cnt.get_lock(): cnt.value += 1 if __name__ == "__main__": cnt = Value('i', 0) # shared integer с примитивной блокировкой n = 100000 p1 = Process(target=worker, args=(cnt, n)) p2 = Process(target=worker, args=(cnt, n)) p1.start(); p2.start() p1.join(); p2.join() print("counter =", cnt.value) ``` - Выгодно для параллельной загрузки CPU. - Стоимость создания процессов и копирования памяти выше, чем у потоков. 2) asyncio — для I/O-bound задач с кооперативной многозадачностью. Потоков нет, но общий ресурс всё ещё может требовать синхронизации через `asyncio.Lock` или передачу сообщений через `asyncio.Queue`: ```python import asyncio counter = 0 lock = asyncio.Lock() async def worker(n): global counter for _ in range(n): async with lock: counter += 1 ``` - Простой и эффективный для большого количества I/O‑операций; не подходит для CPU-bound без offloading в пулы/процессы. Рекомендации при выборе - Для CPU-bound — multiprocessing (процессы). - Для I/O-bound — threading или asyncio (если можно переписать в async/await). - По возможности избегайте общего изменяемого состояния: используйте очереди/сообщения, иммутабельные структуры или атомарные/специфичные структуры данных (например, collections.deque с атомарными операциями в некоторых сценариях). - Всегда защищайте операции read-modify-write примитивами синхронизации (Lock, RLock, Condition, Semaphore, или эквивалент в multiprocessing/asyncio).
Несинхронизированный вариант — ошибка видна при запуске двух потоков, каждый делает много инкрементов:
```python
# race_example.py
import threading
counter = 0
def worker(n):
global counter
for _ in range(n):
# read-modify-write не атомарно
counter += 1
n = 100000
t1 = threading.Thread(target=worker, args=(n,))
t2 = threading.Thread(target=worker, args=(n,))
t1.start(); t2.start()
t1.join(); t2.join()
print("counter =", counter) # ожидаем 2*n, часто меньше
```
Ожидаемое значение: 2×n2 \times n2×n (например, 200000200000200000 при n=100000n=100000n=100000), но из‑за гонки данных результат будет меньше.
Почему происходит гонка:
- Операция `counter += 1` представляет собой последовательность действий (загрузка значения, +1, запись) и не атомарна.
- Между этими шагами планировщик может переключить исполнение на другой поток — оба потока читают одно и то же значение и перезаписывают его, теряя инкремент.
- Наличие GIL в CPython не гарантирует атомарность таких составных операций.
Корректные паттерны синхронизации (threading)
1) Lock (базовый вариант — взаимное исключение)
```python
import threading
counter = 0
lock = threading.Lock()
def worker(n):
global counter
for _ in range(n):
with lock:
counter += 1
```
2) RLock — если нужна рекурсивная блокировка (один поток может захватывать несколько раз).
3) Condition — когда потокам нужно ждать/сигналить о состоянии:
```python
cond = threading.Condition()
# cond.acquire(), cond.wait(), cond.notify()
```
4) Semaphore — ограничение одновременно работающих ресурсов (например пул соединений).
5) Queue (producer-consumer) — thread-safe, хорошая альтернатива для передачи задач/данных между потоками:
```python
from queue import Queue
q = Queue()
# q.put(), q.get() — безопасно между потоками
```
Альтернативы
1) multiprocessing — для CPU-bound задач. Каждый процесс имеет отдельную память; состояние можно передавать через очереди или специальные объекты:
```python
from multiprocessing import Process, Value
def worker(cnt, n):
for _ in range(n):
with cnt.get_lock():
cnt.value += 1
if __name__ == "__main__":
cnt = Value('i', 0) # shared integer с примитивной блокировкой
n = 100000
p1 = Process(target=worker, args=(cnt, n))
p2 = Process(target=worker, args=(cnt, n))
p1.start(); p2.start()
p1.join(); p2.join()
print("counter =", cnt.value)
```
- Выгодно для параллельной загрузки CPU.
- Стоимость создания процессов и копирования памяти выше, чем у потоков.
2) asyncio — для I/O-bound задач с кооперативной многозадачностью. Потоков нет, но общий ресурс всё ещё может требовать синхронизации через `asyncio.Lock` или передачу сообщений через `asyncio.Queue`:
```python
import asyncio
counter = 0
lock = asyncio.Lock()
async def worker(n):
global counter
for _ in range(n):
async with lock:
counter += 1
```
- Простой и эффективный для большого количества I/O‑операций; не подходит для CPU-bound без offloading в пулы/процессы.
Рекомендации при выборе
- Для CPU-bound — multiprocessing (процессы).
- Для I/O-bound — threading или asyncio (если можно переписать в async/await).
- По возможности избегайте общего изменяемого состояния: используйте очереди/сообщения, иммутабельные структуры или атомарные/специфичные структуры данных (например, collections.deque с атомарными операциями в некоторых сценариях).
- Всегда защищайте операции read-modify-write примитивами синхронизации (Lock, RLock, Condition, Semaphore, или эквивалент в multiprocessing/asyncio).