/*++

Copyright (c) 1998  Microsoft Corporation

Module Name:

    tpswork.h

Abstract:

    Worker thread classes. Moved out of tpsclass.h

    Contents:
        CIoWorkerThreadInfo
        CIoWorkerRequest
        CThreadPool

Author:

    Richard L Firth (rfirth) 08-Aug-1998

Revision History:

    08-Aug-1998 rfirth
        Created

--*/

//
// manifests
//

#define THREAD_CREATION_DAMPING_TIME    5000
#define NEW_THREAD_THRESHOLD            10
#define MIN_WORKER_THREADS              1
#define MAX_WORKER_THREADS              128
#define MAX_IO_WORKER_THREADS           256
#define MAX_QUEUE_DEPTH                 0
#define THREAD_IDLE_TIMEOUT             60000

#define TPS_ID                          0x80000000

//
// external data
//

extern DWORD g_dwWorkItemId;

//
// classes
//

//
// CIoWorkerThreadInfo
//

class CIoWorkerThreadInfo : public CDoubleLinkedListEntry {

private:

    HANDLE m_hThread;

public:

    CIoWorkerThreadInfo(CDoubleLinkedList * pList) {
        m_hThread = (HANDLE)-1;
        InsertHead(pList);
    }

    ~CIoWorkerThreadInfo() {

        ASSERT(m_hThread == NULL);

    }

    VOID SetHandle(HANDLE hThread) {
        m_hThread = hThread;
    }

    HANDLE GetHandle(VOID) const {
        return m_hThread;
    }
};

//
// CIoWorkerRequest
//

class CIoWorkerRequest {

private:

    LPTHREAD_START_ROUTINE m_pfnCallback;
    LPVOID m_pContext;

public:

    CIoWorkerRequest(LPTHREAD_START_ROUTINE pfnCallback, LPVOID pContext) {
        m_pfnCallback = pfnCallback;
        m_pContext = pContext;
    }

    LPTHREAD_START_ROUTINE GetCallback(VOID) const {
        return m_pfnCallback;
    }

    LPVOID GetContext(VOID) const {
        return m_pContext;
    }
};

//
// CThreadPool - maintains lists of work items, non-IO worker threads and
// IO worker threads
//

class CThreadPool {

private:

    //
    // private classes
    //

    //
    // CWorkItem - queued app-supplied functions, ordered by priority
    //

    class CWorkItem : public CPrioritizedListEntry {

    public:

        FARPROC m_function;
        ULONG_PTR m_context;
        DWORD_PTR m_tag;
        DWORD_PTR m_id;
        DWORD m_flags;
        HINSTANCE m_hInstModule;

        CWorkItem(FARPROC lpfn,
                  ULONG_PTR context,
                  LONG priority,
                  DWORD_PTR tag,
                  DWORD_PTR * pid,
                  LPCSTR pszModule,
                  DWORD flags
                  ) : CPrioritizedListEntry(priority)
        {
            m_function = lpfn;
            m_context = context;
            m_tag = tag;
            m_id = (DWORD_PTR)0;
            m_flags = flags;

            if (pszModule && *pszModule)
            {
                m_hInstModule = LoadLibrary(pszModule);

                if (!m_hInstModule)
                {
                    TraceMsg(TF_WARNING, TEXT("CWorkItem::CWorkItem  - faild to load %hs (error = %d), worker thread could be abanonded!!"), pszModule, GetLastError());
                }
            }
            else
            {
                m_hInstModule = NULL;
            }

            if (pid) {
                m_id = (DWORD_PTR)++g_dwWorkItemId;
                *pid = m_id;
                m_flags |= TPS_ID;
            }
        }

        ~CWorkItem()
        {
            // we used to call FreeLibrary(m_hInstModule) here but we delete the workitem
            // when we grab it off of the queue (in RemoveWorkItem). so we have to wait until
            // we are actually done running the task before we call FreeLibaray()
        }

        BOOL Match(DWORD_PTR Tag, BOOL IsTag) {
            return IsTag
                ? ((m_flags & TPS_TAGGEDITEM) && (m_tag == Tag))
                : ((m_flags & TPS_ID) && (m_id == Tag));
        }

        BOOL IsLongExec(VOID) {
            return (m_flags & TPS_LONGEXECTIME) ? TRUE : FALSE;
        }
    };

