ocaml/otherlibs/win32unix/winworker.c

321 lines
8.9 KiB
C

/***********************************************************************/
/* */
/* OCaml */
/* */
/* Contributed by Sylvain Le Gall for Lexifi */
/* */
/* Copyright 2008 Institut National de Recherche en Informatique et */
/* en Automatique. All rights reserved. This file is distributed */
/* under the terms of the GNU Library General Public License, with */
/* the special exception on linking described in file ../../LICENSE. */
/* */
/***********************************************************************/
#include <mlvalues.h>
#include <alloc.h>
#include <memory.h>
#include <signals.h>
#include "winworker.h"
#include "winlist.h"
#include "windbug.h"
typedef enum {
WORKER_CMD_NONE = 0,
WORKER_CMD_EXEC,
WORKER_CMD_STOP
} WORKERCMD;
struct _WORKER {
LIST lst; /* This structure is used as a list. */
HANDLE hJobStarted; /* Event representing that the function has begun.*/
HANDLE hJobStop; /* Event that can be used to notify the function
that it should stop processing. */
HANDLE hJobDone; /* Event representing that the function has
finished. */
void *lpJobUserData; /* User data for the job. */
WORKERFUNC hJobFunc; /* Function to be called during APC */
HANDLE hWorkerReady; /* Worker is ready. */
HANDLE hCommandReady; /* Worker should execute command. */
WORKERCMD ECommand; /* Command to execute */
HANDLE hThread; /* Thread handle of the worker. */
};
#define THREAD_WORKERS_MAX 16
#define THREAD_WORKERS_MEM 4000
LPWORKER lpWorkers = NULL;
DWORD nWorkersCurrent = 0;
DWORD nWorkersMax = 0;
HANDLE hWorkersMutex = INVALID_HANDLE_VALUE;
DWORD WINAPI worker_wait (LPVOID _data)
{
BOOL bExit;
LPWORKER lpWorker;
lpWorker = (LPWORKER )_data;
bExit = FALSE;
DEBUG_PRINT("Worker %x starting", lpWorker);
while (
!bExit
&& SignalObjectAndWait(
lpWorker->hWorkerReady,
lpWorker->hCommandReady,
INFINITE,
TRUE) == WAIT_OBJECT_0)
{
DEBUG_PRINT("Worker %x running", lpWorker);
switch (lpWorker->ECommand)
{
case WORKER_CMD_NONE:
break;
case WORKER_CMD_EXEC:
if (lpWorker->hJobFunc != NULL)
{
SetEvent(lpWorker->hJobStarted);
lpWorker->hJobFunc(lpWorker->hJobStop, lpWorker->lpJobUserData);
SetEvent(lpWorker->hJobDone);
};
break;
case WORKER_CMD_STOP:
bExit = TRUE;
break;
}
};
DEBUG_PRINT("Worker %x exiting", lpWorker);
return 0;
}
LPWORKER worker_new (void)
{
LPWORKER lpWorker = NULL;
lpWorker = (LPWORKER)caml_stat_alloc(sizeof(WORKER));
list_init((LPLIST)lpWorker);
lpWorker->hJobStarted = CreateEvent(NULL, TRUE, FALSE, NULL);
lpWorker->hJobStop = CreateEvent(NULL, TRUE, FALSE, NULL);
lpWorker->hJobDone = CreateEvent(NULL, TRUE, FALSE, NULL);
lpWorker->lpJobUserData = NULL;
lpWorker->hWorkerReady = CreateEvent(NULL, FALSE, FALSE, NULL);
lpWorker->hCommandReady = CreateEvent(NULL, FALSE, FALSE, NULL);
lpWorker->ECommand = WORKER_CMD_NONE;
lpWorker->hThread = CreateThread(
NULL,
THREAD_WORKERS_MEM,
worker_wait,
(LPVOID)lpWorker,
0,
NULL);
return lpWorker;
};
void worker_free (LPWORKER lpWorker)
{
/* Wait for termination of the worker */
DEBUG_PRINT("Shutting down worker %x", lpWorker);
WaitForSingleObject(lpWorker->hWorkerReady, INFINITE);
lpWorker->ECommand = WORKER_CMD_STOP;
SetEvent(lpWorker->hCommandReady);
WaitForSingleObject(lpWorker->hThread, INFINITE);
/* Free resources */
DEBUG_PRINT("Freeing resources of worker %x", lpWorker);
if (lpWorker->hThread != INVALID_HANDLE_VALUE)
{
CloseHandle(lpWorker->hThread);
lpWorker->hThread = INVALID_HANDLE_VALUE;
}
if (lpWorker->hJobStarted != INVALID_HANDLE_VALUE)
{
CloseHandle(lpWorker->hJobStarted);
lpWorker->hJobStarted = INVALID_HANDLE_VALUE;
}
if (lpWorker->hJobStop != INVALID_HANDLE_VALUE)
{
CloseHandle(lpWorker->hJobStop);
lpWorker->hJobStop = INVALID_HANDLE_VALUE;
}
if (lpWorker->hJobDone != INVALID_HANDLE_VALUE)
{
CloseHandle(lpWorker->hJobDone);
lpWorker->hJobDone = INVALID_HANDLE_VALUE;
}
lpWorker->lpJobUserData = NULL;
lpWorker->hJobFunc = NULL;
if (lpWorker->hWorkerReady != INVALID_HANDLE_VALUE)
{
CloseHandle(lpWorker->hWorkerReady);
lpWorker->hWorkerReady = INVALID_HANDLE_VALUE;
}
if (lpWorker->hCommandReady != INVALID_HANDLE_VALUE)
{
CloseHandle(lpWorker->hCommandReady);
lpWorker->hCommandReady = INVALID_HANDLE_VALUE;
}
caml_stat_free(lpWorker);
};
LPWORKER worker_pop (void)
{
LPWORKER lpWorkerFree = NULL;
WaitForSingleObject(hWorkersMutex, INFINITE);
/* Get the first worker of the list */
if (lpWorkers != NULL)
{
lpWorkerFree = lpWorkers;
lpWorkers = LIST_NEXT(LPWORKER, lpWorkers);
}
nWorkersCurrent++;
nWorkersMax = (nWorkersCurrent > nWorkersMax ? nWorkersCurrent : nWorkersMax);
DEBUG_PRINT("Workers running current/runnning max/waiting: %d/%d/%d",
nWorkersCurrent,
nWorkersMax,
list_length((LPLIST)lpWorkers));
ReleaseMutex(hWorkersMutex);
if (lpWorkerFree == NULL)
{
/* We cannot find a free worker, create one. */
lpWorkerFree = worker_new();
}
/* Ensure that we don't get dangling pointer to old data. */
list_init((LPLIST)lpWorkerFree);
lpWorkerFree->lpJobUserData = NULL;
/* Reset events */
ResetEvent(lpWorkerFree->hJobStarted);
ResetEvent(lpWorkerFree->hJobStop);
ResetEvent(lpWorkerFree->hJobDone);
return lpWorkerFree;
}
void worker_push(LPWORKER lpWorker)
{
BOOL bFreeWorker;
bFreeWorker = TRUE;
WaitForSingleObject(hWorkersMutex, INFINITE);
DEBUG_PRINT("Testing if we are under the maximum number of running workers");
if (list_length((LPLIST)lpWorkers) < THREAD_WORKERS_MAX)
{
DEBUG_PRINT("Saving this worker for future use");
DEBUG_PRINT("Next: %x", ((LPLIST)lpWorker)->lpNext);
lpWorkers = (LPWORKER)list_concat((LPLIST)lpWorker, (LPLIST)lpWorkers);
bFreeWorker = FALSE;
};
nWorkersCurrent--;
DEBUG_PRINT("Workers running current/runnning max/waiting: %d/%d/%d",
nWorkersCurrent,
nWorkersMax,
list_length((LPLIST)lpWorkers));
ReleaseMutex(hWorkersMutex);
if (bFreeWorker)
{
DEBUG_PRINT("Freeing worker %x", lpWorker);
worker_free(lpWorker);
}
}
void worker_init (void)
{
int i = 0;
/* Init a shared variable. The only way to ensure that no other
worker will be at the same point is to use a critical section.
*/
DEBUG_PRINT("Allocating mutex for workers");
if (hWorkersMutex == INVALID_HANDLE_VALUE)
{
hWorkersMutex = CreateMutex(NULL, FALSE, NULL);
}
}
void worker_cleanup(void)
{
LPWORKER lpWorker = NULL;
/* WARNING: we can have a race condition here, if while this code
is executed another worker is waiting to access hWorkersMutex,
he will never be able to get it...
*/
if (hWorkersMutex != INVALID_HANDLE_VALUE)
{
WaitForSingleObject(hWorkersMutex, INFINITE);
DEBUG_PRINT("Freeing global resource of workers");
/* Empty the queue of worker worker */
while (lpWorkers != NULL)
{
ReleaseMutex(hWorkersMutex);
lpWorker = worker_pop();
DEBUG_PRINT("Freeing worker %x", lpWorker);
WaitForSingleObject(hWorkersMutex, INFINITE);
worker_free(lpWorker);
};
ReleaseMutex(hWorkersMutex);
/* Destroy associated mutex */
CloseHandle(hWorkersMutex);
hWorkersMutex = INVALID_HANDLE_VALUE;
};
}
LPWORKER worker_job_submit (WORKERFUNC f, void *user_data)
{
LPWORKER lpWorker = worker_pop();
DEBUG_PRINT("Waiting for worker to be ready");
enter_blocking_section();
WaitForSingleObject(lpWorker->hWorkerReady, INFINITE);
ResetEvent(lpWorker->hWorkerReady);
leave_blocking_section();
DEBUG_PRINT("Worker is ready");
lpWorker->hJobFunc = f;
lpWorker->lpJobUserData = user_data;
lpWorker->ECommand = WORKER_CMD_EXEC;
DEBUG_PRINT("Call worker (func: %x, worker: %x)", f, lpWorker);
SetEvent(lpWorker->hCommandReady);
return (LPWORKER)lpWorker;
}
HANDLE worker_job_event_done (LPWORKER lpWorker)
{
return lpWorker->hJobDone;
}
void worker_job_stop (LPWORKER lpWorker)
{
DEBUG_PRINT("Sending stop signal to worker %x", lpWorker);
SetEvent(lpWorker->hJobStop);
DEBUG_PRINT("Signal sent to worker %x", lpWorker);
}
void worker_job_finish (LPWORKER lpWorker)
{
DEBUG_PRINT("Finishing call of worker %x", lpWorker);
enter_blocking_section();
WaitForSingleObject(lpWorker->hJobDone, INFINITE);
leave_blocking_section();
worker_push(lpWorker);
}