注:本文是阅读 muduo 网络库之后的理解以及自己的代码实现

IO 多路复用是 Reactor 的核心,简单来说,我们将多个文件描述符存放于一个统一的框架之中进行管理,每当某一个文件描述符中发生了某个事件,系统就去执行对应的响应函数,这也被称为回调。如图 1 所示是 Reactor 的流程描述,fd_2变为红色代表着该文件描述符当前存在激活事件,系统将其推入Event Callback Trigger的卡槽中,代表着fd_2的开关开启,系统开始执行对应的回调函数。
图1. Reactor的多路复用流程示意.png

实现一个 Reactor 模式需要将其中的几个核心部件进行拆分。

首先,我们知道,对于对象,它们都拥有自己的回调函数,而对象本身,在 Linux 中可以用文件描述符进行表示,这包括但不限于各类文件资源、标准输入输出、SOCKET 通信甚至是定时器。综上,对于这一块,我们使用一个class Channel对一个文件描述符和它对应的回调函数进行封装,这在图 1 中对应了各个橙色的小卡片。

其次,整个系统应该有一个最上层的抽象描述。所谓抽象描述,是我们不去关心具体的实现,而是需要描述出这整个系统的核心特点。在这里,虽然是 IO 多路复用,但更抽象的来说,这个系统其实是管理所有文件描述符的所有事件(包括可读,_可写_,等等),并在适当时机执行指定的回调函数。所以,这实际上是一个事件循环,系统无非是对所有的事件进行轮询,探测哪些事件在当前是激活了的。所以,class EventLoop就是我们的抽象表示。class EventLoop的核心函数,即EventLoop::loop(),总结一下就是:轮询所有的事件,得到激活了的事件对应的class Channel,在这些Channel执行各自的回调函数。

最后,便是如何进行轮询。Linux 中的 IO 多路复用无非就是select()poll()epoll()三个函数,这里,为了简单而又不至于使系统性能太差,我们使用poll()。直接使用 Linux 底层的系统调用肯定是有悖于 C++的风格的,因此我们使用class Poller对其进行封装。那么,除了poll(),还需要封装哪些东西呢?回忆一下poll(),该函数需要一个struct pollfd组成的数组,每个struct pollfd对应了一个文件描述符,那么在我们的系统中,它也就对应着一个Channel

是不是有点绕晕了?没关系,这里画一张图来说明整个class EventLoop的执行流程,如图 2 所示:

  • class EventLoop开始执行EventLoop::loop(),表示事件循环开始;
  • ② 系统将控制权交给class Poller,该类中封装有真正的 IO 多路复用底层调用;
  • ③ 系统执行poll()函数,在vector<struct pollfd>得到更新,随后再得到对应的Active Channels,两者之间通过文件描述符予以关联,之后再将控制权交给class EventLoop
  • ④ 所有的Active Channels依次执行各自的事件回调。

图2. Reactor系统流程图

最后再总结一下实现过程中一些需要注意的要点:

  • 唯一性:class EventLoop作为上层抽象,在一个 IO 线程中只能有一个实例;
  • 注册:每生成一个class Channel的实例,且需要对其进行监听时,则自动在EventLoop中进行注册;
  • 一对一:class Channelstruct pollfd是一对一的关系,class Poller每执行一次class Poller::poll()之后,只能得到激活的struct pollfd,但却需要返回给class EventLoop激活的class Channel,因此在class Poller中需要一个std::map<int, Channel *>的查找树,int指代的是文件描述符。
  • 前向声明:在编写类的头文件的时候,如果两个类互相需要依赖另一个类,则为了避免依赖限制,可以使用前向声明,但这样的话,就需要注意,构造函数和析构函数的定义最好都写在.cc 文件中,因为头文件中的类型尚不完整

具体实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
/*  Channel.h */
#ifndef CHANNEL_H
#define CHANNEL_H

#include <functional>
#include <boost/noncopyable.hpp>

// forward declaration, avoid dependency
class EventLoop;

// Dispatch of event
class Channel : boost::noncopyable
{
using eventCallback = std::function<void()>;
private:
EventLoop *ownerLoop_; // ownerLoop_ owns this Channel
int fd_; // just as fd in struct pollfd
int events_;
int revents_;

int index_; // necessary in Poller, -1 as inactivated

static const int kNoneEvent;
static const int kReadEvent;
static const int kWriteEvent;

eventCallback errorCallback_;
eventCallback readCallback_;
eventCallback writeCallback_;

public:
Channel(EventLoop *loop, int fd);
~Channel();

int fd() const {
return fd_;
}
int events() const {
return events_;
}
int revents() const {
return revents_;
}
int index() const {
return index_;
}

bool isNoneEvent() const {
return events_ == kNoneEvent;
}

void set_index(int idx){
index_ = idx;
}
void enableRead(){
events_ = events_ | kReadEvent;
update();
}
void enableWrite(){
events_ = events_ | kWriteEvent;
update();
}
void set_revent(int revents){
revents_ = revents;
}

void setErrorCallback(const eventCallback cb){
errorCallback_ = cb;
}
void setReadCallback(const eventCallback cb){
readCallback_ = cb;
}
void setWriteCallback(const eventCallback cb){
writeCallback_ = cb;
}

void handleEvents();
void update(); // Channel::update() ==> EventLoop::updateChannel()
// ==> Poller::updateChannel()
};

#endif /* CHANNEL_H */

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
/* Channel.cc */
#include "Channel.h"
#include "EventLoop.h"
#include <poll.h>

// static variables should have a initialization
const int Channel::kNoneEvent = 0;
const int Channel::kReadEvent = POLLIN | POLLPRI;
const int Channel::kWriteEvent = POLLOUT;

