Python--进程基础

创建进程

os.fork()

该方法只能在linux和mac os中使用,因为其主要基于系统的fork来实现。window中没有这个方法。

通过os.fork()方法会创建一个子进程,子进程的程序集为该语句下方的所有语句。

 import os
 ​
 ​
 print("主进程的PID为:" , os.getpid())
 w = 1
 pid = os.fork() # 创建子进程
 print('fork方法的返回值为: ', pid)
 if pid == 0:
     print(f'子进程PID: {os.getpid()}, 主进程PID: {os.getppid()}, 子进程中w: {w}')
 else:
     print(f'主进程PID: {os.getpid()}, 主进程PID: {os.getppid()}, 子进程中w: {w}')

multiprocessing(target为函数)

通过multiprocessing模块中的Process类创建一个进程实例对象,并通过其start方法启动该进程。

进程中的程序集为Process类的target参数,可以是一个函数也可以是一个方法。

需要注意的是windows系统中创建进程的过程需要放在if __name__== "__main__"代码块中,因为其实现数据集的复制时,是通过import语句实现。

而在Linux和MacOS系统下则不需要,因为他们原生支持fork方法。

 import multiprocessing
 import os
 import time
 ​
 def task():
     for i in range(3):
         print("wating... 当前pid为 : ", os.getpid(), "父进程为: ", os.getppid())
         time.sleep(1)
 ​
 ​
 ​
 if __name__ == "__main__":
     print('主进程pid:', os.getpid())
 ​
     process = multiprocessing.Process(target=task)
     process.start()

multiprocessing(taget为方法)

创建Process实例对象的target参数,不仅可以是函数名,还可以是类的方法名。

  • 如果需要往target中传入参数,可以通过args和kwargs两个参数进行相应的传参
 import multiprocessing
 import time
 ​
 class Tasks():
     def task1(self):
         time.sleep(1)
         print('task1')
 ​
     def task2(self):
         time.sleep(1)
         print('task2')
 ​
 if __name__ == "__main__":
     t = Tasks()
     process1 = multiprocessing.Process(target=t.task1)
     process1.start()
 ​
     process2 = multiprocessing.Process(target=t.task2)
     process2.start()
     

继承Process类

通过继承multiprocessing.Process类,并重写其中的run方法。

  • 必须要重写run方法,Process子类的实例对象的run方法,就是进程执行的程序
 import time
 from multiprocessing import Process
 ​
 ​
 class MyProcess(Process):
     def __init__(self, i):
         super().__init__()
         self.name = str(i)
 ​
     def run(self):
         time.sleep(2)
         print(f'子进程-{self.name}')
 ​
 ​
 if __name__ == '__main__':
     for i in range(5):
         p = MyProcess(i)
         p.start()
 ​
     print('主进程')

进程阻塞

目前系统一般都是多核的,当处理多任务时,一般都以并发或并行的方式处理多任务。所以系统一般以异步的方式处理多进程。

Process的实例方法中,通过join方法表示进程阻塞时,主将处于等待状态,并不会处理其他进程。

单进程阻塞

针对每个进程开启后立马启用join方法,这种方法效率低下。使得系统的处理方式编程同步阻塞,使得主进程依次处理子进程。

 import time
 from multiprocessing import Process
 ​
 def eat():
     time.sleep(2)
     print('eat')
 ​
 def drink():
     time.sleep(2)
     print('drink')
 ​
 if __name__ == '__main__':
     process1 = Process(target=eat)
     process1.start()
     process1.join()
 ​
     process2 = Process(target=drink)
     process2.start()
     process2.join()
 ​
 ​
     print('主进程')

多进程阻塞

先利用start方法将多个进程同时创建并启动,然后在创建完成后统一阻塞进程。

  • 统一创建进程,并让其统一运行
  • 统一等待进程结束,避免每个进程都等一段时间
 import time
 from multiprocessing import Process
 ​
 def eat():
     time.sleep(2)
     print('eat')
 ​
 def drink():
     time.sleep(2)
     print('drink')
 ​
 ​
 ​
 if __name__ == '__main__':
     process1 = Process(target=eat)
     process1.start()
 ​
     process2 = Process(target=drink)
     process2.start()
 ​
     for p in [process1, process2]:
         p.join()
 ​
     print('主进程')

进程锁

当多进程编辑同一文件或数据时,往往会导致数据不一致问题,针对这种情况,需要在进程中对处理文件或数据的代码前后进行加锁和解锁操作。