    //
    // work item queue variables
    //

    CPrioritizedList m_queue;
    CCriticalSection_NoCtor m_qlock;
    HANDLE m_event;
    DWORD m_error;
    DWORD m_queueSize;
    DWORD m_qFactor;
    DWORD m_minWorkerThreads;
    DWORD m_maxWorkerThreads;
    DWORD m_maxQueueDepth;
    DWORD m_workerIdleTimeout;
    DWORD m_creationDelta;
    DWORD m_totalWorkerThreads;
    DWORD m_availableWorkerThreads;

#if DBG
    DWORD m_queueSizeMax;
    DWORD m_qFactorMax;
    DWORD m_maxWorkerThreadsCreated;
#endif

    //
    // private member functions
    //

    CWorkItem * DequeueWorkItem(VOID) {

        CWorkItem * pItem = NULL;

        if (!m_queue.IsEmpty()) {
            pItem = (CWorkItem *)m_queue.RemoveHead();
            --m_queueSize;
        }
        return pItem;
    }

    VOID
    Worker(
        VOID
        );

public:

    static
    VOID
    WorkerThread(
        VOID
        );

    BOOL Init(VOID) {
        m_queue.Init();
        m_qlock.Init();

        //
        // create auto-reset, initially unsignalled event
        //

        m_event = CreateEvent(NULL, FALSE, FALSE, NULL);
        m_error = (m_event != NULL) ? ERROR_SUCCESS : GetLastError();
        m_queueSize = 0;
        m_qFactor = 0;
        m_minWorkerThreads = MIN_WORKER_THREADS;
        m_maxWorkerThreads = MAX_WORKER_THREADS;
        m_maxQueueDepth = MAX_QUEUE_DEPTH;
        m_workerIdleTimeout = THREAD_IDLE_TIMEOUT;
        m_creationDelta = THREAD_CREATION_DAMPING_TIME;
        m_totalWorkerThreads = 0;
        m_availableWorkerThreads = 0;

#if DBG
        m_queueSizeMax = 0;
        m_qFactorMax = 0;
        m_maxWorkerThreadsCreated = 0;
#endif

        return m_error == ERROR_SUCCESS;
    }

    VOID Terminate(DWORD Limit) {
        PurgeWorkItems();
        TerminateThreads(Limit);
        if (m_event != NULL) {

            BOOL bOk = CloseHandle(m_event);

            ASSERT(bOk);

            m_event = NULL;
        }
        m_qlock.Terminate();

        ASSERT(m_queue.IsEmpty());

//#if DBG
//char buf[256];
//wsprintf(buf,
//         "CThreadPool::Terminate(): m_queueSizeMax = %d, m_maxWorkerThreadsCreated = %d, m_qFactorMax = %d\n",
//         m_queueSizeMax,
//         m_maxWorkerThreadsCreated,
//         m_qFactorMax
//         );
//OutputDebugString(buf);
//#endif
    }

    DWORD GetError() const {
        return m_error;
    }

    VOID
    SetLimits(
        IN DWORD dwMinimumThreads,
        IN DWORD dwMaximumThreads,
        IN DWORD dwMaximumQueueDepth,
        IN DWORD dwThreadIdleTimeout,
        IN DWORD dwThreadCreationDelta
        )
    {
        m_minWorkerThreads = dwMinimumThreads;
        m_maxWorkerThreads = dwMaximumThreads;
        m_maxQueueDepth = dwMaximumQueueDepth;
        m_workerIdleTimeout = dwThreadIdleTimeout;
        m_creationDelta = dwThreadCreationDelta;
    }

    VOID MakeAvailable(VOID) {
        InterlockedIncrement((LPLONG)&m_availableWorkerThreads);
        if (m_qFactor == 0) {
            m_qFactor = 1;
        } else {
            m_qFactor <<= 1;
        }
#if DBG
        if (m_qFactor > m_qFactorMax) {
            m_qFactorMax = m_qFactor;
        }
#endif
    }

    VOID MakeUnavailable(VOID) {
        InterlockedDecrement((LPLONG)&m_availableWorkerThreads);
        m_qFactor >>= 1;
        if ((m_qFactor == 0) && (m_availableWorkerThreads != 0)) {
            m_qFactor = 1;
        }
    }

