【Linux】进程通信实战 —— 进程池项目

送给大家一句话:

没有一颗星,会因为追求梦想而受伤,当你真心渴望某样东西时,整个宇宙都会来帮忙。 – 保罗・戈埃罗 《牧羊少年奇幻之旅》

🏕️🏕️🏕️🏕️🏕️🏕️

🗻🗻🗻🗻🗻🗻


进程通信实战 —— 进程池项目

  • 1 ♻️知识回顾
  • 2 ♻️项目介绍
  • 3 ♻️项目实现
    • 3.1 ✨创建信道和子进程
    • 3.2 ✨建立任务
    • 3.3 ✨控制子进程
    • 3.4 ✨回收信道和子进程
    • 4 ♻️总结
    • Thanks♪(・ω・)ノ谢谢阅读!!!
    • 下一篇文章见!!!

      1 ♻️知识回顾

      在之前的讲解中,我们深入探讨了以下几个方面:

      1. 父子进程的创建与管理:我们详细讲解了父子进程是如何建立的,以及子进程如何继承父进程的代码和数据。子进程通常用于完成特定的任务。
      2. 文件操作:我们学习了如何使用 read 和 write 操作文件,并了解了文件描述符(fd)的概念,从而能够在文件中进行信息的读取和写入。
      3. 进程间通信:我们介绍了匿名管道,这是一种父子进程间进行通信的方式。通过共享资源,父子进程可以实现数据的传递和同步。

      在接下来的内容中,让我们把所学知识来进行运用,我们将探讨进程池的概念和实现细节。

      2 ♻️项目介绍

      进程池是一种用于管理和复用进程的技术,它可以有效地管理系统资源并提高程序的性能和效率。通过维护一组预先创建的进程与管道,进程池可以避免频繁地创建和销毁进程,从而减少了系统开销和资源浪费。

      主要使用的是池化技术的思想:

      池化技术是一种广泛应用于系统开发中的优化策略,旨在通过复用资源来提高性能和效率。池化技术的核心思想是预先分配一组资源,并在需要时进行复用,而不是每次都重新创建和销毁资源。

      池化技术(Pooling)涉及创建和管理一组预先分配的资源,这些资源可以是进程、线程、数据库连接或对象实例。在池化系统中,当请求到达时,它会从池中获取一个空闲资源,使用完毕后将其归还池中。这种方法避免了频繁的创建和销毁操作,从而显著减少了系统开销。

      进程池就是通过预先创建若干个进程与管道,在需要进行任务时,选择一个进程,通过管道发送信息,让其完成工作。

      进程池在实际项目中有广泛的应用,尤其是在处理大量并发任务时,例如:网络服务器中的请求处理、数据处理以及计算密集型任务。通过合理配置进程池的大小和参数,可以有效控制系统负载,提高整体响应速度。

      3 ♻️项目实现

      3.1 ✨创建信道和子进程

      首先我们需要建立一个信道类,来储存管道及其对应的子进程信息。

      //信道类
      class Channel
      {public:
          Channel(pid_t id , int wfd , std::string name)
              :_id(id) , _wfd(wfd) , _name(name)    
          { }
          ~Channel()
          { }
          void Close()
          { close(_wfd);
          }
          //关闭管道时需要等待对应子进程结束
          void WaitSub()
          { pid_t rid = waitpid(_id, nullptr, 0);
              if (rid > 0)
              { std::cout << "wait " << rid << " success" << std::endl;
              }
          }
          pid_t GetId(){ return _id;}
          int GetWfd(){ return _wfd;}
          std::string GetName(){return _name ;}
      private:
          pid_t _id ;//对应 子进程 id
          int _wfd  ;//写入端
          std::string _name ; //管道名称
      };
      

      然后我们就建立若干个信道与子进程,创建子进程与信道的时候,把信息插入到信道容器中,完成储存。子进程需要阻塞在读取文件,等待父进程写入信息:

      void CreateChannel(int num , std::vector* channel)
      { //初始化任务
          InitTask();
          for(int i = 0 ; i < num ; i++)
          { //创建管道
              int pipefd[2] = {0};
              int n = pipe(pipefd);
              if(n != 0)
              { std::cout << "create pipe failed!" << std::endl;
              }
              //创建子进程
              pid_t id = fork();
              if(id == 0)
              { //子进程 --- 只读不写
                  close(pipefd[1]);
                  work(pipefd[0]);
                  close(pipefd[0]);
                  exit(0);
              }
              //父进程
              close(pipefd[0]);
              std::string name = "Channel - " + std::to_string(i);
              //储存信道信息
              channel->push_back( Channel(id , pipefd[1] , name) );
          }
      }
      

      这里提一下传参的规范:

      • const &:表示输出型参数,即该参数是输入型,不会被修改。常用于传递不需要修改的对象或数据。
      • &:表示输入输出型参数,即该参数既是输入参数,又是输出参数,函数可能修改其内容。
      • * :表示输出型参数,通常用于传递指针,函数通过指针参数返回结果给调用者。

        进行一下测试,看看是否可以这正常建立信道与子进程;

        int main(int argc , char* argv[])
        { //1. 通过main函数的参数 int argc char* argv[] (./ProcessPool 5) 
            //判断要创建多少个进程
            if(argc != 2)
            { std::cout << "请输入需要创建的信道数量 :" << std::endl;
            }
            std::vector channel;
            int num = std::stoi(argv[1]);
            //2. 创建信道和子进程
            CreateChannel(num , &channel);
            //测试:
            for(auto t :  channel)
            { std::cout<< "==============="< 

        完美,可以正常创建!!!

        3.2 ✨建立任务

        完成了信道与子进程的创建,接下来我们就来设置一些任务。我们在.hpp文件里直接把声明定义写在一起,确保代码的模块化和可维护性。

        void Print()
        { std::cout << "this is Print()"<< std::endl;
        }
        void Fflush()
        { std::cout << "this is Fflush()"<< std::endl;
        }
        void Scanf()
        { std::cout << "this is Scanf()"<< std::endl;
        }
        

        然后通过函数指针数来储存这些函数,因为子进程会继承父进程的数据,这样通过一个数字下标即可确定调用的函数。只需要传入 4 个字节的int类型,最大程度的减少了通信的成本!!!

        #pragma once
        #include #include #include #include #include #define TaskNum 3
        //这个文件里是任务函数
        typedef void(*task_t)();
        task_t tasks[TaskNum];
        //...
        //...三个函数
        //...
        void InitTask()
        { srand(time(nullptr) ^ getpid() ^ 17777);
            tasks[0] = Print;
            tasks[1] = Fflush;
            tasks[2] = Scanf;
        }
        //执行任务!!!
        void ExecuteTask(int num)
        { if(num < 0 || num > 2) return;
            tasks[num]();
        }
        //随机挑选一个任务
        int SelectTask()
        { return rand() % TaskNum;
        }
        

        3.3 ✨控制子进程

        首先通过 SelectTask() 选择一个任务,然后选择一个信道和子进程。需要注意的是,这里要依次调用每一组子进程,采用轮询(Round-Robin)方案,以尽可能实现负载均衡。 然后发送任务(向信道写入4字节的数组下标)

        int SelectChannel(int n)
        {//静态变量做到轮询方案
            static int next = 0;
            int channel = next;
            next++;
            next %= n;
            return channel;
        }
        void SendTaskCommond(Channel& channel , int TaskCommand  )
        {//写入对应信息
            write(channel.GetWfd() , &TaskCommand , sizeof(TaskCommand));
        }
        void CtrlProcessOnce(std::vector& channel)
        { //选择一个任务
            int TaskCommand = SelectTask();
            //选择一个进程与信道
            int ChannelNum = SelectChannel(channel.size());
            //发送信号
            //测试
            std::cout << "taskcommand: " << TaskCommand << " channel: " << channel[ChannelNum].GetName() << " sub process: " << channel[ChannelNum].GetId() << std::endl; 
            SendTaskCommond(channel[ChannelNum] ,TaskCommand);
            
        }
        

        我们写入之后,子进程就可以读取任务并执行,注意子进程读取只读4个字节!!!如果读取的个数不正确,那么就出现了错误,需要报错!!!

        //子进程运行函数
        void work(int rfd)
        { while(true)
            { int Commond = 0;
                //等待相应
                int n = read(rfd , &Commond , sizeof(Commond));
                if(n == sizeof(int))
                { std::cout << "pid is : " << getpid() << " handler task" << std::endl;
                    //执行命令
                    std::cout << "commond :" << Commond << std::endl;
                    ExecuteTask(Commond);
                }
                //写端关闭
                else if(n == 0)
                { std::cout << "sub Process:" << getpid() << std::endl;
                    break;
                }
            }
        }
        //...
         	//创建子进程
                pid_t id = fork();
                if(id == 0)
                { //子进程 --- 只读不写
                    close(pipefd[1]);
                    work(pipefd[0]);
                    close(pipefd[0]);
                    exit(0);
                }
        //...
        

        进行一下测试:

        成功执行任务!!!

        3.4 ✨回收信道和子进程

        由于子进程会继承父进程的数据,所以一个信道实际上会有多个写端,每个管道在创建子进程时会被拷贝,会多一份指向,如果直接遍历一遍关闭管道,等待子进程:

         for(auto t : channel)
            { t.Close();
                t.WaitSub();
            }
        

        存在BUG !!! 看图:

        这样在如果按照正常的遍历关闭管道的写端描述符,就只是关闭了父进程的,子进程中还要很多指向管道的写端描述符!!!这样管道不会被关闭,子进程什么也读取不到就会一直阻塞在读取上!就会导致父进程等待不到子进程退出!!!

        于是就有了我们上面的改进:

        void CleanUpChannel(std::vector& channel)
        { for(auto t : channel)
            { t.Close();
            }
            for(auto t : channel)
            { t.WaitSub();
            }
        }
        

        按照上面图中的规律:

        • 最先创建的管道具有多个写端指向!
        • 而最后创建的管道只有一个写端,关闭之后,子进程会读取到文件末尾,该子进程就会被释放,其指向其他管道的写端也就对应关闭了!!!这样就避免了阻塞!

          然后最后同一起等待子进程的退出即可!!!

          当然我们可以进行改进,让我们可以支持这样简单的操作!

           for(auto t : channel)
              { t.Close();
                  t.WaitSub();
              }
          

          解决办法就是:在创建新的管道时,关闭子进程中指向其余管道的写端描述符!这样每个子进程就只有指向自己写端的文件描述符了!!!

          //...
          //创建子进程
           	pid_t id = fork();
              if(id == 0)
              { //注意要改进bug
                  //如果管道数组不为空,此时需要关闭其他管道(防止多重指向)
                  if(!channel->empty())
                  { for(auto& c : *channel) c.Close();
                  }
                  //子进程 --- 只读不写
                 	close(pipefd[1]);
               	work(pipefd[0]);
           		close(pipefd[0]);
          		exit(0);
              }
           //...
          

          这样就保证了每个子进程只有指向自己管道的文件描述符了!

          4 ♻️总结

          这样,我们的进程池项目就完成了。不过,实际上我们还可以进一步优化,比如优化 work 函数,将其设置为回调函数,以实现完全解耦。

          尽管如此,目前的实现已经能够满足我们的项目需求。一个面向过程的进程池项目就此完成!!!

          Thanks♪(・ω・)ノ谢谢阅读!!!

          下一篇文章见!!!