Python之路:(十五)进程、线程和协程

Python线程

python中threading模块用于提供线程相关的操作,线程是应用程序中工作的最小单元,擅长IO操作

import threading
import time

# 创建一个任务
def show(arg):
    time.sleep(1)
    print('thread'+str(arg))

# 循环创建10个线程去并发都去执行这个任务
for i in range(10):
    # 创建线程,target=函数,去执行这个函数 args=参数,给这个函数传的参数
    t = threading.Thread(target=show, args=(i,))
    # 运行线程
    t.start()

print('main thread stop')

上述代码创建了10个“前台”线程,然后控制器就交给了CPU,CPU根据指定算法进行调度,分片执行指令。
更多方法:

start

线程准备就绪,等待CPU调度

setName

为线程设置名称

getName

获取线程名称

setDaemon

设置为后台线程或前台线程(默认)

  • 如果是后台线程,主线程执行过程中,后台线程也在进行,主线程执行完毕后,后台线程不论成功与否,均停止
  • 如果是前台线程,主线程执行过程中,前台线程也在进行,主线程执行完毕后,等待前台线程也执行完成后,程序停止

join

逐个执行每个线程,执行完毕后继续往下执行,该方法使得多线程变得无意义

run

线程被cpu调度后自动执行线程对象的run方法

自己创建一个线程类

# 自己创建一个线程类,继承threading.Thread类, 重写run方法,start()方法调用内部run()方法
class MyThread(threading.Thread):

    # 初始化接受两个参数
    def __init__(self, func, args):
        self.func = func
        self.args = args

        # super主动调用父类__init__()方法
        super(MyThread, self).__init__()

    # 执行start()方法,我继承类Thread类,它会调用自己的run方法,
    # 我重写了run方法,根据类的继承关系,执行方法先找自己,如果自己有则我自己写的run方法
    def run(self):
        self.func(self.args)


def f2(arg):
    print(arg)


for i in range(10):
    obj = MyThread(f2, i)
    obj.start()

从上面的例子可以更好的理解上面的threading.Threadrun()方法

线程锁(Lock、RLock)

由于线程之间是进行随机调度,并且每个线程可能只执行n条执行之后,当多个线程同时修改同一条数据时可能会出现脏数据,所以,出现了线程锁 - 同一时刻允许一个线程执行操作。

未加锁

import threading
import time

NUM = 10

def func(arg):
    global NUM
    NUM -= 1
    time.sleep(1)
    print(NUM)

for i in range(10):
    t = threading.Thread(target=func, args=(i, ))
    t.start()

加锁版本(一)

import threading
import time

NUM = 10

def func(lock):
    global NUM
    #上锁
    lock.acquire()
    NUM -= 1
    time.sleep(2)
    print(NUM)
    # 开锁
    lock.release()

lock = threading.Lock()  # 创建锁对象(只能锁一次)
# lock = threading.RLock()   # 创建锁对象(能迭代锁多次)

for i in range(10):
    # 把所对象当作参数传递到任务函数里面
    t = threading.Thread(target=func, args=(lock, ))
    t.start()

加锁版本(二)

import threading
import time

NUM = 10

def func(lock):
    global NUM
    #上锁
    lock.acquire()  # 锁1,锁住
    NUM -= 1
    lock.acquire()  # 锁2,锁住
    time.sleep(2)
    lock.release()  # 锁2,解锁
    print(NUM)
    # 开锁
    lock.release()  # 锁1,解锁

# lock = threading.Lock()  # 创建锁对象(只能锁一次)
lock = threading.RLock()   # 创建锁对象(能迭代锁多次)

for i in range(10):
    # 把所对象当作参数传递到任务函数里面
    t = threading.Thread(target=func, args=(lock, ))
    t.start()

信号量(Semaphore)

互斥锁 同时只允许一个线程更改数据,而Semaphore是同时允许一定数量的线程更改数据 ,比如厕所有3个坑,那最多只允许3个人上厕所,后面的人只能等里面有人出来了才能再进去。

import threading
import time

NUM = 10