    DWORD
    QueueWorkItem(
        FARPROC pfnFunction,
        ULONG_PTR pContext,
        LONG lPriority,
        DWORD_PTR dwTag,
        DWORD_PTR * pdwId,
        LPCSTR pszModule,
        DWORD dwFlags
        )
    {
        //
        // add a work item to the queue at the appropriate place and create a
        // thread to handle it if necessary
        //

        CWorkItem * pItem = new CWorkItem(pfnFunction,
                                          pContext,
                                          lPriority,
                                          dwTag,
                                          pdwId,
                                          pszModule,
                                          dwFlags
                                          );

        if (pItem == NULL) {
            return ERROR_NOT_ENOUGH_MEMORY;
        }
        m_qlock.Acquire();

        //
        // demand-thread work-items have the highest priority. Put at head of
        // queue, else insert based on priority
        //

        if (dwFlags & TPS_DEMANDTHREAD) {
            pItem->InsertHead(&m_queue);
        } else {
            m_queue.insert(pItem);
        }
        ++m_queueSize;
#if DBG
        if (m_queueSize > m_queueSizeMax) {
            m_queueSizeMax = m_queueSize;
        }
#endif

        //
        // determine whether we need to create a new thread:
        //
        //  * no available threads
        //  * work queue growing too fast
        //  * all available threads about to be taken by long-exec work items
        //

        BOOL bCreate = FALSE;
        DWORD error = ERROR_SUCCESS;

        if (m_queueSize > (m_availableWorkerThreads * m_qFactor)) {
            bCreate = TRUE;
        } else {

            DWORD i = 0;
            DWORD n = 0;
            CWorkItem * pItem = (CWorkItem *)m_queue.Next();

            while ((pItem != m_queue.Head()) && (i < m_availableWorkerThreads)) {
                if (pItem->IsLongExec()) {
                    ++n;
                }
                pItem = (CWorkItem *)pItem->Next();
                ++i;
            }
            if (n == m_availableWorkerThreads) {
                bCreate = TRUE;
            }
        }
        m_qlock.Release();
        if (bCreate) {
            // if the CreateWorkerThread fails, do NOT pass back an error code to the caller
            // since we've already added the workitem to the queue.  An error code will
            // likely result in the caller freeing the data for the work item. (saml 081799)
            CreateWorkerThread();
        }
        SetEvent(m_event);
        return error;
    }

    DWORD
    RemoveWorkItem(
        FARPROC * ppfnFunction,
        ULONG_PTR * pContext,
        HMODULE* hModuleToFree,
        DWORD * pdwFlags,
        DWORD dwTimeout
        )
    {
        BOOL bFirstTime = TRUE;
        DWORD dwWaitTime = dwTimeout;

        while (TRUE) {

            CWorkItem * pItem;

            //
            // first test the FIFO state without waiting for the event
            //

            if (!m_queue.IsEmpty())
            {
                m_qlock.Acquire();
                pItem = DequeueWorkItem();

                if (pItem != NULL)
                {
                    if (pItem->m_flags & TPS_LONGEXECTIME)
                    {
                        MakeUnavailable();
                    }

                    m_qlock.Release();
                    *ppfnFunction = pItem->m_function;
                    *pContext = pItem->m_context;
                    *pdwFlags = pItem->m_flags & ~TPS_RESERVED_FLAGS;
                    *hModuleToFree = pItem->m_hInstModule;
                    delete pItem;
                    
                    return ERROR_SUCCESS;
                }
                m_qlock.Release();
            }

            DWORD dwStartTime;

            if ((dwTimeout != INFINITE) && bFirstTime) {
                dwStartTime = GetTickCount();
            }

            //
            // if dwTimeout is 0 (poll) and we've already waited unsuccessfully
            // then we're done: we timed out
            //

            if ((dwTimeout == 0) && !bFirstTime) {
                break;
            }

            //
            // wait alertably: process I/O completions while we wait
            //
            // FEATURE - we want MsgWaitForMultipleObjectsEx() here, but Win95
            //          doesn't support it
            //

            DWORD status = MsgWaitForMultipleObjects(1,
                                                     &m_event,
                                                     FALSE,
                                                     dwWaitTime,
                                                     //QS_ALLINPUT
                                                     QS_SENDMESSAGE | QS_KEY
                                                     );

            //
            // quit now if thread pool is terminating
            //

            if (g_bTpsTerminating) {
                break;
            }
            bFirstTime = FALSE;
            if ((status == WAIT_OBJECT_0) || (status == WAIT_IO_COMPLETION)) {

                //
                // we think there is something to remove from the FIFO or I/O
                // completed. If we're not waiting forever, update the time to
                // wait on the next iteration based on the time we started
                //

                if (dwTimeout != INFINITE) {

                    DWORD dwElapsedTime = GetTickCount() - dwStartTime;

                    if (dwElapsedTime > dwTimeout) {

                        //
                        // waited longer than requested. Don't wait again if
                        // we find there's nothing in the FIFO
                        //

                        dwWaitTime = 0;
                    } else {

                        //
                        // amount of time to wait next iteration is amount of
                        // time until expiration of originally specified period
                        //

                        dwWaitTime = dwTimeout - dwElapsedTime;
                    }
                }
                continue;
            } else if (status == WAIT_OBJECT_0 + 1) {

                MSG msg;

                while (PeekMessage(&msg, NULL, 0, 0, PM_REMOVE)) {
                    if (msg.message == WM_QUIT) {
                         return WAIT_ABANDONED;
                    } else {
                        DispatchMessage(&msg);
                    }
                }
                continue;
            }

            //
            // WAIT_TIMEOUT (or WAIT_ABANDONED (?))
            //

            break;
        }
        return WAIT_TIMEOUT;
    }

