commit
32e684a36b
|
@ -55,7 +55,7 @@ IpfixReceiverZmq::IpfixReceiverZmq(std::vector<std::string> endpoints,
|
|||
THROWEXCEPTION("Could not create ZMQ poller, cannot start ZMQ Receiver");
|
||||
}
|
||||
|
||||
for (std::vector<std::string>::iterator i = endpoints.begin(); i != endpoints.end(); i++) {
|
||||
for (const auto& endpoint : endpoints) {
|
||||
zsock_t *sock = zsock_new(ZMQ_SUB);
|
||||
if (!sock) {
|
||||
THROWEXCEPTION("Could not create ZMQ socket");
|
||||
|
@ -64,7 +64,7 @@ IpfixReceiverZmq::IpfixReceiverZmq(std::vector<std::string> endpoints,
|
|||
zsock_set_sndhwm(sock, zmq_high_watermark);
|
||||
zsock_set_rcvhwm(sock, zmq_high_watermark);
|
||||
|
||||
if (zsock_connect(sock, "%s", (*i).c_str())) {
|
||||
if (zsock_connect(sock, "%s", (endpoint).c_str())) {
|
||||
THROWEXCEPTION("Could not connect ZMQ socket");
|
||||
}
|
||||
|
||||
|
@ -72,18 +72,18 @@ IpfixReceiverZmq::IpfixReceiverZmq(std::vector<std::string> endpoints,
|
|||
if (channels.empty()) {
|
||||
zsock_set_subscribe(sock, "");
|
||||
} else {
|
||||
for (std::vector<std::string>::iterator j = channels.begin(); j != channels.end(); j++) {
|
||||
zsock_set_subscribe(sock, (*j).c_str());
|
||||
for (const auto& channel : channels) {
|
||||
zsock_set_subscribe(sock, (channel).c_str());
|
||||
}
|
||||
}
|
||||
|
||||
if (zpoller_add(zpoller, sock)) {
|
||||
THROWEXCEPTION("Could not add %s ZMQ socket to ZMQ poller", (*i).c_str());
|
||||
THROWEXCEPTION("Could not add %s ZMQ socket to ZMQ poller", (endpoint).c_str());
|
||||
}
|
||||
|
||||
zmq_sockets.push_back(sock);
|
||||
|
||||
msg(LOG_NOTICE, "ZMQ Receiver listening on %s", (*i).c_str());
|
||||
msg(LOG_NOTICE, "ZMQ Receiver listening on %s", (endpoint).c_str());
|
||||
}
|
||||
|
||||
SensorManager::getInstance().addSensor(this, "IpfixReceiverZMQ", moduleId);
|
||||
|
@ -96,8 +96,8 @@ IpfixReceiverZmq::~IpfixReceiverZmq()
|
|||
{
|
||||
zpoller_destroy(&zpoller);
|
||||
|
||||
for (std::vector<zsock_t *>::iterator i = zmq_sockets.begin(); i != zmq_sockets.end(); i++) {
|
||||
zsock_destroy(&(*i));
|
||||
for (auto& zmq_socket : zmq_sockets) {
|
||||
zsock_destroy(&zmq_socket);
|
||||
}
|
||||
|
||||
msg(LOG_NOTICE, "Ipfix Receiver ZMQ poller and sockets destroyed");
|
||||
|
@ -120,12 +120,14 @@ void IpfixReceiverZmq::run()
|
|||
sourceID->fileDescriptor = 0;
|
||||
|
||||
while (!zsys_interrupted && !exitFlag) {
|
||||
DPRINTF_DEBUG("ZMQ Receiver: Waiting on Poller");
|
||||
void *sock = zpoller_wait(zpoller, zmq_poll_timeout);
|
||||
if (!sock) {
|
||||
if (zpoller_terminated(zpoller)) {
|
||||
msg(LOG_INFO, "ZMQ Receiver: ZMQ termination signal received");
|
||||
break;
|
||||
} else {
|
||||
DPRINTF_DEBUG("ZMQ Receiver: Poller Timeout");
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
@ -151,8 +153,8 @@ void IpfixReceiverZmq::run()
|
|||
|
||||
// send packet to all packet processors
|
||||
mutex.lock();
|
||||
for (std::list<IpfixPacketProcessor*>::iterator i = packetProcessors.begin(); i != packetProcessors.end(); i++) {
|
||||
(*i)->processPacket(data, zframe_size(current_frame), sourceID);
|
||||
for (auto& packetProcessor : packetProcessors) {
|
||||
(packetProcessor)->processPacket(data, zframe_size(current_frame), sourceID);
|
||||
}
|
||||
mutex.unlock();
|
||||
|
||||
|
|
Loading…
Reference in New Issue