Разберите фрагмент на Python, где используется многопоточность: import threading; data = []; def worker(): for i in range(1000): data.append(i); threads = [threading.Thread(target=worker) for _ in range(4)]; for t in threads: t.start(); for t in threads: t.join(); — обсудите возможные проблемы с согласованностью данных, влияние GIL, способы безопасной синхронизации и альтернативные модели параллелизма для CPU- и I/O- bound задач
Код, который вы привели: import threading data =
def worker: for i in range100010001000: data.appendiii
threads = threading.Thread(target=worker)forinrange(4)threading.Thread(target=worker) for _ in range(4)threading.Thread(target=worker)forinrange(4)
for t in threads: t.start
for t in threads: t.join
— простая многопоточная программа, но при её анализе надо учитывать несколько важных моментов.
1) Согласованность данных и гонки dataracesdata racesdataraces
В CPython отдельная операция list.append фактически выполняется атомарно из‑за GIL GlobalInterpreterLockGlobal Interpreter LockGlobalInterpreterLock. Это значит, что отдельные append'ы не «порубятся» на мелкие шаги и не приведут к повреждению структуры list вбольшинствереализацийappendнеприведётккрахуинтерпретаторав большинстве реализаций append не приведёт к краху интерпретаторавбольшинствереализацийappendнеприведётккрахуинтерпретатора.Тем не менее это не даёт гарантий на более сложные композиции операций. Если вы делаете несколько операций, которые вместе создают инвариант например,проверитьдлинуизатемиспользоватьиндекснапример, проверить длину и затем использовать индекснапример,проверитьдлинуизатемиспользоватьиндекс, то между этими операциями может вмешаться другой поток и нарушить инвариант — поэтому нужен явный механизм синхронизации.В вашем примере в результате вы, скорее всего, получите список длины 4000 4×10004 × 10004×1000 с нефиксированным порядком элементов интерливингинтерливингинтерливинг. Это нормально, но порядок — недетерминирован.Не стоит полагаться на поведение конкретной реализации CPython: другие реализации PyPy,Jython,IronPythonPyPy, Jython, IronPythonPyPy,Jython,IronPython могут иметь другие семантики и/или отсутствие GIL, и в общем случае правильнее защищать совместный доступ явно.
2) Влияние GIL
GIL сериализует выполнение байткода Python: в CPython в любой момент выполняется байткод только одного потока. Для барьера это значит: Потоку, выполняющему CPU‑интенсивный Python‑код, сложно получить линейный выигрыш от нескольких потоков — они будут «переключаться», а не исполняться параллельно на разных ядрах.Для I/O‑операций ввод/выводввод/выводввод/вывод, где операции блочат и часто освобождают GIL системныевызовы,чтение/записьсокетов,файлов,ожиданиесистемные вызовы, чтение/запись сокетов, файлов, ожиданиесистемныевызовы,чтение/записьсокетов,файлов,ожидание, многопоточность в Python даёт ощутимый выигрыш.Некоторые C‑расширения NumPy,библиотечныевызовыNumPy, библиотечные вызовыNumPy,библиотечныевызовы освобождают GIL при тяжёлых вычислениях, так что параллелизм возможен и там.
3) Как безопасно синхронизировать
threading.Lock илиRLockили RLockилиRLock — самый простой способ защищать критические секции: lock = threading.Lock
def worker: for i in range100010001000: with lock: data.appendiii
Это делает append последовательным, но снижает параллелизм и производительность.queue.Queue — потокобезопасная очередь для передачи данных между потоками. Часто предпочтительнее, чем общий список + Lock: import queue q = queue.Queue
def worker: for i in range100010001000: q.putiiiconcurrent.futures.ThreadPoolExecutor — удобный API для работы со пулами потоков и возврата результатов синхронизациювыполняетбиблиотекасинхронизацию выполняет библиотекасинхронизациювыполняетбиблиотека.Более продвинутые примитивы: Condition, Semaphore, Event — для координации потоков, ожидания условий и т. п.Для счётчиков используйте synchronized структуры или lock; в стандартной библиотеке нет «атомного инкремента» для Python‑переменных.Если хотите агрегировать результаты без блокировок — пусть каждый поток пишет в свой локальный буфер списоксписоксписок, а в конце один поток объединит их mergemergemerge. Это часто гораздо быстрее.
4) Проблемы при неправильной синхронизации
Гонки, потеря инвариантов несогласованностьнесогласованностьнесогласованность, непредсказуемый порядок.Возможность дедлока при неправильном использовании нескольких замков.Снижение производительности при чрезмерном использовании глобальных замков контенцияконтенцияконтенция.В CPython возможна ложная уверенность в безопасности из‑за GIL — это опасно при переносимости кода или при использовании C‑расширений.
5) Альтернативные модели параллелизма
Для I/O‑bound задач: Многопоточность threading,ThreadPoolExecutorthreading, ThreadPoolExecutorthreading,ThreadPoolExecutor — хороша и проста.Асинхронность asyncio,trio,curioasyncio, trio, curioasyncio,trio,curio — эффективна по памяти и без GIL‑проблем всегдаодинпоток,номногокорутинвсегда один поток, но много корутинвсегдаодинпоток,номногокорутин; особенно хорошо при больших количеств соединений/событий.Комбинации: несколько потоков каждый запускает цикл asyncio редконужноредко нужноредконужно.Для CPU‑bound задач: multiprocessing / concurrent.futures.ProcessPoolExecutor — каждый процесс имеет свой интерпретатор и память, обходит GIL; межпроцессное взаимодействие дороже сериализация/IPCсериализация/IPCсериализация/IPC, но даёт настоящий параллелизм на ядрах.Параллелизм на уровне C/Си‑расширений: если использовать библиотеки, которые освобождают GIL NumPy,SciPy,Cythonсnogil,чистыеCNumPy, SciPy, Cython с nogil, чистые CNumPy,SciPy,Cythonсnogil,чистыеC, можно получить многопоточную скорость.Векторизация и алгоритмическая оптимизация часто эффективнее «простого» распараллеливания в Python.Для данных с большим объёмом: распределённые системы Dask,Ray,SparkDask, Ray, SparkDask,Ray,Spark — масштабирование за пределы одного хоста.
6) Практические рекомендации применительно к вашему примеру
Если вам важно просто собрать элементы из нескольких рабочих потоков: Лучше использовать queue.Queue или ThreadPoolExecutor с возвращаемыми результатами.Или пусть каждый поток пишет в свой список, затем объедините списки в конце безблокировокбез блокировокбезблокировок.Если задача CPU‑интенсивна внутриworkerмноговычисленийнаPythonвнутри worker много вычислений на PythonвнутриworkerмноговычисленийнаPython, используйте multiprocessing/ProcessPoolExecutor или перенос тяжёлой работы в C‑расширение/NumPy.Избегайте ненужных глобальных замков и старайтесь уменьшать область критической секции.Не полагайтесь на «атомарность append» как на общий способ обеспечения безопасности — это имплементационная деталь CPython.
Короткие примеры
Lock: lock = threading.Lock
def worker: for i in range100010001000: with lock: data.appendiii
Queue: import queue q = queue.Queue
def worker: for i in range100010001000: q.putiii
главный поток: собрать все элементы
while not q.empty: item = q.get
Каждый поток свой буфер + merge: results = []forinrange(4)[] for _ in range(4)[]forinrange(4)
def workeridxidxidx: local = resultsidxidxidx
for i in range100010001000: local.appendiii
потом: data = sumresults,[]results, []results,[] или itertools.chain.from_iterable
CPU: ProcessPoolExecutor from concurrent.futures import ProcessPoolExecutor with ProcessPoolExecutor as ex: res = listex.map(heavycputask,inputs)ex.map(heavy_cpu_task, inputs)ex.map(heavycputask,inputs)
Если нужно — могу показать конкретные примеры кода для вашего случая сLock/Queue/multiprocessingс Lock / Queue / multiprocessingсLock/Queue/multiprocessing и сравнить по производительности.
Код, который вы привели:
import threading
data = def worker:
for i in range100010001000:
data.appendiii threads = threading.Thread(target=worker)forinrange(4)threading.Thread(target=worker) for _ in range(4)threading.Thread(target=worker)fori nrange(4) for t in threads: t.start for t in threads: t.join
— простая многопоточная программа, но при её анализе надо учитывать несколько важных моментов.
1) Согласованность данных и гонки dataracesdata racesdataraces
В CPython отдельная операция list.append фактически выполняется атомарно из‑за GIL GlobalInterpreterLockGlobal Interpreter LockGlobalInterpreterLock. Это значит, что отдельные append'ы не «порубятся» на мелкие шаги и не приведут к повреждению структуры list вбольшинствереализацийappendнеприведётккрахуинтерпретаторав большинстве реализаций append не приведёт к краху интерпретаторавбольшинствереализацийappendнеприведётккрахуинтерпретатора.Тем не менее это не даёт гарантий на более сложные композиции операций. Если вы делаете несколько операций, которые вместе создают инвариант например,проверитьдлинуизатемиспользоватьиндекснапример, проверить длину и затем использовать индекснапример,проверитьдлинуизатемиспользоватьиндекс, то между этими операциями может вмешаться другой поток и нарушить инвариант — поэтому нужен явный механизм синхронизации.В вашем примере в результате вы, скорее всего, получите список длины 4000 4×10004 × 10004×1000 с нефиксированным порядком элементов интерливингинтерливингинтерливинг. Это нормально, но порядок — недетерминирован.Не стоит полагаться на поведение конкретной реализации CPython: другие реализации PyPy,Jython,IronPythonPyPy, Jython, IronPythonPyPy,Jython,IronPython могут иметь другие семантики и/или отсутствие GIL, и в общем случае правильнее защищать совместный доступ явно.2) Влияние GIL
GIL сериализует выполнение байткода Python: в CPython в любой момент выполняется байткод только одного потока. Для барьера это значит:Потоку, выполняющему CPU‑интенсивный Python‑код, сложно получить линейный выигрыш от нескольких потоков — они будут «переключаться», а не исполняться параллельно на разных ядрах.Для I/O‑операций ввод/выводввод/выводввод/вывод, где операции блочат и часто освобождают GIL системныевызовы,чтение/записьсокетов,файлов,ожиданиесистемные вызовы, чтение/запись сокетов, файлов, ожиданиесистемныевызовы,чтение/записьсокетов,файлов,ожидание, многопоточность в Python даёт ощутимый выигрыш.Некоторые C‑расширения NumPy,библиотечныевызовыNumPy, библиотечные вызовыNumPy,библиотечныевызовы освобождают GIL при тяжёлых вычислениях, так что параллелизм возможен и там.
3) Как безопасно синхронизировать
threading.Lock илиRLockили RLockилиRLock — самый простой способ защищать критические секции:lock = threading.Lock def worker:
for i in range100010001000:
with lock:
data.appendiii Это делает append последовательным, но снижает параллелизм и производительность.queue.Queue — потокобезопасная очередь для передачи данных между потоками. Часто предпочтительнее, чем общий список + Lock:
import queue
q = queue.Queue def worker:
for i in range100010001000:
q.putiiiconcurrent.futures.ThreadPoolExecutor — удобный API для работы со пулами потоков и возврата результатов синхронизациювыполняетбиблиотекасинхронизацию выполняет библиотекасинхронизациювыполняетбиблиотека.Более продвинутые примитивы: Condition, Semaphore, Event — для координации потоков, ожидания условий и т. п.Для счётчиков используйте synchronized структуры или lock; в стандартной библиотеке нет «атомного инкремента» для Python‑переменных.Если хотите агрегировать результаты без блокировок — пусть каждый поток пишет в свой локальный буфер списоксписоксписок, а в конце один поток объединит их mergemergemerge. Это часто гораздо быстрее.
4) Проблемы при неправильной синхронизации
Гонки, потеря инвариантов несогласованностьнесогласованностьнесогласованность, непредсказуемый порядок.Возможность дедлока при неправильном использовании нескольких замков.Снижение производительности при чрезмерном использовании глобальных замков контенцияконтенцияконтенция.В CPython возможна ложная уверенность в безопасности из‑за GIL — это опасно при переносимости кода или при использовании C‑расширений.5) Альтернативные модели параллелизма
Для I/O‑bound задач:Многопоточность threading,ThreadPoolExecutorthreading, ThreadPoolExecutorthreading,ThreadPoolExecutor — хороша и проста.Асинхронность asyncio,trio,curioasyncio, trio, curioasyncio,trio,curio — эффективна по памяти и без GIL‑проблем всегдаодинпоток,номногокорутинвсегда один поток, но много корутинвсегдаодинпоток,номногокорутин; особенно хорошо при больших количеств соединений/событий.Комбинации: несколько потоков каждый запускает цикл asyncio редконужноредко нужноредконужно.Для CPU‑bound задач:
multiprocessing / concurrent.futures.ProcessPoolExecutor — каждый процесс имеет свой интерпретатор и память, обходит GIL; межпроцессное взаимодействие дороже сериализация/IPCсериализация/IPCсериализация/IPC, но даёт настоящий параллелизм на ядрах.Параллелизм на уровне C/Си‑расширений: если использовать библиотеки, которые освобождают GIL NumPy,SciPy,Cythonсnogil,чистыеCNumPy, SciPy, Cython с nogil, чистые CNumPy,SciPy,Cythonсnogil,чистыеC, можно получить многопоточную скорость.Векторизация и алгоритмическая оптимизация часто эффективнее «простого» распараллеливания в Python.Для данных с большим объёмом: распределённые системы Dask,Ray,SparkDask, Ray, SparkDask,Ray,Spark — масштабирование за пределы одного хоста.
6) Практические рекомендации применительно к вашему примеру
Если вам важно просто собрать элементы из нескольких рабочих потоков:Лучше использовать queue.Queue или ThreadPoolExecutor с возвращаемыми результатами.Или пусть каждый поток пишет в свой список, затем объедините списки в конце безблокировокбез блокировокбезблокировок.Если задача CPU‑интенсивна внутриworkerмноговычисленийнаPythonвнутри worker много вычислений на PythonвнутриworkerмноговычисленийнаPython, используйте multiprocessing/ProcessPoolExecutor или перенос тяжёлой работы в C‑расширение/NumPy.Избегайте ненужных глобальных замков и старайтесь уменьшать область критической секции.Не полагайтесь на «атомарность append» как на общий способ обеспечения безопасности — это имплементационная деталь CPython.
Короткие примеры
Lock:
lock = threading.Lock def worker:
for i in range100010001000:
with lock:
data.appendiii
Queue:
главный поток: собрать все элементыimport queue
q = queue.Queue def worker:
for i in range100010001000:
q.putiii
while not q.empty: item = q.get
Каждый поток свой буфер + merge:
потом: data = sumresults,[]results, []results,[] или itertools.chain.from_iterableresults = []forinrange(4)[] for _ in range(4)[]fori nrange(4) def workeridxidxidx:
local = resultsidxidxidx for i in range100010001000:
local.appendiii
CPU: ProcessPoolExecutor
from concurrent.futures import ProcessPoolExecutor
with ProcessPoolExecutor as ex:
res = listex.map(heavycputask,inputs)ex.map(heavy_cpu_task, inputs)ex.map(heavyc put ask,inputs)
Если нужно — могу показать конкретные примеры кода для вашего случая сLock/Queue/multiprocessingс Lock / Queue / multiprocessingсLock/Queue/multiprocessing и сравнить по производительности.