一、WSAEventSelect模型介绍

WSAEventSelect模型和WSAAsyncSelect模型类似,它也允许应用程序在一个或多个套接字上面,接收以事件为基础的网络事件通知。该模型和WSAAsyncSelect模型的最主要的区别在于,网络事件是由事件对象句柄完成的,而不是通过窗口消息完成的。

该模型要求应用程序针对打算使用的每一个套接字都创建一个事件对象。创建方法是就是调用WSACreateEvent函数。

WSAEVENT WSACreateEvent(void);

WSACreateEvent函数返回一个人工控制(manual)、无信号的事件对象句柄。得到事件句柄之后,可以通过调用WSAEventSelect函数,将它和套接字相关联,同时注册感兴趣的网络事件。

int WSAEventSelect(
  _In_ SOCKET   s,
  _In_ WSAEVENT hEventObject,
  _In_ long     lNetworkEvents
);

具体的参数解释可以参考MSDN:https://msdn.microsoft.com/en-us/library/windows/desktop/ms741576(v=vs.85).aspx

CreateEvent创建的事件类型,windows提供了类似的API来重置事件信号、关闭事件等,如WSAResetEvent, WSACloseEvent等。
同样提供给了和WaitForMultipleObjects类似的函数WSAWaitForMultipleEvents来等待多个事件信号,参数也和WaitForMultipleObjects类似。

需要注意的是,WSAWaitForMultipleEvents最多支持同时等待64个事件,如果需要等待超过64个事件,需要使用多个线程调用多次WSAWaitForMultipleEvents函数来实现,这也是WSAEventSelect模型的一大弊端。

int WSAEnumNetworkEvents(
  _In_  SOCKET             s,
  _In_  WSAEVENT           hEventObject,
  _Out_ LPWSANETWORKEVENTS lpNetworkEvents
);

WSAEnumNetworkEvents函数用于获取指定SOCKET上发送了哪些事件(是FD_READ,还是FD_WRITE,或者说还是FD_ACCEPT等),hEventObject参数传入事件句柄,函数会自动将事件置为无信号状态,不需要再次调用WSAResetEvent来重置信号。

具体的用法可以参考下面“示例程序”。

二、示例

服务端和客户端着重在列出WSAEventSelect模型的基本用法,并未实现复杂的逻辑。
服务端在有新的客户端连接上时,给客户端发送“hello, I’m server.”消息,并且在服务端退出时会关闭所有客户端连接。
客户端连接上服务端之后,不间断接收服务端的消息。

示例程序只使用了一个线程(主线程)来等待最多64个事件(也就是最多只支持64-1=63个客户端连接),如果需要支持更多的客户端连接,可以开启多个线程来等待。

2.1 服务端

#include <winsock2.h>
#include <iostream>
#include <assert.h>
#include <vector>

using namespace std;

#pragma comment(lib, "Ws2_32.lib")

const u_short kPort = 10001;
const std::string kHelloServer = "hello, I'm server.";

HANDLE g_event_array[WSA_MAXIMUM_WAIT_EVENTS];
SOCKET g_socket_array[WSA_MAXIMUM_WAIT_EVENTS];
int g_index = 0;

template<typename T>
void RemoveArrayIndex(T arr[], int total, int index) {
    if (index > total)
        return;

    for (int i = index; i < (total - 1); i++) {
        arr[i] = arr[i + 1];
    }
}