如果没有锁,会导致数据的不一致

 import time, json
 from multiprocessing import Process
 ​
 ​
 def read_ticket(user):
     with open('ticket.txt') as f:
         num = json.load(f)['ticket']
         time.sleep(1)
         print(f'User {user}: 当前剩余{num}张票')
         return num
 ​
 ​
 def order_ticket(user, num):
     time.sleep(1)
     num -= 1
     with open('ticket.txt', 'w') as f:
         json.dump({'ticket': num}, f)
     print(f'User {user}: 购票成功')
 ​
 ​
 def ticket(user):
     num = read_ticket(user)
     if num > 0:
         order_ticket(user, num)
     else:
         print(f'User {user}: 购票失败')
 ​
 ​
 if __name__ == '__main__':
     queue = []
     for i in range(5):
         p = Process(target=ticket, args=(i,))
         p.start()
         queue.append(p)
     for q in queue:
         q.join()
 ​
     print('运行结束')

加锁/解锁

在编辑数据的之前通过acquire方法加锁,当数据编辑完成后,通过release方法解锁。

  • 在主进程中创建一个锁对象
  • 然后在每个修改共同数据的进程中传入已经创建的锁对象
  • 在修改数据的代码前后分别加锁和解锁
 """
 @Time: 2024/6/28 20:18
 @Author: 'Ethan'
 @Email: ethanzhou4406@outlook.com
 @File: 1. 同步阻塞.py
 @Project: python
 @Feature:
 """
 import time, json
 from multiprocessing import Process, Lock
 ​
 ​
 def read_ticket(user):
     with open('ticket.txt') as f:
         num = json.load(f)['ticket']
         time.sleep(0.1)
         print(f'User {user}: 当前剩余{num}张票')
 ​
 ​
 def order_ticket(user):
     time.sleep(0.1)
     with open('ticket.txt') as f:
         num = json.load(f)['ticket']
     if num > 0:
         with open('ticket.txt', 'w') as f:
             num -= 1
             json.dump({'ticket': num}, f)
         print(f'User {user}: 购票成功')
     else:
         print(f'User {user}: 购票失败')
 ​
 ​
 def ticket(user,lock):
     read_ticket(user)
     lock.acquire()
     order_ticket(user)
     lock.release()
 ​
 ​
 if __name__ == '__main__':
     lock = Lock()
     queue = []
     for i in range(5):
         p = Process(target=ticket, args=(i, lock))
         p.start()
         queue.append(p)
     for q in queue:
         q.join()
 ​
     print('运行结束')
 ​

锁的上下文管理器

如果在代码加锁后,解锁前,代码出现了异常就会导致进程没有来得及解锁,而导致死锁现象。通过锁的上下文管理器语法,可以有效避免这种情况的发生。

 import time, json
 from multiprocessing import Process, Lock
 ​
 ​
 def read_ticket(user):
     with open('ticket.txt') as f:
         num = json.load(f)['ticket']
         time.sleep(0.1)
         print(f'User {user}: 当前剩余{num}张票')
 ​
 ​
 def order_ticket(user):
     time.sleep(0.1)
     with open('ticket.txt') as f:
         num = json.load(f)['ticket']
     if num > 0:
         with open('ticket.txt', 'w') as f:
             num -= 1
             json.dump({'ticket': num}, f)
         print(f'User {user}: 购票成功')
     else:
         print(f'User {user}: 购票失败')
 ​
 ​
 def ticket(user,lock):
     read_ticket(user)
     with lock:
         order_ticket(user)
 ​
 ​
 if __name__ == '__main__':
     lock = Lock()
     queue = []
     for i in range(5):
         p = Process(target=ticket, args=(i, lock))
         p.start()
         queue.append(p)
     for q in queue:
         q.join()
 ​
     print('运行结束')
 ​

进程间通信

进程之间可以进行通信,主要是通过各个进程之中传入一个公共的沟通工具,所有的进程都通过这个工具进行沟通。multiprocessing中提供了两种进程间沟通的工具QueuePipe

Queue方式

Queue是基于文件传输的socket通信方式,并且它是带锁机制的。它的数据主要的特点是先进先出,后进后出。

当一个对象被放入一个队列中时,这个对象首先会被一个后台线程用pickle序列化,并将序列化后的数据通过一个底层管道的管道传递给队列中。

