d语言中的iocp,走过路过千万不要错过,另外诅咒那些看帖不回帖的人!
ww21xx
2012-04-18
标题有点恶搞 好好看完后,回不回帖也无所谓啦
以下是以ideage大师的代码稍微改写了下,核心部分都在这里了,但是有两个大问题,一直也没找出原因,各位高手有时间就看下吧! 1、当我连接上1000个连接后,然后再全部断开,这个时候GetQueuedCompletionStatus 会返回错误 64, 单一断开 没有这个提示。 2、在关闭队列中,我新开一个线程专门来关闭连接,但当我断开第一个连接的时候线程停止了。。。,找不出原因。。。 class Iocp { private: ushort m_listenPort = 9001; // 服务器端口 ushort m_listenBacklog = 200; // 同时监听数 系统本身默认就是200 ushort m_bufferSize = 4096; // 缓冲大小 uint m_threadCount = 6; // 最大线程数 IService m_service = null; // 监听服务 SYSTEM_INFO m_systemInfo; // 系统信息 SOCKET m_listenSocket = INVALID_SOCKET;; // 服务器监听Socket HANDLE m_iocpHandle = INVALID_HANDLE_VALUE; // 完成端口句柄 //POVERLAPPED[] m_sendQueue; // 发送缓冲区 //BOOL m_bSending; // 当前有数据正在发送中! //BOOL m_bCloseConnection; // 是否关闭连接 //DWORD m_dwCurBufferSize; // 缓冲区大小 Thread m_acceptThread = null; // 应答线程 bool m_acceptThreadQuit = false; // 应答线程是否退出 Thread m_workerThread = null; // 处理线程 bool m_workerThreadQuit = false; // 处理线程是否退出 Thread m_sendThread = null; // 发送线程 bool m_sendThreadQuit = false; // 发送线程是否退出 Thread m_closeThread = null; // 关闭线程 bool m_closeThreadQuit = false; // 关闭线程是否退出 sockaddr_in m_serverAddr; // 服务器地址 sockaddr_in m_clientAddr; // 客户端地址 MemoryStream m_Stream = null; // 内存流 ubyte[] m_recvBuffer; ubyte[] m_sendBuffer; //LpNetState m_recvOperation; // 接收重叠 //LpNetState m_sendOperation; // 发送重叠 public PNetState[ulong] m_clientList; // 客户端连接列表 private PNetState[ulong] m_shutdownList;// 要关闭的链接列表 // 监听完成端口 public this( ushort listenPort, ushort listenBacklog, ushort bufferSize, IService iService ) { // 首先获取系统信息 GetSystemInfo(&m_systemInfo); m_listenPort = listenPort; m_listenBacklog = listenBacklog; m_bufferSize = bufferSize; m_service = iService; serverListen(); //应答线程 m_acceptThread = new Thread( &acceptThread, 3 ); // 工作线程 m_workerThread = new Thread( &workerThread, 3 ); // 发送线程 m_sendThread = new Thread( &sendThread, 3 ); // 关闭线程 m_closeThread = new Thread( &closeThread ); //流数据处理 m_Stream = new MemoryStream(); m_recvBuffer = [0]; m_sendBuffer = [0]; } /** * 开始 * */ public void start() { if( m_acceptThread !is null ) { m_acceptThread.start(); m_acceptThreadQuit = false; } if( m_workerThread !is null ) { m_workerThread.start(); m_workerThreadQuit = false; } if( m_sendThread !is null ) { m_sendThread.start(); m_sendThreadQuit = false; } if( m_closeThread !is null ) { m_closeThread.start(); m_closeThreadQuit = false; } } /** * 停止 * */ public void stop() { if( m_acceptThread !is null ) m_acceptThreadQuit = true; if( m_workerThread !is null) m_workerThreadQuit = true; if( m_sendThread !is null ) m_sendThreadQuit = true; if( m_closeThread !is null ) m_closeThreadQuit = true; } /** * 释放iocp * */ public void dispose() { stop(); if( m_acceptThread !is null ) m_acceptThread = null; if( m_workerThread !is null) m_workerThread = null; if( m_sendThread !is null) m_sendThread = null; if( m_closeThread !is null) m_closeThread = null; } // 监听 void serverListen() { // Init winsock2.2 WSADATA wsaData; if( WSAStartup(0x2020, &wsaData) != 0 ) { m_service.OnException( "WSAStartup Failed, Using:" ~ to!(string)(wsaData.szDescription) ); return; } // Create socket if( (m_listenSocket = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, null, 0, WSA_FLAG_OVERLAPPED)) == INVALID_SOCKET ) { m_service.OnException( "Server Socket Creation Failed, Ret:" ~ to!(string)(WSAGetLastError()) ); return; } // bind m_serverAddr.sin_family = AF_INET; m_serverAddr.sin_addr.s_addr = htonl( INADDR_ANY ); m_serverAddr.sin_port = htons( m_listenPort ); if( bind( m_listenSocket, cast(sockaddr*)&m_serverAddr, m_serverAddr.sizeof ) == SOCKET_ERROR ) { m_service.OnException( "Server Soket Bind Failed, Ret:" ~ to!(string)(WSAGetLastError()) ); return; } // listen if( listen( m_listenSocket, m_listenBacklog ) == SOCKET_ERROR ) { m_service.OnException( "Server Socket Listen Failed, Ret:" ~ to!(string)(WSAGetLastError()) ); return; } // create iocp & binding serverSocket to iocp m_threadCount = m_systemInfo.dwNumberOfProcessors * 2 + 2; if( (m_iocpHandle = CreateIoCompletionPort(INVALID_HANDLE_VALUE, null, 0, m_threadCount) ) == null) { m_service.OnException( "CreateIoCompletionPort() Failed, Ret:" ~ to!(string)(GetLastError()) ); return; } } // 应答 void acceptThread() { SOCKET acceptSocket; // acceptSocket uint m_clientAddrLen = 0; // 客户端地址长度 while(true) { m_clientAddrLen = m_clientAddr.sizeof; if( (acceptSocket = accept( m_listenSocket, cast(sockaddr*)&m_clientAddr, &m_clientAddrLen )) == SOCKET_ERROR ) { m_service.OnException( "client accept error=>" ~ to!(string)(SOCKET_ERROR) ); return; } { //diable buffer to improve performance int nZero = 0; setsockopt(acceptSocket, SOL_SOCKET, SO_SNDBUF, cast(char *)&nZero, nZero.sizeof); } // 创建一个套接字信息结构体去联系起来socket /*uint GPTR = 0x0040; if((lpHandleData = cast(LpHandleData)GlobalAlloc(GPTR, LpHandleData.sizeof)) == null) { m_service.OnException( "HandleData GlobalAlloc() error=>" ~ to!(string)(GetLastError()) ); closesocket( acceptSocket ); GC.free(lpHandleData); return; }*/ //lpHandleData.Socket = acceptSocket; //if(CreateIoCompletionPort(cast(HANDLE)acceptSocket, m_iocpHandle, cast(uint)lpHandleData, 0) == null) if(CreateIoCompletionPort(cast(HANDLE)acceptSocket, m_iocpHandle, 0, 0) == null) { m_service.OnException( "CreateIoCompletionPort 发生了如下错误:" ~ to!(string)(GetLastError()) ); closesocket( acceptSocket ); //GC.free(lpHandleData); return; } // 创建新的NetState RecvNetState data = new NetState; data.Socket = acceptSocket; data.SocketId = generateDnid(); data.SocketIdToString = to!string(data.SocketId); data.localAddresIp = to!string(inet_ntoa(m_serverAddr.sin_addr)); data.localAddresPort = m_serverAddr.sin_port; data.localAddresToString = data.localAddresIp ~ ":" ~ to!string(ntohs(data.localAddresPort)); data.remoteAddresIp = to!string(inet_ntoa(m_clientAddr.sin_addr)); data.remoteAddresPort = m_clientAddr.sin_port; data.remoteAddresToString = data.remoteAddresIp ~ ":" ~ to!string(ntohs(data.remoteAddresPort)); data.OpCode = IO_OPERATION.RECV; data.TotalBytes = 0; data.RecvBytes = 0; data.SentBytes = 0; data.Wsabuf.buf = data.buffer.ptr; data.Wsabuf.len = data.buffer.sizeof; data.iocp = this; uint recvNumBytes = 0, flags = 0; // 连接成功后 立即投递 接收 意思说马上监听接收 if( WSARecv( acceptSocket, &data.Wsabuf, 1, &recvNumBytes, &flags, &data.Overlapped, null ) == SOCKET_ERROR ) { if(WSAGetLastError() != ERROR_IO_PENDING) { m_service.OnException( "Accept WASRecv Failed, Return:" ~ to!(string)(WSAGetLastError()) ); closesocket( acceptSocket ); GC.free(data); return; } } // 添加正确的应答客户端 m_clientList[data.SocketId] = data; m_service.OnConnected(data); } //close & Cleanup closesocket(m_listenSocket); WSACleanup(); } //工作线程 void workerThread() { WSABUF buffSend; uint recvNumBytes = 0; // 接受到的字节 uint sendNumBytes = 0; // 发送出的字节 uint flags = 0; uint bytesTransferred = 0; // 当前数 PNetState lpIoData; // 单IO操作数据 uint completionKey = 0; // 检测 while( true ) { if( m_workerThreadQuit ) // 工作线程退出 { writefln( "退出工作线程" ); break; } // 1、判断完成端口状态 if(GetQueuedCompletionStatus(m_iocpHandle, &bytesTransferred, &completionKey, cast(LPWSAOVERLAPPED*)&lpIoData, INFINITE) == 0) { m_service.OnException( "获取查询完成状态失败(GetQueuedCompletionStatus):" ~ to!(string)(GetLastError()) ); //return; //break; } //首先检查一下去套接字看是否在上发生了错误并且如果发生了错误就关闭套接 //字并且清除与套接字连接的 SOCKET_INFORMATION结构信息体 if(bytesTransferred == 0) //socket closed? { // 关闭连接 closeConnection( lpIoData.SocketId ); continue; } //检查如果 BytesRECV字段等于0,这就意味着一个 WSARecv调用刚刚完成了所以从完成的WSARecv()调用中 //用BytesTransferred值更新 BytesRECV字段 //lpIoData.TransferredBytes = bytesTransferred; /*if(lpIoData.RecvBytes <= 0) { lpIoData.RecvBytes = lpIoData.TransferredBytes; lpIoData.SentBytes = 0; lpIoData.TotalBytes = 0; } else { lpIoData.RecvBytes += bytesTransferred; lpIoData.SentBytes += bytesTransferred; lpIoData.TotalBytes = lpIoData.RecvBytes + lpIoData.SentBytes; }*/ /*if( lpIoData.RecvBytes == 0 ) { lpIoData.RecvBytes = bytesTransferred; lpIoData.SentBytes = 0; } else { lpIoData.SentBytes += bytesTransferred; } * */ /* switch(lpIoData.OpCode) { case IO_OPERATION.ACCEPT: writefln( "clientId=>%s 操作类型=>应答", lpIoData.SocketId ); break; case IO_OPERATION.RECV: writefln( "clientId=>%s 操作类型=>接收", lpIoData.SocketId ); break; case IO_OPERATION.SEND: writefln( "clientId=>%s 操作类型=>发送", lpIoData.SocketId ); break; default: writefln( "clientId=>%s 未知操作=>未知", lpIoData.SocketId ); break; }*/ writefln( "clientCount=>%d ", m_clientList.length ); // 2、判断IO操作 if( lpIoData.OpCode == IO_OPERATION.RECV ) // 判断IO操作是否是读取类型 { // 接收数据 m_Stream.len = 0; m_Stream.seek(0, SeekPos.Set); m_Stream.writeBlock(lpIoData.Wsabuf.buf, bytesTransferred); m_Stream.flush(); // 执行接收回调 lpIoData.data = m_Stream.data.dup; m_service.OnReceived(lpIoData); } else if( lpIoData.OpCode == IO_OPERATION.SEND ) { lpIoData.data = m_Stream.data.dup; m_service.OnSent(lpIoData); lpIoData.SentBytes += bytesTransferred; flags = 0; // 这里的意思 是 如果发送的字节大小 <= 缓冲区大小 就发送数据 否则不发送 // 这里还不好改 还不知道意思 /*if( lpIoData.TransferredBytes <= 4096 ) { writefln( "TransferredBytes < BufferSize------------------------" ); lpIoData.OpCode = IO_OPERATION.SEND; buffSend.buf = lpIoData.buffer.ptr + lpIoData.TransferredBytes; //offset. buffSend.len = lpIoData.TotalBytes - lpIoData.SentBytes; nRet = WSASend (lpIoData.Socket, &buffSend, 1, &dwSendNumBytes, dwFlags, &(lpIoData.Overlapped), null); if( nRet == SOCKET_ERROR && (ERROR_IO_PENDING != WSAGetLastError()) ) { m_service.OnException( "WASSend Failed, Ret:" ~ to!(string)(WSAGetLastError()) ); closesocket( lpIoData.Socket ); continue; } }*/ if( lpIoData.SentBytes < lpIoData.TotalBytes ) { writefln( "SentBytes < TotalBytes----投递 发送 ip:%s", lpIoData.remoteAddresToString ); lpIoData.OpCode = IO_OPERATION.SEND; buffSend.buf = lpIoData.buffer.ptr + lpIoData.SentBytes; //offset. buffSend.len = lpIoData.TotalBytes - lpIoData.SentBytes; //lpIoData.Wsabuf.buf += bytesTransferred; //offset. //lpIoData.Wsabuf.len -= bytesTransferred; if( WSASend (lpIoData.Socket, &buffSend, 1, &sendNumBytes, flags, &(lpIoData.Overlapped), null) == SOCKET_ERROR && (ERROR_IO_PENDING != WSAGetLastError()) ) { m_service.OnException( "WASSend Failed, Ret:" ~ to!(string)(WSAGetLastError()) ); //closesocket( lpIoData.Socket ); closeConnection( lpIoData.SocketId ); continue; } } else { writefln( "SentBytes >= TotalBytes----投递 接收 ip:%s", lpIoData.remoteAddresToString ); lpIoData.OpCode = IO_OPERATION.RECV; recvNumBytes = 0; flags = 0; lpIoData.Wsabuf.buf = lpIoData.buffer.ptr, lpIoData.Wsabuf.len = m_bufferSize; lpIoData.Overlapped.Internal = 0; lpIoData.Overlapped.InternalHigh = 0; lpIoData.Overlapped.Offset = 0; lpIoData.Overlapped.OffsetHigh = 0; lpIoData.Overlapped.hEvent = null; if( WSARecv(lpIoData.Socket, &lpIoData.Wsabuf, 1, &recvNumBytes, &flags, &lpIoData.Overlapped, null) == SOCKET_ERROR && (ERROR_IO_PENDING != WSAGetLastError()) ) { m_service.OnException( "WASRecv Failed, Ret:" ~ to!(string)(WSAGetLastError()) ); //closesocket( lpIoData.Socket ); closeConnection( lpIoData.SocketId ); continue; } } } } } // 发送部分====================== void sendThread() { while (!m_sendThreadQuit) { //if (m_bUseAutoPackage) //{ // 这个地方存在一个非常大的隐患,很可能造成非分页内存的大量分配!!! // 因为这个调用会直接触发Send(),而Send会直接调用WSASend发送缓存的数据! // 只要客户端不接收,WSASend发送的数据就会全部累积到非分页内存中! // 所以,我们需要修改Send()函数 //m_LinkManager.TraversalObjects(0, 0); //} // 毫秒 Thread.sleep( dur!("msecs")( 50 ) ); // sleep for 50 milliseconds // 秒 //Thread.sleep( dur!("seconds")( 5 ) ); // sleep for 5 seconds uint sendLen = m_sendBuffer.length; if( sendLen > 0 ) { //writefln("" } } } /*public void send( ulong socketId, ubyte[] data ) { send( socketId, data ); } public void send( ulong socketId, void[] data ) { send( socketId, data.ptr , data.length); } public void send( ulong socketId, void* data, uint count ) { send( m_clients[socketId], data, count ); }*/ private uint dwFlags = 0; private uint dwSendNumBytes = 0; public void send( NetState* ns, void* data, uint count ) { //void[] buffer = data[0..count]; //Message.WriteLine("结构 长度=>" ~ to!(string)(structPtr.sizeof) ~ " 数据 长度=>"~ to!(string)(buffer.length) ~" 内容=>" ~ to!(string)(count)); //client.send(buffer); m_Stream.len = 0; m_Stream.seek(0, SeekPos.Set); m_Stream.writeBlock(data, count);; m_Stream.flush(); WSABUF buffer; buffer.buf = cast(char*)m_Stream.buf; buffer.len = cast(uint)m_Stream.len; ns.SentBytes = 0; ns.OpCode = IO_OPERATION.SEND; dwFlags = 0; if( WSASend( ns.Socket, &buffer, 1, &dwSendNumBytes, dwFlags, &(ns.Overlapped), null ) == SOCKET_ERROR ) { if(WSAGetLastError() != ERROR_IO_PENDING) { m_service.OnException( "WASSend Failed, Ret:" ~ to!(string)(WSAGetLastError()) ); closesocket( ns.Socket ); return; } } } public void sendAll(ulong socketId, void* data, uint count, bool self = false ) { foreach( PNetState item; m_clientList ) { if( socketId <= 0 ) // 全部广播,包括自己 send(item, data, count); else // 不给自己广播 { if(item.SocketId != socketId ) send(item, data, count); } } } // 关闭部分====================== //0 SD_RECEIVE 关闭连接接收方 //1 SD_SEND 关闭连接的发送方 //2 SD_BOTH 关闭双方 // 关闭连接 void closeConnection(ulong socketId) { PNetState netState = m_clientList[socketId]; // 删除连接 m_clientList.remove(socketId); if( m_service is null ) m_service.OnException( "客户端连接已断开!" ); else m_service.OnDisconnected( netState ); // 停止该链接 发送 与 接收 操作 shutdown(netState.Socket, SD_BOTH); // 放入 连接关闭队列 m_shutdownList[netState.SocketId] = netState; writefln("socket %d 已被丢进关闭队列中 关闭队列个数%d\n", netState.Socket, m_shutdownList.length); } void closeThread() { while( true ) { Thread.sleep( 2000 ); // 等待5秒 //Thread.sleep( dur!("seconds")( 2 ) ); // 等待50毫秒 //Thread.sleep( dur!("msecs")( 50 ) ); uint socketLen = m_shutdownList.length; writefln("执行关闭线程 %d\n", socketLen); //auto lists = m_shutdownList; foreach( PNetState item; m_shutdownList ) { m_shutdownList.remove(item.SocketId); if( closesocket(item.Socket) == SOCKET_ERROR ) { writefln("closesocket() 发生了如下错误: %d\n", WSAGetLastError()); return; } else { writefln("关闭socket %d 成功!\n", item.Socket); } // 释放对象 GC.free(item); writefln("关闭后线程个数 %d\n", m_shutdownList.length); } /*PNetState netState; for(int i=0; i<socketLen; i++) { netState = m_shutdownList[i]; if( closesocket(netState.Socket) == SOCKET_ERROR ) { writefln("closesocket() 发生了如下错误: %d\n", WSAGetLastError()); return; } else writefln("关闭socket %d 成功!\n", netState.Socket); m_shutdownList.remove(netState.SocketId); // 释放对象 GC.free(netState); }*/ /* // 不知道该加在什么地方 //立即关闭(避免出现TIME_WAIT状态) LINGER linger = {1,0}; setsockopt(socket, SOL_SOCKET, SO_LINGER, (char *)&linger, sizeof(linger)); */ } } } |
|
hqs7636
2012-04-19
java 代码吧,看着都头晕
|
|
betty_betty2008
2012-04-19
ww21xx 写道 标题有点恶搞 好好看完后,回不回帖也无所谓啦
以下是以ideage大师的代码稍微改写了下,核心部分都在这里了,但是有两个大问题,一直也没找出原因,各位高手有时间就看下吧! 1、当我连接上1000个连接后,然后再全部断开,这个时候GetQueuedCompletionStatus 会返回错误 64, 单一断开 没有这个提示。 2、在关闭队列中,我新开一个线程专门来关闭连接,但当我断开第一个连接的时候线程停止了。。。,找不出原因。。。 class Iocp { private: ushort m_listenPort = 9001; // 服务器端口 ushort m_listenBacklog = 200; // 同时监听数 系统本身默认就是200 ushort m_bufferSize = 4096; // 缓冲大小 uint m_threadCount = 6; // 最大线程数 IService m_service = null; // 监听服务 SYSTEM_INFO m_systemInfo; // 系统信息 SOCKET m_listenSocket = INVALID_SOCKET;; // 服务器监听Socket HANDLE m_iocpHandle = INVALID_HANDLE_VALUE; // 完成端口句柄 //POVERLAPPED[] m_sendQueue; // 发送缓冲区 //BOOL m_bSending; // 当前有数据正在发送中! //BOOL m_bCloseConnection; // 是否关闭连接 //DWORD m_dwCurBufferSize; // 缓冲区大小 Thread m_acceptThread = null; // 应答线程 bool m_acceptThreadQuit = false; // 应答线程是否退出 Thread m_workerThread = null; // 处理线程 bool m_workerThreadQuit = false; // 处理线程是否退出 Thread m_sendThread = null; // 发送线程 bool m_sendThreadQuit = false; // 发送线程是否退出 Thread m_closeThread = null; // 关闭线程 bool m_closeThreadQuit = false; // 关闭线程是否退出 sockaddr_in m_serverAddr; // 服务器地址 sockaddr_in m_clientAddr; // 客户端地址 MemoryStream m_Stream = null; // 内存流 ubyte[] m_recvBuffer; ubyte[] m_sendBuffer; //LpNetState m_recvOperation; // 接收重叠 //LpNetState m_sendOperation; // 发送重叠 public PNetState[ulong] m_clientList; // 客户端连接列表 private PNetState[ulong] m_shutdownList;// 要关闭的链接列表 // 监听完成端口 public this( ushort listenPort, ushort listenBacklog, ushort bufferSize, IService iService ) { // 首先获取系统信息 GetSystemInfo(&m_systemInfo); m_listenPort = listenPort; m_listenBacklog = listenBacklog; m_bufferSize = bufferSize; m_service = iService; serverListen(); //应答线程 m_acceptThread = new Thread( &acceptThread, 3 ); // 工作线程 m_workerThread = new Thread( &workerThread, 3 ); // 发送线程 m_sendThread = new Thread( &sendThread, 3 ); // 关闭线程 m_closeThread = new Thread( &closeThread ); //流数据处理 m_Stream = new MemoryStream(); m_recvBuffer = [0]; m_sendBuffer = [0]; } /** * 开始 * */ public void start() { if( m_acceptThread !is null ) { m_acceptThread.start(); m_acceptThreadQuit = false; } if( m_workerThread !is null ) { m_workerThread.start(); m_workerThreadQuit = false; } if( m_sendThread !is null ) { m_sendThread.start(); m_sendThreadQuit = false; } if( m_closeThread !is null ) { m_closeThread.start(); m_closeThreadQuit = false; } } /** * 停止 * */ public void stop() { if( m_acceptThread !is null ) m_acceptThreadQuit = true; if( m_workerThread !is null) m_workerThreadQuit = true; if( m_sendThread !is null ) m_sendThreadQuit = true; if( m_closeThread !is null ) m_closeThreadQuit = true; } /** * 释放iocp * */ public void dispose() { stop(); if( m_acceptThread !is null ) m_acceptThread = null; if( m_workerThread !is null) m_workerThread = null; if( m_sendThread !is null) m_sendThread = null; if( m_closeThread !is null) m_closeThread = null; } // 监听 void serverListen() { // Init winsock2.2 WSADATA wsaData; if( WSAStartup(0x2020, &wsaData) != 0 ) { m_service.OnException( "WSAStartup Failed, Using:" ~ to!(string)(wsaData.szDescription) ); return; } // Create socket if( (m_listenSocket = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, null, 0, WSA_FLAG_OVERLAPPED)) == INVALID_SOCKET ) { m_service.OnException( "Server Socket Creation Failed, Ret:" ~ to!(string)(WSAGetLastError()) ); return; } // bind m_serverAddr.sin_family = AF_INET; m_serverAddr.sin_addr.s_addr = htonl( INADDR_ANY ); m_serverAddr.sin_port = htons( m_listenPort ); if( bind( m_listenSocket, cast(sockaddr*)&m_serverAddr, m_serverAddr.sizeof ) == SOCKET_ERROR ) { m_service.OnException( "Server Soket Bind Failed, Ret:" ~ to!(string)(WSAGetLastError()) ); return; } // listen if( listen( m_listenSocket, m_listenBacklog ) == SOCKET_ERROR ) { m_service.OnException( "Server Socket Listen Failed, Ret:" ~ to!(string)(WSAGetLastError()) ); return; } // create iocp & binding serverSocket to iocp m_threadCount = m_systemInfo.dwNumberOfProcessors * 2 + 2; if( (m_iocpHandle = CreateIoCompletionPort(INVALID_HANDLE_VALUE, null, 0, m_threadCount) ) == null) { m_service.OnException( "CreateIoCompletionPort() Failed, Ret:" ~ to!(string)(GetLastError()) ); return; } } // 应答 void acceptThread() { SOCKET acceptSocket; // acceptSocket uint m_clientAddrLen = 0; // 客户端地址长度 while(true) { m_clientAddrLen = m_clientAddr.sizeof; if( (acceptSocket = accept( m_listenSocket, cast(sockaddr*)&m_clientAddr, &m_clientAddrLen )) == SOCKET_ERROR ) { m_service.OnException( "client accept error=>" ~ to!(string)(SOCKET_ERROR) ); return; } { //diable buffer to improve performance int nZero = 0; setsockopt(acceptSocket, SOL_SOCKET, SO_SNDBUF, cast(char *)&nZero, nZero.sizeof); } // 创建一个套接字信息结构体去联系起来socket /*uint GPTR = 0x0040; if((lpHandleData = cast(LpHandleData)GlobalAlloc(GPTR, LpHandleData.sizeof)) == null) { m_service.OnException( "HandleData GlobalAlloc() error=>" ~ to!(string)(GetLastError()) ); closesocket( acceptSocket ); GC.free(lpHandleData); return; }*/ //lpHandleData.Socket = acceptSocket; //if(CreateIoCompletionPort(cast(HANDLE)acceptSocket, m_iocpHandle, cast(uint)lpHandleData, 0) == null) if(CreateIoCompletionPort(cast(HANDLE)acceptSocket, m_iocpHandle, 0, 0) == null) { m_service.OnException( "CreateIoCompletionPort 发生了如下错误:" ~ to!(string)(GetLastError()) ); closesocket( acceptSocket ); //GC.free(lpHandleData); return; } // 创建新的NetState RecvNetState data = new NetState; data.Socket = acceptSocket; data.SocketId = generateDnid(); data.SocketIdToString = to!string(data.SocketId); data.localAddresIp = to!string(inet_ntoa(m_serverAddr.sin_addr)); data.localAddresPort = m_serverAddr.sin_port; data.localAddresToString = data.localAddresIp ~ ":" ~ to!string(ntohs(data.localAddresPort)); data.remoteAddresIp = to!string(inet_ntoa(m_clientAddr.sin_addr)); data.remoteAddresPort = m_clientAddr.sin_port; data.remoteAddresToString = data.remoteAddresIp ~ ":" ~ to!string(ntohs(data.remoteAddresPort)); data.OpCode = IO_OPERATION.RECV; data.TotalBytes = 0; data.RecvBytes = 0; data.SentBytes = 0; data.Wsabuf.buf = data.buffer.ptr; data.Wsabuf.len = data.buffer.sizeof; data.iocp = this; uint recvNumBytes = 0, flags = 0; // 连接成功后 立即投递 接收 意思说马上监听接收 if( WSARecv( acceptSocket, &data.Wsabuf, 1, &recvNumBytes, &flags, &data.Overlapped, null ) == SOCKET_ERROR ) { if(WSAGetLastError() != ERROR_IO_PENDING) { m_service.OnException( "Accept WASRecv Failed, Return:" ~ to!(string)(WSAGetLastError()) ); closesocket( acceptSocket ); GC.free(data); return; } } // 添加正确的应答客户端 m_clientList[data.SocketId] = data; m_service.OnConnected(data); } //close & Cleanup closesocket(m_listenSocket); WSACleanup(); } //工作线程 void workerThread() { WSABUF buffSend; uint recvNumBytes = 0; // 接受到的字节 uint sendNumBytes = 0; // 发送出的字节 uint flags = 0; uint bytesTransferred = 0; // 当前数 PNetState lpIoData; // 单IO操作数据 uint completionKey = 0; // 检测 while( true ) { if( m_workerThreadQuit ) // 工作线程退出 { writefln( "退出工作线程" ); break; } // 1、判断完成端口状态 if(GetQueuedCompletionStatus(m_iocpHandle, &bytesTransferred, &completionKey, cast(LPWSAOVERLAPPED*)&lpIoData, INFINITE) == 0) { m_service.OnException( "获取查询完成状态失败(GetQueuedCompletionStatus):" ~ to!(string)(GetLastError()) ); //return; //break; } //首先检查一下去套接字看是否在上发生了错误并且如果发生了错误就关闭套接 //字并且清除与套接字连接的 SOCKET_INFORMATION结构信息体 if(bytesTransferred == 0) //socket closed? { // 关闭连接 closeConnection( lpIoData.SocketId ); continue; } //检查如果 BytesRECV字段等于0,这就意味着一个 WSARecv调用刚刚完成了所以从完成的WSARecv()调用中 //用BytesTransferred值更新 BytesRECV字段 //lpIoData.TransferredBytes = bytesTransferred; /*if(lpIoData.RecvBytes <= 0) { lpIoData.RecvBytes = lpIoData.TransferredBytes; lpIoData.SentBytes = 0; lpIoData.TotalBytes = 0; } else { lpIoData.RecvBytes += bytesTransferred; lpIoData.SentBytes += bytesTransferred; lpIoData.TotalBytes = lpIoData.RecvBytes + lpIoData.SentBytes; }*/ /*if( lpIoData.RecvBytes == 0 ) { lpIoData.RecvBytes = bytesTransferred; lpIoData.SentBytes = 0; } else { lpIoData.SentBytes += bytesTransferred; } * */ /* switch(lpIoData.OpCode) { case IO_OPERATION.ACCEPT: writefln( "clientId=>%s 操作类型=>应答", lpIoData.SocketId ); break; case IO_OPERATION.RECV: writefln( "clientId=>%s 操作类型=>接收", lpIoData.SocketId ); break; case IO_OPERATION.SEND: writefln( "clientId=>%s 操作类型=>发送", lpIoData.SocketId ); break; default: writefln( "clientId=>%s 未知操作=>未知", lpIoData.SocketId ); break; }*/ writefln( "clientCount=>%d ", m_clientList.length ); // 2、判断IO操作 if( lpIoData.OpCode == IO_OPERATION.RECV ) // 判断IO操作是否是读取类型 { // 接收数据 m_Stream.len = 0; m_Stream.seek(0, SeekPos.Set); m_Stream.writeBlock(lpIoData.Wsabuf.buf, bytesTransferred); m_Stream.flush(); // 执行接收回调 lpIoData.data = m_Stream.data.dup; m_service.OnReceived(lpIoData); } else if( lpIoData.OpCode == IO_OPERATION.SEND ) { lpIoData.data = m_Stream.data.dup; m_service.OnSent(lpIoData); lpIoData.SentBytes += bytesTransferred; flags = 0; // 这里的意思 是 如果发送的字节大小 <= 缓冲区大小 就发送数据 否则不发送 // 这里还不好改 还不知道意思 /*if( lpIoData.TransferredBytes <= 4096 ) { writefln( "TransferredBytes < BufferSize------------------------" ); lpIoData.OpCode = IO_OPERATION.SEND; buffSend.buf = lpIoData.buffer.ptr + lpIoData.TransferredBytes; //offset. buffSend.len = lpIoData.TotalBytes - lpIoData.SentBytes; nRet = WSASend (lpIoData.Socket, &buffSend, 1, &dwSendNumBytes, dwFlags, &(lpIoData.Overlapped), null); if( nRet == SOCKET_ERROR && (ERROR_IO_PENDING != WSAGetLastError()) ) { m_service.OnException( "WASSend Failed, Ret:" ~ to!(string)(WSAGetLastError()) ); closesocket( lpIoData.Socket ); continue; } }*/ if( lpIoData.SentBytes < lpIoData.TotalBytes ) { writefln( "SentBytes < TotalBytes----投递 发送 ip:%s", lpIoData.remoteAddresToString ); lpIoData.OpCode = IO_OPERATION.SEND; buffSend.buf = lpIoData.buffer.ptr + lpIoData.SentBytes; //offset. buffSend.len = lpIoData.TotalBytes - lpIoData.SentBytes; //lpIoData.Wsabuf.buf += bytesTransferred; //offset. //lpIoData.Wsabuf.len -= bytesTransferred; if( WSASend (lpIoData.Socket, &buffSend, 1, &sendNumBytes, flags, &(lpIoData.Overlapped), null) == SOCKET_ERROR && (ERROR_IO_PENDING != WSAGetLastError()) ) { m_service.OnException( "WASSend Failed, Ret:" ~ to!(string)(WSAGetLastError()) ); //closesocket( lpIoData.Socket ); closeConnection( lpIoData.SocketId ); continue; } } else { writefln( "SentBytes >= TotalBytes----投递 接收 ip:%s", lpIoData.remoteAddresToString ); lpIoData.OpCode = IO_OPERATION.RECV; recvNumBytes = 0; flags = 0; lpIoData.Wsabuf.buf = lpIoData.buffer.ptr, lpIoData.Wsabuf.len = m_bufferSize; lpIoData.Overlapped.Internal = 0; lpIoData.Overlapped.InternalHigh = 0; lpIoData.Overlapped.Offset = 0; lpIoData.Overlapped.OffsetHigh = 0; lpIoData.Overlapped.hEvent = null; if( WSARecv(lpIoData.Socket, &lpIoData.Wsabuf, 1, &recvNumBytes, &flags, &lpIoData.Overlapped, null) == SOCKET_ERROR && (ERROR_IO_PENDING != WSAGetLastError()) ) { m_service.OnException( "WASRecv Failed, Ret:" ~ to!(string)(WSAGetLastError()) ); //closesocket( lpIoData.Socket ); closeConnection( lpIoData.SocketId ); continue; } } } } } // 发送部分====================== void sendThread() { while (!m_sendThreadQuit) { //if (m_bUseAutoPackage) //{ // 这个地方存在一个非常大的隐患,很可能造成非分页内存的大量分配!!! // 因为这个调用会直接触发Send(),而Send会直接调用WSASend发送缓存的数据! // 只要客户端不接收,WSASend发送的数据就会全部累积到非分页内存中! // 所以,我们需要修改Send()函数 //m_LinkManager.TraversalObjects(0, 0); //} // 毫秒 Thread.sleep( dur!("msecs")( 50 ) ); // sleep for 50 milliseconds // 秒 //Thread.sleep( dur!("seconds")( 5 ) ); // sleep for 5 seconds uint sendLen = m_sendBuffer.length; if( sendLen > 0 ) { //writefln("" } } } /*public void send( ulong socketId, ubyte[] data ) { send( socketId, data ); } public void send( ulong socketId, void[] data ) { send( socketId, data.ptr , data.length); } public void send( ulong socketId, void* data, uint count ) { send( m_clients[socketId], data, count ); }*/ private uint dwFlags = 0; private uint dwSendNumBytes = 0; public void send( NetState* ns, void* data, uint count ) { //void[] buffer = data[0..count]; //Message.WriteLine("结构 长度=>" ~ to!(string)(structPtr.sizeof) ~ " 数据 长度=>"~ to!(string)(buffer.length) ~" 内容=>" ~ to!(string)(count)); //client.send(buffer); m_Stream.len = 0; m_Stream.seek(0, SeekPos.Set); m_Stream.writeBlock(data, count);; m_Stream.flush(); WSABUF buffer; buffer.buf = cast(char*)m_Stream.buf; buffer.len = cast(uint)m_Stream.len; ns.SentBytes = 0; ns.OpCode = IO_OPERATION.SEND; dwFlags = 0; if( WSASend( ns.Socket, &buffer, 1, &dwSendNumBytes, dwFlags, &(ns.Overlapped), null ) == SOCKET_ERROR ) { if(WSAGetLastError() != ERROR_IO_PENDING) { m_service.OnException( "WASSend Failed, Ret:" ~ to!(string)(WSAGetLastError()) ); closesocket( ns.Socket ); return; } } } public void sendAll(ulong socketId, void* data, uint count, bool self = false ) { foreach( PNetState item; m_clientList ) { if( socketId <= 0 ) // 全部广播,包括自己 send(item, data, count); else // 不给自己广播 { if(item.SocketId != socketId ) send(item, data, count); } } } // 关闭部分====================== //0 SD_RECEIVE 关闭连接接收方 //1 SD_SEND 关闭连接的发送方 //2 SD_BOTH 关闭双方 // 关闭连接 void closeConnection(ulong socketId) { PNetState netState = m_clientList[socketId]; // 删除连接 m_clientList.remove(socketId); if( m_service is null ) m_service.OnException( "客户端连接已断开!" ); else m_service.OnDisconnected( netState ); // 停止该链接 发送 与 接收 操作 shutdown(netState.Socket, SD_BOTH); // 放入 连接关闭队列 m_shutdownList[netState.SocketId] = netState; writefln("socket %d 已被丢进关闭队列中 关闭队列个数%d\n", netState.Socket, m_shutdownList.length); } void closeThread() { while( true ) { Thread.sleep( 2000 ); // 等待5秒 //Thread.sleep( dur!("seconds")( 2 ) ); // 等待50毫秒 //Thread.sleep( dur!("msecs")( 50 ) ); uint socketLen = m_shutdownList.length; writefln("执行关闭线程 %d\n", socketLen); //auto lists = m_shutdownList; foreach( PNetState item; m_shutdownList ) { m_shutdownList.remove(item.SocketId); if( closesocket(item.Socket) == SOCKET_ERROR ) { writefln("closesocket() 发生了如下错误: %d\n", WSAGetLastError()); return; } else { writefln("关闭socket %d 成功!\n", item.Socket); } // 释放对象 GC.free(item); writefln("关闭后线程个数 %d\n", m_shutdownList.length); } /*PNetState netState; for(int i=0; i<socketLen; i++) { netState = m_shutdownList[i]; if( closesocket(netState.Socket) == SOCKET_ERROR ) { writefln("closesocket() 发生了如下错误: %d\n", WSAGetLastError()); return; } else writefln("关闭socket %d 成功!\n", netState.Socket); m_shutdownList.remove(netState.SocketId); // 释放对象 GC.free(netState); }*/ /* // 不知道该加在什么地方 //立即关闭(避免出现TIME_WAIT状态) LINGER linger = {1,0}; setsockopt(socket, SOL_SOCKET, SO_LINGER, (char *)&linger, sizeof(linger)); */ } } } 不敢不回贴, 虽然看不懂。。。 |
|
ww21xx
2012-04-19
其实 很好看,就 监听、应答、处理、发送(没用)、关闭这几个线程,逻辑很好区分开的!
|
|
flyingtimeice
2012-04-19
不错的iocp教程哈
|
|
shirne
2012-05-14
帖子太长了,我还是回帖吧...
|
|
softec
2012-05-15
请诅咒我这辈子被钱多不知怎么花愁死了
|
|
kysnail
2012-05-15
有意思,抽时间看一下。
|