    DWORD RemoveTagged(DWORD_PTR Tag, BOOL IsTag) {

        DWORD count = 0;

        m_qlock.Acquire();

        CPrioritizedListEntry * pEntry = (CPrioritizedListEntry *)m_queue.Next();
        CPrioritizedListEntry * pPrev = (CPrioritizedListEntry *)m_queue.Head();

        while (pEntry != m_queue.Head()) {

            CWorkItem * pItem = (CWorkItem *)pEntry;

            if (pItem->Match(Tag, IsTag)) {
                pItem->Remove();
                --m_queueSize;
                delete pItem;
                ++count;
                if (!IsTag) {
                    break;
                }
            } else {
                pPrev = pEntry;
            }
            pEntry = (CPrioritizedListEntry *)pPrev->Next();
        }
        m_qlock.Release();
        return count;
    }

    DWORD GetQueueSize(VOID) const {
        return m_queueSize;
    }

    VOID PurgeWorkItems(VOID) {
        m_qlock.Acquire();

        CWorkItem * pItem;

        while ((pItem = DequeueWorkItem()) != NULL) {
            delete pItem;
        }
        m_qlock.Release();
    }

    VOID Signal(VOID) {
        if (m_event != NULL) {
            SetEvent(m_event);
        }
    }

    DWORD CreateWorkerThread(VOID) {

        HANDLE hThread;
        DWORD error = ERROR_SUCCESS;

        error = StartThread((LPTHREAD_START_ROUTINE)WorkerThread,
                            &hThread,
                            FALSE
                            );
        if (error == ERROR_SUCCESS) {
            AddWorker();
#if DBG
            if (m_totalWorkerThreads > m_maxWorkerThreadsCreated) {
                m_maxWorkerThreadsCreated = m_totalWorkerThreads;
            }
//char buf[256];
//wsprintf(buf, ">>>> started worker thread. Total = %d/%d. Avail = %d. Factor = %d/%d\n",
//         m_totalWorkerThreads,
//         m_maxWorkerThreadsCreated,
//         m_availableWorkerThreads,
//         m_qFactor,
//         m_qFactorMax
//         );
//OutputDebugString(buf);
#endif
            CloseHandle(hThread); // thread handle not required
            return ERROR_SUCCESS;
        }

        return error;
    }

    VOID TerminateThreads(DWORD Limit) {
        while (m_totalWorkerThreads > Limit) {
            Signal();
            SleepEx(0, FALSE);
        }
    }

    VOID AddWorker(VOID) {
        InterlockedIncrement((LPLONG)&m_totalWorkerThreads);
        MakeAvailable();
    }

    VOID RemoveWorker(VOID) {
        MakeUnavailable();
        InterlockedDecrement((LPLONG)&m_totalWorkerThreads);
    }
};