主要使用如下方法:

  • qsize(): 返回队列的大致的长度。返回的值由于多线程或多进程的上下文而变得不可靠
  • empty(): 队列为空返回True,否则返回False。返回的值由于多线程或多进程的上下文而变得不可靠
  • full(): 队列满了返回True,否则返回False。返回的值由于多线程或多进程的上下文而变得不可靠
  • put(obj[, block[, timeout]]): 将obj放入队列。
    • 如果block为True(默认值)而且timeout是None(默认值),将会阻塞当前进程,直到有空的缓冲槽。
    • 如果timeout是正数,将会阻塞了最多timeout秒之后还是没有可用的缓冲槽时抛出queue.Full异常
    • 反之block为False,仅当有可用缓冲槽时才放入对象,否则抛出queue.Full异常(这种情况下timeout参数会被忽略)

  • get([block[, timeout]]): 从队列中取出并返回对象。如果可选参数block是True而且timeout是None,将会阻塞当前进程,直到队列中出现可用对象。如果timeout是正数,将会阻塞了最多timeout秒之后还是没有可用的对象时抛出queue.Empty异常。
    • 反之,block是False时,仅当有可用对象能够取出时返回,否则抛出queue.Empty异常(这种情况下timeout参数会被忽略)

 import time, json
 from multiprocessing import Process, Queue
 ​
 ​
 def task(i, queue: Queue):
     time.sleep(1)
     queue.put(i)
     print(f'task {i}, 入列')
 ​
 ​
 if __name__ == '__main__':
     queue = Queue()
     process_queue = []
     for i in range(5):
         p = Process(target=task, args=(i, queue))
         p.start()
         process_queue.append(p)
     for p in process_queue:
         p.join()
 ​
     for i in range(5):
         print(f'主进程中消费队列内容{queue.get()}')
 ​
     print('运行结束')
 ​

Pipe方式

Pipe方式是进程之间通信的另一种方式和Queue不同之处在于,它不带锁,且信息顺序无法得到保障。

主要的使用方法:

  • send(obj): 将一个对象发送到链接的另一端,可以用recv()读取,发送的对象必须是可序列化的,多大的对象(接近32MiB)可能引发ValueError异常
  • recv(): 返回一个由另一端使用send()发送的对象,该方法会一直阻塞直到接收到对象。如果对端关闭了链接或者没有东西可接收,将抛出EOFError异常
 import time
 from multiprocessing import Process, Pipe
 from multiprocessing.connection import Connection
 ​
 ​
 def task(pipe:Connection):
     print('子进程往管道里加了内容')
     time.sleep(1)
     pipe.send("子进程往管道中加了点东西")
 ​
 ​
 if __name__ == '__main__':
     pipe1, pipe2 = Pipe()
     p = Process(target=task, args=(pipe1,))
     p.start()
     p.join()
     print('主进程中获取管道内的内容为:', pipe2.recv())
     print('运行结束')

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/764731.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

如何利用GPT-4o生成有趣的梗图

文章目录 如何利用GPT-4o生成有趣的梗图一、引言二、使用GPT-4o生成梗图1. 提供主题2. 调用工具3. 获取图片实际案例输入输出 三、更多功能1. 创意和灵感2. 梗图知识 四、总结 如何利用GPT-4o生成有趣的梗图 梗图,作为互联网文化的一部分,已经成为了我们…

【轻量化】YOLOv8 更换骨干网络之 MobileNetv4 | 《号称最强轻量化网络》

论文地址:https://arxiv.org/pdf/2404.10518 代码地址:https://github.com/tensorflow/models/blob/master/official/vision/modeling/backbones/mobilenet.py 文章速览 文章摘要 MobileNetV4引入了一个名为Universal Inverted Bottleneck (UIB) 的新搜索模块,这个模块融合…

MCU 是什么?一文了解MCU 产业

MCU(Microcontroller Unit),中文名为“微控制器单元”、“单片微型计算机”。MCU 将中央处理器(CPU)、内存(RAM)、输入 / 输出界面(I/O)等等一大堆东西,全部整…

kafka的架构

一、架构图 Broker:一台 kafka 服务器就是一个 broker。一个kakfa集群由多个 broker 组成。一个 broker 可以容纳多个 topic。 Producer:消息生产者,就是向 kafka broker 发消息的客户端 Consumer:消息消费者,向 kafk…

Charles抓包工具踩坑记录

请添加图片描述 Charles抓包工具 证书问题 输入网址:chls.pro/ssl 第一个下载证书网址,会出现一直加载不出来,无法下载证书的情况 解决:选择下面save Charles Root。。。 2 证书在mac中禁止修改问题 解决也很简单,按照…

基于ESP32 IDF的WebServer实现以及OTA固件升级实现记录(三)

经过前面两篇的前序铺垫,对webserver以及restful api架构有了大体了解后本篇描述下最终的ota实现的代码以及调试中遇到的诡异bug。 eps32的实际ota实现过程其实esp32官方都已经基本实现好了,我们要做到无非就是把要升级的固件搬运到对应ota flash分区里面…

自定义动态数据源+事务控制

