网络编程实践-异步通信服务器中踩的一些坑

Socket服务器

因为目前对网络通信知识了解不深,不敢乱讲话,还是多看博客,如协议森林
Scoket服务器通过TCP/IP协议,与客户端连接,进行网络通信。连接方式又存在长连接和短连接两种。短连接即每次客户端请求数据通信完成后断开与服务器的连接,但是TCP/IP建立握手连接很消耗计算机资源;长连接即一旦客户端与服务器建立连接后,通过保活,来建立长期稳定连接。

方案选择

物联网系统,类似摩拜单车的后台服务器系统,我个人猜测:按照摩拜单车的实时性高的特点,应该是采用的长连接,每个地区上万台的子设备与服务器通信,通过心跳和数据帧与服务器保活,定期给服务器发送数据(包括GPS信息)。
针对上万台子设备的连接,对每个连接单独开一个线程进行处理显然不现实,所以该采取异步通信方式,只有子设备有数据活动时才会调用线程来接收。
针对物联网系统,主要有收发、处理和存储三块功能。
收发:与远程客户端进行数据通信。
处理:对数据进行解码、计算等操作。
存储:将处理后的数据存储。
这三块功能可以分别使用三个线程进行单独操作,不会相互堵塞。

异步通信遇到的坑

异步通信参考了MSDN官方示例
这里踩过了一个坑,值得拿出来说说。
关键点在与ReadCallback(IAsyncResult ar)中,当远程客户端主动端开连接时,并不会引发SocketException,而是会不停触发ReadCallback,每次读取到是字节为0,这样就会进入while(true)循环,长期占用CPU资源。如果是双核系统,一旦有两个远程客户端意外端开连接,CPU占用会达到100%。
具体原因在知乎上得到了解答

作者:晨随
来源:知乎
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。
关于I/O多路复用(又被称为“事件驱动”),首先要理解的是,操作系统为你提供了一个功能,当你的某个socket可读或者可写的时候,它可以给你一个通知。这样当配合非阻塞的socket使用时,只有当系统通知我哪个描述符可读了,我才去执行read操作,可以保证每次read都能读到有效数据而不做纯返回-1和EAGAIN的无用功。写操作类似。操作系统的这个功能通过select/poll/epoll/kqueue之类的系统调用函数来使用,这些函数都可以同时监视多个描述符的读写就绪状况,这样,多个描述符的I/O操作都能在一个线程内并发交替地顺序完成,这就叫I/O多路复用,这里的“复用”指的是复用同一个线程。
以select和tcp socket为例,所谓可读事件,具体的说是指以下事件:
1 socket内核接收缓冲区中的可用字节数大于或等于其低水位SO_RCVLOWAT;
2 socket通信的对方关闭了连接,这个时候在缓冲区里有个文件结束符EOF,此时读操作将返回0;
3 监听socket的backlog队列有已经完成三次握手的连接请求,可以调用accept;
4 socket上有未处理的错误,此时可以用getsockopt来读取和清除该错误。
所谓可写事件,则是指:
1 socket的内核发送缓冲区的可用字节数大于或等于其低水位SO_SNDLOWAIT;
2 socket的写端被关闭,继续写会收到SIGPIPE信号;
3 非阻塞模式下,connect返回之后,发起连接成功或失败;
4 socket上有未处理的错误,此时可以用getsockopt来读取和清除该错误。

因此在客户端主动断开连接时,读缓冲区中有文件结束符EOF(End Of File),触发可读事件,一直返回0。

 private void ReadCallback(IAsyncResult ar)
        {
            SocketObject state = (SocketObject)ar.AsyncState;
            Socket clientSocket = state.workSocket;
            
            try
            {
               

                int bytesRead = clientSocket.EndReceive(ar);
                byte[] buffer = new byte[bytesRead];
                if (bytesRead == 0)
                {
                	 // when a client disconnect the socket initiatively and properly, it will trigger ReadCallback and bytesRead always be 0, it will be while(true) to occupy cpu
                if (clientSocket.Poll(1000, SelectMode.SelectRead)) {
                    throw new SocketException();
                }
                }
                else
                {
                    Array.Copy(state.buffer, 0, buffer, 0, bytesRead); // avoid reference value
                    Tuple<EndPoint, byte[]> rcvBytesTuple = new Tuple<EndPoint, byte[]>(clientSocket.RemoteEndPoint,
                        buffer);

                    RcvBytesQueue.Enqueue(rcvBytesTuple); // ConcurrentQueue<Tuple<EndPoint, byte[]>> RcvBytesQueue

                    if (!Dispatcher.RcvResetEvent.WaitOne(0)) // if WaitOne(0) return false, the ManualResetEvent has not set
                    {
                        Dispatcher.RcvResetEvent.Set(); // when data received, inform other thread to process  
                    }
                }

                // ready for receive next bytes
                clientSocket.BeginReceive(state.buffer, 0, SocketObject.BufferSize, 0, new AsyncCallback(ReadCallback),state);
            }
            catch (Exception ex)
            {
                // do something 
            }
        }

因此,要对这种情况主动抛出异常,通过Socket.Poll来监测连接是否正常,来避免进入死循环无效占用CPU资源。