def func(i,lock):
    global NUM
    # 上锁5
    lock.acquire()
    NUM -= 1
    time.sleep(1)
    print(NUM, i)
    # 开锁5
    lock.release()

lock = threading.BoundedSemaphore(5)   # 一次锁多个,释放多个

for i in range(10):
    t = threading.Thread(target=func, args=(i, lock, ))
    t.start()

事件(event)

python线程的事件用于主线程控制其他线程的执行,事件主要提供了三个方法 setwaitclear

事件处理的机制:全局定义了一个“Flag”,如果“Flag”值为 False,那么当程序执行到 event.wait(检查) 方法时就会阻塞,如果“Flag”值为True,那么event.wait 方法时便不再阻塞。

  • clear:将“Flag”设置为False
  • set:将“Flag”设置为True
import threading


def func(i, e):
    print(i)
    e.wait()   # 检测是什么灯,如果是红灯停;绿灯行
    print(i+100)


event = threading.Event()

for i in range(10):
    t = threading.Thread(target=func, args=(i, event, ))
    t.start()

event.clear()  # 设置成红灯
inp = input(">>>")
if inp == "1":
    event.set()  # 设置成绿灯

条件(Condition)

使得线程等待,只有满足某条件时,才释放n个线程

# 输入多少放出多少
import threading
def func(i, con):
    print(i)
    con.acquire()
    con.wait()    # 对应到notify的输入多少放出多少
    print(i+100)
    con.release()


c = threading.Condition()

for i in range(10):
    t = threading.Thread(target=func, args=(i, c, ))
    t.start()


while True:
    inp = input('>>>')
    if inp == 'q':
        break
    c.acquire()
    c.notify(int(inp))
    c.release()
# 当某个执行结果为真就放出一个
import threading


def condition():
    ret = False
    r = input('>>>')
    if r == '1':
        ret = True
    else:
        ret = False
    return ret


def func(i, con):
    print(i)
    con.acquire()
    con.wait_for(condition)  # 检测condition函数执行结果,True放出一个
    print(i+100)
    con.release()

con = threading.Condition()

for i in range(10):
    t = threading.Thread(target=func, args=(i, con, ))
    t.start()

Timer

# 设置这个线程执行一次要多久
from threading import Timer


def hello():
    print("hello, world")

t = Timer(10, hello)  # hello任务执行设置执行10秒
t.start()  # after 1 seconds, "hello, world" will be printed

线程池

什么是线程池?
诸如web服务器、数据库服务器、文件服务器和邮件服务器等许多服务器应用都面向处理来自某些远程来源的大量短小的任务
构建服务器应用程序的一个过于简单的模型是:每当一个请求到达就创建一个新的服务对象,然后在新的服务对象中为请求服务。
但当有大量请求并发访问时,服务器不断的创建和销毁对象的开销很大。
所以提高服务器效率的一个手段就是尽可能减少创建和销毁对象的次数,特别是一些很耗资源的对象创建和销毁,这样就引入了“池”的概念,
“池”的概念使得人们可以定制一定量的资源,然后对这些资源进行复用,而不是频繁的创建和销毁。

线程池的注意事项:
虽然线程池是构建多线程应用程序的强大机制,但使用它并不是没有风险的。在使用线程池时需注意线程池大小与性能的关系,注意并发风险、死锁、资源不足和线程泄漏等问题。

  1. 线程池大小。多线程应用并非线程越多越好,需要根据系统运行的软硬件环境以及应用本身的特点决定线程池的大小。一般来说,如果代码结构合理的话,线程数目与CPU 数量相适合即可。
    如果线程运行时可能出现阻塞现象,可相应增加池的大小;如有必要可采用自适应算法来动态调整线程池的大小,以提高CPU 的有效利用率和系统的整体性能。

  2. 并发错误。多线程应用要特别注意并发错误,要从逻辑上保证程序的正确性,注意避免死锁现象的发生。

  3. 线程泄漏。这是线程池应用中一个严重的问题,当任务执行完毕而线程没能返回池中就会发生线程泄漏现象。

