python进程间通信——命名管道(Named Pipe、FIFO)

文章目录

  • Python中的命名管道:深入理解进程间通信
    • 1. 命名管道简介
    • 2. 创建和删除命名管道
    • 3. 写入命名管道
    • 4. 读取命名管道
    • 5. 示例:进程间通信
      • write_to_pipe.py
      • read_from_pipe.py
      • 测试运行
      • 6. 注意事项和限制
        • 命名管道的半双工机制
        • 命名管道读写任意一方未打开,另一方默认阻塞(可以尝试使用非阻塞方式打开`os.O_NONBLOCK`)
        • 命名管道能被读写多方同时打开,但数据只能从某个写方到某个读方,不会产生数据复制现象(这属于非常规操作,不要这样使用)
        • 命名管道权限问题
        • 7. 命名管道非阻塞打开示例
          • 说明
          • 情况1:读取进程以阻塞方式打开命名管道,写入进程以非阻塞方式打开命名管道
            • write_to_pipe_nonblock.py
            • read_from_pipe_block.py
            • 测试运行
            • 额外功能:申请缓存帧
            • 情况2:读取进程以非阻塞方式打开命名管道,写入进程以阻塞方式打开命名管道
              • write_to_pipe_block.py
              • read_from_pipe_nonblock.py
              • 测试运行
              • 注意事项
                • 读进程报`BlockingIOError`错误
                • 结论

                  Python中的命名管道:深入理解进程间通信

                  命名管道(Named Pipe),也被称为FIFO,是一种在UNIX、Linux和类Unix系统中用于实现进程间通信(IPC)的机制。在Python中,我们可以使用os模块来创建和操作命名管道。

                  1. 命名管道简介

                  命名管道与普通管道类似,都是基于字节流进行通信的,但不同的是命名管道有路径名与之关联,并且其生命周期超过了引发创建它的进程。这使得不相关的进程可以通过命名管道进行通信。

                  2. 创建和删除命名管道

                  在Python中,我们可以使用os.mkfifo()函数创建一个命名管道。该函数接受两个参数,第一个参数是要创建的命名管道的路径,第二个参数是管道的权限设置,它是可选的,默认值为0o666。

                  import os
                  pipe_name = "my_pipe"
                  # 创建命名管道
                  os.mkfifo(pipe_name)
                  

                  当你不再需要这个命名管道时,可以使用os.unlink()或os.remove()函数删除它。

                  # 删除命名管道
                  os.unlink(pipe_name)
                  

                  3. 写入命名管道

                  要向命名管道写入数据,我们首先需要使用os.open()函数以写入模式打开它。然后,可以使用os.write()函数写入数据。

                  pipeout = os.open(pipe_name, os.O_WRONLY)
                  message = "Hello, World!"
                  # 写入数据
                  os.write(pipeout, message.encode())
                  # 关闭管道
                  os.close(pipeout)
                  

                  注意,我们需要将要写入的字符串编码为字节对象,因为os.write()函数期望其参数是字节对象。

                  4. 读取命名管道

                  读取命名管道的数据与写入类似。我们首先使用os.open()函数以读取模式打开管道,然后使用os.read()函数读取数据。

                  pipein = os.open(pipe_name, os.O_RDONLY)
                  # 读取数据
                  data = os.read(pipein, 100)  # 读取前100个字节
                  # 解码数据并打印
                  print(data.decode())
                  # 关闭管道
                  os.close(pipein)
                  

                  这里,我们使用了os.read()函数的两个参数版本,其中第二个参数指定要读取的最大字节数。如果不提供这个参数,os.read()函数将读取所有可用的数据。

                  5. 示例:进程间通信

                  现在让我们来看一个使用命名管道进行进程间通信的示例。假设我们有两个Python脚本,一个脚本负责向管道写入数据,另一个脚本负责从管道读取数据。

                  write_to_pipe.py

                  import os
                  pipe_name = "my_pipe"
                  # 创建命名管道
                  if not os.path.exists(pipe_name):
                      os.mkfifo(pipe_name)
                  pipeout = os.open(pipe_name, os.O_WRONLY)
                  message = "Hello, World!"
                  # 写入数据
                  os.write(pipeout, message.encode())
                  # 关闭管道
                  os.close(pipeout)
                  

                  read_from_pipe.py

                  import os
                  pipe_name = "my_pipe"
                  if os.path.exists(pipe_name):
                      pipein = os.open(pipe_name, os.O_RDONLY)
                      # 读取数据
                      data = os.read(pipein, 100)  # 读取前100个字节
                      # 解码数据并打印
                      print(data.decode())
                      # 关闭管道
                      os.close(pipein)
                  

                  测试运行

                  在这个示例中,write_to_pipe.py脚本向命名管道写入一条消息,然后read_from_pipe.py脚本从同一个命名管道读取并打印这条消息。

                  无论先运行发送方,还是先运行接收方,另一方都会阻塞(等待):

                  (先运行发送方)

                  (先运行接收方)

                  当双方都开启,接收方才会收到消息,发送方也才会结束:

                  6. 注意事项和限制

                  虽然命名管道是一种非常有用的IPC机制,但在使用它时还需要注意一些事项。

                  命名管道的半双工机制

                  首先,命名管道是半双工的,这意味着数据只能(同一时间)在一个方向上流动。如果你需要实现全双工通信(即两个进程可以互相发送和接收数据),那么你需要创建两个命名管道。

                  注意:只用一个命名管道,两个进程交替进行读/写是可行的,但这需要非常谨慎的同步管理来确保正确的操作顺序和数据完整性,通常不建议这么使用。

                  命名管道读写任意一方未打开,另一方默认阻塞(可以尝试使用非阻塞方式打开os.O_NONBLOCK)

                  其次,当你尝试读取或写入一个没有打开的管道时,你的进程将被阻塞,直到管道的另一端被打开。为了避免这种情况,你可以在调用os.open()函数时使用os.O_NONBLOCK标志。

                  命名管道能被读写多方同时打开,但数据只能从某个写方到某个读方,不会产生数据复制现象(这属于非常规操作,不要这样使用)

                  命名管道权限问题

                  最后,当你使用命名管道时,需要考虑到权限和安全问题。任何有访问权限的进程都可以读写命名管道。因此,如果你的程序处理敏感信息,那么你应该小心地设置管道的权限,以防止未授权的访问。

                  7. 命名管道非阻塞打开示例

                  说明

                  对于大多数应用来说,通常只有一个进程(读或写)会以非阻塞方式打开命名管道,而另一个进程则以正常(阻塞)方式打开。这样可以确保当一方尝试读取或写入数据时,如果数据不可用或管道已满,那么该进程就会被阻塞,直到条件满足为止。

                  然而,在某些情况下,可能需要两个进程都以非阻塞方式打开命名管道。比如在某些实时系统或高性能计算中,进程不能接受任何形式的阻塞,即使是等待IPC操作也不行。在这种情况下,进程会以非阻塞方式打开命名管道,并使用轮询、事件通知或其他机制来检查是否可以进行读/写操作。

                  此外,还有一些特殊的情况,比如网络编程或者并发编程中,程序可能需要同时处理多个I/O操作(包括文件操作、网络操作和IPC操作等),并且不能因为任何一个操作的阻塞而停止处理其他操作。在这种情况下,程序可能会选择以非阻塞方式打开所有的I/O资源,包括命名管道,并使用select、poll、epoll等机制来高效地管理这些I/O操作。

                  但是,总体来说,除非有特殊的需求,否则通常不会让两个进程都以非阻塞方式打开命名管道。

                  情况1:读取进程以阻塞方式打开命名管道,写入进程以非阻塞方式打开命名管道

                  write_to_pipe_nonblock.py
                  import os
                  import time
                  pipe_name = "my_pipe"
                  # 创建命名管道
                  if not os.path.exists(pipe_name):
                      os.mkfifo(pipe_name)
                  pipeout = os.open(pipe_name, os.O_WRONLY | os.O_NONBLOCK)
                  message = "Hello, World!"
                  try:
                      # 写入数据
                      os.write(pipeout, message.encode())
                  except BlockingIOError:
                      print("Pipe is full. Waiting...")
                  # 关闭管道
                  os.close(pipeout)
                  
                  read_from_pipe_block.py
                  import os
                  pipe_name = "my_pipe"
                  if os.path.exists(pipe_name):
                      pipein = os.open(pipe_name, os.O_RDONLY)  # 阻塞模式
                      # 读取数据
                      data = os.read(pipein, 100)  # 读取前100个字节
                      # 解码数据并打印
                      print(data.decode())
                      # 关闭管道
                      os.close(pipein)
                  
                  测试运行

                  write_to_pipe_nonblock.py脚本以非阻塞方式打开命名管道,并尝试写入一条消息。如果此时管道没有进程在读,os.open()函数将引发OSError;如果管道已满,os.write()函数将引发BlockingIOError异常。所以非阻塞写要求读进程先打开。

                  read_from_pipe_block.py脚本以阻塞方式打开同一个命名管道,并从中读取将要写入管道的消息。如果管道为空,那么os.read()函数将阻塞,直到有数据可读为止。

                  额外功能:申请缓存帧

                  客户端向缓存帧编码服务申请缓存帧的时候,可以先客户端先开启阻塞读,然后用http接口告知缓存帧编码服务自身已开启读监听,同时告知其监听的命名管道名和申请帧所属摄像头,然后缓存帧编码服务器取对应摄像头最新帧,用非阻塞写方式写到对应命名管道中,完成缓存帧交付。

                  情况2:读取进程以非阻塞方式打开命名管道,写入进程以阻塞方式打开命名管道

                  write_to_pipe_block.py
                  import os
                  import time
                  pipe_name = "my_pipe"
                  # 创建命名管道
                  if not os.path.exists(pipe_name):
                      os.mkfifo(pipe_name)
                  pipeout =  os.open(pipe_name, os.O_WRONLY)
                  message = "Hello, World!"
                  try:
                      # 写入数据
                      os.write(pipeout, message.encode())
                  except BlockingIOError:
                      print("Pipe is full. Waiting...")
                  # time.sleep(5)  # 确保读取进程有足够的时间来读取数据
                  # 关闭管道
                  os.close(pipeout)
                  
                  read_from_pipe_nonblock.py
                  import os
                  pipe_name = "my_pipe"
                  if os.path.exists(pipe_name):
                      pipein = os.open(pipe_name, os.O_RDONLY | os.O_NONBLOCK)  # 阻塞模式
                      # 读取数据
                      #data = os.read(pipein, 100)  # 读取前100个字节
                  	while True:
                  	    try:
                  	        data = os.read(pipein, 100)
                  	        break
                  	    except BlockingIOError:
                  	        continue
                      # 解码数据并打印
                      print(data.decode())
                      # 关闭管道
                      os.close(pipein)
                  
                  测试运行

                  当没有阻塞写时,非阻塞读不会报错,会读到空内容:

                  当有阻塞写时,非阻塞读能正常读到内容:

                  注意事项
                  读进程报BlockingIOError错误
                  Traceback (most recent call last):
                    File "read_from_pipe_nonblock.py", line 10, in  data = os.read(pipein, 100)  # 读取前100个字节
                  BlockingIOError: [Errno 11] Resource temporarily unavailable
                  

                  当你的读进程运行os.read()方法时,由于你使用了O_NONBLOCK标志打开了管道,如果此时管道中没有任何可供读取的数据,那么程序会立即返回一个BlockingIOError。

                  要解决这个问题,你可以尝试以下两种方法:

                  1. 让你的读进程在尝试读取数据之前先等待一段时间,以确保写进程有足够的时间把数据写入管道。你可以使用time.sleep()函数来实现这个等待操作。

                    if os.path.exists(pipe_name):
                        pipein = os.open(pipe_name, os.O_RDONLY | os.O_NONBLOCK)  # 阻塞模式
                        time.sleep(1)  # 等待1秒
                        # 然后进行读取操作...
                    
                  2. 或者你可以捕获BlockingIOError异常,并在发生这种异常时再次尝试读取数据,直到成功为止。

                    while True:
                        try:
                            data = os.read(pipein, 100)
                            break
                        except BlockingIOError:
                            continue
                    # 然后进行解码和打印操作...
                    

                  结论

                  命名管道是UNIX和Linux系统中一种非常强大的IPC机制,它使得不相关的进程能够方便地进行通信。Python通过os模块提供了对命名管道的支持,使得我们可以很容易地在Python程序中使用这种机制。然而,尽管命名管道很有用,但在使用它时还需要注意其半双工性质、可能出现的阻塞情况,以及权限和安全问题。