Top.Mail.Ru
Введение в Threading (python) с примерами


Apr 7, 2024


Введение в Threading (python) с примерами

*Поток (Thread) - это наименьшая единица исполнения в рамках процесса операционной системы. Поток выполняет инструкции программы, представляет собой последовательность команд, которые процессор может выполнить параллельно с другими потоками в рамках одного процесса.

Важные характеристики потоков:

  1. Принадлежат процессу: Потоки существуют в рамках процесса и разделяют ресурсы этого процесса, такие как память, файловые дескрипторы и другие системные ресурсы.
  2. Параллельное выполнение: Потоки в одном процессе могут быть выполняемыми параллельно или конкурентно на многоядерных или многопроцессорных системах.
  3. Легковесны: Потоки обладают легковесной структурой по сравнению с процессами. Создание, уничтожение и переключение потоков занимает меньше системных ресурсов, чем для процессов.
  4. Синхронизация: Потоки могут взаимодействовать друг с другом и разделять данные. Однако, для обеспечения синхронизации доступа к общим ресурсам требуется правильная синхронизация для избежания проблем с состоянием гонки (race conditions) и другими проблемами параллельного выполнения.

В многозадачных операционных системах каждый процесс имеет по крайней мере один поток - основной поток (main thread), который может создавать дополнительные потоки для выполнения различных задач. Потоки позволяют улучшить отзывчивость программ, распределить задачи и ресурсы для эффективного использования мощности вычислительной системы.

Многопоточность - это способность программы выполнять несколько потоков одновременно, что позволяет увеличить эффективность и производительность программы. Основные цели использования многопоточности включают:

  1. Увеличение производительности: Многопоточность позволяет распределять различные части задачи между несколькими потоками для параллельного выполнения, что может значительно повысить производительность программы.

  2. Повышение отзывчивости: Использование потоков может позволить программе реагировать на внешние события или взаимодействовать с пользователями без блокировки основного потока выполнения.

  3. Улучшение реализации многозадачности: Многопоточность позволяет легче реализовывать разделение задач и управление ими в масштабах времени.

Принцип работы многопоточности: Многопоточность использует концепцию потоков (threads), что позволяет программе исполнять операции параллельно. Каждый поток представляет независимую последовательность выполнения команд, включая свой стек вызовов и регистры процессора. Потоки могут работать одновременно в рамках одного процесса, разделяя ресурсы и память.

Проблемы при работе с многопоточностью: Несмотря на преимущества многопоточности, использование потоков также может встречаться с рядом проблем:

  1. Гонки данных (Race conditions): Когда несколько потоков пытаются одновременно получить доступ и изменить один и тот же ресурс, могут возникать проблемы с согласованностью данных.
  2. Блокировки и дедлоки: Неправильное использование блокировок для синхронизации потоков может привести к дедлокам, когда потоки блокируют друг друга и не могут продолжить выполнение.
  3. Отладка и планирование: Сложности в отладке программ с многопоточностью из-за неопределенного поведения и сложности воспроизведения проблем при исполнении.

Разница между многопоточностью и многопроцессорностью:

Каждый из этих подходов имеет свои особенности и применим в зависимости от конкретной задачи и требований программы. Примеры использования многопоточности на языке программирования Python с помощью модуля threading: 1: Создание и запуск нескольких потоков:

import threading
# Функция, которую будет выполнять каждый поток
def print_numbers():
	for i in range(1, 6):
		print(f"Thread {threading.current_thread().name}: {i}")
# Создание потоков
thread1 = threading.Thread(target=print_numbers, name="Thread 1")
thread2 = threading.Thread(target=print_numbers, name="Thread 2")
# Запуск потоков
thread1.start()
thread2.start()
# Ожидание завершения потоков
thread1.join()
thread2.join()
print("Главный поток завершен.")

2: Использование блокировки для синхронизации доступа к ресурсу:

import threading
shared_resource = 0
lock = threading.Lock()

def update_resource():
	global shared_resource
	for _ in range(100000):
	lock.acquire()
	shared_resource += 1
	lock.release()
# Создание потоков
thread1 = threading.Thread(target=update_resource)
thread2 = threading.Thread(target=update_resource)
# Запуск потоков
thread1.start()
thread2.start()
# Ожидание завершения потоков
thread1.join()
thread2.join()
print("Значение ресурса после работы потоков:", shared_resource)

3: Использование условий (conditions) для организации коммуникации между потоками:

import threading
total = 0
condition = threading.Condition()
def produce():
	global total
	for _ in range(10): # Цикл для производства данных
		with condition: # Входим в критическую секцию с помощью условия
			total += 1 # Увеличиваем общий счетчик
			condition.notify() # Оповещаем другие потоки

def consume():
	global total
	with condition: # Вход в критическую секцию с помощью условия
		condition.wait(timeout=5)# Проверяем условие, ожидаем уведомления или таймаут
		print(f"Total is: {total}")

# Создание и запуск потоков
producer_thread = threading.Thread(target=produce)
consumer_thread = threading.Thread(target=consume)

producer_thread.start()
consumer_thread.start()

producer_thread.join()
consumer_thread.join()

print("Главный поток завершен.")