线程池需求

  1. 能够调整线程池大小
    比如:
    任务数是3,进程池20 ,那么咱们只需要开启3个线程就行了。
    任务数是500,进程池是20,那么咱们只开20个线程就可以了。
  1. 实现线程池正在运行,有一个查看的功能,查看一下现在线程里面活跃的线程是多少等待的是多少?
    线程总共是多少,等待中多少,正在运行中多少
    作用:
    方便查看当前线程池状态
    能获取到这个之后就可以当线程一直处于空闲状态
    查看状态用:上下文管理来做,非常nice的一点

  2. 关闭线程

自己实现线程池

import queue
import threading
import contextlib
import time

StopEvent = object()


class ThreadPool(object):

    def __init__(self, max_num):
        # 创建任务队列
        self.q = queue.Queue()
        # 获取最大数设置
        self.max_num = max_num

        self.terminal = False
        # 创建任务线程的list
        self.generate_list = []
        # 创建存放空闲进程的list
        self.free_list = []

    def run(self, func, args, callback=None):
        """
        线程池执行一个任务
        :param func: 任务函数
        :param args: 任务函数所需参数
        :param callback: 任务执行失败或成功后执行的回调函数,回调函数有两个参数1、任务函数执行状态;2、任务函数返回值(默认为None,即:不执行回调函数)
        :return: 如果线程池已经终止,则返回True否则None
        """
        # 空闲的线程等于0,并且已经生成的线程小于线程最大设置就生成一个线程
        if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:
            # 创建线程
            self.generate_thread()
        # 获取的任务
        w = (func, args, callback,)
        # 放进队列里面
        self.q.put(w)

    def generate_thread(self):
        """
        创建一个线程
        """
        # 创建一个线程去队列中取一个任务执行
        t = threading.Thread(target=self.call)
        # 运行线程
        t.start()

    def call(self):
        """
        循环去获取任务函数并执行任务函数
        """
        # 创建线程
        current_thread = threading.currentThread
        # 将线程加到任务线程列表中
        self.generate_list.append(current_thread)
        # 从队列中获取任务
        event = self.q.get()
        while event != StopEvent:
            # 得到任务元祖(函数,参数,函数名)
            func, arguments, callback = event
            try:
                # 执行函数得到返回值结果
                result = func(*arguments)
                success = True
            except Exception as e:
                success = False
                result = None
            # 如果有第二个函数
            if callback is not None:
                try:
                    # 执行第二个函数
                    callback(success, result)
                except Exception as e:
                    pass
            # 线程执行完任务放到调用任务状态转换得方法,将执行完的线程放到空闲的list中
            with self.worker_state(self.free_list, current_thread):
                # 判断是否终止
                if self.terminal:
                    # 终止循环
                    event = StopEvent
                else:
                    # 有则继续取任务,执行任务
                    event = self.q.get()
        else:
            # 线程执行完从已经创建得线程列表中删除
            self.generate_list.remove(current_thread)

    def close(self):
        """
        执行完所有的任务后,所有线程停止
        """
        full_size = len(self.generate_list)
        while full_size:
            self.q.put(StopEvent)
            full_size -= 1

    def terminate(self):
        """
        无论是否还有任务,终止线程
        """
        self.terminal = True

        while self.generate_list:
            self.q.put(StopEvent)

        self.q.empty()


    @contextlib.contextmanager
    def worker_state(self, state_list, worker_thread):
        """
        用于记录线程中正在等待的线程数
        """
        state_list.append(worker_thread)
        try:
            yield
        finally:
            state_list.remove(worker_thread)

Python进程

# 创建进程
from multiprocessing import Process


def foo(i):
    print('say hi',i)

for i in range(10):
    p = Process(target=foo,args=(i,))
    p.start()

注意:由于进程之间的数据需要各自持有一份,所以创建进程需要的非常大的开销。并且python不能再Windows下创建进程!

  • 并且在使用多进程的时候,最好是创建多少个进程?:和CPU核数相等

进程数据共享

进程各自持有一份数据,默认无法共享数据,就得有个特殊的数据结构,这个数据结构就可以理解为他有穿墙的功能
如果你能穿墙的话两边就都可以使用了
使用了3种方法

# 进程间默认无法数据共享
from multiprocessing import Process

li = []


