一、说明

        关于并行、并发等有不同的概念,本篇讲述基本的并行程序,即多线程在python的实现,线程主要的好处在于,在其它进程处于等待状态时,运行中进程能取得更多的CPU资源。

二、线程的基本例子

        线程是存在于进程内并共享相同内存空间的轻量级执行单元。通过使用多个线程,您可以同时执行多个任务,这可以提高某些情况下的性能和响应能力。Python 提供了一个内置threading模块。

        线程由全局解释器锁 (GIL) 管理,即使在多核系统上,一次也只允许一个线程执行 Python 字节码。这意味着线程可以在多个 CPU 核心上并发运行,但不能真正并行。然而,线程对于涉及等待外部资源(例如网络请求)的 I/O 密集型任务仍然有益,因为它允许其他线程在等待期间继续执行。

Python 中的线程-LMLPHP

来源

        它们在进程内共享相同的内存空间,这使它们能够轻松共享数据。但是,如果管理不当,共享数据可能会导致同步问题和竞争条件。

        作为示例,让我们从公共 API 中提取数据。JSONPlaceholder免费提供假 REST API。get_data函数采用索引号并使用包从 API 检索数据requests

import requests
import random

def get_data(idx: int) -> str:
    url = f"https://jsonplaceholder.typicode.com/todos/{idx}"
    response = requests.get(url)
    return response.json()["title"]
def main():
    idx = random.randint(1, 100)
    text = get_data(idx)
    print(text)
    return text

if __name__ == "__main__":
    main()

"""
cum debitis quis accusamus doloremque ipsa natus sapiente omnis
"""

现在,让我们稍微修改一下并运行它 50 次并测量我们所花费的时间。

import threading
import requests
import time
import random

results = []

def get_data(idx: int) -> str:
    url = f"https://jsonplaceholder.typicode.com/todos/{idx}"
    response = requests.get(url)
    results.append(response)
    return

def main() -> None:
    idx = random.randint(1, 100)
    start = time.time()
    for _ in range(50):
        get_data(idx)
    end = time.time()
    print(f"Time: {end-start}")
    return

if __name__ == "__main__":
    main()

"""
Time: 6.154262065887451
"""

        我们有一个results收集每个响应数据的全局列表。get_data我们在 for 循环中调用该函数 50 次。连续拉取数据 50 次大约需要 6 秒。

        这次,我将使用线程。

def main():
    idx = random.randint(1, 100)
    start = time.time()
    threads = []

    for _ in range(50):
        thread = threading.Thread(target=get_data, args=(idx,))
        thread.start()
        threads.append(thread)

    for thread in threads:
        thread.join()

    end = time.time()
    print(f"Time: {end-start}")
    return

"""
Time: 0.5561661720275879
"""

正如您所看到的,我们现在在近半秒内完成了相同的工作。这要归功于线程。现在让我们更详细地了解线程。

三、线程对象

        对象的实例Thread代表可以与其他线程同时运行的单独的控制流。

        建议在调用此构造函数时始终使用关键字参数。所需的参数是:

class threading.Thread(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)

  group保留供将来使用,目前尚未实施。建议保留为None. 线程组是一种将多个线程逻辑分组在一起的方法。然而,在Pythonthreading模块的当前实现中,线程组没有任何重要的功能或影响。它们可能会在该语言的未来版本或基于threading.

  target指定要由线程的方法调用的可调用对象run()。它可以是一个函数或方法。上面,我们传递了这个get_data函数。

  name是一个可选字符串,可用于指定线程的名称。

  args并在线程启动时kwargs传递给可调用对象。target

  daemon是一个布尔标志,指示线程是否是守护线程。如果设置为True,则线程将在主程序完成时退出,无论其工作是否完成。

        该Thread对象提供了多种方法:

  • start通过内部调用该方法来启动线程的执行run()。它创建一个新的操作系统级线程并开​​始执行该线程的代码。
  • run包含将在线程中执行的代码。您可以在 的子类中重写此方法Thread来定义线程的特定行为。
  • join阻塞调用线程,直到调用它的线程完成执行。默认情况下,join()无限期地阻塞,直到连接的线程完成。但是,您可以指定超时值作为join()方法的参数来限制等待时间。该join()方法通常用于确保主程序等待所有线程完成,然后再根据连接线程的结果执行进一步的操作或计算。
import threading
import time

def foo():
    print("Thread started")
    time.sleep(2)
    print("Thread finished")

# create a Thread object
t = threading.Thread(target=foo)

# start the thread
t.start()

# wait for the thread to finish
t.join()

print("Main program continues")

"""
Thread started
Thread finished
Main program continues
"""
  • is_alive返回一个布尔值,指示线程当前是否正在执行 ( True) 还是已完成执行 ( False)。

四、锁定对象

        锁对象用于在多个线程之间提供同步,一次只允许一个线程执行代码的关键部分。

        锁对象充当原始同步机制,通常称为互斥体(互斥的缩写)。它有助于防止竞争条件并确保在任何给定时间只有一个线程可以访问共享资源或代码段。

