• Python之并发编程(线程)


    一、线程理论

    '什么是线程'
    进程:资源单位
    线程:执行单位
    	进程相当于车间(一个个空间),线程相当于车间里面的流水线(真正干活的)
     '''一个进程中至少有一个线程'''
     
     """
     进程仅仅是在内存中开辟一块空间(提供线程工作所需的资源)
     线程真正被CPU执行,线程需要的资源跟所在进程的要
     """
     
    '为什么要有线程'
    开设线程的消耗远远小于进程
      开进程
         1.申请内存空间
         2.拷贝代码
      开线程
       	一个进程内可以开设多个线程 无需申请内存空间、拷贝代码
         一个进程内的多个线程数据是共享的
     
    """
    开发一个文本编辑器
    	获取用户输入并实时展示到屏幕上
    	并实时保存到硬盘中
    多种功能应该开设多线程而不是多进程
    """
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    二、开设线程的两种方式

    	"""进程与线程的代码实操几乎是一样的"""
    '''
    创建线程的两种方式,都需要用到
    threading Thread 模块
    '''
    '第一种方式:函数'
    from threading import Thread
    import time
    def task():
        print('start子线程')
        time.sleep(1)
        print('end子线程')
    # 创建线程无需在__main__下面编写,但是为了统一 还是习惯在子代码中写
    
    if __name__ == '__main__':
        t = Thread(target=task,)
        t.start()  # 创建线程的开销极小,几乎一瞬间就可以创建
        t.join()
        print('执行主线程')
    
    '第二种方式:类'
    from threading import Thread
    import time
    
    class Mythread(Thread):
        def __init__(self,name):
            super().__init__()
            self.name = name
    
        def run(self):
            print(f"{self.name}正在运行")
            time.sleep(1)
            print(f"{self.name}运行结束")
    
    
    obj = Mythread('jack')
    obj.start()
    print('主线程')
    
    '''
    输出的结果为:jack正在运行  主线程  jack运行结束
    '''
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42

    三、线程实现TCP服务端并发

    '比多进程更加简单方便 消耗的资源更少'
    '仔细体会一下线程与进程的本质区别'
    
    '服务端'
    import socket
    from threading import Thread
    
    # 生成一个socket对象 ,设置类型和协议
    server = socket.socket(family = socket.AF_INET,type = socket.SOCK_STREAM)
    
    # 绑定IP地址和端口号
    server.bind(('127.0.0.1',12345))
    
    # 同时监听 半连接池 3个客户端
    server.listen(3)
    
    def talk(sock):  # 设置一个函数来接收数据及发送数据
        while True:
            # 接收客户端的数据
            data = sock.recv(2048)  # 单次接收最大字节数
            print(f"接收到客户端的数据:",data.decode('utf-8'))
    
            # 回应客户端的数据
            sock.send(data.upper())  # 转换大写回复数据
    
    
    while True:
        # 等待客户端的连接
        sock,addr = server.accept()
        # 开设线程去完成数据交互
        t = Thread(target=talk,args=(sock,))
        t.start()
    
    
    '客户端'
    import socket
    # 生成socket对象 设置类型和协议
    client = socket.socket()
    
    # 连接服务端IP地址和端口号
    client.connect(('127.0.0.1',12345))
    
    while True:
        # 发送数据到服务端
        client.send(b'hello world')
    
        # 接收服务端发送的数据
        data = client.recv(2048)  # 接收的最大字节数
        print(f"接收服务端的数据:{data.decode('utf-8')}")
    
    client.close()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51

    四、创建进程与线程的速度比较

    等所有的进程或线程结束后再统计总共花的时间

    1.进程

    from multiprocessing import Process
    import time
    
    def task(name):
        print(f'{name} is running')
        time.sleep(0.1)
        print(f'{name} is over')
    if __name__ == '__main__':
        start_time = time.time()
        p_list = []
        for i in range(100):
            p = Process(target=task,args=('用户%s'%i,))
            p.start()
            p_list.append(p)
        for p in p_list:
            p.join()
        print(time.time()-start_time)
        
        代码展示:
            用户0 is running
            用户1 is running
            ....
            用户15 is running
            用户0 is over
            用户1 is over
            用户16 is running
            .....
            用户99 is over
            用户98 is over
            0.8929297924041748
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    2.线程

    from threading import Thread
    import time
    
    def task(name):
        print(f'{name} is running')
        time.sleep(0.1)
        print(f'{name} is over')
    if __name__ == '__main__':
        start_time = time.time()
        t_list = []
        for i in range(100):
            t = Thread(target=task,args=('用户%s'%i,))
            t.start()
            t_list.append(t)
        for t in t_list:
            t.join()
        print(time.time()-start_time)
        
        代码展示:
            用户0 is running
            用户1 is running
            ....
            用户63 is over
    
            用户0 is over
    
            0.11278319358825684
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    五、线程的相关方法

    1.线程的join方法

    '主线程等到子线程运行结束之后再运行(跟主进程一个道理)'
    from threading import Thread
    import time
    def task():
        print('start子线程')
        time.sleep(1)
        print('end子线程')
    
    
    t = Thread(target=task,)
    t.start()
    t.join()
    
    print('执行主线程')
    
    执行结果:
    >>>:start子线程
    >>>:end子线程
    >>>:执行主线程
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    2.同一进程内的多个线程数据共享

    因为是同一个进程,空间的数据是共享的

    from threading import Thread
    
    money = 6666
    def task():
        global money  # 同一个进程下 线程之间数据共享 
        money  = 8888
    
    if __name__ == '__main__':
        t = Thread(target=task,)
        t.start()
        t.join()  # 确保子线程完毕后,再查找money变量,使结果更具说服性
    
        print(money)  # 输出8888
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    3.其他方法

    	1.进程号
    		同一个进程下开设的多个线程拥有相同的进程号
    	2.线程号
    		需要使用模块 from threading import Thread,current_thread
    		current_thread().name
    		主:MainThread  子:Thread-N
    	3.进程下的线程数 模块 active_count
    		active_count()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    from threading import Thread,current_thread,active_count
    import os
    
    money = 6666
    def task():
        global money  # 同一个进程下 线程之间数据共享
        money = 8888
        print('子线程号',current_thread())
    
    
    if __name__ == '__main__':
        t = Thread(target=task,)
        t.start()
        t.join()  # 确保子线程完毕后,再查找money变量,使结果更具说服性
    
        print(money)  # 输出8888
        print(current_thread().name)
        print('主线程进程号',os.getpid())
        print('进程下的线程数:',active_count())
    
    '''
    一般存活的线程数都是1,因为for循环结束之后只剩下内存自带的一个主线程,
    就算是空代码也有一个线程,如果想统计所有的线程数的话,就必须添加了time.sleep(3),
    '''
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    六、守护线程

    '''
    守护线程伴随着被守护的线程的结束而结束
        进程下所有的守护线程 主线程(主进程)结束 所有线程结束全部直接结束
        进程下所有的非守护线程结束 主线程(主进程)才能真正结束!!!(没有被守护的结束了主线程才结束)
    '''
    
    from threading import Thread
    import time,random
    def task():
        print('执行子线程')
        time.sleep(random.randint(1, 3))
    
    
    if __name__ == '__main__':
        t = Thread(target=task,)
        t.daemon = True
        t.start()
        t.join()
        print('执行主线程')
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    七、线程队列

    为什么线程中还使用队列?

    为什么线程中还有使用队列?
        同一个进程下多个线程数据是共享的
        为什么先同一个进程下还会去使用队列呢
        因为是队列是:管道+'所以用队列还是为了保证数据的安全'
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    当信息必须安全的在多线程之间交换时,该模块可以很好的作为中间的桥梁

    该模块主要实现了三种类型的队列
    1.FIFO:先进先出(队列)
    2.LIFO:后进先出(堆栈)
    3.Priority:优先级队列

    1.先进先出

    import queue
    # 队列:先进先出
    q = queue.Queue()  # 括号内加入数字,表示指定可入队数量,不添加默认无上限
    
    # 入队
    q.put(1)
    q.put(2)
    q.put(3)
    
    # 出列
    print(q.get())  # 1
    print(q.get())  # 2
    print(q.get())  # 3
    # print(q.get())  # 当队列为空时,get就会进入阻塞状态。
    
    # 在get属性中,如果队列不添加数字无限大时,不生效
    # print(q.get(block=False))  # blcok=False表示队列满载,则直接报错
    #
    print(q.get(timeout=1))  # timeout=1表示:1s内如果没有get到数据,则报错
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    2.后进先出

    import queue
    q = queue.LifoQueue()
    
    q.put(1)
    q.put(2)
    q.put(3)
    
    print(q.get())  # 3
    print(q.get())  # 2
    print(q.get())  # 1
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    3.优先级队列

    '优先级队列,入队时需要设置数字代表优先级,数字越低,优先级越高。并以元组形式入队'
    import queue
    q = queue.PriorityQueue()
    
    '第一个参数必须是数字,代表优先级'
    q.put((10,'元素1'))
    q.put((50,'元素2'))
    q.put((-10,'元素3'))
    
    '按照优先级从高到低出队'
    print(q.get())  # (-10, '元素3')
    print(q.get())  # (10, '元素1')
    print(q.get())  # (50, '元素2')
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    常用方法(三种类型通用)

    import queue
    q = queue.Queue(3)
    print(q.empty())  # 当队列为空,返回True,不为空则返回False
    
    print(q.full())  # 队列满了,返回True,否则返回False
    
    # q.put() # 入队
    # q.get() # 出队
    
    
    q.task_done()  # 通常在q.get()后使用,它主要向队列发送信号,已取出一个数据
    q.join()  # 阻塞当前线程,如果q.task_done发出信号的次数,与put入队的次数相同,则取消阻塞
    
    print(q.maxsize)  # 可入队个数,如果未指定,则为0
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    八、进程池与线程池的使用

    我们在实际应用中是不是可以无限制的开进程和线程?
        肯定是不可也的 会造成内存溢出受限于硬件水平
        我们在开设多进程或者多线程的时候,还需要考虑硬件的承受范围
    
    '''    
    池:
        池子、容器类型、可以盛放多个元素
        降低程序的执行效率,保证计算机硬件的安全
    
    进程池:
        提前创建好固定个数的进程供程序使用,后续不再创建
        
        (提前定义好一个池子,然后,往这个池子里面添加进程,以后,只需要往这个进程
        池里面丢任务就行了,然后,有这个进程池里面的任意一个进程来执行任务)
    
    线程池:
        提前创建好固定个数的线程供程序使用,后续不再创建
        
        (提前定义好一个池子,然后,往这个池子里面添加线程,以后,只需要往这个线程
        池里面丢任务就行了,然后,有这个线程池里面的任意一个线程来执行任务)
    '''
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
    from threading import current_thread
    import os,time
    
    # 产生含有固定数量线程或进程池
    pool = ThreadPoolExecutor(5)  # 固定产生五个线程
    '默认状态线程池默认是CPU个数的五倍'
    '''
    上面的代码执行之后就会立刻创建五个等待工作的线程
    不应该自己主动等待结果,应该让异步提交自动提醒>>:异步回调机制
    '''
    
    def task(n, m):  # 括号内可以
        return n+m
    
    def func():
        return {'username': 'jack', 'password': 123}
    
    def callback(res):
        # print(res)
        print(res.result())  # {'username': 'jack', 'password': 123}
        print(res.result().get('username'))  # jack
    
    if __name__ == '__main__':
        res = pool.submit(task, 1, 2)  # 朝线程池中提交任务(异步)
        print(res.result())  # 同步提交
    
        pool.submit(func).add_done_callback(callback)
        # 'add_done_callback只要任务有结果了,就会自动调用括号内的函数处理'
    
        pool.shutdown()
    
        print('主线程')
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
  • 相关阅读:
    Android -- 每日一问:怎么理解 Activity 的生命周期?
    IDEA -- bug笔记
    从零快速搭建仿抖音短视频APP-后端开发粉丝业务模块(4)
    羽夏看Linux内核——环境搭建
    顶级论文创新点怎么找?中国高校首次获CVPR最佳学生论文奖有感
    Metasploit(msf)利用ms17_010(永恒之蓝)出现Encoding::UndefinedConversionError问题
    【路径最全用法】python代码讲解os.path包的最全用法
    自回归策略是什么
    电子电器架构 —— 车载网关边缘节点路由转发策略
    基于SSM的小区疫情防控管理系统设计与实现
  • 原文地址:https://blog.csdn.net/achen_m/article/details/133914747