commit
be70d52a3b
|
@ -21,6 +21,24 @@
|
|||
template<class T>
|
||||
class ConcurrentQueue
|
||||
{
|
||||
private:
|
||||
|
||||
inline bool do_pop(T* res)
|
||||
{
|
||||
lock.lock();
|
||||
*res = queue.front();
|
||||
queue.pop();
|
||||
poppedCount++;
|
||||
count--;
|
||||
lock.unlock();
|
||||
|
||||
pushSemaphore.post();
|
||||
|
||||
DPRINTF_DEBUG( "(%s) element popped", ownerName.c_str());
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
public:
|
||||
/**
|
||||
* default queue size
|
||||
|
@ -75,26 +93,10 @@ class ConcurrentQueue
|
|||
|
||||
inline bool pop(T* res)
|
||||
{
|
||||
DPRINTF_DEBUG( "(%s) trying to pop element (%d elements in queue)",
|
||||
(ownerName.empty() ? "<owner not set>" : ownerName.c_str()),
|
||||
maxEntries-pushSemaphore.getCount());
|
||||
if (!popSemaphore.wait()) {
|
||||
DPRINTF_INFO("(%s) failed to pop element, program is being shut down?", ownerName.c_str());
|
||||
return false;
|
||||
}
|
||||
|
||||
lock.lock();
|
||||
*res = queue.front();
|
||||
queue.pop();
|
||||
poppedCount++;
|
||||
count--;
|
||||
lock.unlock();
|
||||
|
||||
pushSemaphore.post();
|
||||
|
||||
DPRINTF_DEBUG( "(%s) element popped", ownerName.c_str());
|
||||
|
||||
return true;
|
||||
return do_pop(res);
|
||||
};
|
||||
|
||||
// try to pop an entry from the queue before timeout occurs
|
||||
|
@ -102,84 +104,22 @@ class ConcurrentQueue
|
|||
// of the timeout has been reached, res will be set to NULL and false will be returned
|
||||
inline bool pop(long timeout_ms, T *res)
|
||||
{
|
||||
DPRINTF_DEBUG( "(%s) trying to pop element (%d elements in queue)", ownerName.c_str(), count);
|
||||
// try to get an item from the queue
|
||||
if(!popSemaphore.wait(timeout_ms)) {
|
||||
// timeout occured
|
||||
DPRINTF_DEBUG( "(%s) timeout", ownerName.c_str());
|
||||
return false;
|
||||
}
|
||||
|
||||
// popSemaphore.wait() succeeded, now pop the frontmost element
|
||||
lock.lock();
|
||||
*res = queue.front();
|
||||
queue.pop();
|
||||
poppedCount++;
|
||||
count--;
|
||||
lock.unlock();
|
||||
|
||||
pushSemaphore.post();
|
||||
|
||||
DPRINTF_DEBUG( "(%s) element popped", ownerName.c_str());
|
||||
|
||||
return true;
|
||||
return do_pop(res);
|
||||
}
|
||||
|
||||
// like pop above, but with absolute time instead of delta.
|
||||
// use this instead of the above, makes things easier!
|
||||
inline bool popAbs(const struct timeval &timeout, T *res)
|
||||
{
|
||||
DPRINTF_DEBUG( "(%s) trying to pop element (%d elements in queue)", ownerName.c_str(), count);
|
||||
|
||||
if (popSemaphore.waitAbs(timeout)) {
|
||||
// popSemaphore.wait() succeeded, now pop the frontmost element
|
||||
lock.lock();
|
||||
*res = queue.front();
|
||||
queue.pop();
|
||||
poppedCount++;
|
||||
count--;
|
||||
lock.unlock();
|
||||
|
||||
pushSemaphore.post();
|
||||
|
||||
DPRINTF_DEBUG( "(%s) element popped", ownerName.c_str());
|
||||
|
||||
return true;
|
||||
} else {
|
||||
// timeout occured
|
||||
DPRINTF_DEBUG( "(%s) timeout or program shutdown", ownerName.c_str());
|
||||
*res = 0;
|
||||
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
// like pop above, but with absolute time instead of delta.
|
||||
// use this instead of the above, makes things easier!
|
||||
inline bool popAbs(const struct timespec& timeout, T *res)
|
||||
{
|
||||
DPRINTF_DEBUG( "(%s) trying to pop element (%d elements in queue)", ownerName.c_str(), count);
|
||||
|
||||
if (popSemaphore.waitAbs(timeout)) {
|
||||
// popSemaphore.wait() succeeded, now pop the frontmost element
|
||||
lock.lock();
|
||||
*res = queue.front();
|
||||
queue.pop();
|
||||
poppedCount++;
|
||||
count--;
|
||||
lock.unlock();
|
||||
|
||||
pushSemaphore.post();
|
||||
|
||||
DPRINTF_DEBUG( "(%s) element popped", ownerName.c_str());
|
||||
|
||||
return true;
|
||||
return do_pop(res);
|
||||
}
|
||||
else {
|
||||
// timeout occured
|
||||
DPRINTF_DEBUG( "(%s) timeout or program shutdown", ownerName.c_str());
|
||||
*res = 0;
|
||||
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -106,8 +106,6 @@ public:
|
|||
return false;
|
||||
break;
|
||||
default:
|
||||
// semaphore could not be aquired because of several reasons, but none are fatal
|
||||
DPRINTF_DEBUG( "timedwait (<0) returned with %d (%s)", errno, strerror(errno));
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
@ -120,7 +118,6 @@ public:
|
|||
retval = sem_timedwait(sem, &timeout);
|
||||
#endif
|
||||
if (retval != 0 && errno != ETIMEDOUT) {
|
||||
DPRINTF_DEBUG( "timedwait (>=0) returned with %d: %s", errno, strerror(errno));
|
||||
switch (errno) {
|
||||
case EINVAL:
|
||||
/*
|
||||
|
@ -141,8 +138,7 @@ public:
|
|||
return false;
|
||||
break;
|
||||
default:
|
||||
// semaphore could not be aquired because of several reasons, but none are fatal
|
||||
DPRINTF_DEBUG( "timedwait (>=0) returned with %d", errno);
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (errno == ETIMEDOUT) {
|
||||
|
@ -164,20 +160,6 @@ public:
|
|||
return true;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* see documentation for waitAbs(struct timespec)
|
||||
* has same functionality, just different parameter
|
||||
*/
|
||||
inline bool waitAbs(const struct timeval& timeout)
|
||||
{
|
||||
struct timespec ts;
|
||||
|
||||
TIMEVAL_TO_TIMESPEC(&timeout, &ts);
|
||||
|
||||
return waitAbs(ts);
|
||||
}
|
||||
|
||||
// like wait() but with absolute time instead of delta. makes things easier!
|
||||
// Use this instead of the above function
|
||||
inline bool waitAbs(const struct timespec& ts)
|
||||
|
@ -215,9 +197,6 @@ public:
|
|||
return false;
|
||||
break;
|
||||
default:
|
||||
// semaphore not acquired for non-fatal reasons
|
||||
DPRINTF_DEBUG( "timedwait returned with %s",
|
||||
strerror(errno));
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -239,12 +239,10 @@ private:
|
|||
struct timespec nexttimeout;
|
||||
if (!processTimeouts(nexttimeout)) {
|
||||
if (!queue.pop(&element)) {
|
||||
DPRINTF_INFO("queue.pop failed - timeout?");
|
||||
continue;
|
||||
}
|
||||
} else {
|
||||
if (!queue.popAbs(nexttimeout, &element)) {
|
||||
DPRINTF_INFO("queue.pop failed - timeout?");
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -171,15 +171,12 @@ void BaseAggregator::exporterThread()
|
|||
}
|
||||
|
||||
gettimeofday(&curtime, 0);
|
||||
DPRINTF_DEBUG("Aggregator: starting Export");
|
||||
for (size_t i = 0; i < rules->count; i++) {
|
||||
rules->rule[i]->hashtable->expireFlows();
|
||||
}
|
||||
struct timeval endtime;
|
||||
gettimeofday(&endtime, 0);
|
||||
timeval_subtract(&difftime, &endtime, &curtime);
|
||||
|
||||
DPRINTF_DEBUG("Aggregator: export took %.03f secs", (float)difftime.tv_usec/1000000+difftime.tv_sec);
|
||||
}
|
||||
|
||||
if (getShutdownProperly()) {
|
||||
|
|
Loading…
Reference in New Issue