4: Класс Barrier из модуля threading в Python предоставляет синхронизацию между потоками путем блокировки их до момента, когда все потоки достигнут определенного порога. Вот пример использования Barrier:

import threading
def worker(barrier):
	print(f'Thread {threading.current_thread().name} is waiting at the barrier...')
	barrier.wait()
	print(f'Thread {threading.current_thread().name} passed the barrier')

# Создание объекта Barrier для четырех потоков
barrier = threading.Barrier(4) 

# Создание и запуск потоков
thread1 = threading.Thread(target=worker, args=(barrier,), name='Thread 1')
thread2 = threading.Thread(target=worker, args=(barrier,), name='Thread 2')
thread3 = threading.Thread(target=worker, args=(barrier,), name='Thread 3')
thread4 = threading.Thread(target=worker, args=(barrier,), name='Thread 4')
thread1.start()
thread2.start()
thread3.start()
thread4.start()

# Ожидание завершения потоков
thread1.join()
thread2.join()
thread3.join()
thread4.join()
print('All threads have passed the barrier')

В этом примере четыре потока создаются для работы с барьером. Каждый из них ожидает, пока не все четыре потока не достигнут барьера. Когда все четыре потока достигнут этой точки, они будут продолжать свое выполнение. При запуске этого кода вы увидите, что каждый поток будет ожидать других до тех пор, пока все они не будут готовы, после чего выведется сообщение о прохождении барьера всеми потоками.

5: В примере ниже будем использовать Semaphore, чтобы ограничить доступ к ограниченному количеству элементов в списке, который будет использоваться как общий ресурс для потоков:

import threading
import time

# Список как общий ресурс
shared_resource = []
max_items = 5
# Инициализация семафора
semaphore = threading.Semaphore(max_items)

# Поток, который будет добавлять элементы в общий ресурс
def producer():
for i in range(10):
	with semaphore:
		shared_resource.append(i)
		print(f'Producer added {i} to the shared resource')
		time.sleep(1) # Подождать некоторое время

# Поток, который будет удалять элементы из общего ресурса
def consumer():
	for i in range(5):
		with semaphore:
			item = shared_resource.pop(0) if shared_resource else None
			if item is not None:
				print(f'Consumer removed {item} from the shared resource')
			else:
				print('Consumer found the shared resource empty')
			time.sleep(1) # Подождать некоторое время

# Создание потоков
producer_thread = threading.Thread(target=producer)
consumer_thread = threading.Thread(target=consumer)

# Запуск потоков
producer_thread.start()
consumer_thread.start()

# Ожидание завершения работы потоков
producer_thread.join()
consumer_thread.join()
print('All threads have completed their tasks')

Объяснение:

  1. Semaphore используется для ограничения доступа к общему ресурсу shared_resource, который в данном случае представляет собой список.
  2. Поток producer добавляет элементы в список, ограниченный до max_items элементов.
  3. Поток consumer удаляет элементы из списка. Он также работает в пределах семафора, позволяя работать только одному потоку одновременно с общим ресурсом.
  4. Semaphore гарантирует, что одновременно не более max_items элементов могут быть добавлены или удалены из shared_resource.
  5. После завершения работы потоков выводится сообщение о завершении всех задач. Теперь пример более ясно демонстрирует работу Semaphore с общим ресурсом (в данном случае - списком) для контроля доступа потоков к нему.

6: пример использования события (Event) для синхронизации производителей и потребителей :

import threading
import time

shared_resource = [] # Общий ресурс (очередь)
event = threading.Event() # Создание объекта Event для синхронизации

# Функция производителя
def producer():
	for i in range(10):
		shared_resource.append(i)
		print(f"Producer added {i} to the shared resource")
		time.sleep(1)
		event.set() # Устанавливаем событие после добавления элемента

# Функция потребителя
def consumer():
	while True:
		event_is_set = event.wait(timeout=1) # Ожидание события с таймаутом 1 секунда
		if event_is_set and shared_resource:
			consumed_item = shared_resource.pop(0)
			print(f"Consumer removed {consumed_item} from the shared resource") 
			time.sleep(1)
		elif not shared_resource and event_is_set:
			print("Consumer found the shared resource empty")
			break # Выход из цикла, если общий ресурс пуст и событие установлено 

# Создание и запуск потоков
producer_thread = threading.Thread(target=producer)
consumer_thread = threading.Thread(target=consumer)

producer_thread.start()
consumer_thread.start()
# Ожидание завершения работы производителя и потребителя
producer_thread.join()
event.set() # Установка события для предотвращения блокировки потребителя 
consumer_thread.join()
print("All threads have completed their tasks")

Объяснение:

  1. producer() - функция производителя, добавляющая элементы в общий ресурс.
  2. consumer() - функция потребителя, мониторящая общий ресурс и удаляющая элементы. Переходит к блоку завершения, если обнаруживает пустоту общего ресурса и установленное событие.
  3. В основном потоке создаются и запускаются потоки производителя и потребителя.
  4. После завершения работы производителя устанавливается событие, чтобы избежать блокировки потребителя.
  5. Потребитель завершает работу, когда общий ресурс пуст и событие установлено.
  6. Выводится сообщение о завершении всех задач после завершения работы потоков.