Python 中的线程-LMLPHP

来源

        该类Lock提供了两个主要方法来控制对临界区的访问:

        该acquire()方法用于获取锁。默认情况下,它是阻塞调用,这意味着如果锁已被另一个线程持有,则调用线程将等待,直到它可以获得锁。该blocking参数是可选的,如果设置为False,该acquire()方法将立即返回,无论是否获取锁。

        该release()方法用于释放锁,允许其他线程获取它。应始终在块中调用它,finally以确保释放锁,即使在临界区内发生异常也是如此。

   shared在下面的示例中,三个线程在临界区中递增共享变量 ( ),并由锁对象 ( lock) 保护。该threading.get_ident()函数用于检索每个线程的 ID。

import threading
import time

lock = threading.Lock()

# Shared resource
shared = 0


def foo():
    global shared

    thread_id = threading.get_ident()
    # Acquire the lock
    print(f"Thread {thread_id} is trying to acquire the lock")
    lock.acquire()
    print(f"Thread {thread_id} has acquired the lock")

    try:
        # Critical section
        print(f"Thread {thread_id} is inside the critical section")
        for _ in range(5):
            shared += 1
            time.sleep(0.1)
    finally:
        # Release the lock
        lock.release()
        print(f"Thread {thread_id} has released the lock")


# Create multiple threads that increment the shared variable
threads = []
for _ in range(3):
    thread = threading.Thread(target=foo)
    threads.append(thread)
    thread.start()

# Wait for all threads to finish
for thread in threads:
    thread.join()

# Print the final value of the shared variable
print("Final value of the shared variable:", shared)

"""
Thread 12993007616 is trying to acquire the lock
Thread 12993007616 has acquired the lock
Thread 12993007616 is inside the critical section
Thread 13009797120 is trying to acquire the lock
Thread 13026586624 is trying to acquire the lock
Thread 12993007616 has released the lock
Thread 13009797120 has acquired the lock
Thread 13009797120 is inside the critical section
Thread 13009797120 has released the lock
Thread 13026586624 has acquired the lock
Thread 13026586624 is inside the critical section
Thread 13026586624 has released the lock
Final value of the shared variable: 15
"""

        当线程尝试使用 获取锁时lock.acquire(),它将打印“ Thread is attempts to acquire the lock ”。如果锁可用,线程将成功获取锁并打印“ Thread has acquire the lock ”。否则,线程将被阻塞并等待,直到锁变得可用。

        一旦线程获得锁,它就会进入临界区并将共享变量递增五次,每次递增之间有一个小的延迟,以模拟正在完成的某些工作。当一个线程位于临界区内时,其他尝试获取锁的线程将被阻塞。

        完成临界区后,线程使用释放锁lock.release()并打印“ Thread hasreleasethelock ”。释放锁允许其他等待获取锁的线程继续进行。

        最后,主程序等待所有线程使用完毕thread.join()并打印共享变量的最终值。

        Python 中的线程还有一个RLock(可重入锁)对象,它是允许重入行为的锁对象的变体。可重入锁是一种同步原语,同一线程可以多次获取而不会导致死锁。

        可以将RLock房间的钥匙想象成拥有多份副本,而普通锁只提供一把钥匙。使用普通锁时,一次只有一个人可以持有钥匙并进入房间,其他人必须等待钥匙被释放。相比之下,anRLock允许多个人拥有自己的密钥副本。每个人都可以使用自己的钥匙进入房间,甚至可以在不阻塞自己的情况下重新进入房间,尽管其他人仍然需要等待钥匙可用。这种可重入行为RLock允许线程多次获取锁,当线程需要重复访问同一临界区而不导致死锁或阻塞时,它非常有用。

五、条件对象

        条件对象是一个同步原语,允许线程相互协调和通信。它为线程提供了一种等待满足特定条件的方法,并允许它们在满足该条件时通知其他线程。

import threading

# Shared resource
shared_variable = 0

# Create a lock and condition object
lock = threading.Lock()
condition = threading.Condition(lock)

def consumer():
    global shared_variable

    with condition:
        # Wait until the shared variable is positive
        while shared_variable <= 0:
            thread_id = threading.get_ident()
            print(f"Consumer ({thread_id}) is waiting...")
            condition.wait()

        # Consume the shared variable
        thread_id = threading.get_ident()
        print(f"Consumer ({thread_id}) consumed: {shared_variable}")
        shared_variable = 0

def producer():
    global shared_variable

    with condition:
        # Produce a positive value for the shared variable
        shared_variable = 42
        thread_id = threading.get_ident()
        print(f"Producer ({thread_id}) produced: {shared_variable}")

        # Notify the consumer that the condition is satisfied
        condition.notify()

# Create consumer and producer threads
consumer_thread = threading.Thread(target=consumer)
producer_thread = threading.Thread(target=producer)