def foo(i):
    li.append(i)
    print('say hi', li)

for i in range(10):
    p = Process(target=foo, args=(i,))
    p.start()

# 进程中数据不共享,这里主进程的li列表中还是空
print('ending', li)

Array(数组)

from multiprocessing import Process, Array
#通过特殊的数据结构:数组(Array)
temp = Array('i', [11, 22, 33, 44])   # 这里的i是C语言中的数据结构,通过他来定义你要共享的内容的类型!

# 创建一个只包含数字类型的数组(python中叫列表)
# 并且数组是不可变的,在C,或其他语言中,数组是不可变的,之后再python中数组(列表)是可以变得
# 当然其他语言中也提供可变的数组
# 在C语言中数组和字符串是一样的,如果定义一个列表,如果可以增加,那么我需要在你内存地址后面再开辟一块空间,那我给你预留多少呢?
#在python中的list可能用链表来做的,我记录了你前面和后面是谁。   列表不是连续的,数组是连续的

def Foo(i):
    temp[i] = 100+i
    for item in temp:
        # 打印子进程中得value
        print(i, '----->', item)

for i in range(2):
    p = Process(target=Foo, args=(i,))
    p.start()

Array类型定义对应表

'c': ctypes.c_char,  'u': ctypes.c_wchar,
'b': ctypes.c_byte,  'B': ctypes.c_ubyte,
'h': ctypes.c_short, 'H': ctypes.c_ushort,
'i': ctypes.c_int,   'I': ctypes.c_uint,
'l': ctypes.c_long,  'L': ctypes.c_ulong,
'f': ctypes.c_float, 'd': ctypes.c_double

manage.dict()共享数据

from multiprocessing import Process, Manager   # 这个特殊的数据类型Manager

manage = Manager()
dic = manage.dict()  # 这里调用的时候,使用字典,这个字典和咱们python使用方法是一样的!


def Foo(i):
    dic[i] = 100+i
    print(dic.values())

for i in range(10):
    p = Process(target=Foo, args=(i,))
    p.start()
    p.join()

Queue队列

from multiprocessing import Process, Queue


def f(i,q):
    print(i, q.get())

if __name__ == '__main__':
    q = Queue()
    q.put("h1")
    q.put("h2")
    q.put("h3")

    for i in range(10):
        p = Process(target=f, args=(i, q,))
        p.start()

进程锁

当创建进程时(非使用时),共享数据会被拿到子进程中,当进程中执行完毕后,再赋值给原值。

# 进程锁
from multiprocessing import Process, Array, RLock


def Foo(lock, temp, i):
    """
    将第0个数加100
    """
    lock.acquire()
    temp[0] += 100
    for item in temp:
        print(i, '----->', item)
    lock.release()

lock = RLock()
temp = Array('i', [11, 22, 33, 44])

for i in range(10):
    p = Process(target=Foo,args=(lock, temp, i,))
    p.start()

进程池

进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止。

进程池中有两个方法:

  • apply
  • apply_async
from multiprocessing import Process, Pool
import time


def Foo(i):
    time.sleep(2)
    return i+100


def Bar(arg):
    print(arg)

pool = Pool(5)  # 创建一个进程池
# print(pool.apply(Foo,(1,)))  #去进程池里去申请一个进程去执行Foo方法
# print(pool.apply_async(func=Foo, args=(1,)).get())  # 同上一个意思

for i in range(10):
    # print(pool.apply(Foo, (i,)))
    pool.apply_async(func=Foo, args=(i,),callback=Bar)

print('end')
pool.close()
# pool.terminate()  # 立即结束
pool.join()  # 主进程等待进程池中子进程执行完毕后再关闭,如果注释,那么程序直接关闭。

'''
pool.apply 主动的去执行
pool.apply_async(func=Foo, args=(i,),callback=Bar) 相当于异步,当申请一个线程之后,执行FOO方法就不管了,执行完之后就再执行callback ,当你执行完之后,再执行一个方法告诉我执行完了
callback 有个函数,这个函数就是操作的Foo函数的返回值!
'''

Python协程