1:首先yml配置两个数据库的链接 spring:application:name: xxxxmain:banner-mode: OFFdatasource: # 默认数据源 datamarkdruid: # 关闭数据库的 web 访问stat-view-servlet:enabled: falseweb-stat-filter:enabled: falsefilt…

巴比达内网穿透:深度剖析其在解决远程连接挑战中的技术优势

在信息技术日新月异的今天,远程协作与管理的需求日益增长,但内网环境的隔离性一直是横亘在高效远程操作面前的一道坎。本文将深入探讨一款专为打破此壁垒而生的工具——巴比达内网穿透,如何以其技术创新和高效性能,成为解决远程连…

electron-builder 打包过慢解决

报错内容如下 > 6-241.0.0 build > electron-builder • electron-builder version24.13.3 os10.0.22631 • loaded configuration filepackage.json ("build" field) • writing effective config filedist\builder-effective-config.yaml • pack…

【Linux详解】进程地址空间

目录 研究背景 验证地址空间 实验一:父子进程变量地址一致性 实验二:变量值修改后父子进程的差异 分析与结论 实验三:进程地址空间验证 理解进程地址空间 区域与页表 写时拷贝机制 进程地址空间的意义 文章手稿: xmind…

7月信用卡新规下:信用卡欠的钱不用还了?

说到信用卡,现在基本上人手一张,大家都有使用过。但你知道吗,使用信用卡不是这么简单容易的事,比如会对你的贷款有影响,透支不还逾期对生活的影响,信用卡新规对持卡人和银行那边的影响。 一、只要不逾期&am…

hamibot 学习

1.参考文档: https://blog.csdn.net/zxl0428/article/details/1285318731.参考官网 快速入手步骤:注册,安装客户端,添加设备,开发脚本,运行脚本 https://hamibot.com/guide1.安装客户端 2.添加设备 …

高通骁龙(Qualcomm Snapdragon)CDSP HVX HTP 芯片简介与开发入门

1. Hexagon DSP/HVX/HTP 硬件演进 说到高通骁龙芯片大家应该不会陌生,其作为最为广泛的移动处理器之一,几乎每一个品牌的智能手机都会使用高通骁龙的处理器。 高通提供了一系列骁龙芯片解决方案。根据性能强弱分为了5个产品系列:从最高端的…

【neo4j图数据库】入门实践篇

探索数据之间的奥秘:Neo4j图数据库引领新纪元 在数字化浪潮汹涌的今天,数据已成为企业最宝贵的资产之一。然而,随着数据量的爆炸性增长和数据关系的日益复杂,传统的关系型数据库在处理诸如社交网络、推荐系统、生物信息学等高度互…

OPCUA相关概念和KepServer OPCUA连接PLC

文章背景 项目中需要使用OPC UA 来读取PLC的点位。本文简单介绍了OPC UA和使用KepServer软件连接PLC并读点。OPC相关概念 OPC之前,软件开发需要写大量驱动程序去连接设备,设备上的一个硬件改变,应用程序都有可能需要重写,不同设备…

水经微图Web版1.9.0发布

水经微图(简称“微图”)新版已上线,在该版本中主要新增了对WGS84图源加载、火星坐标图源加载和大字体图源加载功能,以及多面要素的加载功能。 现在,为你分享一下本轮迭代的主要新增功能,以及部分功能的效果…

STL空间配置器

空间配置器(allocator)(重点) 背景需求:在底层默默的实现空间的分配 问题:空间的申请与对象的创建两者分开,因为不断创建的时候可能会频繁的申请空间扩容。 类似操作:reserve函数…

自动扫描范围在减少剂量多相CT肝脏成像中的应用:基于CNN和高斯模型| 文献速递-深度学习自动化疾病检查

Title 题目 Automatic scan range for dose-reduced multiphase CT imaging of theliver utilizing CNNs and Gaussian models 自动扫描范围在减少剂量多相CT肝脏成像中的应用:基于CNN和高斯模型 01 文献速递介绍 肝癌是全球癌症死亡的第四大原因,每…

告别推广迷茫,Xinstall渠道包助您精准统计应用商店数据!

在App推广的浩瀚征途中,每一位广告主和开发者都面临着同样的挑战:如何在众多应用商店中脱颖而出,实现高效推广与精准获客?今天,就让我们一同探索Xinstall应用商店渠道包的独特魅力,看看它是如何成为解决这一…

【产品经理】聊聊PLG策略

PLG 是一种以用户增长为导向的策略,如何理解这种策略?适合采用PLG模式的SaaS又有哪些? 一、企业软件采购方式的变迁 从用户的角度:企业软件采购从CIO主导,逐渐演化为经理或者员工可做出决策。 从供应商的角度&#x…