int main()
{
    WSADATA wsaData;
    WORD wVersionRequested = MAKEWORD(2, 2);
    WSAStartup(wVersionRequested, &wsaData);

    do
    {
        // (1)
        g_socket_array[g_index] = ::socket(AF_INET, SOCK_STREAM, 0);
        if (g_socket_array[0] == INVALID_SOCKET) {
            std::cout << "create socket failed, GLE: " << WSAGetLastError() << std::endl;
            break;
        }

        // (2)
        struct sockaddr_in addr = { 0 };
        addr.sin_family = AF_INET;
        addr.sin_addr.s_addr = htonl(INADDR_ANY);
        addr.sin_port = htons(kPort);
        if (bind(g_socket_array[g_index], reinterpret_cast<const sockaddr*>(&addr), sizeof(addr)) == SOCKET_ERROR) {
            std::cout << "bind failed, GLE: " << WSAGetLastError() << std::endl;
            break;
        }

        // (3)
        g_event_array[g_index] = WSACreateEvent();

        WSAEventSelect(g_socket_array[g_index], g_event_array[g_index], FD_ACCEPT | FD_READ | FD_WRITE | FD_CLOSE);

        // (4)
        if (listen(g_socket_array[g_index], 5) == SOCKET_ERROR) {
            std::cout << "listen failed, GLE: " << WSAGetLastError() << std::endl;
            break;
        }
        std::cout << "listen on port: " << kPort << std::endl;

        // (5)
        // 该循环可以放到子线程中
        while (true) {
            DWORD ret = WSAWaitForMultipleEvents(g_index + 1, g_event_array, FALSE, WSA_INFINITE, FALSE);
            if(ret == WSA_WAIT_FAILED || ret == WSA_WAIT_TIMEOUT)
                continue;

            // 忽略上面WSAWaitForMultipleEvents返回值所表示的index。遍历每个socket,查看是否有信号。
            for (int i = 0; i <= g_index; i++) {
                ret = WSAWaitForMultipleEvents(1, &g_event_array[i], FALSE, 0, FALSE);
                if (ret == WSA_WAIT_FAILED || ret == WSA_WAIT_TIMEOUT) {
                    continue;
                }

                WSANETWORKEVENTS network_events;
                // WSAEnumNetworkEvent函数会将事件重置为无信号状态
                WSAEnumNetworkEvents(g_socket_array[i], g_event_array[i], &network_events);

                if (network_events.lNetworkEvents & FD_ACCEPT) {
                    if (network_events.iErrorCode[FD_ACCEPT_BIT] != 0) {
                        std::cout << "FD_ACCEPT failed with code: " << network_events.iErrorCode[FD_ACCEPT_BIT] << "\n";
                        break;
                    }

                    SOCKET s = accept(g_socket_array[i], NULL, NULL);
                    if (s == SOCKET_ERROR) {
                        std::cout << "accept failed, GLE: " << WSAGetLastError() << std::endl;
                        break;
                    }

                    if (g_index >= WSA_MAXIMUM_WAIT_EVENTS - 1) {
                        std::cout << "too many connection\n";
                        closesocket(s);
                        break;
                    }

                    g_index++;

                    g_event_array[g_index] = WSACreateEvent();
                    g_socket_array[g_index] = s;

                    std::cout << "new connection\n";

                    int err = send(s, (const char*)kHelloServer.c_str(), kHelloServer.length(), 0);
                    if (err == SOCKET_ERROR) {
                        std::cout << "send failed, GLE: " << WSAGetLastError() << std::endl;
                        break;
                    }
                }

                if (network_events.lNetworkEvents & FD_READ) {
                    if (network_events.iErrorCode[FD_READ_BIT] != 0) {
                        std::cout << "FD_READ failed with code: " << network_events.iErrorCode[FD_READ_BIT] << "\n";
                        break;
                    }

                    char buf[100] = { 0 };
                    int err = recv(g_socket_array[i], buf, 100, 0);
                    if (err > 0) {
                        std::cout << "recv: " << buf << std::endl;
                    }
                    else if (err == 0) {
                        std::cout << "connection closed." << std::endl;

                        closesocket(g_socket_array[i]);
                        WSACloseEvent(g_event_array[i]);

                        RemoveArrayIndex(g_socket_array, g_index, i);
                        RemoveArrayIndex(g_event_array, g_index, i);

                        g_index--;
                    }
                    else {
                        std::cout << "recv failed, GLE: " << WSAGetLastError() << std::endl;

                        closesocket(g_socket_array[i]);
                        WSACloseEvent(g_event_array[i]);

                        RemoveArrayIndex(g_socket_array, g_index, i);
                        RemoveArrayIndex(g_event_array, g_index, i);

                        g_index--;
                    }
                }

                if (network_events.lNetworkEvents & FD_WRITE) {
                    if (network_events.iErrorCode[FD_WRITE_BIT] != 0) {
                        std::cout << "FD_WRITE failed with code: " << network_events.iErrorCode[FD_WRITE_BIT] << "\n";
                        break;
                    }


                }

                if (network_events.lNetworkEvents & FD_CLOSE) {
                    if (network_events.iErrorCode[FD_CLOSE_BIT] != 0) {
                        std::cout << "FD_CLOSE failed with code: " << network_events.iErrorCode[FD_CLOSE_BIT] << "\n";
                        break;
                    }

                    closesocket(g_socket_array[i]);
                    WSACloseEvent(g_event_array[i]);

                    RemoveArrayIndex(g_socket_array, g_index, i);
                    RemoveArrayIndex(g_event_array, g_index, i);

                    g_index--;
                }
            }
        } // while
    } while (false);

    // (6)
    for (int i = 0; i <= g_index; i++) {
        closesocket(g_socket_array[i]);
        WSACloseEvent(g_event_array[i]);
    }

    WSACleanup();
    return 0;
}

