Python核心:进程&线程

1.进程和线程

  • 进程&线程(概念)

    执行一个python程序,默认会创建一个进程,在一个进程里又会创建一个线程。线程是真正工作的单位,而进程是为线程提供资源的单位。

    进程是CPU内核资源分配的最小单位,线程是CPU内核调度的最小单位。

    一个进程中可以有多个线程,同一个进程中的线程可以共享此进程中的资源

  • 创建方式

    • 创建线程

      • 方式一:
      1
      2
      3
      4
      5
      6
      7
      8
      import threading

      def func(a1,a2,a3):
      pass
      # 创建线程
      t = threading.Thread(target=func,args=(11,22,33))
      # 线程开始工作
      t.start()
- 方式二:自定义线程类

1
2
3
4
5
6
7
8
import threading

class MyThread(threading.Thread):
def run(self):
print('执行此线程', self._args)

t = MyThread(args=(100,))
t.start()
  • 创建进程

    • 方式一
    1
    2
    3
    4
    import multiprocessing
    # Linux系统fork; win:spawn; mac支持fork和spawn(python3.8默认设置spawn)
    t = multiprocessing.Process(target=func,args=(11,22,33))
    t.start()

    如果是spawn模式,进程的创建和执行 都需要放在main函数中。否则会报错,让放在main中。

    mac系统指定fork模式时,写上multiprocessing.set_start_method('fork')

    • 方式二: 自定义进程类
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    import multiprocessing


    class MyProcess(multiprocessing.Process):
    def run(self):
    print('执行此进程', self._args)


    if __name__ == '__main__':
    multiprocessing.set_start_method("spawn")
    p = MyProcess(args=('xxx',))
    p.start()
    print("继续执行...")

多进程比多线程的开销大,那是不是证明多线程要比多进程好呢?

2.GIL锁

GIL, 全局解释器锁(Global Interpreter Lock),是CPython解释器特有一个玩意,让一个进程中同一个时刻只能有一个线程可以被CPU调用。

GIL只保证同一时刻一个线程在运行,保证不了数据安全。

由于GIL锁的存在,控制一个进程中同一时刻只有一个线程可以被CPU调度。所以

- 计算密集型,适合多进程开发
- IO密集型,适合多线程开发

image-20210218184651385

如果程序想利用 计算机的多核优势,让CPU同时处理一些任务,适合用多进程开发(即使资源开销大)。

image-20210218185849637

如果程序不利用 计算机的多核优势,适合用多线程开发。

image-20210218185953326

常见的程序开发中,计算操作需要使用CPU多核优势,IO操作不需要利用CPU的多核优势,所以,就有这一句话:

  • 计算密集型,用多进程,例如:大量的数据计算【累加计算示例】。

  • IO密集型,用多线程,例如:文件读写、网络数据传输【下载抖音视频示例】。

1.进程

1.1 多线程

线程的常见方法:

  • t.start(),当前线程准备就绪(等待CPU调度,具体怎么调度由CPU决定)

  • t.join(),等待当前线程的任务执行完毕后,再往下执行

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    import threading

    loop = 10000000
    number = 0


    def _add(count):
    global number
    for i in range(count):
    number += 1


    def _sub(count):
    global number
    for i in range(count):
    number -= 1

    # 情况一:t1先执行,执行完成之后t2开始执行
    t1 = threading.Thread(target=_add)
    t2 = threading.Thread(target=_sub)
    t1.start()
    t1.join() # t1线程执行完毕,才继续往后走
    t2.start()
    t2.join() # t2线程执行完毕,才继续往后走

    # 情况二:t1、t2交替执行
    #(怎么交替,由CPU调度决定。可以先执行2个t1 再执行3个t2,调度决定,同一时刻只有一个线程在运行。而且可能t1还没执行完,横跳到t2线程)
    t1 = threading.Thread(target=_add, args=(loop,))
    t2 = threading.Thread(target=_sub, args=(loop,))
    t1.start()
    t2.start()
    t1.join() # t1线程执行完毕,才继续往后走
    t2.join() # t2线程执行完毕,才继续往后走
  • t.setDaemon(布尔值),守护线程(必须放在start之前)

    • t.setDaemon(True),设置为守护线程,主线程执行完毕后,子线程也自动关闭。
    • t.setDaemon(False),设置为非守护线程,主线程等待子线程,子线程执行完毕后,主线程才结束。(默认)
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    import threading
    import time

    def task(arg):
    time.sleep(5)
    print('任务')

    t = threading.Thread(target=task, args=(11,))
    t.setDaemon(True) # True/False
    t.start()

    print('END')
    # True的时候 输出END 之后结束。(不等待t线程,此时 任务 可能输出了,也可能没来得及输出)
    # False的时候 先输出END 再等待5s 最后输出 任务

    后续3.10版本,弃用了该方法

  • t.name()设置线程名称,需要放在start()方法前

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    import threading

    def task(arg):
    # 获取当前执行此代码的线程
    name = threading.current_thread().name()
    print(name)

    for i in range(10):
    t = threading.Thread(target=task, args=(11,))
    t.name('日魔-{}'.format(i))
    t.start()

