#include "stdafx.h" #include "InputOutputStream.h" #include "SocketAddress.h" #include "Socket.h" #include "ThreadName.h" #include "..\Minecraft.Client\ServerConnection.h" #include #include "..\Minecraft.Client\PS3\PS3Extras\ShutdownManager.h" // This current socket implementation is for the creation of a single local link. 2 sockets can be created, one for either end of this local // link, the end (0 or 1) is passed as a parameter to the ctor. CRITICAL_SECTION Socket::s_hostQueueLock[2]; std::queue Socket::s_hostQueue[2]; Socket::SocketOutputStreamLocal *Socket::s_hostOutStream[2]; Socket::SocketInputStreamLocal *Socket::s_hostInStream[2]; ServerConnection *Socket::s_serverConnection = NULL; void Socket::Initialise(ServerConnection *serverConnection) { s_serverConnection = serverConnection; // Only initialise everything else once - just setting up static data, one time xrnm things, thread for ticking sockets static bool init = false; if( init ) { for( int i = 0; i < 2; i++ ) { if(TryEnterCriticalSection(&s_hostQueueLock[i])) { // Clear the queue std::queue empty; std::swap( s_hostQueue[i], empty ); LeaveCriticalSection(&s_hostQueueLock[i]); } s_hostOutStream[i]->m_streamOpen = true; s_hostInStream[i]->m_streamOpen = true; } return; } init = true; for( int i = 0; i < 2; i++ ) { InitializeCriticalSection(&Socket::s_hostQueueLock[i]); s_hostOutStream[i] = new SocketOutputStreamLocal(i); s_hostInStream[i] = new SocketInputStreamLocal(i); } } Socket::Socket(bool response) { m_hostServerConnection = true; m_hostLocal = true; if( response ) { m_end = SOCKET_SERVER_END; } else { m_end = SOCKET_CLIENT_END; Socket *socket = new Socket(1); s_serverConnection->NewIncomingSocket(socket); } for( int i = 0; i < 2; i++ ) { m_endClosed[i] = false; } m_socketClosedEvent = NULL; createdOk = true; networkPlayerSmallId = g_NetworkManager.GetHostPlayer()->GetSmallId(); } Socket::Socket(INetworkPlayer *player, bool response /* = false*/, bool hostLocal /*= false*/) { m_hostServerConnection = false; m_hostLocal = hostLocal; for( int i = 0; i < 2; i++ ) { InitializeCriticalSection(&m_queueLockNetwork[i]); m_inputStream[i] = NULL; m_outputStream[i] = NULL; m_endClosed[i] = false; } if(!response || hostLocal) { m_inputStream[0] = new SocketInputStreamNetwork(this,0); m_outputStream[0] = new SocketOutputStreamNetwork(this,0); m_end = SOCKET_CLIENT_END; } if(response || hostLocal) { m_inputStream[1] = new SocketInputStreamNetwork(this,1); m_outputStream[1] = new SocketOutputStreamNetwork(this,1); m_end = SOCKET_SERVER_END; } m_socketClosedEvent = new C4JThread::Event; //printf("New socket made %s\n", player->GetGamertag() ); networkPlayerSmallId = player->GetSmallId(); createdOk = true; } SocketAddress *Socket::getRemoteSocketAddress() { return NULL; } INetworkPlayer *Socket::getPlayer() { return g_NetworkManager.GetPlayerBySmallId(networkPlayerSmallId); } void Socket::setPlayer(INetworkPlayer *player) { if(player!=NULL) { networkPlayerSmallId = player->GetSmallId(); } else { networkPlayerSmallId = 0; } } void Socket::pushDataToQueue(const BYTE * pbData, DWORD dwDataSize, bool fromHost /*= true*/) { int queueIdx = SOCKET_CLIENT_END; if(!fromHost) queueIdx = SOCKET_SERVER_END; if( queueIdx != m_end && !m_hostLocal ) { app.DebugPrintf("SOCKET: Error pushing data to queue. End is %d but queue idx id %d\n", m_end, queueIdx); return; } EnterCriticalSection(&m_queueLockNetwork[queueIdx]); for( unsigned int i = 0; i < dwDataSize; i++ ) { m_queueNetwork[queueIdx].push(*pbData++); } LeaveCriticalSection(&m_queueLockNetwork[queueIdx]); } void Socket::addIncomingSocket(Socket *socket) { if( s_serverConnection != NULL ) { s_serverConnection->NewIncomingSocket(socket); } } InputStream *Socket::getInputStream(bool isServerConnection) { if( !m_hostServerConnection ) { if( m_hostLocal ) { if( isServerConnection ) { return m_inputStream[SOCKET_SERVER_END]; } else { return m_inputStream[SOCKET_CLIENT_END]; } } else { return m_inputStream[m_end]; } } else { return s_hostInStream[m_end]; } } void Socket::setSoTimeout(int a ) { } void Socket::setTrafficClass( int a ) { } Socket::SocketOutputStream *Socket::getOutputStream(bool isServerConnection) { if( !m_hostServerConnection ) { if( m_hostLocal ) { if( isServerConnection ) { return m_outputStream[SOCKET_SERVER_END]; } else { return m_outputStream[SOCKET_CLIENT_END]; } } else { return m_outputStream[m_end]; } } else { return s_hostOutStream[ 1 - m_end ]; } } bool Socket::close(bool isServerConnection) { bool allClosed = false; if( m_hostLocal ) { if( isServerConnection ) { m_endClosed[SOCKET_SERVER_END] = true; if(m_endClosed[SOCKET_CLIENT_END]) { allClosed = true; } } else { m_endClosed[SOCKET_CLIENT_END] = true; if(m_endClosed[SOCKET_SERVER_END]) { allClosed = true; } } } else { allClosed = true; m_endClosed[m_end] = true; } if( allClosed && m_socketClosedEvent != NULL ) { m_socketClosedEvent->Set(); } if(allClosed) createdOk = false; return allClosed; } /////////////////////////////////// Socket for input, on local connection //////////////////// Socket::SocketInputStreamLocal::SocketInputStreamLocal(int queueIdx) { m_streamOpen = true; m_queueIdx = queueIdx; } // Try and get an input byte, blocking until one is available int Socket::SocketInputStreamLocal::read() { while(m_streamOpen && ShutdownManager::ShouldRun(ShutdownManager::eConnectionReadThreads)) { if(TryEnterCriticalSection(&s_hostQueueLock[m_queueIdx])) { if( s_hostQueue[m_queueIdx].size() ) { byte retval = s_hostQueue[m_queueIdx].front(); s_hostQueue[m_queueIdx].pop(); LeaveCriticalSection(&s_hostQueueLock[m_queueIdx]); return retval; } LeaveCriticalSection(&s_hostQueueLock[m_queueIdx]); } Sleep(1); } return -1; } // Try and get an input array of bytes, blocking until enough bytes are available int Socket::SocketInputStreamLocal::read(byteArray b) { return read(b, 0, b.length); } // Try and get an input range of bytes, blocking until enough bytes are available int Socket::SocketInputStreamLocal::read(byteArray b, unsigned int offset, unsigned int length) { while(m_streamOpen) { if(TryEnterCriticalSection(&s_hostQueueLock[m_queueIdx])) { if( s_hostQueue[m_queueIdx].size() >= length ) { for( unsigned int i = 0; i < length; i++ ) { b[i+offset] = s_hostQueue[m_queueIdx].front(); s_hostQueue[m_queueIdx].pop(); } LeaveCriticalSection(&s_hostQueueLock[m_queueIdx]); return length; } LeaveCriticalSection(&s_hostQueueLock[m_queueIdx]); } Sleep(1); } return -1; } void Socket::SocketInputStreamLocal::close() { m_streamOpen = false; EnterCriticalSection(&s_hostQueueLock[m_queueIdx]); s_hostQueue[m_queueIdx].empty(); LeaveCriticalSection(&s_hostQueueLock[m_queueIdx]); } /////////////////////////////////// Socket for output, on local connection //////////////////// Socket::SocketOutputStreamLocal::SocketOutputStreamLocal(int queueIdx) { m_streamOpen = true; m_queueIdx = queueIdx; } void Socket::SocketOutputStreamLocal::write(unsigned int b) { if( m_streamOpen != true ) { return; } EnterCriticalSection(&s_hostQueueLock[m_queueIdx]); s_hostQueue[m_queueIdx].push((byte)b); LeaveCriticalSection(&s_hostQueueLock[m_queueIdx]); } void Socket::SocketOutputStreamLocal::write(byteArray b) { write(b, 0, b.length); } void Socket::SocketOutputStreamLocal::write(byteArray b, unsigned int offset, unsigned int length) { if( m_streamOpen != true ) { return; } MemSect(12); EnterCriticalSection(&s_hostQueueLock[m_queueIdx]); for( unsigned int i = 0; i < length; i++ ) { s_hostQueue[m_queueIdx].push(b[offset+i]); } LeaveCriticalSection(&s_hostQueueLock[m_queueIdx]); MemSect(0); } void Socket::SocketOutputStreamLocal::close() { m_streamOpen = false; EnterCriticalSection(&s_hostQueueLock[m_queueIdx]); s_hostQueue[m_queueIdx].empty(); LeaveCriticalSection(&s_hostQueueLock[m_queueIdx]); } /////////////////////////////////// Socket for input, on network connection //////////////////// Socket::SocketInputStreamNetwork::SocketInputStreamNetwork(Socket *socket, int queueIdx) { m_streamOpen = true; m_queueIdx = queueIdx; m_socket = socket; } // Try and get an input byte, blocking until one is available int Socket::SocketInputStreamNetwork::read() { while(m_streamOpen && ShutdownManager::ShouldRun(ShutdownManager::eConnectionReadThreads)) { if(TryEnterCriticalSection(&m_socket->m_queueLockNetwork[m_queueIdx])) { if( m_socket->m_queueNetwork[m_queueIdx].size() ) { byte retval = m_socket->m_queueNetwork[m_queueIdx].front(); m_socket->m_queueNetwork[m_queueIdx].pop(); LeaveCriticalSection(&m_socket->m_queueLockNetwork[m_queueIdx]); return retval; } LeaveCriticalSection(&m_socket->m_queueLockNetwork[m_queueIdx]); } Sleep(1); } return -1; } // Try and get an input array of bytes, blocking until enough bytes are available int Socket::SocketInputStreamNetwork::read(byteArray b) { return read(b, 0, b.length); } // Try and get an input range of bytes, blocking until enough bytes are available int Socket::SocketInputStreamNetwork::read(byteArray b, unsigned int offset, unsigned int length) { while(m_streamOpen) { if(TryEnterCriticalSection(&m_socket->m_queueLockNetwork[m_queueIdx])) { if( m_socket->m_queueNetwork[m_queueIdx].size() >= length ) { for( unsigned int i = 0; i < length; i++ ) { b[i+offset] = m_socket->m_queueNetwork[m_queueIdx].front(); m_socket->m_queueNetwork[m_queueIdx].pop(); } LeaveCriticalSection(&m_socket->m_queueLockNetwork[m_queueIdx]); return length; } LeaveCriticalSection(&m_socket->m_queueLockNetwork[m_queueIdx]); } Sleep(1); } return -1; } void Socket::SocketInputStreamNetwork::close() { m_streamOpen = false; } /////////////////////////////////// Socket for output, on network connection //////////////////// Socket::SocketOutputStreamNetwork::SocketOutputStreamNetwork(Socket *socket, int queueIdx) { m_queueIdx = queueIdx; m_socket = socket; m_streamOpen = true; } void Socket::SocketOutputStreamNetwork::write(unsigned int b) { if( m_streamOpen != true ) return; byteArray barray; byte bb; bb = (byte)b; barray.data = &bb; barray.length = 1; write(barray, 0, 1); } void Socket::SocketOutputStreamNetwork::write(byteArray b) { write(b, 0, b.length); } void Socket::SocketOutputStreamNetwork::write(byteArray b, unsigned int offset, unsigned int length) { writeWithFlags(b, offset, length, 0); } void Socket::SocketOutputStreamNetwork::writeWithFlags(byteArray b, unsigned int offset, unsigned int length, int flags) { if( m_streamOpen != true ) return; if( length == 0 ) return; // If this is a local connection, don't bother going through QNet as it just delivers it straight anyway if( m_socket->m_hostLocal ) { // We want to write to the queue for the other end of this socket stream int queueIdx = m_queueIdx; if(queueIdx == SOCKET_CLIENT_END) queueIdx = SOCKET_SERVER_END; else queueIdx = SOCKET_CLIENT_END; EnterCriticalSection(&m_socket->m_queueLockNetwork[queueIdx]); for( unsigned int i = 0; i < length; i++ ) { m_socket->m_queueNetwork[queueIdx].push(b[offset+i]); } LeaveCriticalSection(&m_socket->m_queueLockNetwork[queueIdx]); } else { XRNM_SEND_BUFFER buffer; buffer.pbyData = &b[offset]; buffer.dwDataSize = length; INetworkPlayer *hostPlayer = g_NetworkManager.GetHostPlayer(); if(hostPlayer == NULL) { app.DebugPrintf("Trying to write to network, but the hostPlayer is NULL\n"); return; } INetworkPlayer *socketPlayer = m_socket->getPlayer(); if(socketPlayer == NULL) { app.DebugPrintf("Trying to write to network, but the socketPlayer is NULL\n"); return; } if( m_queueIdx == SOCKET_SERVER_END ) { //printf( "Sent %u bytes of data from \"%ls\" to \"%ls\"\n", //buffer.dwDataSize, //hostPlayer->GetGamertag(), //m_socket->networkPlayer->GetGamertag()); hostPlayer->SendData(socketPlayer, buffer.pbyData, buffer.dwDataSize, QNET_SENDDATA_RELIABLE | QNET_SENDDATA_SEQUENTIAL | flags); // DWORD queueSize = hostPlayer->GetSendQueueSize( NULL, QNET_GETSENDQUEUESIZE_BYTES ); // if( queueSize > 24000 ) // { // //printf("Queue size is: %d, forcing doWork()\n",queueSize); // g_NetworkManager.DoWork(); // } } else { //printf( "Sent %u bytes of data from \"%ls\" to \"%ls\"\n", //buffer.dwDataSize, //m_socket->networkPlayer->GetGamertag(), //hostPlayer->GetGamertag()); socketPlayer->SendData(hostPlayer, buffer.pbyData, buffer.dwDataSize, QNET_SENDDATA_RELIABLE | QNET_SENDDATA_SEQUENTIAL | flags); } } } void Socket::SocketOutputStreamNetwork::close() { m_streamOpen = false; }