Merge pull request #505 from terrelln/pthread_cond_t

pthread_cond_t
dev
Yann Collet 2017-01-01 15:53:51 +01:00 committed by GitHub
commit e4f70cdbbc
7 changed files with 59 additions and 32 deletions

View File

@ -12,7 +12,7 @@ matrix:
os: linux os: linux
sudo: false sudo: false
- env: Ubu=12.04cont Cmd="make zlibwrapper && make clean && make -C tests test-pool && make -C tests test-symbols && make clean && make -C tests test-zstd-nolegacy && make clean && make cmaketest && make clean && make -C contrib/pzstd googletest pzstd tests check && make -C contrib/pzstd clean" - env: Ubu=12.04cont Cmd="make zlibwrapper && make clean && make -C tests test-symbols && make clean && make -C tests test-zstd-nolegacy && make clean && make cmaketest && make clean && make -C contrib/pzstd googletest pzstd tests check && make -C contrib/pzstd clean"
os: linux os: linux
sudo: false sudo: false
language: cpp language: cpp

View File

@ -51,21 +51,21 @@ static void* POOL_thread(void* opaque) {
if (!ctx) { return NULL; } if (!ctx) { return NULL; }
for (;;) { for (;;) {
/* Lock the mutex and wait for a non-empty queue or until shutdown */ /* Lock the mutex and wait for a non-empty queue or until shutdown */
if (pthread_mutex_lock(&ctx->queueMutex)) { return NULL; } pthread_mutex_lock(&ctx->queueMutex);
while (ctx->queueHead == ctx->queueTail && !ctx->shutdown) { while (ctx->queueHead == ctx->queueTail && !ctx->shutdown) {
if (pthread_cond_wait(&ctx->queuePopCond, &ctx->queueMutex)) { return NULL; } pthread_cond_wait(&ctx->queuePopCond, &ctx->queueMutex);
} }
/* empty => shutting down: so stop */ /* empty => shutting down: so stop */
if (ctx->queueHead == ctx->queueTail) { if (ctx->queueHead == ctx->queueTail) {
if (pthread_mutex_unlock(&ctx->queueMutex)) { return NULL; } pthread_mutex_unlock(&ctx->queueMutex);
return opaque; return opaque;
} }
/* Pop a job off the queue */ /* Pop a job off the queue */
{ POOL_job const job = ctx->queue[ctx->queueHead]; { POOL_job const job = ctx->queue[ctx->queueHead];
ctx->queueHead = (ctx->queueHead + 1) % ctx->queueSize; ctx->queueHead = (ctx->queueHead + 1) % ctx->queueSize;
/* Unlock the mutex, signal a pusher, and run the job */ /* Unlock the mutex, signal a pusher, and run the job */
if (pthread_mutex_unlock(&ctx->queueMutex)) { return NULL; } pthread_mutex_unlock(&ctx->queueMutex);
if (pthread_cond_signal(&ctx->queuePushCond)) { return NULL; } pthread_cond_signal(&ctx->queuePushCond);
job.function(job.opaque); job.function(job.opaque);
} }
} }
@ -73,7 +73,6 @@ static void* POOL_thread(void* opaque) {
} }
POOL_ctx *POOL_create(size_t numThreads, size_t queueSize) { POOL_ctx *POOL_create(size_t numThreads, size_t queueSize) {
int err = 0;
POOL_ctx *ctx; POOL_ctx *ctx;
/* Check the parameters */ /* Check the parameters */
if (!numThreads || !queueSize) { return NULL; } if (!numThreads || !queueSize) { return NULL; }
@ -88,15 +87,15 @@ POOL_ctx *POOL_create(size_t numThreads, size_t queueSize) {
ctx->queue = (POOL_job *)malloc(ctx->queueSize * sizeof(POOL_job)); ctx->queue = (POOL_job *)malloc(ctx->queueSize * sizeof(POOL_job));
ctx->queueHead = 0; ctx->queueHead = 0;
ctx->queueTail = 0; ctx->queueTail = 0;
err |= pthread_mutex_init(&ctx->queueMutex, NULL); pthread_mutex_init(&ctx->queueMutex, NULL);
err |= pthread_cond_init(&ctx->queuePushCond, NULL); pthread_cond_init(&ctx->queuePushCond, NULL);
err |= pthread_cond_init(&ctx->queuePopCond, NULL); pthread_cond_init(&ctx->queuePopCond, NULL);
ctx->shutdown = 0; ctx->shutdown = 0;
/* Allocate space for the thread handles */ /* Allocate space for the thread handles */
ctx->threads = (pthread_t *)malloc(numThreads * sizeof(pthread_t)); ctx->threads = (pthread_t *)malloc(numThreads * sizeof(pthread_t));
ctx->numThreads = 0; ctx->numThreads = 0;
/* Check for errors */ /* Check for errors */
if (!ctx->threads || !ctx->queue || err) { POOL_free(ctx); return NULL; } if (!ctx->threads || !ctx->queue) { POOL_free(ctx); return NULL; }
/* Initialize the threads */ /* Initialize the threads */
{ size_t i; { size_t i;
for (i = 0; i < numThreads; ++i) { for (i = 0; i < numThreads; ++i) {

View File

@ -39,6 +39,7 @@ typedef void (*POOL_add_function)(void *, POOL_function, void *);
/*! POOL_add() : /*! POOL_add() :
Add the job `function(opaque)` to the thread pool. Add the job `function(opaque)` to the thread pool.
Possibly blocks until there is room in the queue. Possibly blocks until there is room in the queue.
Note : The function may be executed asynchronously, so `opaque` must live until the function has been completed.
*/ */
void POOL_add(void *ctx, POOL_function function, void *opaque); void POOL_add(void *ctx, POOL_function function, void *opaque);

View File

@ -15,7 +15,7 @@
* This file will hold wrapper for systems, which do not support Pthreads * This file will hold wrapper for systems, which do not support Pthreads
*/ */
#ifdef _WIN32 #if defined(ZSTD_PTHREAD) && defined(_WIN32)
/** /**
* Windows minimalist Pthread Wrapper, based on : * Windows minimalist Pthread Wrapper, based on :

View File

@ -24,24 +24,42 @@ extern "C" {
* Windows minimalist Pthread Wrapper, based on : * Windows minimalist Pthread Wrapper, based on :
* http://www.cse.wustl.edu/~schmidt/win32-cv-1.html * http://www.cse.wustl.edu/~schmidt/win32-cv-1.html
*/ */
#ifdef WINVER
# undef WINVER
#endif
#define WINVER 0x0600
#ifdef _WIN32_WINNT
# undef _WIN32_WINNT
#endif
#define _WIN32_WINNT 0x0600
#ifndef WIN32_LEAN_AND_MEAN #ifndef WIN32_LEAN_AND_MEAN
# define WIN32_LEAN_AND_MEAN # define WIN32_LEAN_AND_MEAN
#endif #endif
#include <windows.h> #include <windows.h>
/* mutex */ /* mutex */
#define pthread_mutex_t CRITICAL_SECTION #define pthread_mutex_t CRITICAL_SECTION
#define pthread_mutex_init(a,b) InitializeCriticalSection((a)) #define pthread_mutex_init(a,b) InitializeCriticalSection((a))
#define pthread_mutex_destroy(a) DeleteCriticalSection((a)) #define pthread_mutex_destroy(a) DeleteCriticalSection((a))
#define pthread_mutex_lock EnterCriticalSection #define pthread_mutex_lock(a) EnterCriticalSection((a))
#define pthread_mutex_unlock LeaveCriticalSection #define pthread_mutex_unlock(a) LeaveCriticalSection((a))
/* condition variable */
#define pthread_cond_t CONDITION_VARIABLE
#define pthread_cond_init(a, b) InitializeConditionVariable((a))
#define pthread_cond_destroy(a) /* No delete */
#define pthread_cond_wait(a, b) SleepConditionVariableCS((a), (b), INFINITE)
#define pthread_cond_signal(a) WakeConditionVariable((a))
#define pthread_cond_broadcast(a) WakeAllConditionVariable((a))
/* pthread_create() and pthread_join() */ /* pthread_create() and pthread_join() */
typedef struct { typedef struct {
HANDLE handle; HANDLE handle;
void* (*start_routine)(void*); void* (*start_routine)(void*);
void*varg; void* arg;
} pthread_t; } pthread_t;
int pthread_create(pthread_t* thread, const void* unused, int pthread_create(pthread_t* thread, const void* unused,
@ -68,6 +86,13 @@ typedef int pthread_mutex_t;
#define pthread_mutex_lock(a) #define pthread_mutex_lock(a)
#define pthread_mutex_unlock(a) #define pthread_mutex_unlock(a)
typedef int pthread_cond_t;
#define pthread_cond_init(a,b)
#define pthread_cond_destroy(a)
#define pthread_cond_wait(a,b)
#define pthread_cond_signal(a)
#define pthread_cond_broadcast(a)
/* do not use pthread_t */ /* do not use pthread_t */
#endif /* ZSTD_PTHREAD */ #endif /* ZSTD_PTHREAD */

View File

@ -48,8 +48,10 @@ ZDICT_FILES := $(ZSTDDIR)/dictBuilder/*.c
# Define *.exe as extension for Windows systems # Define *.exe as extension for Windows systems
ifneq (,$(filter Windows%,$(OS))) ifneq (,$(filter Windows%,$(OS)))
EXT =.exe EXT =.exe
PTHREAD = -DZSTD_PTHREAD
else else
EXT = EXT =
PTHREAD = -pthread -DZSTD_PTHREAD
endif endif
VOID = /dev/null VOID = /dev/null
@ -158,8 +160,8 @@ else
$(CC) $(FLAGS) $^ -o $@$(EXT) -Wl,-rpath=$(ZSTDDIR) $(ZSTDDIR)/libzstd.so $(CC) $(FLAGS) $^ -o $@$(EXT) -Wl,-rpath=$(ZSTDDIR) $(ZSTDDIR)/libzstd.so
endif endif
pool : pool.c $(ZSTDDIR)/common/pool.c pool : pool.c $(ZSTDDIR)/common/pool.c $(ZSTDDIR)/common/threading.c
$(CC) $(FLAGS) -pthread -DZSTD_PTHREAD $^ -o $@$(EXT) $(CC) $(FLAGS) $(PTHREAD) $^ -o $@$(EXT)
namespaceTest: namespaceTest:
if $(CC) namespaceTest.c ../lib/common/xxhash.c -o $@ ; then echo compilation should fail; exit 1 ; fi if $(CC) namespaceTest.c ../lib/common/xxhash.c -o $@ ; then echo compilation should fail; exit 1 ; fi
@ -225,7 +227,7 @@ zstd-playTests: datagen
file $(ZSTD) file $(ZSTD)
ZSTD="$(QEMU_SYS) $(ZSTD)" ./playTests.sh $(ZSTDRTTEST) ZSTD="$(QEMU_SYS) $(ZSTD)" ./playTests.sh $(ZSTDRTTEST)
test: test-zstd test-fullbench test-fuzzer test-zstream test-longmatch test-invalidDictionaries test: test-zstd test-fullbench test-fuzzer test-zstream test-longmatch test-invalidDictionaries test-pool
test32: test-zstd32 test-fullbench32 test-fuzzer32 test-zstream32 test32: test-zstd32 test-fullbench32 test-fuzzer32 test-zstream32

View File

@ -1,5 +1,5 @@
#include "pool.h" #include "pool.h"
#include <pthread.h> #include "threading.h"
#include <stddef.h> #include <stddef.h>
#include <stdio.h> #include <stdio.h>
@ -14,7 +14,7 @@
struct data { struct data {
pthread_mutex_t mutex; pthread_mutex_t mutex;
unsigned data[1024]; unsigned data[16];
size_t i; size_t i;
}; };
@ -26,45 +26,45 @@ void fn(void *opaque) {
pthread_mutex_unlock(&data->mutex); pthread_mutex_unlock(&data->mutex);
} }
int testOrder(size_t numThreads, size_t queueLog) { int testOrder(size_t numThreads, size_t queueSize) {
struct data data; struct data data;
POOL_ctx *ctx = POOL_create(numThreads, queueLog); POOL_ctx *ctx = POOL_create(numThreads, queueSize);
ASSERT_TRUE(ctx); ASSERT_TRUE(ctx);
data.i = 0; data.i = 0;
ASSERT_FALSE(pthread_mutex_init(&data.mutex, NULL)); pthread_mutex_init(&data.mutex, NULL);
{ {
size_t i; size_t i;
for (i = 0; i < 1024; ++i) { for (i = 0; i < 16; ++i) {
POOL_add(ctx, &fn, &data); POOL_add(ctx, &fn, &data);
} }
} }
POOL_free(ctx); POOL_free(ctx);
ASSERT_EQ(1024, data.i); ASSERT_EQ(16, data.i);
{ {
size_t i; size_t i;
for (i = 0; i < data.i; ++i) { for (i = 0; i < data.i; ++i) {
ASSERT_EQ(i, data.data[i]); ASSERT_EQ(i, data.data[i]);
} }
} }
ASSERT_FALSE(pthread_mutex_destroy(&data.mutex)); pthread_mutex_destroy(&data.mutex);
return 0; return 0;
} }
int main(int argc, const char **argv) { int main(int argc, const char **argv) {
size_t numThreads; size_t numThreads;
for (numThreads = 1; numThreads <= 8; ++numThreads) { for (numThreads = 1; numThreads <= 4; ++numThreads) {
size_t queueLog; size_t queueSize;
for (queueLog = 1; queueLog <= 8; ++queueLog) { for (queueSize = 1; queueSize <= 2; ++queueSize) {
if (testOrder(numThreads, queueLog)) { if (testOrder(numThreads, queueSize)) {
printf("FAIL: testOrder\n"); printf("FAIL: testOrder\n");
return 1; return 1;
} }
} }
} }
printf("PASS: testOrder\n"); printf("PASS: testOrder\n");
(POOL_create(0, 1) || POOL_create(1, 0)) ? printf("FAIL: testInvalid\n")
: printf("PASS: testInvalid\n");
(void)argc; (void)argc;
(void)argv; (void)argv;
return (POOL_create(0, 1) || POOL_create(1, 0)) ? printf("FAIL: testInvalid\n"), 1
: printf("PASS: testInvalid\n"), 0;
return 0; return 0;
} }