注:本文为阅读《Linux 多线程服务端编程:使用 muduo C++网络库》的一点笔记

空闲连接指的是一段时间内没有受到任何数据的连接。我们需要做的是每隔一段时间断开这些空闲连接,以免浪费资源。剔除空闲连接这一任务大概有如下两个特点和需求:

  • 无需精准定时:只需要一个大致的间隔,比如说 10s 左右未受到数据,则判定其为空闲连接
  • 应尽量简单:剔除空闲连接应该是一个简洁明了的操作,不应占据太多的计算或空间资源

time wheel 运用了桶排序的思路,在系统中设置 N 个桶,共同组成一个队列。第 i 个桶中存放 i 秒后将要变为空闲连接的连接。这样一来,我们只需要每秒剔除第 0 个桶中的连接即可,无需遍历全部连接,剔除之后,将第 0 个桶移动到尾部。而一个连接如果接收到了新数据,那么该连接就将自己重新放入最后一个桶中。
以循环队列构造的time wheel

接下来,我先用自己的语言描述一下书中作者所使用的数据结构:

  • 桶中存放TcpConnection?: 我们在断掉连接之前还需要判断其是否还处于连接状态,结合 RAII 技术,使用using Entry = std::weak_ptr<TcpConnection>是最有效的。(weak_ptr 是棉线,我们在Entry的析构函数中去判断TcpConnection出否还处于连接状态)
  • 每个Entry需要在有新数据到达时转移在桶中的位置?:其实不必真的去移动桶中的数据,利用using EntryPtr = std::shared_prt<Entry>的引用计数,用 RAII 计数去管理资源。没错,用shared_ptr去管理weak_ptr,虽然看起来怪怪的,但此时一个weak_ptr就是一个Entry,故将其看作为一份资源。如此一来,我们无需再去频繁移动Entry,转而在桶中存放同一个Entry的多个shared_ptr,并在引用计数变为 0 的时候去析构对应的资源。
  • 如何构造 time wheel 中的每个桶?:因为需要频繁的删除和插入,所以可以使用哈希表using Bucket = std::unordered_set<Entry>去构造桶
  • 如何构造 time wheel?:使用using WeakConnectionList = boost::circular_buffer<Bucket>,该数据结构的元素个数在初始化时被固定,此后在每秒中,我们只需要在尾部重新添加一个空桶,那么头部的桶就会被清除

嗯…上面一段听起来不太像人话。接下来是我对上述数据结构的理解和剖析:

  1. 明确 time wheel 服务的目标:
    明确这一点是很重要的,time wheel 的核心仅仅是**”⏲ 计时”**。这意味 time wheel 只是去判断一个 TCP 连接是否已经超时成为空闲连接,而不去判断、也不去影响这个 TCP 连接的其他状态,比如说这个 TCP 连接是否已经断开。那或许有人会这样问了:“当 time wheel 得知某一个 TCP 连接已经超时并变成空闲连接之后,由谁来执行断开操作呢?”答案是交给其他合适的数据结构。明白了这一点,使用using Entry = shd::weak_ptr<TcpConnection>作为 time wheel 和 TcpConnection 之间的桥梁也就是顺其而然的事情了,因为weak_ptr的特点就是,它可以探测到其指向的资源的状态,但本身并不影响其状态。那为什么非得要这样来设计呢?我觉得这是一个明确分工的问题,一个数据结构应该专注于将自己的任务做好,而不应过分插手其他的任务。

  2. 直观的解决方案
    其实明白了第 1 点的内涵之后,再结合 time wheel 的思路,我们就已经可以有一个比较直观的方案了。当服务器发现新的TcpConnection之后,就生成这个TcpConnection对应的Entry,并将其放置到尾部的桶中。而每当任意TcpConnection接收到新的消息之后,就将对应的Entry从原来的桶再移动到尾部的桶中。最后,利用定时器设定一个每秒执行一次的回调函数,该函数将头部的桶中的全部Entry逐个进行delete,而Entry中的析构函数会去断开对应的TcpConnection。上述的思路就是很直观明了的。

  3. 反思:应该充分利用 C++语言特性
    在第 2 点谈到的初步方案中,我们利用Entry的析构函数去断开对应的TcpConnection,这已经体现了 RAII 的思路了,但方案很麻烦的一点在于,我们需要对Entry进行频繁的移动。试想一下,为了移动Entry,我们必须要记录每个Entry位于哪一个桶中,这就又需要一个新的数据结构,变来变去徒增烦恼。还是再一次发挥 RAII 的优势,我们再使用一层using EntryPtr = shared_ptr<Entry>;去管理Entry。具体的思路是这样的,桶中的元素类型从Entry变为EntryPtr,这样一来,每当任意TcpConnection接收到新的消息之后,我们只需要在尾部的桶中新增一个对应EntryEntryPtr。定时器的回调函数还是同样的模式,这样一来,每当一个TcpConnection有新消息时,其对应的Entry的引用计数会增加,而每秒执行一次的定时函数又会减少头部桶中的各个Entry的引用计数。完美,我们不再需要新的数据结构去记录Entry的位置了。下图是一个简单的示意,当前这个Entry在桶中共存放了 4 个EntryPtr,如果后续的 6 秒内该TcpConnection均没有接收到数据,那么EntryPtr的引用计数就会变为 0,那么系统自动执行Entry的析构函数,也就是去断开对应的TcpConnection
    当前Entry有4个EntryPtr

  4. 细节:一个TcpConnection仅对应一个Entry
    这是一个非常隐秘的细节,但也及其重要。试想一下,如果同一个TcpConnection生成了两个Entry,而这两个Entry各自的多个EntryPtr又都放入了桶中,那么毫无疑问,该TcpConnection会被”断开两次”,而放入到此处的语义中,则是TcpConnection会被提前断开连接。下图是一个示例,图中的TcpConnection生成了两个Entry,而Entry2最多在 4 秒以后就会因为引用计数变为 0 而进行析构(假设这个连接之后不会再接收数据),即断开了对应的 TCP 连接,但实际上,桶 6 中也有对应的代理,这意味着理论上这个TcpConnection至少还应该存在 6 秒。这也就是书中两个思考题的答案了,我们需要在TcpConnectioncontext中保存那唯一一个Entry,而这个Entry是在服务器探测到新连接的时候为这个新连接创建出来的。
    错误的示例,单个TcpConnection生成了两个Entry