首先要明确,线程和进程都是系统帮咱们开辟的,不管是thread还是process他内部都是调用的系统的API
而对于协程来说它和系统毫无关系!
他就和程序员有关系,对于线程和进程来说,调度是由CPU来决定调度的!
对于协程来说,程序员就是上帝,你想让谁执行到哪里他就执行到哪里

协程存在的意义:对于多线程应用,CPU通过切片的方式来切换线程间的执行,线程切换时需要耗时(保存状态,下次继续)。协程,则只使用一个线程,在一个线程中规定某个代码块执行顺序。

适用场景:其实在其他语言中,协程的意义其实不大,多线程即可已解决I/O的问题,但是在python因为他有GIL(Global Interpreter Lock 全局解释器锁 )在同一时间只有一个线程在工作,所以:如果一个线程里面I/O操作特别多,协程就比较适用

greenlet

from greenlet import greenlet


def test1():
    print(12)      # (1)
    gr2.switch()   # 遇到这个跳到下一个
    print(34)      # (3)
    gr2.switch()


def test2():
    print(56)      # (2)
    gr1.switch()
    print(78)      # (4)

# 创建两个协程,协程运行遇到IO操作需要等待得时候执行另一个协程
gr1 = greenlet(test1)
gr2 = greenlet(test2)
gr1.switch()  # 执行第一个协程

'''
比I/O操作,如果10个I/O,我程序从上往下执行,如果同时发出去了10个I/O操作,那么返回的结果如果同时回来了2个
,是不是就节省了很多时间?

如果一个线程里面I/O操作特别多,使用协程是不是就非常适用了!

如果一个线程访问URL通过协程来做,协程告诉它你去请求吧,然后继续执行,但是如果不用协程就得等待第一个请求完毕之后返回之后才
继续下一个请求。

协程:把一个线程分成了多个协程操作,每个协程做操作
多线程:是把每一个操作,分为多个线程做操作,但是python中,在同一时刻只能有一个线程操作,并且有上下文切换。但是如果上下文切换非常频繁的话
是非常耗时的,但对于协程切换就非常轻便了~
'''

协程就是对线程的分片,上面的例子需要手动操作可能用处不是很大,助于了解原理,看下面的例子:

gevent

上面的greenlet是需要认为的制定调度顺序的,所以又出了一个gevent他是对greenlet功能进行封装,遇到I/O自动切换

from gevent import monkey; monkey.patch_all()
import gevent
import requests


def f(url):
    print('GET: %s' % url)
    resp = requests.get(url)
    data = resp.text
    print('%d bytes received from %s.' % (len(data), url))


gevent.joinall([
    gevent.spawn(f, 'https://www.python.org/'),  # 这里的f是调用的任务,第二个是给这个任务传的参数
    gevent.spawn(f, 'https://www.yahoo.com/'),
    gevent.spawn(f, 'https://www.github.com/'),
])

'''
当遇到I/O操作的时候就会调用协程操作,然后继续往下走,然后这个协程就卡在这里等待数据的返回
'''
文章目录
  1. 1. Python线程
    1. 1.1. start
    2. 1.2. setName
    3. 1.3. getName
    4. 1.4. setDaemon
    5. 1.5. join
    6. 1.6. run
      1. 1.6.1. 自己创建一个线程类
    7. 1.7. 线程锁(Lock、RLock)
      1. 1.7.1. 未加锁
      2. 1.7.2. 加锁版本(一)
      3. 1.7.3. 加锁版本(二)
    8. 1.8. 信号量(Semaphore)
    9. 1.9. 事件(event)
    10. 1.10. 条件(Condition)
    11. 1.11. Timer
    12. 1.12. 线程池
      1. 1.12.1. 线程池需求
      2. 1.12.2. 自己实现线程池
  2. 2. Python进程
    1. 2.1. 进程数据共享
    2. 2.2. Array(数组)
      1. 2.2.1. Array类型定义对应表
    3. 2.3. manage.dict()共享数据
    4. 2.4. Queue队列
    5. 2.5. 进程锁
    6. 2.6. 进程池
  3. 3. Python协程
    1. 3.1. greenlet
    2. 3.2. gevent
|