IpfixParser: return if message is too short to contain message header
IpfixReceiverSctpIpV4, TemplateBuffer, IpfixParser, IpfixRecord: destroy buffered templates of disconnected SCTP exporter git-svn-id: file:///Users/braun/svn/vermont/trunk/vermont@2125 aef3b71b-58ee-0310-9ba9-8811b9f0742fmaster
parent
46d1b3e009
commit
95fca71578
|
@ -3,4 +3,5 @@ SUBDIRS(
|
|||
core
|
||||
common
|
||||
modules
|
||||
tests
|
||||
)
|
||||
|
|
|
@ -749,7 +749,13 @@ uint32_t IpfixParser::processDataSet(boost::shared_ptr<IpfixRecord::SourceID> so
|
|||
* Process a NetflowV9 Packet
|
||||
* @return 0 on success
|
||||
*/
|
||||
int IpfixParser::processNetflowV9Packet(boost::shared_array<uint8_t> message, uint16_t length, boost::shared_ptr<IpfixRecord::SourceID> sourceId) {
|
||||
int IpfixParser::processNetflowV9Packet(boost::shared_array<uint8_t> message, uint16_t length, boost::shared_ptr<IpfixRecord::SourceID> sourceId)
|
||||
{
|
||||
if (length < sizeof(NetflowV9Header)) {
|
||||
msg(MSG_ERROR, "IpfixParser: Invalide NetFlowV9 message - message too short to contain header!");
|
||||
return -1;
|
||||
}
|
||||
|
||||
NetflowV9Header* header = (NetflowV9Header*)message.get();
|
||||
|
||||
/* pointer to first set */
|
||||
|
@ -792,9 +798,14 @@ int IpfixParser::processNetflowV9Packet(boost::shared_array<uint8_t> message, ui
|
|||
* Process an IPFIX Packet
|
||||
* @return 0 on success
|
||||
*/
|
||||
int IpfixParser::processIpfixPacket(boost::shared_array<uint8_t> message, uint16_t length, boost::shared_ptr<IpfixRecord::SourceID> sourceId) {
|
||||
int IpfixParser::processIpfixPacket(boost::shared_array<uint8_t> message, uint16_t length, boost::shared_ptr<IpfixRecord::SourceID> sourceId)
|
||||
{
|
||||
if (length < sizeof(IpfixHeader)) {
|
||||
msg(MSG_ERROR, "IpfixParser: Invalide IPFIX message - message too short to contain header!");
|
||||
return -1;
|
||||
}
|
||||
IpfixHeader* header = (IpfixHeader*)message.get();
|
||||
sourceId->observationDomainId = ntohl(header->observationDomainId);
|
||||
sourceId->observationDomainId = ntohl(header->observationDomainId);
|
||||
|
||||
if (ntohs(header->length) != length) {
|
||||
msg(MSG_ERROR, "IpfixParser: Bad message length - packet length is %#06x, header length field is %#06x\n", length, ntohs(header->length));
|
||||
|
@ -853,6 +864,11 @@ int IpfixParser::processIpfixPacket(boost::shared_array<uint8_t> message, uint16
|
|||
int IpfixParser::processPacket(boost::shared_array<uint8_t> message, uint16_t length, boost::shared_ptr<IpfixRecord::SourceID> sourceId)
|
||||
{
|
||||
pthread_mutex_lock(&mutex);
|
||||
if (length == 0) {
|
||||
templateBuffer->destroyBufferedTemplate(sourceId, 0, true);
|
||||
pthread_mutex_unlock(&mutex);
|
||||
return 0;
|
||||
}
|
||||
IpfixHeader* header = (IpfixHeader*)message.get();
|
||||
if (ntohs(header->version) == 0x000a) {
|
||||
int r = processIpfixPacket(message, length, sourceId);
|
||||
|
@ -875,7 +891,6 @@ int IpfixParser::processPacket(boost::shared_array<uint8_t> message, uint16_t le
|
|||
#endif
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Creates a new @c IpfixParser.
|
||||
* @return handle to created instance
|
||||
|
|
|
@ -132,10 +132,15 @@ void IpfixReceiverSctpIpV4::run() {
|
|||
rfd = accept(listen_socket, (struct sockaddr*)&clientAddress, &clientAddressLen);
|
||||
|
||||
if (rfd >= 0){
|
||||
FD_SET(rfd, &fd_array); // add new client to fd_array
|
||||
msg(MSG_DEBUG, "IpfixReceiverSctpIpV4: Client connected from %s:%d, FD=%d", inet_ntoa(clientAddress.sin_addr), ntohs(clientAddress.sin_port), rfd);
|
||||
if (rfd > maxfd){
|
||||
maxfd = rfd;
|
||||
if (isHostAuthorized(&clientAddress.sin_addr, sizeof(clientAddress.sin_addr))) {
|
||||
FD_SET(rfd, &fd_array); // add new client to fd_array
|
||||
msg(MSG_DEBUG, "IpfixReceiverSctpIpV4: Client connected from %s:%d, FD=%d", inet_ntoa(clientAddress.sin_addr), ntohs(clientAddress.sin_port), rfd);
|
||||
if (rfd > maxfd){
|
||||
maxfd = rfd;
|
||||
}
|
||||
} else {
|
||||
msg(MSG_DEBUG, "IpfixReceiverSctpIpV4: Connection from unwanted client %s:%d, FD=%d rejected.", inet_ntoa(clientAddress.sin_addr), ntohs(clientAddress.sin_port), rfd);
|
||||
close(rfd);
|
||||
}
|
||||
}else{
|
||||
msg(MSG_ERROR ,"accept() in ipfixReceiver failed");
|
||||
|
@ -146,31 +151,30 @@ void IpfixReceiverSctpIpV4::run() {
|
|||
for (rfd = listen_socket + 1; rfd <= maxfd; ++rfd) {
|
||||
if (FD_ISSET(rfd, &readfds)) {
|
||||
boost::shared_array<uint8_t> data(new uint8_t[MAX_MSG_LEN]);
|
||||
ret = recvfrom(rfd, data.get(), MAX_MSG_LEN, 0,
|
||||
(struct sockaddr*)&clientAddress, &clientAddressLen);
|
||||
if (ret == 0) { // shut down initiated
|
||||
ret = recvfrom(rfd, data.get(), MAX_MSG_LEN, 0, (struct sockaddr*)&clientAddress, &clientAddressLen);
|
||||
if (ret < 0) { // error
|
||||
msg(MSG_ERROR, "IpfixReceiverSctpIpV4: Client error (%s), close connection.", inet_ntoa(clientAddress.sin_addr));
|
||||
close(rfd);
|
||||
// we treat an error like a shut down, so overwrite return value to zero
|
||||
ret = 0;
|
||||
}
|
||||
// create sourceId
|
||||
boost::shared_ptr<IpfixRecord::SourceID> sourceID(new IpfixRecord::SourceID);
|
||||
memcpy(sourceID->exporterAddress.ip, &clientAddress.sin_addr.s_addr, 4);
|
||||
sourceID->exporterAddress.len = 4;
|
||||
sourceID->exporterPort = ntohs(clientAddress.sin_port);
|
||||
sourceID->protocol = IPFIX_protocolIdentifier_SCTP;
|
||||
sourceID->receiverPort = receiverPort;
|
||||
sourceID->fileDescriptor = rfd;
|
||||
// send packet to all packet processors
|
||||
mutex.lock();
|
||||
for (std::list<IpfixPacketProcessor*>::iterator i = packetProcessors.begin(); i != packetProcessors.end(); ++i) {
|
||||
(*i)->processPacket(data, ret, sourceID);
|
||||
}
|
||||
mutex.unlock();
|
||||
if (ret == 0) { // this was a shut down (or error)
|
||||
FD_CLR(rfd, &fd_array); // delete dead client
|
||||
msg(MSG_DEBUG, "IpfixReceiverSctpIpV4: Client disconnected");
|
||||
}else{
|
||||
if (isHostAuthorized(&clientAddress.sin_addr,
|
||||
sizeof(clientAddress.sin_addr))) {
|
||||
boost::shared_ptr<IpfixRecord::SourceID> sourceID(new IpfixRecord::SourceID);
|
||||
|
||||
memcpy(sourceID->exporterAddress.ip, &clientAddress.sin_addr.s_addr, 4);
|
||||
sourceID->exporterAddress.len = 4;
|
||||
sourceID->exporterPort = ntohs(clientAddress.sin_port);
|
||||
sourceID->protocol = IPFIX_protocolIdentifier_SCTP;
|
||||
sourceID->receiverPort = receiverPort;
|
||||
sourceID->fileDescriptor = rfd;
|
||||
mutex.lock();
|
||||
for (std::list<IpfixPacketProcessor*>::iterator i = packetProcessors.begin(); i != packetProcessors.end(); ++i) {
|
||||
(*i)->processPacket(data, ret, sourceID);
|
||||
}
|
||||
mutex.unlock();
|
||||
}
|
||||
else{
|
||||
msg(MSG_DEBUG, "packet from unauthorized host %s discarded", inet_ntoa(clientAddress.sin_addr));
|
||||
}
|
||||
msg(MSG_DEBUG, "IpfixReceiverSctpIpV4: Client %s disconnected", inet_ntoa(clientAddress.sin_addr));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -486,6 +486,19 @@ class IpfixRecord
|
|||
(memcmp(exporterAddress.ip, x.exporterAddress.ip, exporterAddress.len) == 0 );
|
||||
|
||||
}
|
||||
// compare two SourceIDs without considering Observation Domain ID
|
||||
bool equalIgnoringODID(const struct SourceID & x) const {
|
||||
if(protocol == 132) /* compare file descriptors instead of IP addresses because of possible multihoming */
|
||||
return (fileDescriptor == x.fileDescriptor);
|
||||
else /* UDP: fileDescriptor only specifies the Collector endpoint*/
|
||||
return (exporterPort == x.exporterPort) &&
|
||||
//(receiverPort == x.receiverPort) &&
|
||||
(fileDescriptor == x.fileDescriptor) &&
|
||||
//(protocol == x.protocol) &&
|
||||
(exporterAddress.len == x.exporterAddress.len) &&
|
||||
(memcmp(exporterAddress.ip, x.exporterAddress.ip, exporterAddress.len) == 0 );
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
boost::shared_ptr<IpfixRecord::SourceID> sourceID;
|
||||
|
|
|
@ -73,14 +73,16 @@ void TemplateBuffer::bufferTemplate(TemplateBuffer::BufferedTemplate* bt) {
|
|||
/**
|
||||
* Frees memory, marks Template unused.
|
||||
*/
|
||||
void TemplateBuffer::destroyBufferedTemplate(boost::shared_ptr<IpfixRecord::SourceID> sourceId, TemplateID templateId)
|
||||
void TemplateBuffer::destroyBufferedTemplate(boost::shared_ptr<IpfixRecord::SourceID> sourceId, TemplateID templateId, bool all)
|
||||
{
|
||||
TemplateBuffer::BufferedTemplate* predecessor = 0;
|
||||
TemplateBuffer::BufferedTemplate* bt = head;
|
||||
bool found = false;
|
||||
while (bt != 0) {
|
||||
/* templateId == setID means that all templates of this set type shall be removed */
|
||||
if ((*(bt->sourceID.get()) == *(sourceId.get())) && ((bt->templateID == templateId) || (bt->setID == templateId))) {
|
||||
/* templateId == setID means that all templates of this set type shall be removed for given sourceID */
|
||||
/* all == true means that all templates of given sourceID shall be removed */
|
||||
if (((*(bt->sourceID.get()) == *(sourceId.get())) && ((bt->templateID == templateId) || (bt->setID == templateId))
|
||||
|| (all && sourceId->equalIgnoringODID(*(bt->sourceID.get()))))) {
|
||||
found = true;
|
||||
if (predecessor != 0) {
|
||||
predecessor->next = bt->next;
|
||||
|
@ -90,7 +92,7 @@ void TemplateBuffer::destroyBufferedTemplate(boost::shared_ptr<IpfixRecord::Sour
|
|||
if (bt->setID == IPFIX_SetId_Template) {
|
||||
/* Invoke all registered callback functions */
|
||||
IpfixTemplateDestructionRecord* ipfixRecord = ipfixParser->templateDestructionRecordIM.getNewInstance();
|
||||
ipfixRecord->sourceID = sourceId;
|
||||
ipfixRecord->sourceID = bt->sourceID;
|
||||
ipfixRecord->templateInfo = bt->templateInfo;
|
||||
ipfixParser->ipfixRecordSender->send(ipfixRecord);
|
||||
} else
|
||||
|
@ -98,7 +100,7 @@ void TemplateBuffer::destroyBufferedTemplate(boost::shared_ptr<IpfixRecord::Sour
|
|||
if (bt->setID == NetflowV9_SetId_Template) {
|
||||
/* Invoke all registered callback functions */
|
||||
IpfixTemplateDestructionRecord* ipfixRecord = ipfixParser->templateDestructionRecordIM.getNewInstance();
|
||||
ipfixRecord->sourceID = sourceId;
|
||||
ipfixRecord->sourceID = bt->sourceID;
|
||||
ipfixRecord->templateInfo = bt->templateInfo;
|
||||
ipfixParser->ipfixRecordSender->send(ipfixRecord);
|
||||
} else
|
||||
|
@ -106,13 +108,13 @@ void TemplateBuffer::destroyBufferedTemplate(boost::shared_ptr<IpfixRecord::Sour
|
|||
if (bt->setID == IPFIX_SetId_OptionsTemplate) {
|
||||
/* Invoke all registered callback functions */
|
||||
IpfixOptionsTemplateDestructionRecord* ipfixRecord = ipfixParser->optionsTemplateDestructionRecordIM.getNewInstance();
|
||||
ipfixRecord->sourceID = sourceId;
|
||||
ipfixRecord->sourceID = bt->sourceID;
|
||||
ipfixRecord->optionsTemplateInfo = bt->optionsTemplateInfo;
|
||||
ipfixParser->ipfixRecordSender->send(ipfixRecord);
|
||||
} else if (bt->setID == IPFIX_SetId_DataTemplate) {
|
||||
/* Invoke all registered callback functions */
|
||||
IpfixDataTemplateDestructionRecord* ipfixRecord = ipfixParser->dataTemplateDestructionRecordIM.getNewInstance();
|
||||
ipfixRecord->sourceID = sourceId;
|
||||
ipfixRecord->sourceID = bt->sourceID;
|
||||
ipfixRecord->dataTemplateInfo = bt->dataTemplateInfo;
|
||||
ipfixParser->ipfixRecordSender->send(ipfixRecord);
|
||||
|
||||
|
|
|
@ -55,7 +55,9 @@ class TemplateBuffer {
|
|||
~TemplateBuffer();
|
||||
|
||||
TemplateBuffer::BufferedTemplate* getBufferedTemplate(boost::shared_ptr<IpfixRecord::SourceID> sourceId, TemplateID templateId);
|
||||
void destroyBufferedTemplate(boost::shared_ptr<IpfixRecord::SourceID> sourceId, TemplateID id);
|
||||
void destroyBufferedTemplate(boost::shared_ptr<IpfixRecord::SourceID> sourceId, TemplateID templateId, bool all = false);
|
||||
// templateId=2,3,4 means that all Templates, Option Templates, or Data Templates of given sourceID are destroyed
|
||||
// all=true overrides templateId parameter, so all Templates of given sourceID will be deleted
|
||||
void bufferTemplate(TemplateBuffer::BufferedTemplate* bt);
|
||||
TemplateBuffer::BufferedTemplate* getFirstBufferedTemplate();
|
||||
|
||||
|
|
|
@ -123,7 +123,7 @@ int main(int argc, char *argv[])
|
|||
#endif
|
||||
case 'u':
|
||||
// add UDP collector
|
||||
ret=ipfix_add_collector(my_exporter, "127.0.0.1", 4711, UDP);
|
||||
ret=ipfix_add_collector(my_exporter, "127.0.0.1", 1500, UDP);
|
||||
if (ret != 0) {
|
||||
fprintf(stderr, "ipfix_add_collector failed!\n");
|
||||
exit(-1);
|
||||
|
|
Loading…
Reference in New Issue