Changed : input divided into roughly equal parts.
Debug : can measure time waiting for mutexes to unlock.
This commit is contained in:
parent
6c0ed9483a
commit
e70912c72b
@ -5,12 +5,41 @@
|
|||||||
|
|
||||||
#if 0
|
#if 0
|
||||||
# include <stdio.h>
|
# include <stdio.h>
|
||||||
static unsigned g_debugLevel = 4;
|
# include <unistd.h>
|
||||||
|
# include <sys/times.h>
|
||||||
|
static unsigned g_debugLevel = 2;
|
||||||
# define DEBUGLOG(l, ...) if (l<=g_debugLevel) { fprintf(stderr, __VA_ARGS__); fprintf(stderr, " \n"); }
|
# define DEBUGLOG(l, ...) if (l<=g_debugLevel) { fprintf(stderr, __VA_ARGS__); fprintf(stderr, " \n"); }
|
||||||
|
|
||||||
|
static unsigned long long GetCurrentClockTimeMicroseconds()
|
||||||
|
{
|
||||||
|
static clock_t _ticksPerSecond = 0;
|
||||||
|
if (_ticksPerSecond <= 0) _ticksPerSecond = sysconf(_SC_CLK_TCK);
|
||||||
|
|
||||||
|
struct tms junk; clock_t newTicks = (clock_t) times(&junk);
|
||||||
|
return ((((unsigned long long)newTicks)*(1000000))/_ticksPerSecond);
|
||||||
|
}
|
||||||
|
|
||||||
|
#define MUTEX_WAIT_TIME_DLEVEL 5
|
||||||
|
#define PTHREAD_MUTEX_LOCK(mutex) \
|
||||||
|
if (g_debugLevel>=MUTEX_WAIT_TIME_DLEVEL) { \
|
||||||
|
unsigned long long beforeTime = GetCurrentClockTimeMicroseconds(); \
|
||||||
|
pthread_mutex_lock(mutex); \
|
||||||
|
unsigned long long afterTime = GetCurrentClockTimeMicroseconds(); \
|
||||||
|
unsigned long long elapsedTime = (afterTime-beforeTime); \
|
||||||
|
if (elapsedTime > 1000) { /* or whatever threshold you like; I'm using 1 millisecond here */ \
|
||||||
|
DEBUGLOG(MUTEX_WAIT_TIME_DLEVEL, "Thread %li took %llu microseconds to acquire mutex %s \n", \
|
||||||
|
(long int) pthread_self(), elapsedTime, #mutex); \
|
||||||
|
} \
|
||||||
|
} else pthread_mutex_lock(mutex);
|
||||||
|
|
||||||
#else
|
#else
|
||||||
|
|
||||||
# define DEBUGLOG(l, ...) /* disabled */
|
# define DEBUGLOG(l, ...) /* disabled */
|
||||||
|
# define PTHREAD_MUTEX_LOCK(m) pthread_mutex_lock(m)
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
|
||||||
#define ZSTDMT_NBTHREADS_MAX 128
|
#define ZSTDMT_NBTHREADS_MAX 128
|
||||||
#define ZSTDMT_NBSTACKEDFRAMES_MAX (2*ZSTDMT_NBTHREADS_MAX)
|
#define ZSTDMT_NBSTACKEDFRAMES_MAX (2*ZSTDMT_NBTHREADS_MAX)
|
||||||
|
|
||||||
@ -38,8 +67,9 @@ static ZSTDMT_dstBufferManager ZSTDMT_createDstBufferManager(void* dst, size_t d
|
|||||||
dbm.out.pos = 0;
|
dbm.out.pos = 0;
|
||||||
dbm.frameIDToWrite = 0;
|
dbm.frameIDToWrite = 0;
|
||||||
pthread_mutex_init(&dbm.frameTable_mutex, NULL);
|
pthread_mutex_init(&dbm.frameTable_mutex, NULL);
|
||||||
pthread_mutex_init(&dbm.allFramesWritten_mutex, NULL);
|
pthread_mutex_t* const allFramesWritten_mutex = &dbm.allFramesWritten_mutex;
|
||||||
pthread_mutex_lock(&dbm.allFramesWritten_mutex); /* maybe could be merged into init ? */
|
pthread_mutex_init(allFramesWritten_mutex, NULL);
|
||||||
|
PTHREAD_MUTEX_LOCK(allFramesWritten_mutex); /* maybe could be merged into init ? */
|
||||||
dbm.nbStackedFrames = 0;
|
dbm.nbStackedFrames = 0;
|
||||||
return dbm;
|
return dbm;
|
||||||
}
|
}
|
||||||
@ -89,7 +119,7 @@ static size_t ZSTDMT_tryWriteFrame(ZSTDMT_dstBufferManager* dstBufferManager,
|
|||||||
|
|
||||||
/* check if correct frame ordering; stack otherwise */
|
/* check if correct frame ordering; stack otherwise */
|
||||||
DEBUGLOG(5, "considering writing frame %u ", frameID);
|
DEBUGLOG(5, "considering writing frame %u ", frameID);
|
||||||
pthread_mutex_lock(&dstBufferManager->frameTable_mutex);
|
PTHREAD_MUTEX_LOCK(&dstBufferManager->frameTable_mutex);
|
||||||
if (frameID != dstBufferManager->frameIDToWrite) {
|
if (frameID != dstBufferManager->frameIDToWrite) {
|
||||||
DEBUGLOG(4, "writing frameID %u : not possible, waiting for %u ", frameID, dstBufferManager->frameIDToWrite);
|
DEBUGLOG(4, "writing frameID %u : not possible, waiting for %u ", frameID, dstBufferManager->frameIDToWrite);
|
||||||
frameToWrite_t const frame = { src, srcSize, frameID, isLastFrame };
|
frameToWrite_t const frame = { src, srcSize, frameID, isLastFrame };
|
||||||
@ -112,7 +142,7 @@ static size_t ZSTDMT_tryWriteFrame(ZSTDMT_dstBufferManager* dstBufferManager,
|
|||||||
lastFrameWritten = isLastFrame;
|
lastFrameWritten = isLastFrame;
|
||||||
|
|
||||||
/* check if more frames are stacked */
|
/* check if more frames are stacked */
|
||||||
pthread_mutex_lock(&dstBufferManager->frameTable_mutex);
|
PTHREAD_MUTEX_LOCK(&dstBufferManager->frameTable_mutex);
|
||||||
unsigned frameWritten = dstBufferManager->nbStackedFrames>0;
|
unsigned frameWritten = dstBufferManager->nbStackedFrames>0;
|
||||||
while (frameWritten) {
|
while (frameWritten) {
|
||||||
unsigned u;
|
unsigned u;
|
||||||
@ -127,7 +157,7 @@ static size_t ZSTDMT_tryWriteFrame(ZSTDMT_dstBufferManager* dstBufferManager,
|
|||||||
lastFrameWritten = dstBufferManager->stackedFrame[u].isLastFrame;
|
lastFrameWritten = dstBufferManager->stackedFrame[u].isLastFrame;
|
||||||
dstBufferManager->frameIDToWrite = frameID+1;
|
dstBufferManager->frameIDToWrite = frameID+1;
|
||||||
/* remove frame from stack */
|
/* remove frame from stack */
|
||||||
pthread_mutex_lock(&dstBufferManager->frameTable_mutex);
|
PTHREAD_MUTEX_LOCK(&dstBufferManager->frameTable_mutex);
|
||||||
dstBufferManager->stackedFrame[u] = dstBufferManager->stackedFrame[dstBufferManager->nbStackedFrames-1];
|
dstBufferManager->stackedFrame[u] = dstBufferManager->stackedFrame[dstBufferManager->nbStackedFrames-1];
|
||||||
dstBufferManager->nbStackedFrames -= 1;
|
dstBufferManager->nbStackedFrames -= 1;
|
||||||
frameWritten = dstBufferManager->nbStackedFrames>0;
|
frameWritten = dstBufferManager->nbStackedFrames>0;
|
||||||
@ -166,7 +196,7 @@ typedef struct ZSTDMT_jobAgency_s {
|
|||||||
static void ZSTDMT_postjob(ZSTDMT_jobAgency* jobAgency, ZSTDMT_jobDescription job)
|
static void ZSTDMT_postjob(ZSTDMT_jobAgency* jobAgency, ZSTDMT_jobDescription job)
|
||||||
{
|
{
|
||||||
DEBUGLOG(5, "starting job posting ");
|
DEBUGLOG(5, "starting job posting ");
|
||||||
pthread_mutex_lock(&jobAgency->jobApply_mutex); /* wait for a thread to take previous job */
|
PTHREAD_MUTEX_LOCK(&jobAgency->jobApply_mutex); /* wait for a thread to take previous job */
|
||||||
DEBUGLOG(5, "job posting mutex acquired ");
|
DEBUGLOG(5, "job posting mutex acquired ");
|
||||||
jobAgency->jobAnnounce = job; /* post job */
|
jobAgency->jobAnnounce = job; /* post job */
|
||||||
pthread_mutex_unlock(&jobAgency->jobAnnounce_mutex); /* announce */
|
pthread_mutex_unlock(&jobAgency->jobAnnounce_mutex); /* announce */
|
||||||
@ -175,7 +205,7 @@ static void ZSTDMT_postjob(ZSTDMT_jobAgency* jobAgency, ZSTDMT_jobDescription jo
|
|||||||
|
|
||||||
static ZSTDMT_jobDescription ZSTDMT_getjob(ZSTDMT_jobAgency* jobAgency)
|
static ZSTDMT_jobDescription ZSTDMT_getjob(ZSTDMT_jobAgency* jobAgency)
|
||||||
{
|
{
|
||||||
pthread_mutex_lock(&jobAgency->jobAnnounce_mutex); /* should check return code */
|
PTHREAD_MUTEX_LOCK(&jobAgency->jobAnnounce_mutex); /* should check return code */
|
||||||
ZSTDMT_jobDescription const job = jobAgency->jobAnnounce;
|
ZSTDMT_jobDescription const job = jobAgency->jobAnnounce;
|
||||||
pthread_mutex_unlock(&jobAgency->jobApply_mutex);
|
pthread_mutex_unlock(&jobAgency->jobApply_mutex);
|
||||||
return job;
|
return job;
|
||||||
@ -192,7 +222,7 @@ typedef struct ZSTDMT_bufferPool_s {
|
|||||||
|
|
||||||
static buffer_t ZSTDMT_getBuffer(ZSTDMT_bufferPool* pool, size_t bSize)
|
static buffer_t ZSTDMT_getBuffer(ZSTDMT_bufferPool* pool, size_t bSize)
|
||||||
{
|
{
|
||||||
pthread_mutex_lock(&pool->bufferPool_mutex);
|
PTHREAD_MUTEX_LOCK(&pool->bufferPool_mutex);
|
||||||
if (pool->nbBuffers) { /* try to use an existing buffer */
|
if (pool->nbBuffers) { /* try to use an existing buffer */
|
||||||
pool->nbBuffers--;
|
pool->nbBuffers--;
|
||||||
buffer_t const buf = pool->bTable[pool->nbBuffers];
|
buffer_t const buf = pool->bTable[pool->nbBuffers];
|
||||||
@ -213,7 +243,7 @@ static buffer_t ZSTDMT_getBuffer(ZSTDMT_bufferPool* pool, size_t bSize)
|
|||||||
/* effectively store buffer for later re-use, up to pool capacity */
|
/* effectively store buffer for later re-use, up to pool capacity */
|
||||||
static void ZSTDMT_releaseBuffer(ZSTDMT_bufferPool* pool, buffer_t buf)
|
static void ZSTDMT_releaseBuffer(ZSTDMT_bufferPool* pool, buffer_t buf)
|
||||||
{
|
{
|
||||||
pthread_mutex_lock(&pool->bufferPool_mutex);
|
PTHREAD_MUTEX_LOCK(&pool->bufferPool_mutex);
|
||||||
if (pool->nbBuffers >= ZSTDMT_NBBUFFERSPOOLED_MAX) {
|
if (pool->nbBuffers >= ZSTDMT_NBBUFFERSPOOLED_MAX) {
|
||||||
pthread_mutex_unlock(&pool->bufferPool_mutex);
|
pthread_mutex_unlock(&pool->bufferPool_mutex);
|
||||||
free(buf.start);
|
free(buf.start);
|
||||||
@ -240,6 +270,7 @@ static void* ZSTDMT_compressionThread(void* arg)
|
|||||||
ZSTDMT_bufferPool* const pool = &mtctx->bufferPool;
|
ZSTDMT_bufferPool* const pool = &mtctx->bufferPool;
|
||||||
ZSTD_CCtx* const cctx = ZSTD_createCCtx();
|
ZSTD_CCtx* const cctx = ZSTD_createCCtx();
|
||||||
if (cctx==NULL) return NULL; /* allocation failure : thread not started */
|
if (cctx==NULL) return NULL; /* allocation failure : thread not started */
|
||||||
|
DEBUGLOG(3, "thread %li created ", (long int)pthread_self());
|
||||||
for (;;) {
|
for (;;) {
|
||||||
ZSTDMT_jobDescription const job = ZSTDMT_getjob(jobAgency);
|
ZSTDMT_jobDescription const job = ZSTDMT_getjob(jobAgency);
|
||||||
if (job.src == NULL) {
|
if (job.src == NULL) {
|
||||||
@ -254,7 +285,7 @@ static void* ZSTDMT_compressionThread(void* arg)
|
|||||||
DEBUGLOG(4, "start compressing frame %u", job.frameNumber);
|
DEBUGLOG(4, "start compressing frame %u", job.frameNumber);
|
||||||
//size_t const cSize = ZSTD_compress(dstBuffer.start, dstBuffer.bufferSize, job.src, job.srcSize, job.compressionLevel);
|
//size_t const cSize = ZSTD_compress(dstBuffer.start, dstBuffer.bufferSize, job.src, job.srcSize, job.compressionLevel);
|
||||||
size_t const cSize = ZSTD_compressCCtx(cctx, dstBuffer.start, dstBuffer.bufferSize, job.src, job.srcSize, job.compressionLevel);
|
size_t const cSize = ZSTD_compressCCtx(cctx, dstBuffer.start, dstBuffer.bufferSize, job.src, job.srcSize, job.compressionLevel);
|
||||||
if (ZSTD_isError(cSize)) return (void*)(cSize); /* error */
|
if (ZSTD_isError(cSize)) return (void*)(cSize); /* error - find a better way */
|
||||||
size_t const writeError = ZSTDMT_tryWriteFrame(dstBufferManager, dstBuffer.start, cSize, job.frameNumber, job.isLastFrame); /* pas clair */
|
size_t const writeError = ZSTDMT_tryWriteFrame(dstBufferManager, dstBuffer.start, cSize, job.frameNumber, job.isLastFrame); /* pas clair */
|
||||||
if (ZSTD_isError(writeError)) return (void*)writeError;
|
if (ZSTD_isError(writeError)) return (void*)writeError;
|
||||||
if (job.frameNumber) ZSTDMT_releaseBuffer(pool, dstBuffer);
|
if (job.frameNumber) ZSTDMT_releaseBuffer(pool, dstBuffer);
|
||||||
@ -269,7 +300,7 @@ ZSTDMT_CCtx *ZSTDMT_createCCtx(unsigned nbThreads)
|
|||||||
/* init jobAgency */
|
/* init jobAgency */
|
||||||
pthread_mutex_init(&cctx->jobAgency.jobAnnounce_mutex, NULL); /* check return value ? */
|
pthread_mutex_init(&cctx->jobAgency.jobAnnounce_mutex, NULL); /* check return value ? */
|
||||||
pthread_mutex_init(&cctx->jobAgency.jobApply_mutex, NULL);
|
pthread_mutex_init(&cctx->jobAgency.jobApply_mutex, NULL);
|
||||||
pthread_mutex_lock(&cctx->jobAgency.jobAnnounce_mutex); /* no job at beginning */
|
PTHREAD_MUTEX_LOCK(&cctx->jobAgency.jobAnnounce_mutex); /* no job at beginning */
|
||||||
/* init bufferPool */
|
/* init bufferPool */
|
||||||
pthread_mutex_init(&cctx->bufferPool.bufferPool_mutex, NULL);
|
pthread_mutex_init(&cctx->bufferPool.bufferPool_mutex, NULL);
|
||||||
/* start all workers */
|
/* start all workers */
|
||||||
@ -299,7 +330,8 @@ size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* cctx,
|
|||||||
ZSTDMT_jobAgency* jobAgency = &cctx->jobAgency;
|
ZSTDMT_jobAgency* jobAgency = &cctx->jobAgency;
|
||||||
ZSTD_parameters const params = ZSTD_getParams(compressionLevel, srcSize, 0);
|
ZSTD_parameters const params = ZSTD_getParams(compressionLevel, srcSize, 0);
|
||||||
size_t const frameSizeTarget = (size_t)1 << (params.cParams.windowLog + 2);
|
size_t const frameSizeTarget = (size_t)1 << (params.cParams.windowLog + 2);
|
||||||
unsigned const nbFrames = (unsigned)(srcSize / frameSizeTarget) + (srcSize < frameSizeTarget) /* min 1 */;
|
unsigned const nbFramesMax = (unsigned)(srcSize / frameSizeTarget) + (srcSize < frameSizeTarget) /* min 1 */;
|
||||||
|
unsigned const nbFrames = MIN(nbFramesMax, cctx->nbThreads);
|
||||||
size_t const avgFrameSize = (srcSize + (nbFrames-1)) / nbFrames;
|
size_t const avgFrameSize = (srcSize + (nbFrames-1)) / nbFrames;
|
||||||
size_t remainingSrcSize = srcSize;
|
size_t remainingSrcSize = srcSize;
|
||||||
const char* const srcStart = (const char*)src;
|
const char* const srcStart = (const char*)src;
|
||||||
@ -320,7 +352,7 @@ size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* cctx,
|
|||||||
remainingSrcSize -= frameSize;
|
remainingSrcSize -= frameSize;
|
||||||
} }
|
} }
|
||||||
|
|
||||||
pthread_mutex_lock(&dbm.allFramesWritten_mutex);
|
PTHREAD_MUTEX_LOCK(&dbm.allFramesWritten_mutex);
|
||||||
DEBUGLOG(4, "compressed size : %u ", (U32)dbm.out.pos);
|
DEBUGLOG(4, "compressed size : %u ", (U32)dbm.out.pos);
|
||||||
return dbm.out.pos;
|
return dbm.out.pos;
|
||||||
}
|
}
|
||||||
|
@ -159,8 +159,6 @@ static int BMK_benchMem(const void* srcBuffer, size_t srcSize,
|
|||||||
U32 nbBlocks;
|
U32 nbBlocks;
|
||||||
UTIL_time_t ticksPerSecond;
|
UTIL_time_t ticksPerSecond;
|
||||||
|
|
||||||
ZSTDMT_CCtx* const mtcctx = ZSTDMT_createCCtx(g_nbThreads);
|
|
||||||
|
|
||||||
/* checks */
|
/* checks */
|
||||||
if (!compressedBuffer || !resultBuffer || !blockTable || !ctx || !dctx)
|
if (!compressedBuffer || !resultBuffer || !blockTable || !ctx || !dctx)
|
||||||
EXM_THROW(31, "allocation error : not enough memory");
|
EXM_THROW(31, "allocation error : not enough memory");
|
||||||
@ -228,6 +226,8 @@ static int BMK_benchMem(const void* srcBuffer, size_t srcSize,
|
|||||||
const char* const marks[NB_MARKS] = { " |", " /", " =", "\\" };
|
const char* const marks[NB_MARKS] = { " |", " /", " =", "\\" };
|
||||||
U32 markNb = 0;
|
U32 markNb = 0;
|
||||||
|
|
||||||
|
ZSTDMT_CCtx* const mtcctx = ZSTDMT_createCCtx(g_nbThreads);
|
||||||
|
|
||||||
UTIL_getTime(&coolTime);
|
UTIL_getTime(&coolTime);
|
||||||
DISPLAYLEVEL(2, "\r%79s\r", "");
|
DISPLAYLEVEL(2, "\r%79s\r", "");
|
||||||
while (!cCompleted || !dCompleted) {
|
while (!cCompleted || !dCompleted) {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user