注:本文为阅读了 muduo 网络库源码及作者著作之后对于网络库的复现和笔记
功能 我们定义一个class Acceptor,其功能是:让服务器在指定的端口处进行监听,如果在端口监听到连接,则执行由class Acceptor的类用户注册的回调函数。
底层 API 首先梳理一下与 Acceptor 相关的底层 API 调用。
int socket(int domain, int type, int protocol) 用于创建本地 socket fd,domain指示网络的通信所在域,通常选择AF_INET即可,代表 IPV4;type指示 socket fd 类型,对于 TCP 协议,因为是流式协议,加上我们的网络库的非阻塞特性,而通常还需指定 CLOSE ON EXECVE,所以通常需要输入SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC;protocol指示具体协议,这里我们选用IPPROTO_TCP
int bind(int sockfd, const struct sockaddr *addr, socklen_t addrLen) 用于将socket()生成的 socket fd 与服务器的监听地址(IP 加端口)进行绑定。
int listen(sockfd, int backlog) 对生成的套接字进行实际的监听,backlog指定能够同时容纳的最多监听个数。
int accept(int sockfd, struct sockaddr *peerAddr, socklen_t addrLen ) 用于监听到了“来电”之后,接受对应的 TCP 连接,返回对端(客户端)的 socket fd,并将对端的地址填入到peerAddr中。
模块拆分 之前的 Reactor 模式是一个经典的 IO 多路复用模式,我们已经用一个class EventLoop抽象出了整个多路复用的网络模型,接下来就是将这个模型用起来,去构建实际的 socket 网络程序了。
socket 网络编程设计到了繁多的底层 API 接口,在现代 C++特性之中,自然也要对其进行合理的封装,才能发挥出语言的最大优势。首先绘制一下服务端的对于 TCP 连接的接收过程,如图 1 所示。
对于底层 API 而言,”socket() 到 bind() “是一条龙式的操作,因此可以定义一个class Socket来封装这个过程。服务器根据这一套流程建立本地的 socket。随后当开始决定监听之后,则开始进行”listen()”,当探测到连接请求的时候, 开始” accept()”,得到对端的 socket 以及网络地址,然后就可以调用用户注册的回调函数了。需要注意的是图 1 中的handleRead()子框,这个子框才是 Acceptor 中的 Channel 的 readable 回调。原因其实很简单,因为对于服务器而言,当探测到有客户端发起连接请求之后,服务器的 callback 应该是先建立连接,再执行用户回调,那么“建立连接+执行用户回调”的整个过程才是“客户端发起连接请求”这个 readable 事件的回调。
代码实战 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 #ifndef SOCKET_H #define SOCKET_H #include "muduo/base/noncopyable.h" #include <muduo/net/InetAddress.h> #include <boost/noncopyable.hpp> class Socket : boost::noncopyable{ private : const int sockfd_; public : int fd () const { return sockfd_; } void bindAddress (const muduo::net::InetAddress& localaddr) ; void listen () ; int accept (muduo::net::InetAddress* peeraddr) ; void setReuseAddr (bool on) ; void setReusePort (bool on) ; explicit Socket (const int &fd) ; ~Socket(); }; #endif
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 #include "Socket.h" #include "muduo/base/Logging.h" #include "muduo/net/InetAddress.h" #include "SockOptions.h" #include <netinet/in.h> #include <netinet/tcp.h> #include <stdio.h> // snprintf Socket::Socket(const int &fd): sockfd_(fd) { } Socket::~Socket(){ sockoptions::close(sockfd_); } void Socket::bindAddress (const muduo::net::InetAddress &localaddr) { sockoptions::bindOrDie(sockfd_, localaddr.getSockAddr()); } void Socket::listen () { sockoptions::listenOrDie(sockfd_); } int Socket::accept (muduo::net::InetAddress *peeraddr) { sockaddr_in6 tmpAddr; bzero(&tmpAddr, sizeof tmpAddr); int connfd = sockoptions::accept(sockfd_, &tmpAddr); peeraddr->setSockAddrInet6(tmpAddr); return connfd; } void Socket::setReuseAddr (bool on) { int reused = on; int len = static_cast <socklen_t >(sizeof reused); int ret = ::setsockopt(sockfd_, SOL_SOCKET, SO_REUSEADDR, &reused, len); if (ret < 0 ){ LOG_ERROR << "setReuseAddr falied" ; } } void Socket::setReusePort (bool on) { int reused = on; int len = static_cast <socklen_t >(sizeof reused); int ret = ::setsockopt(sockfd_, SOL_SOCKET, SO_REUSEPORT, &reused, len); if (ret < 0 ){ LOG_ERROR << "setReuseAddr falied" ; } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 #ifndef ACCEPTOR_H #define ACCEPTOR_H #include <functional> #include <muduo/net/InetAddress.h> #include <memory> #include <boost/noncopyable.hpp> #include "../Reactor/Channel.h" #include "Socket.h" class EventLoop ;class Acceptor : boost::noncopyable{ using ConnCallback = std ::function<void (int , const muduo::net::InetAddress &)>; private : EventLoop *loop_; ConnCallback cb_; std ::unique_ptr <Socket> socket_; Channel socketChannel_; bool listening_; public : Acceptor(EventLoop *loop, const muduo::net::InetAddress & localAddr); ~Acceptor(); bool isListening () const { return listening_; } void listen () ; void handleRead () ; void setNewConnectionCallback (const ConnCallback &func) ; }; #endif
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 #include <muduo/base/Logging.h> #include "../Reactor/EventLoop.h" #include "../Reactor/Channel.h" #include "Acceptor.h" #include "SockOptions.h" #include "Socket.h" Acceptor::Acceptor(EventLoop *loop, const muduo::net::InetAddress &localAddr) :loop_(loop), socket_(new Socket (sockoptions::createNonblockingOrDie(AF_INET))), socketChannel_(loop, socket_->fd()), listening_(false ) { socket_->setReuseAddr(true ); socket_->bindAddress(localAddr); socketChannel_.setReadCallback(std ::bind(&Acceptor::handleRead, this )); } Acceptor::~Acceptor() { } void Acceptor::listen () { loop_->assertInLoopThread(); listening_ = true ; socket_->listen(); socketChannel_.enableRead(); } void Acceptor::handleRead () { muduo::net::InetAddress addr; int connfd = socket_->accept(&addr); if (connfd < 0 ){ LOG_FATAL << "Acceptor - socket accept failed" ; return ; } if (cb_){ cb_(connfd, addr); } else { LOG_ERROR << "Acceptor - NewConnectionCallback unset" ; } } void Acceptor::setNewConnectionCallback (const ConnCallback &func) { cb_ = func; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 #include <arpa/inet.h> namespace sockoptions{ int createNonblockingOrDie (sa_family_t family) ;void bindOrDie (int sockfd, const struct sockaddr* addr) ;void listenOrDie (int sockfd) ;void close (int sockfd) ;int accept (int sockfd, struct sockaddr_in6* addr) ;const struct sockaddr* sockaddr_cast (const struct sockaddr_in* addr) ;const struct sockaddr* sockaddr_cast (const struct sockaddr_in6* addr) ;struct sockaddr* sockaddr_cast (struct sockaddr_in6* addr) ;}
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 #include "./SockOptions.h" #include "muduo/base/Logging.h" #include "muduo/base/Types.h" #include "muduo/net/Endian.h" #include <errno.h> #include <fcntl.h> #include <stdio.h> // snprintf #include <sys/socket.h> #include <sys/uio.h> // readv #include <unistd.h> int sockoptions::createNonblockingOrDie (sa_family_t family) { int sockfd = ::socket(family, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, IPPROTO_TCP); if (sockfd < 0 ){ LOG_FATAL << "Create socket failed!" ; } return sockfd; } void sockoptions::bindOrDie (int sockfd, const struct sockaddr *addr) { socklen_t len = static_cast <socklen_t >(sizeof (*addr)); int ret = ::bind(sockfd, addr, len); if (ret < 0 ){ LOG_FATAL << "Bind address failed!" ; } } void sockoptions::listenOrDie (int sockfd) { int ret = ::listen(sockfd, SOMAXCONN); if (ret < 0 ){ LOG_FATAL << "Listen socket failed!" ; } } void sockoptions::close (int sockfd) { int ret = ::close(sockfd); if (ret < 0 ){ LOG_FATAL << "Close sockfd falied!" ; } } int sockoptions::accept (int sockfd, struct sockaddr_in6 *addr) { struct sockaddr *sa = sockaddr_cast(addr); socklen_t len = static_cast <socklen_t >(sizeof (*addr)); int connfd = ::accept(sockfd, sa, &len); if (connfd < 0 ){ LOG_FATAL << "Accept socket failed!" ; } return connfd; } const struct sockaddr * sockoptions::sockaddr_cast (const struct sockaddr_in *addr) { return static_cast <const struct sockaddr *>(muduo::implicit_cast<const void *>(addr)); } const struct sockaddr * sockoptions::sockaddr_cast (const struct sockaddr_in6 *addr) { return static_cast <const struct sockaddr *>(muduo::implicit_cast<const void *>(addr)); } struct sockaddr * sockoptions::sockaddr_cast (struct sockaddr_in6 *addr) { return static_cast <struct sockaddr *>(muduo::implicit_cast<void *>(addr)); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 #include "../Reactor/EventLoop.h" #include "Acceptor.h" #include <muduo/net/InetAddress.h> void callbackFunc (int connfd, const muduo::net::InetAddress &addr) { printf ("A new connection comming from %s\n" , addr.toIpPort().c_str()); char msg[] = "Hello, I can hear you calling me\n" ; ::write(connfd, msg, sizeof msg); } int main (int argc, char *argv[]) { EventLoop loop; muduo::net::InetAddress localAddr (2333 ) ; muduo::net::InetAddress localAddr2 (3332 ) ; Acceptor acceptor (&loop, localAddr) ; Acceptor acceptor2 (&loop, localAddr2) ; acceptor.setNewConnectionCallback(&callbackFunc); acceptor2.setNewConnectionCallback(&callbackFunc); acceptor.listen(); acceptor2.listen(); loop.loop(); return 0 ; }
运行结果
实现过程中的一些知识点总结
sockaddr 和 sockaddr_in 两者之间是相互补充的关系。大多数诸如::bind()的底层 socket API 使用struct sockaddr *作为入参类型,但sockaddr的小缺陷是,它将 IP 端口和 IP 地址混在了一个变量中,故赋值时不太方便。sockaddr_in弥补了这一问题,它将端口和地址进行了分离,同时为了适配sockaddr,又填充了一些不使用的变量,使得两种结构体的内存分布完全一致,因此两种类型的指针可以相互转换。总结一下:sockaddr_in简化了变量的赋值,sockaddr用于函数的传参。
implicit_cast 该类型转换应该是为了更安全的进行精确类型转换,当使用implicit_cast的时候,编译器会去检查该转换是否安全。(不过该 cast 暂时还没有纳入标准库)
SOMAXCONN 在调用::listen()的时候需要指定最多可以支持多少连接请求,为此系统定义了一个专门的宏SOMAXCONN用来表示系统所支持的最多请求个数。
setsockopt() 网络通信会在不同的层级或者协议之中拥有不同的设置选项,setsockopt()的功能就是将设置这些选项都抽象到了顶层的 socket 这一层级。比如说要设置 socket 底层 API 层级的某些选项,抑或是设置 TCP 协议中的某些选项,都通过该函数来执行。
FD_CLOEXEC 默认情况下,使用fork() + execve()开启新的子进程时,父进程打开的文件描述符fd不会被关闭。为了让用户能够自己控制是否在execve()后关闭fd,系统设定了一个全局的变量,变量中的各个标志位代表不同的fd,如果设定了FD_CLOEXEC,则会在启动子进程时关闭对应的fd。相应的SOCK_CLOEXEC和TFD_CLOEXEC可以理解是对FD_CLOEXEC的继承,它们分别为timer fd和socket fd服务
TcpServer TcpServer 是 Acceptor 的直接使用者,其实只需要将 TcpServer 的 Callback 注册到 Acceptor 里面即可