Channel::Channel(EventLoop *loop, int fd):
ownerLoop_(loop),
fd_(fd),
events_(0),
revents_(0),
index_(-1)
{

}

Channel::~Channel() {

}

void Channel::handleEvents() {
if(revents_ & (POLLNVAL|POLLERR)){
printf("error event comes\n");
if(errorCallback_) errorCallback_();
}
if(revents_ & (POLLIN | POLLPRI | POLLRDHUP)){
printf("read event comes\n");
if(readCallback_) readCallback_();
}
if(revents_ & POLLOUT){
printf("write event comes\n");
if(writeCallback_) writeCallback_();
}
}

void Channel::update(){
ownerLoop_->updateChannel(this);
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/* Poller.h */
#ifndef POLLER_H
#define POLLER_H

#include <vector>
#include <map>
#include <muduo/base/noncopyable.h>

class Channel;
class EventLoop;
struct pollfd;

class Poller : muduo::noncopyable
{
using ChannelVec = std::vector<Channel *>;
using ChannelMap = std::map<int, Channel *>; // fd => Channel *
private:
std::vector<struct pollfd> pollfds_;
ChannelMap channels_;
EventLoop *ownerLoop_;

public:
Poller(EventLoop *loop);
~Poller();

void updateChannel(Channel *);
void poll(int maxWaitTimeM, ChannelVec *activeChannels);
void fillActiveChannels(int activeNum, ChannelVec *activeChannels);
};

#endif /* POLLER_H */

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
/* Poller.cc */
#include <assert.h>
#include <poll.h>
#include "Poller.h"
#include "Channel.h"
#include <muduo/base/Logging.h>

Poller::Poller(EventLoop *loop): ownerLoop_(loop) {

}

Poller::~Poller() {

}

void Poller::updateChannel(Channel *ch){
if(ch->index() < 0){ // this channel is a new one
assert(channels_.find(ch->fd()) == channels_.end());
struct pollfd tmp;
tmp.events = ch->events();
tmp.revents = ch->revents();
tmp.fd = ch->fd();
pollfds_.push_back(tmp);
channels_[tmp.fd] = ch;
ch->set_index(pollfds_.size()-1);
} else { // it alreay stores in pollfds_, we just change the value
assert(channels_.find(ch->fd()) != channels_.end());
struct pollfd &tmp = pollfds_[ch->index()];
tmp.events = ch->events();
tmp.revents = ch->revents();
if(ch->isNoneEvent()){
tmp.fd = -1; // no event under watched, so set -1
}
}
}

void Poller::poll(int maxWaitTimeM, ChannelVec *activeChannels){
int activeNum = ::poll(pollfds_.data(), pollfds_.size(), maxWaitTimeM);
if(activeNum>0){
fillActiveChannels(activeNum, activeChannels);
} else if(!activeNum){
LOG_INFO << "No active event in " << maxWaitTimeM << " m seconds";
} else {
LOG_FATAL << "ERROR occurs when ::poll()";
}
}

void Poller::fillActiveChannels(int activeNum, ChannelVec *activeChannels){
for(const auto &tmp: pollfds_){
if(activeNum<=0) break;
if(tmp.revents>0){
assert(channels_.find(tmp.fd) != channels_.end());
--activeNum;
channels_[tmp.fd]->set_revent(tmp.revents); // revent of channel should be updated
activeChannels->push_back(channels_[tmp.fd]);
}
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
/* EventLoop.h */
#ifndef EVENTLOOP_H
#define EVENTLOOP_H

#include <vector>
#include <memory>
#include <boost/noncopyable.hpp>

class Channel;
class Poller;

// core of Reactor
class EventLoop : boost::noncopyable
{
using ChannelVec = std::vector<Channel *>;
private:
ChannelVec activeChannels_;
bool quit_; // should be atomatic
bool isLoopping_;
const pid_t threadId_; // notes of IO thread
int maxWaitTimeM;
std::unique_ptr<Poller> pollerPtr_;

void assertInLoopThread();

public:
EventLoop();
~EventLoop();

// precondition:
// 1. should be IO thread
// 2. cannot call loop() repeately
void loop();

void updateChannel(Channel *ch);
void quit() {
quit_ =true;
}
};

#endif /* EVENTLOOP_H */

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
/* EventLoop.cc */
#include "EventLoop.h"
#include "Channel.h"
#include "Poller.h"
#include <muduo/base/Thread.h>
#include <muduo/base/Logging.h>

__thread EventLoop * loopOfCurrentThread_ = 0;

EventLoop::EventLoop():
quit_(false),
isLoopping_(false),
threadId_(muduo::CurrentThread::tid()),
maxWaitTimeM(10000),
pollerPtr_(new Poller (this))
{
if(loopOfCurrentThread_){
LOG_FATAL << "Already has an EventLoop!";
} else {
loopOfCurrentThread_ = this;
}
}

EventLoop::~EventLoop(){
LOG_INFO << "EventLoop " << this << "deconstructed in thread "
<< muduo::CurrentThread::tid();
loopOfCurrentThread_ = 0;
}

void EventLoop::loop(){
assert(!isLoopping_);
assertInLoopThread();
isLoopping_ = true;
quit_ = false;
while(!quit_){
activeChannels_.clear();
pollerPtr_->poll(maxWaitTimeM, &activeChannels_);
for(auto &ch: activeChannels_){
ch->handleEvents();
}
}
isLoopping_ = false;
LOG_INFO << "EventLoop::loop() stopped";
}

void EventLoop::assertInLoopThread() {
assert(threadId_ == muduo::CurrentThread::tid());
}

void EventLoop::updateChannel(Channel *ch){
pollerPtr_->updateChannel(ch);
}