2.2 客户端

#include <winsock2.h>
#include <iostream>
#include <assert.h>
#include <vector>

using namespace std;

#pragma comment(lib, "Ws2_32.lib")

const std::string kIP = "127.0.0.1";
const u_short kPort = 10001;

HANDLE g_event;
SOCKET g_socket;


int main()
{
    WSADATA wsaData;
    WORD wVersionRequested = MAKEWORD(2, 2);
    WSAStartup(wVersionRequested, &wsaData);

    do
    {
        // (1)
        g_socket = ::socket(AF_INET, SOCK_STREAM, 0);
        if (g_socket == INVALID_SOCKET) {
            std::cout << "create socket failed, GLE: " << WSAGetLastError() << std::endl;
            break;
        }

        // (2)
        g_event = WSACreateEvent();

        WSAEventSelect(g_socket, g_event, FD_CONNECT | FD_READ | FD_WRITE | FD_CLOSE);

        // (3)
        struct sockaddr_in addr = { 0 };
        addr.sin_family = AF_INET;
        addr.sin_addr.s_addr = inet_addr(kIP.c_str());
        addr.sin_port = htons(kPort);
        if (connect(g_socket, reinterpret_cast<const sockaddr*>(&addr), sizeof(addr)) == SOCKET_ERROR) {
            int gle_err = WSAGetLastError();
            if (gle_err != WSAEWOULDBLOCK) {
                std::cout << "connect failed, GLE: " << WSAGetLastError() << "\n";
                break;
            }
        }

        // (4)
        while (true) {
            DWORD ret = WSAWaitForMultipleEvents(1, &g_event, FALSE, WSA_INFINITE, FALSE);
            if (ret == WSA_WAIT_FAILED || ret == WSA_WAIT_TIMEOUT) {
                continue;
            }

            WSANETWORKEVENTS network_events;
            // WSAEnumNetworkEvent函数会将事件重置为无信号状态
            WSAEnumNetworkEvents(g_socket, g_event, &network_events);

            if (network_events.lNetworkEvents & FD_CONNECT) {
                if (network_events.iErrorCode[FD_CONNECT_BIT] != 0) {
                    std::cout << "FD_CONNECT failed with code: " << network_events.iErrorCode[FD_CONNECT_BIT] << "\n";
                    break;
                }

                std::cout << "connect to server\n";
            }

            if (network_events.lNetworkEvents & FD_READ) {
                if (network_events.iErrorCode[FD_READ_BIT] != 0) {
                    std::cout << "FD_READ failed with code: " << network_events.iErrorCode[FD_READ_BIT] << "\n";
                    break;
                }

                char buf[100] = { 0 };
                int err = recv(g_socket, buf, 100, 0);
                if (err > 0) {
                    std::cout << "recv: " << buf << std::endl;
                }
                else if (err == 0) {
                    std::cout << "connection closed." << std::endl;

                    closesocket(g_socket);
                    WSACloseEvent(g_event);
                }
                else {
                    std::cout << "recv failed, GLE: " << WSAGetLastError() << std::endl;

                    closesocket(g_socket);
                    WSACloseEvent(g_event);
                }
            }

            if (network_events.lNetworkEvents & FD_WRITE) {
                if (network_events.iErrorCode[FD_WRITE_BIT] != 0) {
                    std::cout << "FD_WRITE failed with code: " << network_events.iErrorCode[FD_WRITE_BIT] << "\n";
                    break;
                }
            }

            if (network_events.lNetworkEvents & FD_CLOSE) {
                if (network_events.iErrorCode[FD_CLOSE_BIT] != 0) {
                    std::cout << "FD_CLOSE failed with code: " << network_events.iErrorCode[FD_CLOSE_BIT] << "\n";
                    break;
                }

                closesocket(g_socket);
                WSACloseEvent(g_event);
            }
        } // while
    } while (false);

    // (5)
    closesocket(g_socket);
    WSACloseEvent(g_event);

    WSACleanup();
    return 0;
}
Logo

欢迎加入 MCP 技术社区!与志同道合者携手前行,一同解锁 MCP 技术的无限可能!

更多推荐