之前,多线程一些基本的东西,包括线程创建,互斥锁,信号量,我们都已经封装,下面来看看消息队列


我们尽量少用系统自带的消息队列(比如Linux的sys/msgqueue),那样移植性不是很强,我们希望的消息队列,在消息打包和提取都是用的标准的C++数据结构,当然,你也可以用链表或者是FIFO,那样得先写个链表或者FIFO出来。


我比较懒,直接用的C++的STL的deque,即双端口队列,这样可靠性有保证,当然,速度可能没有自己写的链表快,但是没关系,使用双端口队列还可以根据你自己的需要将数据插入到队列头或者队列尾,这样在消息有优先级的情况下还是有用的。


消息队列的核心作用其实很简单,一个或多个线程往一个队列后面堆数据,另外的一个线程从队列前面取数据处理,基本操作也只有两个,一个发,一个收,所以,我们定义消息队列基类为:

class CMsgQueue  
{  
    public:  
        CMsgQueue(const char *pName=NULL);  
        ~CMsgQueue();  
        //revice data from message queue  
        virtual bool recvMsg(unsigned int &m_msg_code,void *&p_msg)=0;  
        //send data to message queue  
        virtual bool sendMsg(unsigned int m_msg_code,void *p_msg)=0;  
        const char * getName(void) const {  
                return msg_queue_name;  
            }        
    private:  
        char *msg_queue_name;  
};
登录后复制


然后记得在COperratingSystemFactory里加上创建消息队列的方法:

class COperatingSystemFactory  
{  
    public:  
        static COperatingSystem *newOperatingSystem();  
        static CCountingSem  *newCountingSem(unsigned int init);  
        static CMutex           *newMutex(const char *pName=NULL);  
        static CMsgQueue     *newMsgQueue(const char *pName=NULL);  
  
};
登录后复制


最后,从CMsgQueue继承一个CLinuxMsgQueue,然后把recvMsg和sendMsg实现吧,实现的时候注意一下。


单纯的操作双端口FIFO不行,我们希望是接收消息的时候如果没有消息,线程阻塞在那里等待消息直到有消息到来才接着运行,所以,接收消息的时候我们用了信号量,阻塞在信号量那里,发送消息的时候操作完队列,发送一个信号量出去。


其次,对于队列的操作,我们希望是原子性的,不然一个正在收一个正在发就乱了,所以操作队列的时候我们用互斥锁来锁一下,保证基本的原子性。


对应到具体的程序就是


1.为每个消息队列申请一个锁,一个信号量

CLinuxMsgQueue::CLinuxMsgQueue(const char *pName):  
CMsgQueue(pName)  
{  
    p_mutex=COperatingSystemFactory::newMutex("Msg Mutex");  
    p_sem=COperatingSystemFactory::newCountingSem(0);  
}
登录后复制


接收消息的时候:

bool CLinuxMsgQueue::recvMsg(unsigned int &m_msg_code,void *&p_msg)  
{  
    bool result;  
        Elements queue_element;  
    p_sem->Get();  //通过信号量阻塞在这里,有消息到达了才接着往下走  
    p_mutex->Lock();  //锁定,保证原子性  
        //操作队列  
    if (m_queue.empty()) {  
                p_mutex-> UnLock ();  
            return false;    
    }  
    queue_element = m_queue.front();  
    m_queue.pop_front();  
    m_msg_code = queue_element.msg_code;  
    p_msg = queue_element.p_message;  
        //操作队列结束  
    p_mutex->UnLock(); //解除锁定  
        return true;  
}
登录后复制


发送的时候也是类似的方式进行,这样,一个最简单消息队列就完成了。如果我们要使用消息队列的话,很简单,在main.cpp中

int main()  
{  
        //首先,新建一个消息队列  
        CMsgQueue *q=COperatingSystemFactory::newMsgQueue("B to A message Queue");  
        //新建两个线程,TestThread和TestThreadB都是从CThread继承下来的线程类  
    TestThread *a=new TestThread("A");  
    TestThreadB *b=new TestThreadB("B");  
        //将消息队列放到两个线程实体的局部变量中  
    a->setMsgQueue(q);  
    b->setMsgQueue(q);  
        //启动线程  
    a->run();  
    b->run();  
}
登录后复制


当要在mainloop中发送消息的时候,只需要调用

p_msg_send->sendMsg(code, (void *)p_msg);  
//其中p_msg_send是b线程的局部变量,实际指向的是之前新建的消息队列q
登录后复制

github地址:

https://github.com/wyh267/Cplusplus_Thread_Lib

写在后面的话:

当然,这个代码还非常不完整,整个代码量也没有多少行,在这里,我只是提供一个代码框架的方法,作为一个demo给大家参考,如果真的需要实际使用还有很多很多地方需要修改的,github上我的代码也不能在生产软件中实际使用,在实际的项目中,我也实现了一个没有任何第三方的线程库,比这个复杂多了,还包括事件处理,等待超时,消息广播,消息订阅等模块,而且能运行在linux,ecos等多个平台上,基本做到平台无关了,但由于各种原因我也没办法将代码都公布出来,这里所说的这个框架只是项目中线程库提取出来的非常少的一部分,同样,也只是提供一种编程的设计思想,后面的东西希望大家各自有各自的发掘和完善,也许你看了以后,会提出更加强大和简洁的框架。


另外,github上的代码我会继续完善,将其他模块陆续加上,如果大家感兴趣也可以跟我一起来完善,我尽量不使用之前实现过的线程库的代码,避免不必要的麻烦。

以上就是C++ 多线程框架(3):消息队列的内容,更多相关内容请关注Work网(www.php.cn)!


09-01 01:24