1.2 线程安全

  • 方式一:手动加锁、释放锁

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    import threading
    # 定义锁对象。(线程中的锁 必须是同一把锁)
    lock_object = threading.RLock()

    loop = 10000000
    number = 0

    def _add(count):
    lock_object.acquire() # 加锁
    global number
    for i in range(count):
    number += 1
    lock_object.release() # 释放锁

    def _sub(count):
    lock_object.acquire() # 申请锁(等待)
    global number
    for i in range(count):
    number -= 1
    lock_object.release() # 释放锁

    t1 = threading.Thread(target=_add, args=(loop,))
    t2 = threading.Thread(target=_sub, args=(loop,))
    t1.start()
    t2.start()

    t1.join() # t1线程执行完毕,才继续往后走
    t2.join() # t2线程执行完毕,才继续往后走

    加锁之后,一个t1线程在执行中 一个t2线程必须等待。不会出现还没执行完 就反复横跳 保证了数据安全。

  • 方式二:基于上下文,自动加锁和释放锁

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    import threading
    num = 0
    lock_object = threading.RLock()

    def task():
    print('开始')
    with lock_object: # 基于上下文 内部自动执行acquire和release(类似读取文件 自动打开关闭)
    global num
    for i in range(1000000):
    num += 1
    print(num)

    for i in range(2):
    t = threading.Thread(target=task)
    t.start()

1.3 线程锁

手动加锁,一般有两种:Lock和RLock

  • Lock,同步锁lock_object = threading.Lock()
  • RLock,递归锁lock_object = threading.RLock()

RLock支持多次申请锁和多次释放;Lock不支持。例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import threading
import time

lock_object = threading.RLock()

def task():
print("开始")
lock_object.acquire()
lock_object.acquire()
print(123)
lock_object.release()
lock_object.release()

for i in range(3):
t = threading.Thread(target=task)
t.start()
  • 死锁

    死锁,由于竞争资源或者由于彼此通信而造成的一种阻塞的现象

1.4 线程池

官方在python3中才正式提供线程池

线程并不是开的越多越好,可能会导致系统的性能更低

原因:线程越多,切换线程涉及到的上下文切换就会越多,反而耗时

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import time
from concurrent.futures import ThreadPoolExecutor

# pool = ThreadPoolExecutor(100)
# pool.submit(函数名,参数1,参数2,参数...)


def task(video_url,num):
print("开始执行任务", video_url)
time.sleep(5)

# 创建线程池,最多维护10个线程。
pool = ThreadPoolExecutor(10)

url_list = ["www.xxxx-{}.com".format(i) for i in range(300)]

for url in url_list:
# 在线程池中提交一个任务,线程池中如果有空闲线程,则分配一个线程去执行,执行完毕后再将线程交还给线程池;如果没有空闲线程,则等待。
pool.submit(task, url,2)

print("END")

输出10个 ”开始执行任务“ 以及 END ;5S后 输出 开始执行任务的10-19 ;5s后 输出 开始执行任务的20-29 .。。。。。直到,开始执行任务的299

通过这样操作,节省上下文调度时间

1.5 单例模式(拓展内容)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class Singleton:
instance = None

def __init__(self, name):
self.name = name
def __new__(cls, *args, **kwargs):
# 返回空对象
if cls.instance:
return cls.instance
cls.instance = object.__new__(cls) # 创建空对象
return cls.instance

# 单例模式:obj1和obj2的内存地址一样
obj1 = Singleton('su')
print(obj1)

