注:本文是阅读 muduo 网络库之后的理解以及自己的代码实现
IO 多路复用是 Reactor 的核心,简单来说,我们将多个文件描述符存放于一个统一的框架之中进行管理,每当某一个文件描述符中发生了某个事件,系统就去执行对应的响应函数,这也被称为回调。如图 1 所示是 Reactor 的流程描述,fd_2变为红色代表着该文件描述符当前存在激活事件,系统将其推入Event Callback Trigger的卡槽中,代表着fd_2的开关开启,系统开始执行对应的回调函数。
实现一个 Reactor 模式需要将其中的几个核心部件进行拆分。
首先,我们知道,对于对象,它们都拥有自己的回调函数,而对象本身,在 Linux 中可以用文件描述符进行表示,这包括但不限于各类文件资源、标准输入输出、SOCKET 通信甚至是定时器。综上,对于这一块,我们使用一个class Channel对一个文件描述符和它对应的回调函数进行封装,这在图 1 中对应了各个橙色的小卡片。
其次,整个系统应该有一个最上层的抽象描述。所谓抽象描述,是我们不去关心具体的实现,而是需要描述出这整个系统的核心特点。在这里,虽然是 IO 多路复用,但更抽象的来说,这个系统其实是管理所有文件描述符的所有事件(包括可读 ,_可写_,等等),并在适当时机执行指定的回调函数。所以,这实际上是一个事件循环,系统无非是对所有的事件进行轮询,探测哪些事件在当前是激活了的。所以,class EventLoop就是我们的抽象表示。class EventLoop的核心函数,即EventLoop::loop(),总结一下就是:轮询所有的事件,得到激活了的事件对应的class Channel,在这些Channel执行各自的回调函数。
最后,便是如何进行轮询。Linux 中的 IO 多路复用无非就是select(),poll(),epoll()三个函数,这里,为了简单而又不至于使系统性能太差,我们使用poll()。直接使用 Linux 底层的系统调用肯定是有悖于 C++的风格的,因此我们使用class Poller对其进行封装。那么,除了poll(),还需要封装哪些东西呢?回忆一下poll(),该函数需要一个struct pollfd组成的数组,每个struct pollfd对应了一个文件描述符,那么在我们的系统中,它也就对应着一个Channel。
是不是有点绕晕了?没关系,这里画一张图来说明整个class EventLoop的执行流程,如图 2 所示:
① class EventLoop开始执行EventLoop::loop(),表示事件循环开始;
② 系统将控制权交给class Poller,该类中封装有真正的 IO 多路复用底层调用;
③ 系统执行poll()函数,在vector<struct pollfd>得到更新,随后再得到对应的Active Channels,两者之间通过文件描述符予以关联,之后再将控制权交给class EventLoop;
④ 所有的Active Channels依次执行各自的事件回调。
最后再总结一下实现过程中一些需要注意的要点:
唯一性:class EventLoop作为上层抽象,在一个 IO 线程中只能有一个实例;
注册:每生成一个class Channel的实例,且需要对其进行监听时,则自动在EventLoop中进行注册;
一对一:class Channel和struct pollfd是一对一的关系,class Poller每执行一次class Poller::poll()之后,只能得到激活的struct pollfd,但却需要返回给class EventLoop激活的class Channel,因此在class Poller中需要一个std::map<int, Channel *>的查找树,int指代的是文件描述符。
前向声明:在编写类的头文件的时候,如果两个类互相需要依赖另一个类,则为了避免依赖限制,可以使用前向声明 ,但这样的话,就需要注意,构造函数和析构函数的定义最好都写在.cc 文件中,因为头文件中的类型尚不完整
具体实现 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 #ifndef CHANNEL_H #define CHANNEL_H #include <functional> #include <boost/noncopyable.hpp> class EventLoop ;class Channel : boost::noncopyable{ using eventCallback = std ::function<void ()>; private : EventLoop *ownerLoop_; int fd_; int events_; int revents_; int index_; static const int kNoneEvent; static const int kReadEvent; static const int kWriteEvent; eventCallback errorCallback_; eventCallback readCallback_; eventCallback writeCallback_; public : Channel(EventLoop *loop, int fd); ~Channel(); int fd () const { return fd_; } int events () const { return events_; } int revents () const { return revents_; } int index () const { return index_; } bool isNoneEvent () const { return events_ == kNoneEvent; } void set_index (int idx) { index_ = idx; } void enableRead () { events_ = events_ | kReadEvent; update(); } void enableWrite () { events_ = events_ | kWriteEvent; update(); } void set_revent (int revents) { revents_ = revents; } void setErrorCallback (const eventCallback cb) { errorCallback_ = cb; } void setReadCallback (const eventCallback cb) { readCallback_ = cb; } void setWriteCallback (const eventCallback cb) { writeCallback_ = cb; } void handleEvents () ; void update () ; }; #endif
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 #include "Channel.h" #include "EventLoop.h" #include <poll.h> const int Channel::kNoneEvent = 0 ;const int Channel::kReadEvent = POLLIN | POLLPRI;const int Channel::kWriteEvent = POLLOUT;Channel::Channel(EventLoop *loop, int fd): ownerLoop_(loop), fd_(fd), events_(0 ), revents_(0 ), index_(-1 ) { } Channel::~Channel() { } void Channel::handleEvents () { if (revents_ & (POLLNVAL|POLLERR)){ printf ("error event comes\n" ); if (errorCallback_) errorCallback_(); } if (revents_ & (POLLIN | POLLPRI | POLLRDHUP)){ printf ("read event comes\n" ); if (readCallback_) readCallback_(); } if (revents_ & POLLOUT){ printf ("write event comes\n" ); if (writeCallback_) writeCallback_(); } } void Channel::update () { ownerLoop_->updateChannel(this ); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 #ifndef POLLER_H #define POLLER_H #include <vector> #include <map> #include <muduo/base/noncopyable.h> class Channel ;class EventLoop ;struct pollfd ;class Poller : muduo::noncopyable{ using ChannelVec = std ::vector <Channel *>; using ChannelMap = std ::map <int , Channel *>; private : std ::vector <struct pollfd> pollfds_; ChannelMap channels_; EventLoop *ownerLoop_; public : Poller(EventLoop *loop); ~Poller(); void updateChannel (Channel *) ; void poll (int maxWaitTimeM, ChannelVec *activeChannels) ; void fillActiveChannels (int activeNum, ChannelVec *activeChannels) ; }; #endif
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 #include <assert.h> #include <poll.h> #include "Poller.h" #include "Channel.h" #include <muduo/base/Logging.h> Poller::Poller(EventLoop *loop): ownerLoop_(loop) { } Poller::~Poller() { } void Poller::updateChannel (Channel *ch) { if (ch->index() < 0 ){ assert(channels_.find(ch->fd()) == channels_.end()); struct pollfd tmp ; tmp.events = ch->events(); tmp.revents = ch->revents(); tmp.fd = ch->fd(); pollfds_.push_back(tmp); channels_[tmp.fd] = ch; ch->set_index(pollfds_.size()-1 ); } else { assert(channels_.find(ch->fd()) != channels_.end()); struct pollfd &tmp = pollfds_[ch->index()]; tmp.events = ch->events(); tmp.revents = ch->revents(); if (ch->isNoneEvent()){ tmp.fd = -1 ; } } } void Poller::poll (int maxWaitTimeM, ChannelVec *activeChannels) { int activeNum = ::poll(pollfds_.data(), pollfds_.size(), maxWaitTimeM); if (activeNum>0 ){ fillActiveChannels(activeNum, activeChannels); } else if (!activeNum){ LOG_INFO << "No active event in " << maxWaitTimeM << " m seconds" ; } else { LOG_FATAL << "ERROR occurs when ::poll()" ; } } void Poller::fillActiveChannels (int activeNum, ChannelVec *activeChannels) { for (const auto &tmp: pollfds_){ if (activeNum<=0 ) break ; if (tmp.revents>0 ){ assert(channels_.find(tmp.fd) != channels_.end()); --activeNum; channels_[tmp.fd]->set_revent(tmp.revents); activeChannels->push_back(channels_[tmp.fd]); } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 #ifndef EVENTLOOP_H #define EVENTLOOP_H #include <vector> #include <memory> #include <boost/noncopyable.hpp> class Channel ;class Poller ;class EventLoop : boost::noncopyable{ using ChannelVec = std ::vector <Channel *>; private : ChannelVec activeChannels_; bool quit_; bool isLoopping_; const pid_t threadId_; int maxWaitTimeM; std ::unique_ptr <Poller> pollerPtr_; void assertInLoopThread () ; public : EventLoop(); ~EventLoop(); void loop () ; void updateChannel (Channel *ch) ; void quit () { quit_ =true ; } }; #endif
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 #include "EventLoop.h" #include "Channel.h" #include "Poller.h" #include <muduo/base/Thread.h> #include <muduo/base/Logging.h> __thread EventLoop * loopOfCurrentThread_ = 0 ; EventLoop::EventLoop(): quit_(false ), isLoopping_(false ), threadId_(muduo::CurrentThread::tid()), maxWaitTimeM(10000 ), pollerPtr_(new Poller (this )) { if (loopOfCurrentThread_){ LOG_FATAL << "Already has an EventLoop!" ; } else { loopOfCurrentThread_ = this ; } } EventLoop::~EventLoop(){ LOG_INFO << "EventLoop " << this << "deconstructed in thread " << muduo::CurrentThread::tid(); loopOfCurrentThread_ = 0 ; } void EventLoop::loop () { assert(!isLoopping_); assertInLoopThread(); isLoopping_ = true ; quit_ = false ; while (!quit_){ activeChannels_.clear(); pollerPtr_->poll(maxWaitTimeM, &activeChannels_); for (auto &ch: activeChannels_){ ch->handleEvents(); } } isLoopping_ = false ; LOG_INFO << "EventLoop::loop() stopped" ; } void EventLoop::assertInLoopThread () { assert(threadId_ == muduo::CurrentThread::tid()); } void EventLoop::updateChannel (Channel *ch) { pollerPtr_->updateChannel(ch); }