并发编程

  • 并发(伪):由于执行速度特别快,人感觉不到
  • 并行(真):创建10个人同时操作

线程

开销非常小
是操作系统可以调度的最小单位(内存共享) 能利用多个CPU 由操作系统控制

  1. 单进程,单线程的应用程序
    • print('666')
  2. 到底什么是线程?什么是进程
    • Python自己没有这玩意,Python中调用的操作系统的线程和进程(伪线程)
  3. 多线程
    • 工作的最小单元
    • 共享进程中所有资源
    • 每个线程可以分担一点任务,最终完成最后的结果

python多线程原理:python的多线程实际是一个假的多线程(多个线程在1核CPU上运行,进行快速的切换导致错觉为同时执行)

代码

import threading
def func(arg):
    print(arg)

th = threading.Thread(target=func)
th.start()
print('end')

一个应用程序:软件

  • 默认一个程序只有一个进程
  • 可以有多个进程(默认只有一个),一个进程可以创建多个线程(默认一个)。

Python多线程情况下:

  • 计算密集型操作:效率低(GIL锁)
  • IO操作:效率高

Python多进程情况下:

  • 计算密集型操作:效率高(浪费空间)
  • IO操作:效率高(浪费资源)

以后写Python时:

  • IO密集型用多线程:文件/输入输出/socket
  • 计算密集型用多进程:

进程

  • 独立开辟内存
  • 进程之间的数据隔离

注意:进程是为了提供环境让线程工作

Python中线程和进程(GIL锁)

GIL锁,全局解释器锁。用于限制一个进程中同一个时刻只有一个线程被CPU调度

扩展:默认GIL锁在执行100个cpu指令后(过期时间)。

线程的使用

多线程基本使用

  • 基础例子
import threading
def func(arg):
    print(arg)

th = threading.Thread(target=func)
th.start()
print('end')
  • 测试例子(主线程默认会先执行自己的代码,然后等子线程执行完毕后,才会结束)
import time
import threading

def func(arg):
    time.sleep(arg)
    print(arg)

t1 = threading.Thread(target=func,args=(3,))
t1.start()

t2 = threading.Thread(target=func,args=(9,))
t2.start()

print('end')
  • jojn方法:
import time
import threading

def func(arg):
    time.sleep(3)
    print(arg)

print('开始执行t1')
t1 = threading.Thread(target=func,args=(3,))
t1.start()
# 无参:让主线程等待,等到子线程t1执行完毕后,才能往下执行
# 有参:让主线程在这里最多等待n秒,无论执行完毕与否,会继续往下走
t1.join(2)

print('开始执行t2')
t2 = threading.Thread(target=func,args=(9,))
t2.start()
t2.join()   # 让主线程等待,等到子线程t1执行完毕后,才能往下执行

print('end')
  • 线程名称获取
import threading

def func(arg):
    # 获取当前执行该函数的线程的对象
    t = threading.current_thread()
    # 根据当前线程对象,获取当前线程名称
    name = t.getName()
    print(name,arg)

t1 = threading.Thread(target=func,args=(3,))
t1.setName('mhy')
t1.start()

t2 = threading.Thread(target=func,args=(9,))
t2.setName('zz')
t2.start()

print('end')
  • 线程本质
# 先打印3还是end?
import threading

def func(arg):
    print(arg)

t1 = threading.Thread(target=func,args=(3,))
# start是开始线程嘛?不是
# start是告诉cpu,我已准备就绪,你可以调度我了。
t1.start()

print('end')
  • 面向对象式子多线程使用
import threading
class MyThread(threading.Thread):
    def run(self):
        print(123,self._args,self._kwargs)


t1 = MyThread(args=(11,))
t1.start()

t2 = MyThread(args=(12,))
t2.start()
  • 计算密集型多线程(作用不大)
import threading
def func(lis,num):
    et = [i+num for i in lis]
    print(et)

t1 = threading.Thread(target=func,args=([11,22,33],1))
t1.start()

t2 = threading.Thread(target=func,args=([44,55,66],100))
t2.start()
  • 实例代码
import threading
import time

#定义类继承Thread线程
class CodingThread(threading.Thread):
    def run(self):
        for x in range(3):
            print('正在写代码%s' % threading.current_thread())
            time.sleep(1)

class DrawingThread(threading.Thread):
    def run(self):
        for x in range(3):
            print('正在画图%s' % threading.current_thread())
            time.sleep(1)

def main():
    t1 = CodingThread()
    t2 = DrawingThread()

    t1.start()
    t2.start()

if __name__ == '__main__':
    main()

线程安全(Lock)

  • 线程安全,多线程操作时,内部会让所有线程排队处理,如:list/dict/Queue
  • 线程不安全 + 人(LOCK) = 排队处理

简介:

Python自带的解释器是CPython。CPython解释器的多线程实际上是一个假的多线程(在多核CPU中,只能利用一核,不能利用多核)。同一时刻只有一个线程在执行,为了保证同一时刻只有一个线程在执行,在CPython解释器中有一个东西叫做GIL(Global Intepreter Lock),叫做全局解释器锁。这个解释器锁是有必要的。因为CPython解释器的内存管理不是线程安全的。当然除了CPython解释器,还有其他的解释器,有些解释器是没有GIL锁的,见下面:

  1. Jython:用Java实现的Python解释器。不存在GIL锁。
  2. IronPython:用.net实现的Python解释器。不存在GIL锁。
  3. PyPy:用Python实现的Python解释器。存在GIL锁。

虽然是一个假的多线程。但是在处理一些IO操作(比如文件读写和网络请求)还是可以在很大程度上提高效率的。在IO操作上建议使用多线程提高效率。在一些CPU计算操作上不建议使用多线程,而建议使用多进程。

不安全例子

import time
import threading

lit = []
def func(arg):
    lit.append(arg)
    time.sleep(0.04)
    m = lit[-1]
    print(arg,m)  # m和arg应该是一个值

for num in range(10):
    t1 = threading.Thread(target=func,args=(num,))
    t1.start()

锁机制原理

多线程同时执行,会导致数据同时执行,获取数据不符合结果。

线程先获得执行权时,将会将执行过程锁起来,其他线程不能使用,必须要等该线程执行完,其他线程才可获取执行权,执行线程,将解决数据不统一的方法

Lock 锁(一次放行一个)

import time
import threading

lit = []
lock = threading.Lock()  # 创建一个锁

def func(arg):
    lock.acquire() # 将该行代码以下代码锁起来,直到遇到release释放
    lit.append(arg)
    time.sleep(0.04)
    m = lit[-1]
    print(arg,m)
    lock.release()  # 释放锁

for num in range(10):
    t1 = threading.Thread(target=func,args=(num,))
    t1.start()

RLock 递归锁(一次放行多个),

因为lock锁如果有多层锁机制,会造成死锁现象,所以有了Rlock(递归锁)

代码如下:

import time
import threading

lit = []
lock = threading.RLock()  # 创建一个递归锁

def func(arg):
    # Lock只能解开一个锁,会造成死锁线程
    # RLock可以解开两个锁,解决了下面问题
    lock.acquire() # 将该行代码以下代码锁起来,直到遇到release释放
    lock.acquire()
    lit.append(arg)
    time.sleep(0.04)
    m = lit[-1]
    print(arg,m)
    lock.release()  # 释放锁
    lock.release()  # 释放锁

for num in range(10):
    t1 = threading.Thread(target=func,args=(num,))
    t1.start()

BoundedSemaphore(一次放N个)信号量

import time
import threading

# 创建一个锁,这个可以支持你同时进行几次锁,默认为1次
lock = threading.BoundedSemaphore(3)

def func(arg):
    lock.acquire()
    print(arg)
    time.sleep(1)
    lock.release()

for num in range(20):
    t = threading.Thread(target=func,args=(num,))
    t.start()

Condition(1次放x个数)动态输入

Lock版本的生产者与消费者模式可以正常的运行,但是存在一个不足,在消费者中,总是通过While True死循环并且上锁的方式去判断钱够不够,上锁是一个很耗费CPU资源的行为,因此这种方式不是最好的,还有一种更好的方式便是使用Threading.condition来实现,threading.condition可以在没有数据的时候处于堵塞等待状态,一旦有合适的数据了,还可以使用notify相关的函数来通知其他处于等待状态的线程。这样可以不用做一些无用的上锁和解锁的操作。可以提高程序的性能。首先对threading.Condition相关的函数做个介绍,threading.Condition类似threading.Lock,可以在修改全局数据的时候进行上锁,也可以在修改完毕后进行解锁。以下将一些常用的函数做个简单的介绍:

  1. acquire:上锁
  2. release:解锁
  3. wait:将当前线程处于等待状态,并且会释放锁。可以被其他线程使用notify和notify_all函数唤醒。被唤醒后继续等待上锁,上锁后执行下面的代码。
  4. notify:通知某个等待的线程,默认是第1个等待的线程。
  5. notify_all:通知正在等待的线程。notify和notify_all不会释放锁。并且需要在release之前调用。
# 方法一
import time
import threading

# 创建一个锁,这个可以支持你同时进行几次锁,默认为1次
lock = threading.Condition()

def func(arg):
    print('线程进来了')
    lock.acquire()
    lock.wait()

    print(arg)
    time.sleep(1)
    lock.release()

for num in range(3):
    t = threading.Thread(target=func,args=(num,))
    t.start()

while True:
    num = int(input('>>>:'))
    lock.acquire()
    lock.notify(num)
    lock.release()

# 方法二
import time
import threading

# 创建一个锁,这个可以支持你同时进行几次锁,默认为1次
lock = threading.Condition()

def xxx():
    print('来执行函数了')
    input('>>>:')
    return True

def func(arg):
    print('线程进来了')
    lock.wait_for(xxx)
    print(arg)
    time.sleep(1)

for num in range(3):
    t = threading.Thread(target=func,args=(num,))
    t.start()

event(事件)1次放所有

import threading

# 创建一个锁,这个可以支持你同时进行几次锁,默认为1次
lock = threading.Event()