obj2 = Singleton('lemon')
print(obj2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# 但是,如果多线程的情况,地址可能不一样。为了使得多线程也一样,加上锁
import threading

class UpSingleton:
instance = None
lock = threading.RLock()
def __init__(self, name):
self.name = name
def __new__(cls, *args, **kwargs):
# 这两行为了优化提升效率,因为申请锁释放锁也需要时间
if cls.instance:
return cls.instance
with cls.lock:
# 返回空对象
if cls.instance:
return cls.instance
cls.instance = object.__new__(cls) # 创建空对象
return cls.instance

def task():
obj = UpSingleton('su')
print(obj)

for i in range(10):
t = threading.Thread(target=task)
t.start()

1.6 常见问题总结

  1. 简述进程和线程的区别以及应用场景

    答:①进程是CPU内核资源分配最小单位,线程是CPU内核调度最小单位。②一个进程中可以有多个线程,同一个进程中的线程可以共享此进程中的资源。③由于GIL锁的存在,控制一个进程中同一时刻只有一个线程可以被CPU调度。所以 计算密集型(多进程开发),IO密集型(多线程开发)

  2. 什么是GIL锁

    答:GIL是Cpython解释器特有的一个全局解释器锁,控制一个进程中同一时刻只有一个线程可以被CPU调度。同时像列表、字典等常见对象的线程数据安全,也得益于GIL

2.进程

2.1 进程三大模式

关于在Python中基于multiprocessiong模块操作的进程:

Depending on the platform, multiprocessing supports three ways to start a process. These start methods are

  • fork,【“拷贝”几乎所有资源】【支持文件对象/线程锁等传参】【unix】【任意位置开始】【快】

    The parent process uses os.fork() to fork the Python interpreter. The child process, when it begins, is effectively identical to the parent process. All resources of the parent are inherited by the child process. Note that safely forking a multithreaded process is problematic.Available on Unix only. The default on Unix.

  • spawn,【run参数传必备资源】【不支持文件对象/线程锁等传参】【unix、win】【main代码块开始】【慢】

    The parent process starts a fresh python interpreter process. The child process will only inherit those resources necessary to run the process object’s run() method. In particular, unnecessary file descriptors and handles from the parent process will not be inherited. Starting a process using this method is rather slow compared to using fork or forkserver.Available on Unix and Windows. The default on Windows and macOS.

  • forkserver,【run参数传必备资源】【不支持文件对象/线程锁等传参】【部分unix】【main代码块开始】

    When the program starts and selects the forkserver start method, a server process is started. From then on, whenever a new process is needed, the parent process connects to the server and requests that it fork a new process. The fork server process is single threaded so it is safe for it to use os.fork(). No unnecessary resources are inherited.Available on Unix platforms which support passing file descriptors over Unix pipes.

2.2 多进程

进程的常见方法:

  • p.start(),当前进程准备就绪,等待被CPU调度(工作单元其实是进程中的线程)。

  • p.join(),等待当前进程的任务执行完毕后再向下继续执行。

  • p.daemon = 布尔值,守护进程(必须放在start之前)

    • p.daemon =True,设置为守护进程,主进程执行完毕后,子进程也自动关闭。
    • p.daemon =False,设置为非守护进程,主进程等待子进程,子进程执行完毕后,主进程才结束。
  • p.name设置进程名称,multiprocessing.current_process().name获取进程名称

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    import os
    import time
    import threading
    import multiprocessing

    def func():
    time.sleep(3)
    def task(arg):
    for i in range(10):
    t = threading.Thread(target=func)
    t.start()
    print(os.getpid(), os.getppid())
    print("线程个数", len(threading.enumerate()))
    time.sleep(2)
    print("当前进程的名称:", multiprocessing.current_process().name)

    if __name__ == '__main__':
    print(os.getpid())
    multiprocessing.set_start_method("spawn")
    p = multiprocessing.Process(target=task, args=('xxx',))
    p.name = "哈哈哈哈"
    p.start()

    print("继续执行...")
  • 获取CPU个数

    1
    2
    import multiprocessing
    multiprocessing.cpu_count()

2.3 进程间数据共享

2.4 进程锁

2.5 进程池

https://gitee.com/wupeiqi/python_course/blob/master/day23%20%E5%B9%B6%E5%8F%91%E7%BC%96%E7%A8%8B%EF%BC%88%E4%B8%8B%EF%BC%89/%E7%AC%94%E8%AE%B0/day23.md#3-%E8%BF%9B%E7%A8%8B%E9%94%81

3.协程

计算机中提供了:线程、进程 用于实现并发编程(真实存在)。

协程(Coroutine),是程序员通过代码搞出来的一个东西(非真实存在)。

1
2
协程也可以被称为微线程,是一种用户态内的上下文切换技术。
简而言之,其实就是通过一个线程实现代码块相互切换执行(来回跳着执行)。

看起来像前端中的异步

在Python中有多种方式可以实现协程,例如:

  • greenlet

    1
    pip install greenlet
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    from greenlet import greenlet

    def func1():
    print(1) # 第1步:输出 1
    gr2.switch() # 第3步:切换到 func2 函数
    print(2) # 第6步:输出 2
    gr2.switch() # 第7步:切换到 func2 函数,从上一次执行的位置继续向后执行

    def func2():
    print(3) # 第4步:输出 3
    gr1.switch() # 第5步:切换到 func1 函数,从上一次执行的位置继续向后执行
    print(4) # 第8步:输出 4

    gr1 = greenlet(func1)
    gr2 = greenlet(func2)

    gr1.switch() # 第1步:去执行 func1 函数
  • yield

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    def func1():
    yield 1
    yield from func2()
    yield 2

    def func2():
    yield 3
    yield 4

    f1 = func1()
    for item in f1:
    print(item)

虽然上述两种都实现了协程,但这种编写代码的方式没啥意义。

这种来回切换执行,可能反倒让程序的执行速度更慢了(相比较于串行)。

协程如何才能更有意义呢?

不要让用户手动去切换,而是遇到IO操作时能自动切换。

Python在3.4之后推出了asyncio模块 + Python3.5推出async、async语法 ,内部基于协程并且遇到IO请求自动化切换。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import asyncio

async def func1():
print(1)
await asyncio.sleep(2)
print(2)

async def func2():
print(3)
await asyncio.sleep(2)
print(4)

tasks = [
asyncio.ensure_future(func1()),
asyncio.ensure_future(func2())
]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
"""
需要先安装:pip3 install aiohttp
"""

import aiohttp
import asyncio

async def fetch(session, url):
print("发送请求:", url)
async with session.get(url, verify_ssl=False) as response:
content = await response.content.read()
file_name = url.rsplit('_')[-1]
with open(file_name, mode='wb') as file_object:
file_object.write(content)

async def main():
async with aiohttp.ClientSession() as session:
url_list = [
'https://www3.autoimg.cn/newsdfs/g26/M02/35/A9/120x90_0_autohomecar__ChsEe12AXQ6AOOH_AAFocMs8nzU621.jpg',
'https://www2.autoimg.cn/newsdfs/g30/M01/3C/E2/120x90_0_autohomecar__ChcCSV2BBICAUntfAADjJFd6800429.jpg',
'https://www3.autoimg.cn/newsdfs/g26/M0B/3C/65/120x90_0_autohomecar__ChcCP12BFCmAIO83AAGq7vK0sGY193.jpg'
]
tasks = [asyncio.create_task(fetch(session, url)) for url in url_list]
await asyncio.wait(tasks)
if __name__ == '__main__':
asyncio.run(main())

通过上述内容发现,在处理IO请求时,协程通过一个线程就可以实现并发的操作。

协程、线程、进程的区别?

1
2
3
4
5
6
7
8
9
10
11
线程,是计算机中可以被cpu调度的最小单元。
进程,是计算机资源分配的最小单元(进程为线程提供资源)。
一个进程中可以有多个线程,同一个进程中的线程可以共享此进程中的资源。

由于CPython中GIL的存在:
- 线程,适用于IO密集型操作。
- 进程,适用于计算密集型操作。

协程,协程也可以被称为微线程,是一种用户态内的上下文切换技术,在开发中结合遇到IO自动切换,就可以通过一个线程实现并发操作。

所以,在处理IO操作时,协程比线程更加节省开销(协程的开发难度大一些)。

协程也不是万能的,比如很多个数据同时来回,这时协程处理就会很慢。因为他是一个线程基于IO自动切换。通常协程用来进行数据的下载 下载完成后的数据扔到数据库或者其他其他地方 再有线程进行数据的处理。

现在很多Python中的框架都在支持协程,比如:FastAPI、Tornado、Sanic、Django 3、aiohttp等,企业开发使用的也越来越多(目前不是特别多)。

关于协程,等学习爬虫相关知识之后,再来学习和补充效果更佳。有兴趣想要研究的同学可以参考文章和专题视频:

  • 文章

    1
    2
    https://pythonav.com/wiki/detail/6/91/
    https://zhuanlan.zhihu.com/p/137057192
  • 视频

    1
    https://www.bilibili.com/video/BV1NA411g7yf