#include "stdafx.h" #include "InputOutputStream.h" #include "Socket.h" #include "Connection.h" #include "ThreadName.h" #include "compression.h" #include "..\Minecraft.Client\PS3\PS3Extras\ShutdownManager.h" // This should always be enabled, except for debugging use #ifndef _DEBUG #define CONNECTION_ENABLE_TIMEOUT_DISCONNECT 1 #endif int Connection::readThreads = 0; int Connection::writeThreads = 0; int Connection::readSizes[256]; int Connection::writeSizes[256]; void Connection::_init() { // printf("Con:0x%x init\n",this); InitializeCriticalSection(&writeLock); InitializeCriticalSection(&threadCounterLock); InitializeCriticalSection(&incoming_cs); running = true; quitting = false; disconnected = false; disconnectReason = DisconnectPacket::eDisconnect_None; noInputTicks = 0; estimatedRemaining = 0; fakeLag = 0; slowWriteDelay = 50; saqThreadID = 0; closeThreadID = 0; tickCount = 0; } // 4J Jev, need to delete the critical section. Connection::~Connection() { // 4J Stu - Just to be sure, make sure the read and write threads terminate themselves before the connection object is destroyed running = false; if( dis ) dis->close(); // The input stream needs closed before the readThread, or the readThread // may get stuck whilst blocking waiting on a read readThread->WaitForCompletion(INFINITE); writeThread->WaitForCompletion(INFINITE); DeleteCriticalSection(&writeLock); DeleteCriticalSection(&threadCounterLock); DeleteCriticalSection(&incoming_cs); delete m_hWakeReadThread; delete m_hWakeWriteThread; // These should all have been destroyed in close() but no harm in checking again delete byteArrayDos; byteArrayDos = NULL; delete baos; baos = NULL; if( bufferedDos ) { bufferedDos->deleteChildStream(); delete bufferedDos; bufferedDos = NULL; } delete dis; dis = NULL; } Connection::Connection(Socket *socket, const wstring& id, PacketListener *packetListener) // throws IOException { _init(); this->socket = socket; address = socket->getRemoteSocketAddress(); this->packetListener = packetListener; //try { socket->setSoTimeout(30000); socket->setTrafficClass(IPTOS_THROUGHPUT | IPTOS_LOWDELAY); /* 4J JEV no catch } catch (SocketException e) { // catching this exception because it (apparently?) causes problems // on OSX Tiger System.err.println(e.getMessage()); }*/ dis = new DataInputStream(socket->getInputStream(packetListener->isServerPacketListener())); sos = socket->getOutputStream(packetListener->isServerPacketListener()); bufferedDos = new DataOutputStream(new BufferedOutputStream(sos, SEND_BUFFER_SIZE)); baos = new ByteArrayOutputStream( SEND_BUFFER_SIZE ); byteArrayDos = new DataOutputStream(baos); m_hWakeReadThread = new C4JThread::Event; m_hWakeWriteThread = new C4JThread::Event; const char *szId = wstringtofilename(id); char readThreadName[256]; char writeThreadName[256]; sprintf(readThreadName,"%s read\n",szId); sprintf(writeThreadName,"%s write\n",szId); readThread = new C4JThread(runRead, (void*)this, readThreadName, READ_STACK_SIZE); writeThread = new C4JThread(runWrite, this, writeThreadName, WRITE_STACK_SIZE); readThread->SetProcessor(CPU_CORE_CONNECTIONS); writeThread->SetProcessor(CPU_CORE_CONNECTIONS ); readThread->Run(); writeThread->Run(); /* 4J JEV, java: new Thread(wstring(id).append(L" read thread")) { }; writeThread = new Thread(id + " write thread") { public void run() { }; readThread->start(); writeThread->start(); */ } void Connection::setListener(PacketListener *packetListener) { this->packetListener = packetListener; } void Connection::send(shared_ptr packet) { if (quitting) return; MemSect(15); // 4J Jev, synchronized (&writeLock) EnterCriticalSection(&writeLock); estimatedRemaining += packet->getEstimatedSize() + 1; if (packet->shouldDelay) { // 4J We have delayed it enough by putting it in the slow queue, so don't delay when we actually send it packet->shouldDelay = false; outgoing_slow.push(packet); } else { outgoing.push(packet); } // 4J Jev, end synchronized. LeaveCriticalSection(&writeLock); MemSect(0); } void Connection::queueSend(shared_ptr packet) { if (quitting) return; estimatedRemaining += packet->getEstimatedSize() + 1; outgoing_slow.push(packet); } bool Connection::writeTick() { bool didSomething = false; // 4J Stu - If the connection is closed and the output stream has been deleted if(bufferedDos==NULL || byteArrayDos==NULL) return didSomething; // try { if (!outgoing.empty() && (fakeLag == 0 || System::currentTimeMillis() - outgoing.front()->createTime >= fakeLag)) { shared_ptr packet; EnterCriticalSection(&writeLock); packet = outgoing.front(); outgoing.pop(); estimatedRemaining -= packet->getEstimatedSize() + 1; LeaveCriticalSection(&writeLock); Packet::writePacket(packet, bufferedDos); #ifndef _CONTENT_PACKAGE // 4J Added for debugging if( !socket->isLocal() ) Packet::recordOutgoingPacket(packet); #endif // 4J Stu - Changed this so that rather than writing to the network stream through a buffered stream we want to: // a) Only push whole "game" packets to QNet, rather than amalgamated chunks of data that may include many packets, and partial packets // b) To be able to change the priority and queue of a packet if required //sos->writeWithFlags( baos->buf, 0, baos->size(), 0 ); //baos->reset(); writeSizes[packet->getId()] += packet->getEstimatedSize() + 1; didSomething = true; } if ((slowWriteDelay-- <= 0) && !outgoing_slow.empty() && (fakeLag == 0 || System::currentTimeMillis() - outgoing_slow.front()->createTime >= fakeLag)) { shared_ptr packet; //synchronized (writeLock) { EnterCriticalSection(&writeLock); packet = outgoing_slow.front(); outgoing_slow.pop(); estimatedRemaining -= packet->getEstimatedSize() + 1; LeaveCriticalSection(&writeLock); // If the shouldDelay flag is still set at this point then we want to write it to QNet as a single packet with priority flags // Otherwise just buffer the packet with other outgoing packets as the java game did if(packet->shouldDelay) { Packet::writePacket(packet, byteArrayDos); // 4J Stu - Changed this so that rather than writing to the network stream through a buffered stream we want to: // a) Only push whole "game" packets to QNet, rather than amalgamated chunks of data that may include many packets, and partial packets // b) To be able to change the priority and queue of a packet if required int flags = QNET_SENDDATA_LOW_PRIORITY | QNET_SENDDATA_SECONDARY; sos->writeWithFlags( baos->buf, 0, baos->size(), flags ); baos->reset(); } else { Packet::writePacket(packet, bufferedDos); } #ifndef _CONTENT_PACKAGE // 4J Added for debugging if( !socket->isLocal() ) Packet::recordOutgoingPacket(packet); #endif writeSizes[packet->getId()] += packet->getEstimatedSize() + 1; slowWriteDelay = 0; didSomething = true; } /* 4J JEV, removed try/catch } catch (Exception e) { if (!disconnected) handleException(e); return false; } */ return didSomething; } void Connection::flush() { // TODO 4J Stu - How to interrupt threads? Or do we need to change the multithreaded functions a bit more //readThread.interrupt(); //writeThread.interrupt(); m_hWakeReadThread->Set(); m_hWakeWriteThread->Set(); } bool Connection::readTick() { bool didSomething = false; // 4J Stu - If the connection has closed and the input stream has been deleted if(dis==NULL) return didSomething; //try { shared_ptr packet = Packet::readPacket(dis, packetListener->isServerPacketListener()); if (packet != NULL) { readSizes[packet->getId()] += packet->getEstimatedSize() + 1; EnterCriticalSection(&incoming_cs); if(!quitting) { incoming.push(packet); } LeaveCriticalSection(&incoming_cs); didSomething = true; } else { // printf("Con:0x%x readTick close EOS\n",this); // 4J Stu - Remove this line // Fix for #10410 - UI: If the player is removed from a splitscreened host�s game, the next game that player joins will produce a message stating that the host has left. //close(DisconnectPacket::eDisconnect_EndOfStream); } /* 4J JEV, removed try/catch } catch (Exception e) { if (!disconnected) handleException(e); return false; } */ return didSomething; } /* 4J JEV, removed try/catch void handleException(Exception e) { e.printStackTrace(); close("disconnect.genericReason", "Internal exception: " + e.toString()); }*/ void Connection::close(DisconnectPacket::eDisconnectReason reason, ...) { // printf("Con:0x%x close\n",this); if (!running) return; // printf("Con:0x%x close doing something\n",this); disconnected = true; va_list input; va_start( input, reason ); disconnectReason = reason;//va_arg( input, const wstring ); vector objs = vector(); void *i = NULL; while (i != NULL) { i = va_arg( input, void* ); objs.push_back(i); } if( objs.size() ) { disconnectReasonObjects = &objs[0]; } else { disconnectReasonObjects = NULL; } // int count = 0, sum = 0, i = first; // va_list marker; // // va_start( marker, first ); // while( i != -1 ) // { // sum += i; // count++; // i = va_arg( marker, int); // } // va_end( marker ); // return( sum ? (sum / count) : 0 ); // CreateThread(NULL, 0, runClose, this, 0, &closeThreadID); running = false; if( dis ) dis->close(); // The input stream needs closed before the readThread, or the readThread // may get stuck whilst blocking waiting on a read // Make sure that the read & write threads are dead before we go and kill the streams that they depend on readThread->WaitForCompletion(INFINITE); writeThread->WaitForCompletion(INFINITE); delete dis; dis = NULL; if( bufferedDos ) { bufferedDos->close(); bufferedDos->deleteChildStream(); delete bufferedDos; bufferedDos = NULL; } if( byteArrayDos ) { byteArrayDos->close(); delete byteArrayDos; byteArrayDos = NULL; } if( socket ) { socket->close(packetListener->isServerPacketListener()); socket = NULL; } } void Connection::tick() { if (estimatedRemaining > 1 * 1024 * 1024) { close(DisconnectPacket::eDisconnect_Overflow); } EnterCriticalSection(&incoming_cs); bool empty = incoming.empty(); LeaveCriticalSection(&incoming_cs); if (empty) { #if CONNECTION_ENABLE_TIMEOUT_DISCONNECT if (noInputTicks++ == MAX_TICKS_WITHOUT_INPUT) { close(DisconnectPacket::eDisconnect_TimeOut); } #endif } // 4J Stu - Moved this a bit later in the function to stop the race condition of Disconnect packets not being processed when local client leaves //else if( socket && socket->isClosing() ) //{ // close(DisconnectPacket::eDisconnect_Closed); //} else { noInputTicks = 0; } // 4J Added - Send a KeepAlivePacket every now and then to ensure that our read and write threads don't timeout tickCount++; if (tickCount % 20 == 0) { send( shared_ptr( new KeepAlivePacket() ) ); } // 4J Stu - 1.8.2 changed from 100 to 1000 int max = 1000; // 4J-PB - NEEDS CHANGED!!! // If we can call connection.close from within a packet->handle, then we can lockup because the loop below has locked incoming_cs, and the connection.close will flag the read and write threads for the connection to close. // they are running on other threads, and will try to lock incoming_cs // We got this with a pre-login packet of a player who wasn't allowed to play due to parental controls, so was kicked out // This has been changed to use a eAppAction_ExitPlayerPreLogin which will run in the main loop, so the connection will not be ticked at that point EnterCriticalSection(&incoming_cs); // 4J Stu - If disconnected, then we shouldn't process incoming packets std::vector< shared_ptr > packetsToHandle; while (!disconnected && !g_NetworkManager.IsLeavingGame() && g_NetworkManager.IsInSession() && !incoming.empty() && max-- >= 0) { shared_ptr packet = incoming.front(); packetsToHandle.push_back(packet); incoming.pop(); } LeaveCriticalSection(&incoming_cs); // MGH - moved the packet handling outside of the incoming_cs block, as it was locking up sometimes when disconnecting for(int i=0; igetId()); packetsToHandle[i]->handle(packetListener); PIXEndNamedEvent(); } flush(); // 4J Stu - Moved this a bit later in the function to stop the race condition of Disconnect packets not being processed when local client leaves if( socket && socket->isClosing() ) { close(DisconnectPacket::eDisconnect_Closed); } // 4J - split the following condition (used to be disconnect && iscoming.empty()) so we can wrap the access in a critical section if (disconnected) { EnterCriticalSection(&incoming_cs); bool empty = incoming.empty(); LeaveCriticalSection(&incoming_cs); if( empty ) { packetListener->onDisconnect(disconnectReason, disconnectReasonObjects); disconnected = false; // 4J added - don't keep sending this every tick } } } SocketAddress *Connection::getRemoteAddress() { return (SocketAddress *) address; } void Connection::sendAndQuit() { if (quitting) { return; } // printf("Con:0x%x send & quit\n",this); flush(); quitting = true; // TODO 4J Stu - How to interrupt threads? Or do we need to change the multithreaded functions a bit more //readThread.interrupt(); #if 1 // 4J - this used to be in a thread but not sure why, and is causing trouble for us if we kill the connection // whilst the thread is still expecting to be able to send a packet a couple of seconds after starting it if (running) { // 4J TODO writeThread.interrupt(); close(DisconnectPacket::eDisconnect_Closed); } #else CreateThread(NULL, 0, runSendAndQuit, this, 0, &saqThreadID); #endif } int Connection::countDelayedPackets() { return (int)outgoing_slow.size(); } int Connection::runRead(void* lpParam) { ShutdownManager::HasStarted(ShutdownManager::eConnectionReadThreads); Connection *con = (Connection *)lpParam; if (con == NULL) { #ifdef __PS3__ ShutdownManager::HasFinished(ShutdownManager::eConnectionReadThreads); #endif return 0; } Compression::UseDefaultThreadStorage(); CRITICAL_SECTION *cs = &con->threadCounterLock; EnterCriticalSection(cs); con->readThreads++; LeaveCriticalSection(cs); //try { MemSect(19); while (con->running && !con->quitting && ShutdownManager::ShouldRun(ShutdownManager::eConnectionReadThreads)) { while (con->readTick()) ; // try { //Sleep(100L); // TODO - 4J Stu - 1.8.2 changes these sleeps to 2L, but not sure whether we should do that as well con->m_hWakeReadThread->WaitForSignal(100L); } MemSect(0); /* 4J JEV, removed try/catch } catch (InterruptedException e) { } } } finally { synchronized (threadCounterLock) { readThreads--; } } */ ShutdownManager::HasFinished(ShutdownManager::eConnectionReadThreads); return 0; } int Connection::runWrite(void* lpParam) { ShutdownManager::HasStarted(ShutdownManager::eConnectionWriteThreads); Connection *con = dynamic_cast((Connection *) lpParam); if (con == NULL) { ShutdownManager::HasFinished(ShutdownManager::eConnectionWriteThreads); return 0; } Compression::UseDefaultThreadStorage(); CRITICAL_SECTION *cs = &con->threadCounterLock; EnterCriticalSection(cs); con->writeThreads++; LeaveCriticalSection(cs); // 4J Stu - Adding this to force us to run through the writeTick at least once after the event is fired // Otherwise there is a race between the calling thread setting the running flag and this loop checking the condition DWORD waitResult = WAIT_TIMEOUT; while ((con->running || waitResult == WAIT_OBJECT_0 ) && ShutdownManager::ShouldRun(ShutdownManager::eConnectionWriteThreads)) { while (con->writeTick()) ; //Sleep(100L); // TODO - 4J Stu - 1.8.2 changes these sleeps to 2L, but not sure whether we should do that as well waitResult = con->m_hWakeWriteThread->WaitForSignal(100L); if (con->bufferedDos != NULL) con->bufferedDos->flush(); //if (con->byteArrayDos != NULL) con->byteArrayDos->flush(); } // 4J was in a finally block. EnterCriticalSection(cs); con->writeThreads--; LeaveCriticalSection(cs); ShutdownManager::HasFinished(ShutdownManager::eConnectionWriteThreads); return 0; } int Connection::runClose(void* lpParam) { Connection *con = dynamic_cast((Connection *) lpParam); if (con == NULL) return 0; //try { Sleep(2000); if (con->running) { // 4J TODO writeThread.interrupt(); con->close(DisconnectPacket::eDisconnect_Closed); } /* 4J Jev, removed try/catch } catch (Exception e) { e.printStackTrace(); } */ return 1; } int Connection::runSendAndQuit(void* lpParam) { Connection *con = dynamic_cast((Connection *) lpParam); // printf("Con:0x%x runSendAndQuit\n",con); if (con == NULL) return 0; //try { Sleep(2000); if (con->running) { // 4J TODO writeThread.interrupt(); con->close(DisconnectPacket::eDisconnect_Closed); // printf("Con:0x%x runSendAndQuit close\n",con); } // printf("Con:0x%x runSendAndQuit end\n",con); /* 4J Jev, removed try/catch } catch (Exception e) { e.printStackTrace(); } */ return 0; }