def func(arg):
    print('线程进来了')
    lock.wait()  # 变红灯
    print(arg)

for num in range(10):
    t = threading.Thread(target=func,args=(num,))
    t.start()

input('>>>')
lock.set()   # 变绿灯
input('>>>')

# 重新回归 红灯状态
lock.clear()
for num in range(10):
    t = threading.Thread(target=func,args=(num,))
    t.start()

input('>>>')
lock.set()   # 变绿灯
input('>>>')

线程总结

线程安全:列表和字典就是线程安全;

为什么要加锁?

  • 非线程安全
  • 控制一段代码的时候,每次只能最多同时执行几个

threading.local

为每一个线程创建一个字典键值对,为数据进行隔离

示例代码:

import time
import threading

pond = threading.local()

def func(arg):
    # 内部会为当前线程创建一个空间用于存储,phone = 自己的值 ,将数据隔离开
    pond.phone = arg
    time.sleep(2)
    # 取当前线程自己空间取值
    print(pond.phone,arg)

for num in range(10):
    t = threading.Thread(target=func,args=(num,))
    t.start()

线程池

使用concurrent来创建线程池,使用线程池可以有效的解决线程无止境的问题,用户每一次一个请求都会创建一个线程,这样线程一堆积也会造成程序缓慢,所以,线程池就可以帮我们解决这个问题,线程池可以设定,最多同时执行n个线程,由自己设定。

示例代码:

import time
from concurrent.futures import ThreadPoolExecutor

# 创建一个线程池,(最多同时执行5个线程)
pool = ThreadPoolExecutor(5)

def func(arg1,arg2):
    time.sleep(1)
    print(arg1,arg2)

for num in range(5):
    # 去线程池申请一个线程,让线程执行func函数
    pool.submit(func,num,8)

Queue线程安全队列:

在线程中,访问一些全局变量,加锁是一个经常的过程。如果你是想把一些数据存储到某个队列中,那么Python内置了一个线程安全的模块叫做queue模块。Python中的queue模块中提供了同步的、线程安全的队列类,包括FIFO(先进先出)队列Queue,LIFO(后入先出)队列LifoQueue。这些队列都实现了锁原语(可以理解为原子操作,即要么不做,要么都做完),能够在多线程中直接使用。可以使用队列来实现线程间的同步。相关的函数如下:

  1. 初始化Queue(maxsize):创建一个先进先出的队列。
  2. qsize():返回队列的大小。
  3. empty():判断队列是否为空。
  4. full():判断队列是否满了。
  5. get():从队列中取最后一个数据。
  6. put():将一个数据放到队列中。

示例代码

 #encoding:utf-8

from queue import Queue
import threading
import time

# q = Queue(4)  # 添加4个队列
# q.put(10)   #第1个队列插入一条数据
# q.put(4)    #第2个队列插入一条数据

# for x in range(4):
#     q.put(x)
#
# for x in range(4):
#     print(q.get())
# print(q.empty())
# print(q.full())
# print(q.qsize())

def set_value(q):
    index = 0
    while True:
        q.put(index)
        index +=1
        time.sleep(2)
def get_value(q):
    while True:
        print(q.get())

def main():
    q = Queue(4)
    t1 = threading.Thread(target=set_value,args=[q])
    t2 = threading.Thread(target=get_value,args=[q])

    t1.start()
    t2.start()

if __name__ == '__main__':
    main()

生产者消费者模型

模型三部件

  • 生产者
    • 队列:先进先出
    • 栈:后进先出
  • 消费者
  • 队列

生产者和消费者模型解决了 不用一直等待的问题

使用Queue创建队列

示例代码:

import time
import threading
from queue import Queue

q = Queue()

def producer(id):
    '''生产者'''
    while True:
        time.sleep(2)
        q.put('包子')
        print('厨师 %s 生产了一个包子'%id)

def consumer(id):
    '''消费者'''
    while True:
        time.sleep(1)
        v1 = q.get()
        print('顾客 %s 吃了一个包子'%id)

for produce in range(1,4):
    t1 = threading.Thread(target=producer,args=(produce,))
    t1.start()

for consu in range(1,3):
    t2 = threading.Thread(target=consumer,args=(consu,))
    t2.start()

总结:

  1. 操作系统帮助开发者操作硬件

  2. 程序员写好代码在操作系统上运行(依赖解释器)

  3. 任务特别多。

    以前一个一个执行,(串行),现在可以使用多线程

为什么创建线程?

  • 由于线程是cpu工作的最小单元,创建线程可以利用多核优势实现并行操作(java,C#)

为啥创建进程?

  • 进程和进程之间做数据隔离(java/C)

Python

  • Python中存在一个GIL锁。
    • 造成:多线程无法利用多核优势
    • 解决:开多进程处理(浪费资源)
    • 总结:
      • IO密集型:多线程
      • 计算密集型:多进程
  • 线程的创建
    • Thread
    • 面向对象继承(Threading.Thread)
  • 其他
    • jojn
    • setDeanon
    • setName
    • threading.current_thread()
    • 获得
    • 释放
12-24 00:16