接下来就是实际操作了,明白了原理之后,就简单很多了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
/*  省略各个头文件 */
/* ... */
class EchoServer
{
using WeakTcpConnectionPtr = std::weak_ptr<TcpConnection>;
struct Entry {
std::weak_ptr<TcpConnection> ptr_;
Entry(const WeakTcpConnectionPtr &ptr): ptr_(ptr) {} // weak_ptr是可以通过shared_ptr进行构造的
~Entry(){
const TcpConnectionPtr conn = ptr_.lock(); // return the corresponding shared_ptr
if(conn){ // must check the activity before shutdown action
conn->shutdown();
}
}
};

using EntryPtr = std::shared_ptr<Entry>;
using Bucket = std::unordered_set<EntryPtr>;
using BucketList = boost::circular_buffer<Bucket>; // our finale data structure

using WeakEntryPtr = std::weak_ptr<Entry>;

public:
EchoServer(EventLoop *loop, const InetAddress &addr, const int &num_buckets):
loop_(loop), server_(loop, addr, "ECHO SERVER"), buckets_(num_buckets)
{
server_.setConnectionCallback(std::bind(&EchoServer::onConnection, this, _1)); // must have the &
server_.setMessageCallback(std::bind(&EchoServer::onMessage, this, _1, _2, _3));
loop_->runEvery(1.0, std::bind(&EchoServer::onTime, this));
}
// ~EchoServer();

void start_(){
server_.start();
}

private:
EventLoop *loop_;
TcpServer server_;
BucketList buckets_;

void onTime(){
buckets_.push_back(Bucket()); // the head bucket is popped automatically
}

void onConnection(const TcpConnectionPtr &conn){
LOG_INFO << "ECHO_SERVER: " << conn->peerAddress().toIpPort()
<< "-> " << conn->localAddress().toIpPort() << " is "
<< ( conn->connected() ? " ON " : " OFF " );
if(conn->connected()){
EntryPtr newPtr(new Entry(conn)); // shared_ptr can be transformed to weak_ptr
buckets_.back().insert(newPtr);
conn->setContext(WeakEntryPtr(newPtr)); // store weak_ptr of Entry
}
}
void onMessage(const TcpConnectionPtr &conn, Buffer *buf, Timestamp time_){
muduo::string msg(buf->retrieveAllAsString());
LOG_INFO << "ECHO_SERVER: " << conn->peerAddress().toIpPort()
<< " has message coming, size is: " << msg.size();
WeakEntryPtr tmp = boost::any_cast<WeakEntryPtr>(conn->getContext());
buckets_.back().insert(tmp.lock());// add the count
conn->send(msg);
}



};

#endif /* ECHO_SERVER_H */

int main(int argc, char *argv[])
{
EventLoop loop;
InetAddress addr(2333);
EchoServer server(&loop, addr, 10); // 设置超时时间为10s
server.start_();
loop.loop();
return 0;
}

全部代码详见wcsjdzz/duduo