# Start the threads
consumer_thread.start()
producer_thread.start()

# Wait for the threads to finish
consumer_thread.join()
producer_thread.join()

"""
Consumer (13067112448) is waiting...
Producer (13083901952) produced: 42
Consumer (13067112448) consumed: 42
"""

        消费者线程通过使用while循环和wait()方法等待共享变量为正。在等待期间,它释放锁,允许其他线程获取它。一旦条件满足,它就会重新获取锁并继续。生产者线程更新共享变量并通过方法通知消费者notify()

六、信号量对象

        信号量对象是一种同步原语,它控制对共享资源的访问或限制可以同时访问特定代码部分的线程数量。它维护一个内部计数器,用于确定可用资源或许可的数量。

import threading
import time
import random
# Create a semaphore with an initial value of 2
semaphore = threading.Semaphore(2)


def worker():
    semaphore.acquire()
    thread_id = threading.get_ident()
    print(f"Thread {thread_id} acquired the semaphore.")
    time.sleep(random.randint(1, 3))
    # Perform some task here
    print(f"Thread {thread_id} releasing the semaphore.")
    semaphore.release()


# Create worker threads
threads = []
for _ in range(5):
    t = threading.Thread(target=worker)
    threads.append(t)
    t.start()

# Wait for the threads to finish
for t in threads:
    t.join()

"""
Thread 13018976256 acquired the semaphore.
Thread 13035765760 acquired the semaphore.
Thread 13018976256 releasing the semaphore.
Thread 13035765760 releasing the semaphore.
Thread 13052555264 acquired the semaphore.
Thread 13069344768 acquired the semaphore.
Thread 13052555264 releasing the semaphore.
Thread 13086134272 acquired the semaphore.
Thread 13069344768 releasing the semaphore.
Thread 13086134272 releasing the semaphore.
"""     

信号量的初始值为2,因此,前两个工作线程可以立即获取信号量,而其余线程必须等待,直到获得可用的许可。一旦工作线程完成其任务,它就会释放许可证,允许另一个等待线程获取它。

七、事件对象

        事件对象是一种同步原语,允许线程通过发出事件发生信号来相互通信。事件有两种状态:“设置”和“取消设置”。当事件被设置时,它表明特定的条件或事件已经发生。线程可以等待事件设置,然后继续执行其任务。

import threading
import time

# Create an event
event = threading.Event()


def worker():
    thread_id = threading.get_ident()
    print(f"Worker {thread_id} is waiting for the event.")
    event.wait()
    print(f"Worker {thread_id} has been notified of the event.")


# Create worker threads
threads = []
for _ in range(3):
    t = threading.Thread(target=worker)
    threads.append(t)
    t.start()

# Sleep for some time
time.sleep(2)

# Set the event
event.set()
print("Event has been set.")

# Wait for the threads to finish
for t in threads:
    t.join()

"""
Worker 12987469824 is waiting for the event.
Worker 13004259328 is waiting for the event.
Worker 13021048832 is waiting for the event.
Event has been set.
Worker 13004259328 has been notified of the event.
Worker 12987469824 has been notified of the event.
Worker 13021048832 has been notified of the event.
"""

八、定时器对象

        计时器对象是Thread表示计时器的类的子类,该计时器可以在指定的延迟后调度并执行函数。

import threading

def print_message(message):
    print("Timer message:", message)

# Create a timer that prints a message after 5 seconds
timer = threading.Timer(5, print_message, args=("Hello, world!",))

# Start the timer
timer.start()

# Wait for the timer to finish
timer.join()

"""
Timer message: Hello, world!"""

九、障碍物(Berrier)

        屏障对象是一种同步原语,它允许指定数量的线程在代码中的指定点相互等待,然后才能继续执行。它通过确保多个线程一起到达某个点来帮助协调多个线程的执行,从而使它们能够同步其操作。

import threading


def worker():
    print("Worker thread is waiting at the barrier.")
    barrier.wait()
    print("Worker thread has passed the barrier and can proceed.")


# Create a barrier for 3 threads
barrier = threading.Barrier(3)

# Create worker threads
threads = []
for _ in range(3):
    t = threading.Thread(target=worker)
    threads.append(t)
    t.start()

# Wait for the threads to finish
for t in threads:
    t.join()

"""
Worker thread is waiting at the barrier.
Worker thread is waiting at the barrier.
Worker thread is waiting at the barrier.
Worker thread has passed the barrier and can proceed.
Worker thread has passed the barrier and can proceed.
Worker thread has passed the barrier and can proceed.
"""

        Python 中的线程为并发编程提供了强大的机制,允许多个线程在单个进程中并发执行。它在各种场景中提供了多种好处和用例。线程在处理 I/O 密集型任务(例如网络请求或文件操作)时特别有用,因为它允许程序在等待 I/O 操作完成时有效利用空闲 CPU 时间。

参考资料:

Threading in Python. Essential Parts of Threading in Python | by Okan Yenigün | Dev Genius

10-26 10:10