added transportOctetDeltaCount (counts number of transport layer bytes transferred using TCP's sequence numbers)

git-svn-id: file:///Users/braun/svn/vermont/branches/vermont/new-template@2371 aef3b71b-58ee-0310-9ba9-8811b9f0742f
This commit is contained in:
limmer 2010-05-20 12:23:35 +00:00
parent 2419c78469
commit 88adca5fdb
10 changed files with 124 additions and 21 deletions

View File

@ -88,6 +88,12 @@
*/
#define HT_DEFAULT_BITSIZE 17
/**
* defines maximum window size of TCP connections (used in PacketHashtable for tracking
* number of transferred bytes)
*/
#define HT_MAX_TCP_WINDOW_SIZE (1<<26)
/**
* defines interval in milliseconds, how often DelayedDeleter is called

View File

@ -154,6 +154,7 @@ int string2typelength(const char*s);
#define IPFIX_ETYPEID_dpaFlowCount (IPFIX_VERMONT_SPECIFIC_TYPE | 0x84)
#define IPFIX_ETYPEID_dpaForcedExport (IPFIX_VERMONT_SPECIFIC_TYPE | 0x85)
#define IPFIX_ETYPEID_dpaReverseStart (IPFIX_VERMONT_SPECIFIC_TYPE | 0x86)
#define IPFIX_ETYPEID_transportOctetDeltaCount (IPFIX_VERMONT_SPECIFIC_TYPE | 0x87)
// information elements for biflows
#define IPFIX_REVERSE_TYPE 0x0400 // temporary solution until enterprise number 29305 is used
@ -170,6 +171,7 @@ int string2typelength(const char*s);
#define IPFIX_ETYPEID_revFrontPayloadLen (IPFIX_ETYPEID_frontPayloadLen | IPFIX_REVERSE_TYPE)
#define IPFIX_ETYPEID_revFrontPayloadPktCount (IPFIX_ETYPEID_frontPayloadPktCount | IPFIX_REVERSE_TYPE)
#define IPFIX_ETYPEID_revMaxPacketGap (IPFIX_ETYPEID_maxPacketGap | IPFIX_REVERSE_TYPE)
#define IPFIX_ETYPEID_revTransportOctetDeltaCount (IPFIX_ETYPEID_transportOctetDeltaCount | IPFIX_REVERSE_TYPE)
#define IPFIX_LENGTH_ipVersion IPFIX_LENGTH_octet
@ -272,6 +274,7 @@ int string2typelength(const char*s);
#define IPFIX_ELENGTH_dpaFlowCount IPFIX_LENGTH_unsigned32
#define IPFIX_ELENGTH_dpaForcedExport IPFIX_LENGTH_octet
#define IPFIX_ELENGTH_dpaReverseStart IPFIX_LENGTH_octet
#define IPFIX_ELENGTH_transportOctetDeltaCount IPFIX_LENGTH_unsigned64
// lengths for biflows elements
@ -289,6 +292,7 @@ int string2typelength(const char*s);
#define IPFIX_ELENGTH_revMaxPacketGap IPFIX_LENGTH_unsigned32
#define IPFIX_ELENGTH_frontPayloadPktCount IPFIX_LENGTH_unsigned32
#define IPFIX_ELENGTH_revFrontPayloadPktCount IPFIX_LENGTH_unsigned32
#define IPFIX_ELENGTH_revTransportOctetDeltaCount IPFIX_LENGTH_unsigned64
#define MAX_MSG_LEN 65536

View File

@ -128,7 +128,9 @@ struct ipfix_identifier ipfixids[] = {
{ IPFIX_ETYPEID_revFrontPayload, 0, "revFrontPayload" },
{ IPFIX_ETYPEID_revFrontPayloadLen, IPFIX_ELENGTH_revFrontPayloadLen, "revFrontPayloadLen" },
{ IPFIX_ETYPEID_revFrontPayloadPktCount, IPFIX_ELENGTH_revFrontPayloadPktCount, "revFrontPayloadPktCount" },
{ IPFIX_ETYPEID_revMaxPacketGap, IPFIX_ELENGTH_revMaxPacketGap, "revMaxPacketGap" }
{ IPFIX_ETYPEID_revMaxPacketGap, IPFIX_ELENGTH_revMaxPacketGap, "revMaxPacketGap" },
{ IPFIX_ETYPEID_transportOctetDeltaCount, IPFIX_ELENGTH_transportOctetDeltaCount, "transportOctetDeltaCount" },
{ IPFIX_ETYPEID_revTransportOctetDeltaCount, IPFIX_ELENGTH_revTransportOctetDeltaCount, "revTransportOctetDeltaCount" }
};

View File

@ -188,6 +188,10 @@ Connection::Connection(IpfixDataRecord* record)
if (fi != 0) dpaReverseStart = *(uint8_t*)(record->data + fi->offset);
fi = record->templateInfo->getFieldInfo(IPFIX_ETYPEID_dpaFlowCount, 0);
if (fi != 0) dpaFlowCount = ntohl(*(uint32_t*)(record->data + fi->offset));
fi = record->templateInfo->getFieldInfo(IPFIX_ETYPEID_transportOctetDeltaCount, 0);
if (fi != 0) srcTransOctets = ntohll(*(uint64_t*)(record->data + fi->offset));
fi = record->templateInfo->getFieldInfo(IPFIX_ETYPEID_revTransportOctetDeltaCount, 0);
if (fi != 0) dstTransOctets = ntohll(*(uint64_t*)(record->data + fi->offset));
}
Connection::~Connection()

View File

@ -54,6 +54,8 @@ class Connection
uint64_t dstTimeEnd; /**< milliseconds since 1970, host-byte order */
uint64_t srcOctets; /**< network-byte order! **/
uint64_t dstOctets; /**< network-byte order! **/
uint64_t srcTransOctets; /**< host-byte order! **/
uint64_t dstTransOctets; /**< host-byte order! **/
uint64_t srcPackets; /**< network-byte order! **/
uint64_t dstPackets; /**< network-byte order! **/
uint8_t srcTcpControlBits;

View File

@ -293,7 +293,7 @@ IpfixPrinter::IpfixPrinter(OutputType outputtype, string filename)
}
if (outputtype==TABLE)
fprintf(fh, "srcip\tdstip\tsrcport\tdstport\tprot\tsrcpkts\tdstpkts\tsrcoct\tdstoct\tsrcstart\tsrcend\tdststart\tdstend\tsrcplen\tdstplen\tforcedexp\trevstart\tflowcnt\n");
fprintf(fh, "srcip\tdstip\tsrcport\tdstport\tprot\tsrcpkts\tdstpkts\tsrcoct\tdstoct\tsrcstart\tsrcend\tdststart\tdstend\tsrcplen\tdstplen\tforcedexp\trevstart\tflowcnt\ttranoct\trevtranoct\n");
}
/**
@ -642,11 +642,11 @@ void IpfixPrinter::printTableRecord(IpfixDataRecord* record)
Connection c(record);
//fprintf(fh, "%llu\t%llu\t%u\t%u\t%llu\n", ntohll(c.srcOctets), ntohll(c.srcPackets), c.srcPayloadLen, c.srcPayloadPktCount, c.srcTimeEnd-c.srcTimeStart);
fprintf(fh, "%s\t%s\t%hu\t%hu\t%hhu\t%llu\t%llu\t%llu\t%llu\t%llu\t%llu\t%llu\t%llu\t%u\t%u\t%hhu\t%hhu\t%u\n",
fprintf(fh, "%s\t%s\t%hu\t%hu\t%hhu\t%llu\t%llu\t%llu\t%llu\t%llu\t%llu\t%llu\t%llu\t%u\t%u\t%hhu\t%hhu\t%u\t%llu\t%llu\n",
IPToString(c.srcIP).c_str(), IPToString(c.dstIP).c_str(), ntohs(c.srcPort), ntohs(c.dstPort), c.protocol,
(long long unsigned)ntohll(c.srcPackets), (long long unsigned)ntohll(c.dstPackets), (long long unsigned)ntohll(c.srcOctets), (long long unsigned)ntohll(c.dstOctets),
(long long unsigned)c.srcTimeStart, (long long unsigned)c.srcTimeEnd, (long long unsigned)c.dstTimeStart, (long long unsigned)c.dstTimeEnd,
c.srcPayloadLen, c.dstPayloadLen, c.dpaForcedExport, c.dpaReverseStart, c.dpaFlowCount);
c.srcPayloadLen, c.dstPayloadLen, c.dpaForcedExport, c.dpaReverseStart, c.dpaFlowCount, c.srcTransOctets, c.dstTransOctets);
}

View File

@ -6,12 +6,12 @@
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
@ -190,6 +190,8 @@ namespace InformationElement {
case IPFIX_ETYPEID_dpaForcedExport:
case IPFIX_ETYPEID_dpaFlowCount:
case IPFIX_ETYPEID_dpaReverseStart:
case IPFIX_ETYPEID_transportOctetDeltaCount:
case IPFIX_ETYPEID_revTransportOctetDeltaCount:
return Packet::IPProtocolType(Packet::UDP|Packet::TCP);
case IPFIX_TYPEID_tcpControlBits:
@ -206,9 +208,9 @@ namespace InformationElement {
/* Methods of TemplateInfo class */
TemplateInfo::TemplateInfo() : templateId(0), setId(UnknownSetId), fieldCount(0), fieldInfo(NULL),
TemplateInfo::TemplateInfo() : templateId(0), setId(UnknownSetId), fieldCount(0), fieldInfo(NULL),
freePointers(true),
scopeCount(0), scopeInfo(NULL), dataCount(0), dataInfo(NULL), preceding(0), dataLength(0), data(NULL),
scopeCount(0), scopeInfo(NULL), dataCount(0), dataInfo(NULL), preceding(0), dataLength(0), data(NULL),
uniqueId(0)
{
setUniqueId();
@ -269,7 +271,7 @@ TemplateInfo::~TemplateInfo() {
mutex().unlock();
}
void TemplateInfo::setUniqueId()
void TemplateInfo::setUniqueId()
{
mutex().lock();
uint16_t oldId = uniqueId;

View File

@ -81,6 +81,8 @@ uint32_t BaseHashtable::getPrivateDataLength(const InformationElement::IeInfo& t
switch (type.id) {
case IPFIX_ETYPEID_frontPayload:
case IPFIX_ETYPEID_revFrontPayload:
case IPFIX_ETYPEID_transportOctetDeltaCount:
case IPFIX_ETYPEID_revTransportOctetDeltaCount:
return sizeof(PayloadPrivateData);
case IPFIX_ETYPEID_dpaForcedExport:
@ -381,6 +383,8 @@ int BaseHashtable::isToBeAggregated(const InformationElement::IeInfo& type)
case IPFIX_ETYPEID_dpaForcedExport:
case IPFIX_ETYPEID_dpaFlowCount:
case IPFIX_ETYPEID_dpaReverseStart:
case IPFIX_ETYPEID_transportOctetDeltaCount:
case IPFIX_ETYPEID_revTransportOctetDeltaCount:
return 1;
case IPFIX_TYPEID_octetTotalCount:

View File

@ -123,6 +123,27 @@ void PacketHashtable::copyDataNanoseconds(CopyFuncParameters* cfp)
}
#endif
}
void PacketHashtable::copyDataTransportOctets(CopyFuncParameters* cfp)
{
const Packet* p = cfp->packet;
uint16_t plen = p->data_length-p->payloadOffset;
if (p->payloadOffset==0 || p->payloadOffset==p->transportHeaderOffset) plen = 0;
*reinterpret_cast<uint64_t*>(cfp->dst+cfp->efd->dstIndex) = htonll(plen);
PayloadPrivateData* ppd;
switch (cfp->packet->ipProtocolType) {
case Packet::TCP:
ppd = reinterpret_cast<PayloadPrivateData*>(cfp->dst+cfp->efd->privDataOffset);
ppd->seq = ntohl(*reinterpret_cast<const uint32_t*>(p->data+p->transportHeaderOffset+4))+plen+(p->data[p->transportHeaderOffset+13] & 0x02 ? 1 : 0);
ppd->initialized = true;
break;
default:
break;
}
DPRINTFL(MSG_VDEBUG, "%s=%llu, ppd->seq=%u", typeid2string(cfp->efd->typeId), ntohll(*reinterpret_cast<uint64_t*>(cfp->dst+cfp->efd->dstIndex)), ntohl(ppd->seq));
}
/**
@ -294,6 +315,7 @@ void (*PacketHashtable::getCopyDataFunction(const ExpFieldData* efd))(CopyFuncPa
case IPFIX_TYPEID_flowEndNanoSeconds:
case IPFIX_TYPEID_octetDeltaCount:
case IPFIX_TYPEID_packetDeltaCount:
case IPFIX_ETYPEID_transportOctetDeltaCount:
if (efd->dstLength != 8) {
THROWEXCEPTION("unsupported length %d for type %d (\"%s\")", efd->dstLength, efd->typeId, typeid2string(efd->typeId));
}
@ -323,6 +345,8 @@ void (*PacketHashtable::getCopyDataFunction(const ExpFieldData* efd))(CopyFuncPa
return copyDataFrontPayloadNoInit;
} else if (efd->typeId == IPFIX_ETYPEID_frontPayloadLen) {
return copyDataDummy;
} else if (efd->typeId == IPFIX_ETYPEID_transportOctetDeltaCount) {
return copyDataTransportOctets;
} else if (efd->typeId == IPFIX_ETYPEID_maxPacketGap) {
return copyDataMaxPacketGap;
} else if (efd->typeId == IPFIX_ETYPEID_frontPayloadPktCount ||
@ -406,6 +430,7 @@ uint8_t PacketHashtable::getRawPacketFieldLength(const InformationElement::IeInf
case IPFIX_TYPEID_flowEndNanoSeconds:
//case IPFIX_ETYPEID_revFlowStartNanoSeconds:
//case IPFIX_ETYPEID_revFlowEndNanoSeconds:
case IPFIX_ETYPEID_transportOctetDeltaCount:
return 8;
case IPFIX_ETYPEID_frontPayload:
@ -551,6 +576,7 @@ bool PacketHashtable::isRawPacketPtrVariable(const InformationElement::IeInfo& t
case IPFIX_TYPEID_tcpControlBits:
case IPFIX_ETYPEID_frontPayload:
case IPFIX_ETYPEID_frontPayloadLen:
case IPFIX_ETYPEID_transportOctetDeltaCount:
return true;
}
@ -651,6 +677,7 @@ bool PacketHashtable::typeAvailable(const InformationElement::IeInfo& type)
case IPFIX_ETYPEID_dpaForcedExport:
case IPFIX_ETYPEID_dpaFlowCount:
case IPFIX_ETYPEID_dpaReverseStart:
case IPFIX_ETYPEID_transportOctetDeltaCount:
return true;
}
@ -727,6 +754,7 @@ void PacketHashtable::buildExpHelperTable()
THROWEXCEPTION("Type '%s' is not contained in raw packet. Please remove it from PacketAggregator rule.", typeid2string(hfi->type.id));
}
if (!isToBeAggregated(hfi->type)) continue;
DPRINTF("including type %s.", typeid2string(hfi->type.id));
ExpFieldData* efd = &expHelperTable.aggFields[expHelperTable.noAggFields++];
fillExpFieldData(efd, hfi, fieldModifier[i], expHelperTable.noAggFields-1);
expagg2field.push_back(i);
@ -878,10 +906,12 @@ boost::shared_array<IpfixRecord::Data> PacketHashtable::buildBucketData(const Pa
bzero(data, fieldLength+privDataLength);
CopyFuncParameters cfp;
cfp.dst = data;
cfp.packet = p;
// copy all data ...
for (vector<ExpFieldData*>::const_iterator iter=expHelperTable.allFields.begin(); iter!=expHelperTable.allFields.end(); iter++) {
ExpFieldData* efd = *iter;
cfp.dst = data;
cfp.src = reinterpret_cast<IpfixRecord::Data*>(p->netHeader)+efd->srcIndex;
cfp.efd = efd;
efd->copyDataFunc(&cfp);
@ -901,6 +931,9 @@ void PacketHashtable::aggregateField(const ExpFieldData* efd, HashtableBucket* h
uint64_t ntptime;
uint64_t ntp2;
PayloadPrivateData* ppd;
const Packet* p;
uint16_t plen;
switch (efd->typeId) {
case IPFIX_TYPEID_flowStartSeconds:
@ -991,11 +1024,6 @@ void PacketHashtable::aggregateField(const ExpFieldData* efd, HashtableBucket* h
aggregateFrontPayload(data, hbucket, reinterpret_cast<const Packet*>(deltaData), efd, false, false);
break;
/*case IPFIX_ETYPEID_frontPayloadLen:
case IPFIX_ETYPEID_revFrontPayloadLen:
*(uint32_t*)baseData = htonl(reinterpret_cast<PayloadPrivateData*>(bucket+efd->privDataOffset)->byteCount);
break;*/
case IPFIX_ETYPEID_maxPacketGap:
case IPFIX_ETYPEID_revMaxPacketGap:
gap = (int64_t)ntohll(*(int64_t*)deltaData)-(int64_t)ntohll(*reinterpret_cast<const uint64_t*>(data+efd->privDataOffset));
@ -1011,6 +1039,51 @@ void PacketHashtable::aggregateField(const ExpFieldData* efd, HashtableBucket* h
// ignore these fields, as FPA aggregation does everything needed
break;
case IPFIX_ETYPEID_transportOctetDeltaCount:
case IPFIX_ETYPEID_revTransportOctetDeltaCount:
{
p = reinterpret_cast<const Packet*>(deltaData);
if (!(p->ipProtocolType==Packet::TCP || p->ipProtocolType==Packet::UDP)) break;
plen = p->data_length-p->payloadOffset;
if (p->payloadOffset==0 || p->payloadOffset==p->transportHeaderOffset || plen==0) break;
uint64_t seq;
switch (p->ipProtocolType) {
case Packet::TCP:
ppd = reinterpret_cast<PayloadPrivateData*>(data+efd->privDataOffset);
seq = ntohl(*reinterpret_cast<const uint32_t*>(p->data+p->transportHeaderOffset+4));
if (!ppd->initialized) {
ppd->seq = ntohl(*reinterpret_cast<const uint32_t*>(p->data+p->transportHeaderOffset+4))+plen+(p->data[p->transportHeaderOffset+13] & 0x02 ? 1 : 0);
*reinterpret_cast<uint64_t*>(baseData) = htonll(plen);
ppd->initialized = true;
break;
}
if (seq+plen>ppd->seq && seq+plen<ppd->seq+HT_MAX_TCP_WINDOW_SIZE) {
*reinterpret_cast<uint64_t*>(baseData) = htonll(seq-ppd->seq+plen+ntohll(*reinterpret_cast<uint64_t*>(baseData)));
ppd->seq = seq+plen;
} else if (0x100000000LL+seq+plen>ppd->seq && 0x100000000LL+seq+plen<ppd->seq+HT_MAX_TCP_WINDOW_SIZE) { // wrap-around
*reinterpret_cast<uint64_t*>(baseData) = htonll(0x100000000LL+seq-ppd->seq+plen+ntohll(*reinterpret_cast<uint64_t*>(baseData)));
ppd->seq = seq+plen;
}
DPRINTFL(MSG_VDEBUG, "%s=%llu, ppd->seq=%u", typeid2string(efd->typeId), ntohll(*reinterpret_cast<uint64_t*>(baseData)), ntohl(ppd->seq));
break;
case Packet::UDP:
DPRINTF("blub udp");
*reinterpret_cast<uint64_t*>(baseData) = htonll(plen+ntohll(*reinterpret_cast<uint64_t*>(baseData)));
break;
default:
DPRINTF("blub default");
break;
}
break;
}
// no other types needed, as this is only for raw field input
default:
DPRINTF("non-aggregatable type: %d", efd->typeId);
@ -1025,23 +1098,25 @@ void PacketHashtable::aggregateFlow(HashtableBucket* bucket, const Packet* p, bo
{
IpfixRecord::Data* data = bucket->data.get();
if (!reverse) {
for (int i=0; i<expHelperTable.noAggFields; i++) {
for (int i=0; i<expHelperTable.noAggFields && !bucket->forceExpiry; i++) {
ExpFieldData* efd = &expHelperTable.aggFields[i];
aggregateField(efd, bucket, p->netHeader+efd->srcIndex, data);
}
} else {
for (int i=0; i<expHelperTable.noRevAggFields; i++) {
for (int i=0; i<expHelperTable.noRevAggFields && !bucket->forceExpiry; i++) {
ExpFieldData* efd = &expHelperTable.revAggFields[i];
aggregateField(efd, bucket, p->netHeader+efd->srcIndex, data);
}
}
// TODO: tobi_optimize
// replace call of time() with access to a static variable which is updated regularly (such as every 100ms)
bucket->expireTime = time(0) + minBufferTime;
if (!bucket->forceExpiry) {
bucket->expireTime = time(0) + minBufferTime;
if (bucket->forceExpireTime>bucket->expireTime) {
exportList.remove(bucket->listNode);
exportList.push(bucket->listNode);
if (bucket->forceExpireTime>bucket->expireTime) {
exportList.remove(bucket->listNode);
exportList.push(bucket->listNode);
}
}
}
@ -1163,6 +1238,8 @@ void PacketHashtable::updatePointers(const Packet* p)
// pointing to packet structure
case IPFIX_ETYPEID_frontPayload:
case IPFIX_ETYPEID_revFrontPayload:
case IPFIX_ETYPEID_transportOctetDeltaCount:
case IPFIX_ETYPEID_revTransportOctetDeltaCount:
efd->srcIndex = reinterpret_cast<uintptr_t>(p)-reinterpret_cast<uintptr_t>(p->netHeader);
break;

View File

@ -51,6 +51,7 @@ private:
*/
struct ExpFieldData;
struct CopyFuncParameters {
const Packet* packet;
IpfixRecord::Data* dst;
IpfixRecord::Data* src;
ExpFieldData* efd;
@ -151,6 +152,7 @@ private:
static void copyDataMaxPacketGap(CopyFuncParameters* cfp);
static void copyDataNanoseconds(CopyFuncParameters* cfp);
static void copyDataDummy(CopyFuncParameters* cfp);
static void copyDataTransportOctets(CopyFuncParameters* cfp);
static void aggregateFrontPayload(IpfixRecord::Data* bucket, HashtableBucket* hbucket, const Packet* src,
const ExpFieldData* efd, bool firstpacket, bool onlyinit);
void (*getCopyDataFunction(const ExpFieldData* efd))(CopyFuncParameters*);