`
deepfuture
  • 浏览: 4329798 次
  • 性别: Icon_minigender_1
  • 来自: 湛江
博客专栏
073ec2a9-85b7-3ebf-a3bb-c6361e6c6f64
SQLite源码剖析
浏览量:79390
1591c4b8-62f1-3d3e-9551-25c77465da96
WIN32汇编语言学习应用...
浏览量:68284
F5390db6-59dd-338f-ba18-4e93943ff06a
神奇的perl
浏览量:101376
Dac44363-8a80-3836-99aa-f7b7780fa6e2
lucene等搜索引擎解析...
浏览量:280946
Ec49a563-4109-3c69-9c83-8f6d068ba113
深入lucene3.5源码...
浏览量:14583
9b99bfc2-19c2-3346-9100-7f8879c731ce
VB.NET并行与分布式编...
浏览量:65454
B1db2af3-06b3-35bb-ac08-59ff2d1324b4
silverlight 5...
浏览量:31270
4a56b548-ab3d-35af-a984-e0781d142c23
算法下午茶系列
浏览量:45147
社区版块
存档分类
最新评论

WIN网络编程-IOCP服务程序

阅读更多
//////////////////////////////////////////////////
// IOCP.cpp文件

#include "iocp.h"
#pragma comment(lib, "WS2_32.lib")

CIOCPServer::CIOCPServer()
{
// 列表
m_pFreeBufferList = NULL;
m_pFreeContextList =NULL;
m_pPendingAccepts = NULL;
m_pConnectionList = NULL;

m_nFreeBufferCount = 0;
m_nFreeContextCount = 0;
m_nPendingAcceptCount = 0;
m_nCurrentConnection = 0;

::InitializeCriticalSection(&m_FreeBufferListLock);
::InitializeCriticalSection(&m_FreeContextListLock);
::InitializeCriticalSection(&m_PendingAcceptsLock);
::InitializeCriticalSection(&m_ConnectionListLock);

// Accept请求
m_hAcceptEvent = ::CreateEvent(NULL, FALSE,FALSE, NULL);
m_hRepostEvent = ::CreateEvent(NULL, FALSE,FALSE, NULL);
m_nRepostCount = 0;

m_nPort = 4567;

m_nInitialAccepts = 10;
m_nInitialReads = 4;
m_nMaxAccepts = 100;
m_nMaxSends = 20;
m_nMaxFreeBuffers = 200;
m_nMaxFreeContexts = 100;
m_nMaxConnections = 2000;

m_hListenThread = NULL;
m_hCompletion = NULL;
m_sListen = INVALID_SOCKET;
m_lpfnAcceptEx = NULL;
m_lpfnGetAcceptExSockaddrs = NULL;

m_bShutDown = FALSE;
m_bServerStarted = FALSE;

// 初始化WS2_32.dll
WSADATA wsaData;
WORD sockVersion = MAKEWORD(2, 2);
::WSAStartup(sockVersion,&wsaData);
}

CIOCPServer::~CIOCPServer()
{
Shutdown();

if(m_sListen !=INVALID_SOCKET)
::closesocket(m_sListen);
if(m_hListenThread != NULL)
::CloseHandle(m_hListenThread);

::CloseHandle(m_hRepostEvent);
::CloseHandle(m_hAcceptEvent);

::DeleteCriticalSection(&m_FreeBufferListLock);
::DeleteCriticalSection(&m_FreeContextListLock);
::DeleteCriticalSection(&m_PendingAcceptsLock);
::DeleteCriticalSection(&m_ConnectionListLock);

::WSACleanup();
}


///////////////////////////////////
// 自定义帮助函数

CIOCPBuffer *CIOCPServer::AllocateBuffer(int nLen)
{

//在内存池中分配申请缓冲区
CIOCPBuffer *pBuffer = NULL;
if(nLen > BUFFER_SIZE)
return NULL;

// 为缓冲区对象申请内存
::EnterCriticalSection(&m_FreeBufferListLock);
if(m_pFreeBufferList == NULL)// 内存池为空,申请新的内存
{
pBuffer = (CIOCPBuffer*)::HeapAlloc(GetProcessHeap(),
HEAP_ZERO_MEMORY,sizeof(CIOCPBuffer) + BUFFER_SIZE);

//buff成员指向的内存直接位于CIOCPBuffer对象之后。
}
else// 从内存池中取一块来使用
{
pBuffer =m_pFreeBufferList;
m_pFreeBufferList =m_pFreeBufferList->pNext;
pBuffer->pNext =NULL;
m_nFreeBufferCount --;
}
::LeaveCriticalSection(&m_FreeBufferListLock);

// 初始化新的缓冲区对象
if(pBuffer != NULL)
{
pBuffer->buff =(char*)(pBuffer + 1);

//(char*)(pBuffer + 1)等价于pbuffer +sizeof(CIOCPBuffer ) 的地址

//(char*)pBuffer +1相当于pBuffer的首地址+1


pBuffer->nLen =nLen;
}
return pBuffer;
}

void CIOCPServer::ReleaseBuffer(CIOCPBuffer *pBuffer)
{

//在内存池中释放缓冲区
::EnterCriticalSection(&m_FreeBufferListLock);

if(m_nFreeBufferCount <=m_nMaxFreeBuffers)// 将要释放的内存添加到空闲列表中
{
memset(pBuffer, 0,sizeof(CIOCPBuffer) + BUFFER_SIZE);
pBuffer->pNext =m_pFreeBufferList;
m_pFreeBufferList =pBuffer;

m_nFreeBufferCount ++;
}
else//已经达到最大值,真正的释放内存
{
::HeapFree(::GetProcessHeap(),0, pBuffer);
}

::LeaveCriticalSection(&m_FreeBufferListLock);
}


CIOCPContext *CIOCPServer::AllocateContext(SOCKET s)
{

//为SOCKET申请上下文对象,存储了SOCKET的相关信息
CIOCPContext *pContext;

// 申请一个CIOCPContext对象
::EnterCriticalSection(&m_FreeContextListLock);
if(m_pFreeContextList == NULL)
{
pContext = (CIOCPContext*)
::HeapAlloc(::GetProcessHeap(),HEAP_ZERO_MEMORY, sizeof(CIOCPContext));

::InitializeCriticalSection(&pContext->Lock);
}
else
{
// 在空闲列表中申请
pContext =m_pFreeContextList;
m_pFreeContextList =m_pFreeContextList->pNext;
pContext->pNext= NULL;

m_nFreeBufferCount --;
}
::LeaveCriticalSection(&m_FreeContextListLock);

// 初始化对象成员
if(pContext != NULL)
{
pContext->s =s;
}
return pContext;
}

void CIOCPServer::ReleaseContext(CIOCPContext *pContext)
{

//为SOCKET释放上下文对象,其中存储了SOCKET的相关信息
if(pContext->s !=INVALID_SOCKET)
::closesocket(pContext->s);

// 首先释放(如果有的话)此套节字上的没有按顺序完成的读I/O的缓冲区
CIOCPBuffer *pNext;
while(pContext->pOutOfOrderReads!= NULL)
{
pNext =pContext->pOutOfOrderReads->pNext;
ReleaseBuffer(pContext->pOutOfOrderReads);
pContext->pOutOfOrderReads= pNext;
}

::EnterCriticalSection(&m_FreeContextListLock);

if(m_nFreeContextCount <=m_nMaxFreeContexts) // 添加到空闲列表
{
// 先将关键代码段变量保存到一个临时变量中
CRITICAL_SECTION cstmp =pContext->Lock;
// 将要释放的上下文对象初始化为0
memset(pContext, 0,sizeof(CIOCPContext));

//再放会关键代码段变量,将要释放的上下文对象添加到空闲列表的表头
pContext->Lock =cstmp;
pContext->pNext= m_pFreeContextList;
m_pFreeContextList =pContext;

// 更新计数
m_nFreeContextCount ++;
}
else
{
::DeleteCriticalSection(&pContext->Lock);
::HeapFree(::GetProcessHeap(),0, pContext);
}

::LeaveCriticalSection(&m_FreeContextListLock);
}

void CIOCPServer::FreeBuffers()
{

//释放所有缓冲区池内存
// 遍历m_pFreeBufferList空闲列表,释放缓冲区池内存
::EnterCriticalSection(&m_FreeBufferListLock);

CIOCPBuffer *pFreeBuffer =m_pFreeBufferList;
CIOCPBuffer *pNextBuffer;
while(pFreeBuffer != NULL)
{
pNextBuffer =pFreeBuffer->pNext;
if(!::HeapFree(::GetProcessHeap(),0, pFreeBuffer))
{
#ifdef _DEBUG
::OutputDebugString("FreeBuffers释放内存出错!");
#endif // _DEBUG
break;
}
pFreeBuffer =pNextBuffer;
}
m_pFreeBufferList = NULL;
m_nFreeBufferCount = 0;

::LeaveCriticalSection(&m_FreeBufferListLock);
}

void CIOCPServer::FreeContexts()
{

//释放所有Context
// 遍历m_pFreeContextList空闲列表,释放缓冲区池内存
::EnterCriticalSection(&m_FreeContextListLock);

CIOCPContext *pFreeContext =m_pFreeContextList;
CIOCPContext *pNextContext;
while(pFreeContext != NULL)
{
pNextContext =pFreeContext->pNext;

::DeleteCriticalSection(&pFreeContext->Lock);
if(!::HeapFree(::GetProcessHeap(),0, pFreeContext))
{
#ifdef _DEBUG
::OutputDebugString("FreeBuffers释放内存出错!");
#endif // _DEBUG
break;
}
pFreeContext =pNextContext;
}
m_pFreeContextList = NULL;
m_nFreeContextCount = 0;

::LeaveCriticalSection(&m_FreeContextListLock);
}

BOOL CIOCPServer::AddAConnection(CIOCPContext *pContext)
{
// 向客户连接列表添加一个CIOCPContext对象

::EnterCriticalSection(&m_ConnectionListLock);
if(m_nCurrentConnection <=m_nMaxConnections)
{
// 添加到表头
pContext->pNext =m_pConnectionList;
m_pConnectionList = pContext;
// 更新计数
m_nCurrentConnection ++;

::LeaveCriticalSection(&m_ConnectionListLock);
return TRUE;
}
::LeaveCriticalSection(&m_ConnectionListLock);

return FALSE;
}

void CIOCPServer::CloseAConnection(CIOCPContext *pContext)
{
// 首先从列表中移除要关闭的连接
::EnterCriticalSection(&m_ConnectionListLock);

CIOCPContext* pTest = m_pConnectionList;
if(pTest == pContext)
{
m_pConnectionList =pContext->pNext;
m_nCurrentConnection --;
}
else
{
while(pTest != NULL&& pTest->pNext!= pContext)
pTest =pTest->pNext;
if(pTest != NULL)
{
pTest->pNext= pContext->pNext;
m_nCurrentConnection --;
}
}

::LeaveCriticalSection(&m_ConnectionListLock);

// 然后关闭客户套节字
::EnterCriticalSection(&pContext->Lock);

if(pContext->s !=INVALID_SOCKET)
{
::closesocket(pContext->s);
pContext->s =INVALID_SOCKET;
}
pContext->bClosing = TRUE;

::LeaveCriticalSection(&pContext->Lock);
}

void CIOCPServer::CloseAllConnections()
{
// 遍历整个连接列表,关闭所有的客户套节字

::EnterCriticalSection(&m_ConnectionListLock);

CIOCPContext *pContext =m_pConnectionList;
while(pContext != NULL)
{
::EnterCriticalSection(&pContext->Lock);

if(pContext->s !=INVALID_SOCKET)
{
::closesocket(pContext->s);
pContext->s =INVALID_SOCKET;
}

pContext->bClosing = TRUE;

::LeaveCriticalSection(&pContext->Lock);

pContext = pContext->pNext;
}

m_pConnectionList = NULL;
m_nCurrentConnection = 0;

::LeaveCriticalSection(&m_ConnectionListLock);
}


BOOL CIOCPServer::InsertPendingAccept(CIOCPBuffer *pBuffer)
{

//m_pPendingAccepts表为未决ACCEPT请求,由请求时的CIOCPBuffer缓冲区对象形成
// 将一个I/O缓冲区对象插入到m_pPendingAccepts表中

::EnterCriticalSection(&m_PendingAcceptsLock);

if(m_pPendingAccepts == NULL)
m_pPendingAccepts = pBuffer;
else
{
pBuffer->pNext =m_pPendingAccepts;
m_pPendingAccepts = pBuffer;
}
m_nPendingAcceptCount ++;

::LeaveCriticalSection(&m_PendingAcceptsLock);

return TRUE;
}

BOOL CIOCPServer::RemovePendingAccept(CIOCPBuffer*pBuffer)
{
BOOL bResult = FALSE;

//遍历m_pPendingAccepts表,从中移除pBuffer所指向的缓冲区对象
::EnterCriticalSection(&m_PendingAcceptsLock);

CIOCPBuffer *pTest = m_pPendingAccepts;
if(pTest == pBuffer) // 如果是表头元素
{
m_pPendingAccepts =pBuffer->pNext;
bResult = TRUE;
}
else// 不是表头元素的话,就要遍历这个表来查找了
{
while(pTest != NULL&& pTest->pNext !=pBuffer)
pTest =pTest->pNext;
if(pTest != NULL)
{
pTest->pNext =pBuffer->pNext;
bResult =TRUE;
}
}
// 更新计数
if(bResult)
m_nPendingAcceptCount --;

::LeaveCriticalSection(&m_PendingAcceptsLock);

return bResult;
}


CIOCPBuffer *CIOCPServer::GetNextReadBuffer(CIOCPContext *pContext,CIOCPBuffer *pBuffer)
{

//因为I/O完成端口可能造成无序的包传送,所以在缓冲区内加入一个序列号
if(pBuffer != NULL)
{
// 如果与要读的下一个序列号相等,则读这块缓冲区
if(pBuffer->nSequenceNumber ==pContext->nCurrentReadSequence)
{
return pBuffer;
}

//如果不相等,则说明没有按顺序接收数据,将这块缓冲区保存到连接的pOutOfOrderReads列表中

// 列表中的缓冲区是按照其序列号从小到大的顺序排列的

pBuffer->pNext = NULL;

CIOCPBuffer *ptr =pContext->pOutOfOrderReads;
CIOCPBuffer *pPre = NULL;
while(ptr != NULL)
{
if(pBuffer->nSequenceNumber <ptr->nSequenceNumber)
break;

pPre = ptr;
ptr =ptr->pNext;
}

if(pPre == NULL) // 应该插入到表头
{
pBuffer->pNext= pContext->pOutOfOrderReads;
pContext->pOutOfOrderReads = pBuffer;
}
else //应该插入到表的中间
{
pBuffer->pNext= pPre->pNext;
pPre->pNext =pBuffer->pNext;
}
}

// 检查表头元素的序列号,如果与要读的序列号一致,就将它从表中移除,返回给用户
CIOCPBuffer *ptr =pContext->pOutOfOrderReads;
if(ptr != NULL &&(ptr->nSequenceNumber ==pContext->nCurrentReadSequence))
{
pContext->pOutOfOrderReads =ptr->pNext;
return ptr;
}
return NULL;
}


BOOL CIOCPServer::PostAccept(CIOCPBuffer *pBuffer) //在监听套节字上投递Accept请求
{
// 设置I/O类型
pBuffer->nOperation =OP_ACCEPT;

// 投递此重叠I/O
DWORD dwBytes;
pBuffer->sClient =::WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0,WSA_FLAG_OVERLAPPED);
BOOL b = m_lpfnAcceptEx(m_sListen,
pBuffer->sClient,
pBuffer->buff,
pBuffer->nLen -((sizeof(sockaddr_in) + 16) * 2),
sizeof(sockaddr_in) +16,
sizeof(sockaddr_in) +16,
&dwBytes,
&pBuffer->ol);
if(!b &&::WSAGetLastError() != WSA_IO_PENDING)
{
return FALSE;
}
return TRUE;
};

BOOL CIOCPServer::PostRecv(CIOCPContext *pContext, CIOCPBuffer*pBuffer)
{
// 设置I/O类型
pBuffer->nOperation =OP_READ;

::EnterCriticalSection(&pContext->Lock);

// 设置序列号
pBuffer->nSequenceNumber =pContext->nReadSequence;

// 投递此重叠I/O
DWORD dwBytes;
DWORD dwFlags = 0;
WSABUF buf;
buf.buf = pBuffer->buff;
buf.len = pBuffer->nLen;
if(::WSARecv(pContext->s,&buf, 1, &dwBytes,&dwFlags,&pBuffer->ol, NULL) !=NO_ERROR)
{
if(::WSAGetLastError() != WSA_IO_PENDING)
{
::LeaveCriticalSection(&pContext->Lock);
return FALSE;
}
}

// 增加套节字上的重叠I/O计数和读序列号计数

pContext->nOutstandingRecv++;
pContext->nReadSequence ++;

::LeaveCriticalSection(&pContext->Lock);

return TRUE;
}

BOOL CIOCPServer::PostSend(CIOCPContext *pContext, CIOCPBuffer*pBuffer)
{
// 跟踪投递的发送的数量,防止用户仅发送数据而不接收,导致服务器抛出大量发送操作
if(pContext->nOutstandingSend> m_nMaxSends)
return FALSE;

// 设置I/O类型,增加套节字上的重叠I/O计数
pBuffer->nOperation =OP_WRITE;

// 投递此重叠I/O
DWORD dwBytes;
DWORD dwFlags = 0;
WSABUF buf;
buf.buf = pBuffer->buff;
buf.len = pBuffer->nLen;
if(::WSASend(pContext->s,
&buf, 1,&dwBytes, dwFlags,&pBuffer->ol, NULL) !=NO_ERROR)
{
if(::WSAGetLastError() != WSA_IO_PENDING)
return FALSE;
}

// 增加套节字上的重叠I/O计数
::EnterCriticalSection(&pContext->Lock);
pContext->nOutstandingSend++;
::LeaveCriticalSection(&pContext->Lock);

return TRUE;
}


BOOL CIOCPServer::Start(int nPort, int nMaxConnections,
int nMaxFreeBuffers, intnMaxFreeContexts, int nInitialReads)
{
// 检查服务是否已经启动
if(m_bServerStarted)
return FALSE;

// 保存用户参数
m_nPort = nPort;
m_nMaxConnections = nMaxConnections;
m_nMaxFreeBuffers = nMaxFreeBuffers;
m_nMaxFreeContexts = nMaxFreeContexts;
m_nInitialReads = nInitialReads;

// 初始化状态变量
m_bShutDown = FALSE;
m_bServerStarted = TRUE;


// 创建监听套节字,绑定到本地端口,进入监听模式
m_sListen = ::WSASocket(AF_INET, SOCK_STREAM, 0,NULL, 0, WSA_FLAG_OVERLAPPED);
SOCKADDR_IN si;
si.sin_family = AF_INET;
si.sin_port = ::ntohs(m_nPort);
si.sin_addr.S_un.S_addr = INADDR_ANY;
if(::bind(m_sListen,(sockaddr*)&si, sizeof(si)) == SOCKET_ERROR)
{
m_bServerStarted = FALSE;
return FALSE;
}
::listen(m_sListen, 200);

// 创建完成端口对象
m_hCompletion =::CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0);

// 加载扩展函数AcceptEx
GUID GuidAcceptEx = WSAID_ACCEPTEX;
DWORD dwBytes;
::WSAIoctl(m_sListen,
SIO_GET_EXTENSION_FUNCTION_POINTER,
&GuidAcceptEx,
sizeof(GuidAcceptEx),
&m_lpfnAcceptEx,
sizeof(m_lpfnAcceptEx),
&dwBytes,
NULL,
NULL);

// 加载扩展函数GetAcceptExSockaddrs
GUID GuidGetAcceptExSockaddrs =WSAID_GETACCEPTEXSOCKADDRS;
::WSAIoctl(m_sListen,
SIO_GET_EXTENSION_FUNCTION_POINTER,
&GuidGetAcceptExSockaddrs,
sizeof(GuidGetAcceptExSockaddrs),
&m_lpfnGetAcceptExSockaddrs,
sizeof(m_lpfnGetAcceptExSockaddrs),
&dwBytes,
NULL,
NULL
);


// 将监听套节字关联到完成端口,注意,这里为它传递的CompletionKey为0
::CreateIoCompletionPort((HANDLE)m_sListen,m_hCompletion, (DWORD)0, 0);

// 注册FD_ACCEPT事件。
// 如果投递的AcceptExI/O不够,线程会接收到FD_ACCEPT网络事件,说明应该投递更多的AcceptEx I/O
WSAEventSelect(m_sListen, m_hAcceptEvent,FD_ACCEPT);

// 创建监听线程
m_hListenThread = ::CreateThread(NULL, 0,_ListenThreadProc, this, 0, NULL);

return TRUE;
}

void CIOCPServer::Shutdown()
{
if(!m_bServerStarted)
return;

// 通知监听线程,马上停止服务
m_bShutDown = TRUE;
::SetEvent(m_hAcceptEvent);
// 等待监听线程退出
::WaitForSingleObject(m_hListenThread,INFINITE);
::CloseHandle(m_hListenThread);
m_hListenThread = NULL;

m_bServerStarted = FALSE;
}

DWORD WINAPI CIOCPServer::_ListenThreadProc(LPVOIDlpParam)
{
CIOCPServer *pThis = (CIOCPServer*)lpParam;

// 先在监听套节字上投递几个Accept I/O
CIOCPBuffer *pBuffer;
for(int i=0; im_nInitialAccepts; i++)
{
pBuffer =pThis->AllocateBuffer(BUFFER_SIZE);
if(pBuffer == NULL)
return -1;
pThis->InsertPendingAccept(pBuffer);
pThis->PostAccept(pBuffer);
}

//构建事件对象数组,以便在上面调用WSAWaitForMultipleEvents函数
HANDLE hWaitEvents[2 + MAX_THREAD];
int nEventCount = 0;
hWaitEvents[nEventCount ++] =pThis->m_hAcceptEvent;
hWaitEvents[nEventCount ++] =pThis->m_hRepostEvent;

// 创建指定数量的工作线程在完成端口上处理I/O
for(i=0; i
{
hWaitEvents[nEventCount ++] =::CreateThread(NULL, 0, _WorkerThreadProc, pThis, 0, NULL);
}

// 下面进入无限循环,处理事件对象数组中的事件
while(TRUE)
{
int nIndex =::WSAWaitForMultipleEvents(nEventCount, hWaitEvents, FALSE,60*1000, FALSE);

// 首先检查是否要停止服务
if(pThis->m_bShutDown || nIndex== WSA_WAIT_FAILED)
{
// 关闭所有连接
pThis->CloseAllConnections();
::Sleep(0);// 给I/O工作线程一个执行的机会
// 关闭监听套节字
::closesocket(pThis->m_sListen);
pThis->m_sListen = INVALID_SOCKET;
::Sleep(0);// 给I/O工作线程一个执行的机会

// 通知所有I/O处理线程退出
for(int i=2; i
{
::PostQueuedCompletionStatus(pThis->m_hCompletion,-1, 0, NULL);
}

// 等待I/O处理线程退出
::WaitForMultipleObjects(MAX_THREAD,&hWaitEvents[2], TRUE, 5*1000);

for(i=2; i
{
::CloseHandle(hWaitEvents[i]);
}

::CloseHandle(pThis->m_hCompletion);

pThis->FreeBuffers();
pThis->FreeContexts();
::ExitThread(0);
}

// 1)定时检查所有未返回的AcceptEx I/O的连接建立了多长时间
if(nIndex == WSA_WAIT_TIMEOUT)
{
pBuffer =pThis->m_pPendingAccepts;
while(pBuffer != NULL)
{
intnSeconds;
int nLen =sizeof(nSeconds);
//取得连接建立的时间
::getsockopt(pBuffer->sClient,
SOL_SOCKET, SO_CONNECT_TIME, (char *)&nSeconds,&nLen);
//如果超过2分钟客户还不发送初始数据,就让这个客户go away
if(nSeconds!= -1 && nSeconds >2*60)
{
closesocket(pBuffer->sClient);
pBuffer->sClient = INVALID_SOCKET;
}

pBuffer =pBuffer->pNext;
}
}
else
{
nIndex = nIndex -WAIT_OBJECT_0;
WSANETWORKEVENTS ne;
int nLimit=0;
if(nIndex ==0) //2)m_hAcceptEvent事件对象受信,说明投递的Accept请求不够,需要增加
{
::WSAEnumNetworkEvents(pThis->m_sListen,hWaitEvents[nIndex], &ne);
if(ne.lNetworkEvents & FD_ACCEPT)
{
nLimit = 50; // 增加的个数,这里设为50个
}
}
else if(nIndex == 1) //3)m_hRepostEvent事件对象受信,说明处理I/O的线程接受到新的客户
{
nLimit =InterlockedExchange(&pThis->m_nRepostCount,0);
}
else if(nIndex> 1) //I/O服务线程退出,说明有错误发生,关闭服务器
{
pThis->m_bShutDown = TRUE;
continue;
}

// 投递nLimit个AcceptExI/O请求
int i = 0;
while(i++ <nLimit &&pThis->m_nPendingAcceptCount <pThis->m_nMaxAccepts)
{
pBuffer =pThis->AllocateBuffer(BUFFER_SIZE);
if(pBuffer!= NULL)
{
pThis->InsertPendingAccept(pBuffer);
pThis->PostAccept(pBuffer);
}
}
}
}
return 0;
}

DWORD WINAPI CIOCPServer::_WorkerThreadProc(LPVOIDlpParam)
{
#ifdef _DEBUG
::OutputDebugString("WorkerThread 启动... \n");
#endif // _DEBUG

CIOCPServer *pThis =(CIOCPServer*)lpParam;

CIOCPBuffer *pBuffer;
DWORD dwKey;
DWORD dwTrans;
LPOVERLAPPED lpol;
while(TRUE)
{
// 在关联到此完成端口的所有套节字上等待I/O完成
BOOL bOK =::GetQueuedCompletionStatus(pThis->m_hCompletion,
&dwTrans, (LPDWORD)&dwKey,(LPOVERLAPPED*)&lpol, WSA_INFINITE);

if(dwTrans == -1) // 用户通知退出
{
#ifdef _DEBUG
::OutputDebugString("WorkerThread 退出 \n");
#endif // _DEBUG
::ExitThread(0);
}

pBuffer = CONTAINING_RECORD(lpol,CIOCPBuffer, ol);
int nError = NO_ERROR;
if(!bOK)// 在此套节字上有错误发生
{
SOCKET s;
if(pBuffer->nOperation == OP_ACCEPT)
{
s =pThis->m_sListen;
}
else
{
if(dwKey ==0)
break;
s =((CIOCPContext*)dwKey)->s;
}
DWORD dwFlags = 0;
if(!::WSAGetOverlappedResult(s,&pBuffer->ol,&dwTrans, FALSE, &dwFlags))
{
nError =::WSAGetLastError();
}
}
pThis->HandleIO(dwKey, pBuffer,dwTrans, nError);
}

#ifdef _DEBUG
::OutputDebugString("WorkerThread 退出 \n");
#endif // _DEBUG
return 0;
}

void CIOCPServer::HandleIO(DWORD dwKey,CIOCPBuffer *pBuffer, DWORD dwTrans, int nError)
{
CIOCPContext *pContext = (CIOCPContext*)dwKey;

#ifdef _DEBUG
::OutputDebugString("HandleIO... \n");
#endif // _DEBUG

// 1)首先减少套节字上的未决I/O计数
if(pContext != NULL)
{
::EnterCriticalSection(&pContext->Lock);

if(pBuffer->nOperation ==OP_READ)
pContext->nOutstandingRecv --;
else if(pBuffer->nOperation ==OP_WRITE)
pContext->nOutstandingSend --;

::LeaveCriticalSection(&pContext->Lock);

// 2)检查套节字是否已经被我们关闭
if(pContext->bClosing)
{
#ifdef _DEBUG
::OutputDebugString("检查到套节字已经被我们关闭 \n");
#endif // _DEBUG
if(pContext->nOutstandingRecv == 0&&pContext->nOutstandingSend == 0)
{
ReleaseContext(pContext);
}
// 释放已关闭套节字的未决I/O
ReleaseBuffer(pBuffer);
return;
}
}
else
{
RemovePendingAccept(pBuffer);
}

//3)检查套节字上发生的错误,如果有的话,通知用户,然后关闭套节字
if(nError != NO_ERROR)
{
if(pBuffer->nOperation !=OP_ACCEPT)
{
OnConnectionError(pContext,pBuffer, nError);
CloseAConnection(pContext);
if(pContext->nOutstandingRecv == 0&&pContext->nOutstandingSend == 0)
{
ReleaseContext(pContext);
}
#ifdef _DEBUG
::OutputDebugString("检查到客户套节字上发生错误 \n");
#endif // _DEBUG
}
else // 在监听套节字上发生错误,也就是监听套节字处理的客户出错了
{
// 客户端出错,释放I/O缓冲区
if(pBuffer->sClient != INVALID_SOCKET)
{
::closesocket(pBuffer->sClient);
pBuffer->sClient = INVALID_SOCKET;
}
#ifdef _DEBUG
::OutputDebugString("检查到监听套节字上发生错误 \n");
#endif // _DEBUG
}

ReleaseBuffer(pBuffer);
return;
}


// 开始处理
if(pBuffer->nOperation ==OP_ACCEPT)
{
if(dwTrans == 0)
{
#ifdef _DEBUG
::OutputDebugString("监听套节字上客户端关闭 \n");
#endif // _DEBUG

if(pBuffer->sClient != INVALID_SOCKET)
{
::closesocket(pBuffer->sClient);
pBuffer->sClient = INVALID_SOCKET;
}
}
else
{
// 为新接受的连接申请客户上下文对象
CIOCPContext *pClient =AllocateContext(pBuffer->sClient);
if(pClient != NULL)
{
if(AddAConnection(pClient))
{
// 取得客户地址
int nLocalLen, nRmoteLen;
LPSOCKADDR pLocalAddr, pRemoteAddr;
m_lpfnGetAcceptExSockaddrs(
pBuffer->buff,
pBuffer->nLen - ((sizeof(sockaddr_in) + 16) *2),
sizeof(sockaddr_in) + 16,
sizeof(sockaddr_in) + 16,
(SOCKADDR **)&pLocalAddr,
&nLocalLen,
(SOCKADDR **)&pRemoteAddr,
&nRmoteLen);
memcpy(&pClient->addrLocal,pLocalAddr, nLocalLen);
memcpy(&pClient->addrRemote,pRemoteAddr, nRmoteLen);

// 关联新连接到完成端口对象
::CreateIoCompletionPort((HANDLE)pClient->s,m_hCompletion, (DWORD)pClient, 0);

// 通知用户
pBuffer->nLen = dwTrans;
OnConnectionEstablished(pClient, pBuffer);

// 向新连接投递几个Read请求,这些空间在套节字关闭或出错时释放
for(int i=0; i<5; i++)
{
CIOCPBuffer *p = AllocateBuffer(BUFFER_SIZE);
if(p != NULL)
{
if(!PostRecv(pClient, p))
{
CloseAConnection(pClient);
break;
}
}
}
}
else //连接数量已满,关闭连接
{
CloseAConnection(pClient);
ReleaseContext(pClient);
}
}
else
{
//资源不足,关闭与客户的连接即可
::closesocket(pBuffer->sClient);
pBuffer->sClient = INVALID_SOCKET;
}
}

// Accept请求完成,释放I/O缓冲区
ReleaseBuffer(pBuffer);

// 通知监听线程继续再投递一个Accept请求
::InterlockedIncrement(&m_nRepostCount);
::SetEvent(m_hRepostEvent);
}
else if(pBuffer->nOperation ==OP_READ)
{
if(dwTrans == 0) // 对方关闭套节字
{
// 先通知用户
pBuffer->nLen =0;
OnConnectionClosing(pContext,pBuffer);
// 再关闭连接
CloseAConnection(pContext);
// 释放客户上下文和缓冲区对象
if(pContext->nOutstandingRecv == 0&&pContext->nOutstandingSend == 0)
{
ReleaseContext(pContext);
}
ReleaseBuffer(pBuffer);
}
else
{
pBuffer->nLen =dwTrans;
// 按照I/O投递的顺序读取接收到的数据
CIOCPBuffer *p =GetNextReadBuffer(pContext, pBuffer);
while(p != NULL)
{
//通知用户
OnReadCompleted(pContext, p);
//增加要读的序列号的值
::InterlockedIncrement((LONG*)&pContext->nCurrentReadSequence);
//释放这个已完成的I/O
ReleaseBuffer(p);
p =GetNextReadBuffer(pContext, NULL);
}

// 继续投递一个新的接收请求
pBuffer =AllocateBuffer(BUFFER_SIZE);
if(pBuffer == NULL ||!PostRecv(pContext, pBuffer))
{
CloseAConnection(pContext);
}
}
}
else if(pBuffer->nOperation ==OP_WRITE)
{

if(dwTrans == 0) // 对方关闭套节字
{
// 先通知用户
pBuffer->nLen =0;
OnConnectionClosing(pContext,pBuffer);

// 再关闭连接
CloseAConnection(pContext);

// 释放客户上下文和缓冲区对象
if(pContext->nOutstandingRecv == 0&&pContext->nOutstandingSend == 0)
{
ReleaseContext(pContext);
}
ReleaseBuffer(pBuffer);
}
else
{
// 写操作完成,通知用户
pBuffer->nLen =dwTrans;
OnWriteCompleted(pContext,pBuffer);
// 释放SendText函数申请的缓冲区
ReleaseBuffer(pBuffer);
}
}
}


BOOL CIOCPServer::SendText(CIOCPContext *pContext, char *pszText,int nLen)
{
CIOCPBuffer *pBuffer =AllocateBuffer(nLen);
if(pBuffer != NULL)
{
memcpy(pBuffer->buff, pszText,nLen);
return PostSend(pContext, pBuffer);
}
return FALSE;
}


void CIOCPServer::OnConnectionEstablished(CIOCPContext *pContext,CIOCPBuffer *pBuffer)
{
}

void CIOCPServer::OnConnectionClosing(CIOCPContext *pContext,CIOCPBuffer *pBuffer)
{
}


void CIOCPServer::OnReadCompleted(CIOCPContext *pContext,CIOCPBuffer *pBuffer)
{
}

void CIOCPServer::OnWriteCompleted(CIOCPContext *pContext,CIOCPBuffer *pBuffer)
{
}

void CIOCPServer::OnConnectionError(CIOCPContext *pContext,CIOCPBuffer *pBuffer, int nError)
{
}

////////////////////////////////////////////////
// iocpserver.cpp文件


// CIOCPServer类的测试程序

#include "iocp.h"
#include <stdio.h>
#include <windows.h>

class CMyServer : public CIOCPServer
{
public:

void OnConnectionEstablished(CIOCPContext*pContext, CIOCPBuffer *pBuffer)
{
printf(" 接收到一个新的连接(%d): %s\n",
GetCurrentConnection(),::inet_ntoa(pContext->addrRemote.sin_addr));

SendText(pContext,pBuffer->buff, pBuffer->nLen);
}

void OnConnectionClosing(CIOCPContext*pContext, CIOCPBuffer *pBuffer)
{
printf(" 一个连接关闭! \n" );
}

void OnConnectionError(CIOCPContext *pContext,CIOCPBuffer *pBuffer, int nError)
{
printf(" 一个连接发生错误: %d \n ",nError);
}

void OnReadCompleted(CIOCPContext *pContext,CIOCPBuffer *pBuffer)
{
SendText(pContext,pBuffer->buff, pBuffer->nLen);
}

void OnWriteCompleted(CIOCPContext *pContext,CIOCPBuffer *pBuffer)
{
printf(" 数据发送成功!\n ");
}
};

void main()
{
CMyServer *pServer = new CMyServer;

// 开启服务
if(pServer->Start())
{
printf(" 服务器开启成功... \n");
}
else
{
printf(" 服务器开启失败!\n");
return;
}

// 创建事件对象,让ServerShutdown程序能够关闭自己
HANDLE hEvent = ::CreateEvent(NULL, FALSE, FALSE,"ShutdownEvent");
::WaitForSingleObject(hEvent, INFINITE);
::CloseHandle(hEvent);

// 关闭服务
pServer->Shutdown();
delete pServer;

printf(" 服务器关闭 \n ");

}

////////////////////////////////////////
// IOCP.h文件

#ifndef __IOCP_H__
#define __IOCP_H__

#include<winsock2.h>
#include <windows.h>
#include <Mswsock.h>

#define BUFFER_SIZE1024*4// I/O请求的缓冲区大小
#defineMAX_THREAD2//I/O服务线程的数量


// 这是per-I/O数据。它包含了在套节字上处理I/O操作的必要信息
struct CIOCPBuffer
{
WSAOVERLAPPED ol;

SOCKETsClient;//AcceptEx接收的客户方套节字

char*buff;//I/O操作使用的缓冲区
intnLen;//buff缓冲区(使用的)大小

ULONG nSequenceNumber;//此I/O的序列号

intnOperation;//操作类型
#define OP_ACCEPT1
#define OP_WRITE2
#define OP_READ3

CIOCPBuffer *pNext;
};

// 这是per-Handle数据。它包含了一个套节字的信息
struct CIOCPContext
{
SOCKETs;//套节字句柄

SOCKADDR_INaddrLocal;//连接的本地地址
SOCKADDR_INaddrRemote;//连接的远程地址

BOOLbClosing;//套节字是否关闭

intnOutstandingRecv;//此套节字上抛出的重叠操作的数量
int nOutstandingSend;


ULONGnReadSequence;//安排给接收的下一个序列号
ULONGnCurrentReadSequence;//当前要读的序列号
CIOCPBuffer*pOutOfOrderReads;// 记录没有按顺序完成的读I/O

CRITICAL_SECTIONLock;//保护这个结构

CIOCPContext *pNext;
};


class CIOCPServer // 处理线程
{
public:
CIOCPServer();
~CIOCPServer();

// 开始服务
BOOL Start(int nPort = 4567, int nMaxConnections= 2000,
intnMaxFreeBuffers = 200, int nMaxFreeContexts = 100, intnInitialReads = 4);
// 停止服务
void Shutdown();

// 关闭一个连接和关闭所有连接
void CloseAConnection(CIOCPContext*pContext);
void CloseAllConnections();

// 取得当前的连接数量
ULONG GetCurrentConnection() { returnm_nCurrentConnection; }

// 向指定客户发送文本
BOOL SendText(CIOCPContext *pContext, char*pszText, int nLen);

protected:

// 申请和释放缓冲区对象
CIOCPBuffer *AllocateBuffer(int nLen);
void ReleaseBuffer(CIOCPBuffer *pBuffer);

// 申请和释放套节字上下文
CIOCPContext *AllocateContext(SOCKET s);
void ReleaseContext(CIOCPContext *pContext);

// 释放空闲缓冲区对象列表和空闲上下文对象列表
void FreeBuffers();
void FreeContexts();

// 向连接列表中添加一个连接
BOOL AddAConnection(CIOCPContext *pContext);

// 插入和移除未决的接受请求
BOOL InsertPendingAccept(CIOCPBuffer*pBuffer);
BOOL RemovePendingAccept(CIOCPBuffer*pBuffer);

// 取得下一个要读取的
CIOCPBuffer *GetNextReadBuffer(CIOCPContext*pContext, CIOCPBuffer *pBuffer);


// 投递接受I/O、发送I/O、接收I/O
BOOL PostAccept(CIOCPBuffer *pBuffer);
BOOL PostSend(CIOCPContext *pContext, CIOCPBuffer*pBuffer);
BOOL PostRecv(CIOCPContext *pContext, CIOCPBuffer*pBuffer);

void HandleIO(DWORD dwKey, CIOCPBuffer*pBuffer, DWORD dwTrans, int nError);


// 事件通知函数
// 建立了一个新的连接
virtual void OnConnectionEstablished(CIOCPContext*pContext, CIOCPBuffer *pBuffer);
// 一个连接关闭
virtual void OnConnectionClosing(CIOCPContext*pContext, CIOCPBuffer *pBuffer);
// 在一个连接上发生了错误
virtual void OnConnectionError(CIOCPContext*pContext, CIOCPBuffer *pBuffer, int nError);
// 一个连接上的读操作完成
virtual void OnReadCompleted(CIOCPContext*pContext, CIOCPBuffer *pBuffer);
// 一个连接上的写操作完成
virtual void OnWriteCompleted(CIOCPContext*pContext, CIOCPBuffer *pBuffer);

protected:

// 记录空闲结构信息
CIOCPBuffer *m_pFreeBufferList;
CIOCPContext *m_pFreeContextList;
int m_nFreeBufferCount;
int m_nFreeContextCount;
CRITICAL_SECTION m_FreeBufferListLock;
CRITICAL_SECTION m_FreeContextListLock;

// 记录抛出的Accept请求
CIOCPBuffer*m_pPendingAccepts; //抛出请求列表。
long m_nPendingAcceptCount;
CRITICAL_SECTION m_PendingAcceptsLock;

// 记录连接列表
CIOCPContext *m_pConnectionList;
int m_nCurrentConnection;
CRITICAL_SECTION m_ConnectionListLock;

// 用于投递Accept请求
HANDLE m_hAcceptEvent;
HANDLE m_hRepostEvent;
LONG m_nRepostCount;

intm_nPort;//服务器监听的端口

int m_nInitialAccepts;
int m_nInitialReads;
int m_nMaxAccepts;
int m_nMaxSends;
int m_nMaxFreeBuffers;
int m_nMaxFreeContexts;
int m_nMaxConnections;

HANDLEm_hListenThread;//监听线程
HANDLEm_hCompletion;//完成端口句柄
SOCKETm_sListen;//监听套节字句柄
LPFN_ACCEPTEX m_lpfnAcceptEx;//AcceptEx函数地址
LPFN_GETACCEPTEXSOCKADDRSm_lpfnGetAcceptExSockaddrs; // GetAcceptExSockaddrs函数地址

BOOLm_bShutDown;// 用于通知监听线程退出
BOOL m_bServerStarted;//记录服务是否启动


private:// 线程函数
static DWORD WINAPI _ListenThreadProc(LPVOIDlpParam);
static DWORD WINAPI _WorkerThreadProc(LPVOIDlpParam);
};


#endif // __IOCP_H__

分享到:
评论

相关推荐

    iocp udp 分布式队列无同步多核测试

    设置代码中 DEFAULT_RUDP_BUFFER_SIZE = 32k(实际udp包远没有这么大),用vc6及vc8编译,在我的机器:thinkpad x200,(2.56G双核cpu,4G DDR3内存),win7 旗舰版,在资源管理器中网络速度达到:14Gbit(loopback). ...

    Windows网络编程技术

    935.2 支持的协议 935.2.1 支持的Win32 网络协议 935.2.2 Windows CE 网络协议 945.3 Winsock 2 协议信息 945.4 Windows 套接字 975.5 具体平台的问题 995.6 选择适当的协议 1005.7 小结 100第6 章 地址家族和名字...

    血与荣誉网络通信引擎WINBuild0522.rar

    你可以使用这套SDK,开发你想要的任何网络与通信程序或服务,包括不限于:聊天服务,视频会议,语音会议,文件服务,远程监控,日志服务,HTTP服务,代理服务,数据转发服务,消息服务,安全验证,流媒体服务,音视频编解码,P2P等等...

    Winsock 完成端口模型封装的全新类

    如果不了解,推荐几本书:《Inside Windows 2000,《WINDOWS核心编程》,《WIN32多线程程序设计》、《WINDOWS网络编程技术》。在去年,我在C语言下用完成端口模型写了一个WEBSERVER,前些天,我决定用C++重写这个WEB...

    vc++ 应用源码包_3

    IOCP 完成端口编程技术 《远程控制编程技术》源代码 内含(重启、图片操作、ip操作、键盘与鼠标、客户端以及服务端、文件传输等实例源码) 多个VC++加密解密算法库(CRYPT++) 详细讲解了Crypt++的加密解密的使用...

    vc++ 开发实例源码包

    内含各种例子(vc下各种控件的使用方法、标题栏与菜单栏、工具栏与状态栏、图标与光标、程序窗口、程序控制、进程与线程、字符串、文件读写操作、文件与文件夹属性操作、文件与文件夹系统操作、系统控制操作、程序...

    vc++ 应用源码包_1

    内含各种例子(vc下各种控件的使用方法、标题栏与菜单栏、工具栏与状态栏、图标与光标、程序窗口、程序控制、进程与线程、字符串、文件读写操作、文件与文件夹属性操作、文件与文件夹系统操作、系统控制操作、程序...

    御剑高速tcp全端口扫描工具

    IOCP网络编程模型也叫完成端口,完成端口会充分利用Windows内核来进行I/O的调度,理论上和大量的实践中证明是用于WIN中C/S通信模式中性能最好的网络通信模型,没有之一(希望别打脸) 这是一个基于.NET 2.0编写的...

    vc++ 应用源码包_6

    IOCP 完成端口编程 《远程控制编程技术》源代码 内含(重启、图片操作、ip操作、键盘与鼠标、客户端以及服务端、文件传输等实例源码) 多个VC++加密解密算法库(CRYPT++) 详细讲解了Crypt++的加密解密的使用以及...

    vc++ 应用源码包_5

    IOCP 完成端口编程 《远程控制编程技术》源代码 内含(重启、图片操作、ip操作、键盘与鼠标、客户端以及服务端、文件传输等实例源码) 多个VC++加密解密算法库(CRYPT++) 详细讲解了Crypt++的加密解密的使用以及...

    vc++ 应用源码包_2

    内含各种例子(vc下各种控件的使用方法、标题栏与菜单栏、工具栏与状态栏、图标与光标、程序窗口、程序控制、进程与线程、字符串、文件读写操作、文件与文件夹属性操作、文件与文件夹系统操作、系统控制操作、程序...

Global site tag (gtag.js) - Google Analytics