working mongo connector need to add a global counter for exporter id

master
Philipp Fehre 2011-12-14 08:00:29 -08:00
parent 0cdbad66f1
commit ce425d8fbe
6 changed files with 86 additions and 60 deletions

View File

@ -73,14 +73,14 @@
</expiration>
<pollInterval unit="msec">1000</pollInterval>
<next>4</next>
<next>5</next>
</ipfixAggregator>
<ipfixDbWriterMongo id="4">
<host>127.0.0.1</host>
<collection>flows<collection>
<bufferrecords>5</bufferrecords>
<column>
<database>nasty</database>
<bufferobjects>5</bufferobjects>
<port>27017</port>
<properties>
<name>dstIP</name>
<name>srcIP</name>
<name>srcPort</name>
@ -94,10 +94,7 @@
<name>firstSwitchedMillis</name>
<name>lastSwitchedMillis</name>
<name>exporterID</name>
</column>
</properties>
</ipfixDbWriterMongo>
<ipfixPrinter id="5">
</ipfixPrinter>
</ipfixConfig>

0
configs/mongo/test.sh Normal file → Executable file
View File

View File

@ -3,7 +3,7 @@
<checkinterval>2</checkinterval>
</sensorManager>
<observer id="1">
<filename>oracletest.pcap</filename>
<filename>test.pcap</filename>
<pcap_filter>ip</pcap_filter>
<next>2</next>
</observer>

View File

@ -28,30 +28,34 @@
#include "IpfixDbWriterMongo.hpp"
#include "common/msg.h"
int IpfixDbWriterMongo::GEID = 0;
IpfixDbWriterMongo::Property identify [] = {
{CN_dstIP, "number", 0, IPFIX_TYPEID_destinationIPv4Address, 0},
{CN_srcIP, "number", 0, IPFIX_TYPEID_sourceIPv4Address, 0},
{CN_srcPort, "number", 0, IPFIX_TYPEID_sourceTransportPort, 0},
{CN_dstPort, "number", 0, IPFIX_TYPEID_destinationTransportPort, 0},
{CN_proto, "number", 0, IPFIX_TYPEID_protocolIdentifier, 0 },
{CN_dstTos, "number", 0, IPFIX_TYPEID_classOfServiceIPv4, 0},
{CN_bytes, "number", 0, IPFIX_TYPEID_octetDeltaCount, 0},
{CN_pkts, "number", 0, IPFIX_TYPEID_packetDeltaCount, 0},
{CN_firstSwitched, "number", 0, IPFIX_TYPEID_flowStartSeconds, 0}, // default value is invalid/not used for this ent
{CN_lastSwitched, "number", 0, IPFIX_TYPEID_flowEndSeconds, 0}, // default value is invalid/not used for this entry
{CN_firstSwitchedMillis, "number", 0, IPFIX_TYPEID_flowStartMilliSeconds, 0},
{CN_lastSwitchedMillis, "number", 0, IPFIX_TYPEID_flowEndMilliSeconds, 0},
{CN_tcpControlBits, "number", 0, IPFIX_TYPEID_tcpControlBits, 0},
{CN_dstIP, 0, IPFIX_TYPEID_destinationIPv4Address, 0},
{CN_srcIP, 0, IPFIX_TYPEID_sourceIPv4Address, 0},
{CN_srcPort, 0, IPFIX_TYPEID_sourceTransportPort, 0},
{CN_dstPort, 0, IPFIX_TYPEID_destinationTransportPort, 0},
{CN_proto, 0, IPFIX_TYPEID_protocolIdentifier, 0 },
{CN_dstTos, 0, IPFIX_TYPEID_classOfServiceIPv4, 0},
{CN_bytes, 0, IPFIX_TYPEID_octetDeltaCount, 0},
{CN_pkts, 0, IPFIX_TYPEID_packetDeltaCount, 0},
{CN_firstSwitched, 0, IPFIX_TYPEID_flowStartSeconds, 0}, // default value is invalid/not used for this ent
{CN_lastSwitched, 0, IPFIX_TYPEID_flowEndSeconds, 0}, // default value is invalid/not used for this entry
{CN_firstSwitchedMillis, 0, IPFIX_TYPEID_flowStartMilliSeconds, 0},
{CN_lastSwitchedMillis, 0, IPFIX_TYPEID_flowEndMilliSeconds, 0},
{CN_tcpControlBits, 0, IPFIX_TYPEID_tcpControlBits, 0},
//TODO: use enterprise number for the following extended types (Gerhard, 12/2009)
{CN_revbytes, "number", 0, IPFIX_TYPEID_octetDeltaCount, IPFIX_PEN_reverse},
{CN_revpkts, "number", 0, IPFIX_TYPEID_packetDeltaCount, IPFIX_PEN_reverse},
{CN_revFirstSwitched, "number", 0, IPFIX_TYPEID_flowStartSeconds, IPFIX_PEN_reverse}, // default value is invalid/not used for this entry
{CN_revLastSwitched, "number", 0, IPFIX_TYPEID_flowEndSeconds, IPFIX_PEN_reverse}, // default value is invalid/not used for this entry
{CN_revFirstSwitchedMillis, "number", 0, IPFIX_TYPEID_flowStartMilliSeconds, IPFIX_PEN_reverse},
{CN_revLastSwitchedMillis, "number", 0, IPFIX_TYPEID_flowEndMilliSeconds, IPFIX_PEN_reverse},
{CN_revTcpControlBits, "number", 0, IPFIX_TYPEID_tcpControlBits, IPFIX_PEN_reverse},
{CN_maxPacketGap, "number", 0, IPFIX_ETYPEID_maxPacketGap, IPFIX_PEN_vermont|IPFIX_PEN_reverse},
{CN_exporterID, "number", 0, EXPORTERID, 0},
{CN_revbytes, 0, IPFIX_TYPEID_octetDeltaCount, IPFIX_PEN_reverse},
{CN_revpkts, 0, IPFIX_TYPEID_packetDeltaCount, IPFIX_PEN_reverse},
{CN_revFirstSwitched, 0, IPFIX_TYPEID_flowStartSeconds, IPFIX_PEN_reverse},
// default value is invalid/not used for this entry
{CN_revLastSwitched, 0, IPFIX_TYPEID_flowEndSeconds, IPFIX_PEN_reverse},
// default value is invalid/not used for this entry
{CN_revFirstSwitchedMillis, 0, IPFIX_TYPEID_flowStartMilliSeconds, IPFIX_PEN_reverse},
{CN_revLastSwitchedMillis, 0, IPFIX_TYPEID_flowEndMilliSeconds, IPFIX_PEN_reverse},
{CN_revTcpControlBits, 0, IPFIX_TYPEID_tcpControlBits, IPFIX_PEN_reverse},
{CN_maxPacketGap, 0, IPFIX_ETYPEID_maxPacketGap, IPFIX_PEN_vermont|IPFIX_PEN_reverse},
{CN_exporterID, 0, EXPORTERID, 0},
{0} // last entry must be 0
};
@ -95,6 +99,17 @@ int IpfixDbWriterMongo::connectToDB()
}
}
//FIXME We need to identify the max Global Exporter Counter to insert new exporters with higher ID
/* mongo::BSONObj info;
ostringstream command;
command << "find_max = [];";
command << dbCollectionExporter << ".find([], {id : 1}).map(function(item){ ";
command << "if(item.id){ find_max.push(parseFloat(item.id)); }});";
command << "return Math.max.apply(Math, find_max);";
string cmd = command.str();
con.simpleCommand(dbCollectionExporter, &info, cmd);
GEID = info.getIntField("retval");
*/
msg(MSG_DEBUG,"IpfixDbWriterMongo: Mongo connection successful");
dbError = false;
return 0;
@ -108,7 +123,6 @@ void IpfixDbWriterMongo::processDataDataRecord(const IpfixRecord::SourceID& sour
IpfixRecord::Data* data)
{
mongo::BSONObj obj;
time_t flowStartSeconds;
msg(MSG_DEBUG, "IpfixDbWriter: Processing data record");
if (dbError) {
@ -120,11 +134,12 @@ void IpfixDbWriterMongo::processDataDataRecord(const IpfixRecord::SourceID& sour
/* get new insert */
if(srcId.observationDomainId != 0) {
// use default source id
obj = getInsertObj(flowStartSeconds, srcId, dataTemplateInfo, length, data);
obj = getInsertObj(srcId, dataTemplateInfo, length, data);
} else {
obj = getInsertObj(flowStartSeconds, sourceID, dataTemplateInfo, length, data);
obj = getInsertObj(sourceID, dataTemplateInfo, length, data);
}
// start new insert statement if necessary
if (numberOfInserts == 0) {
// start insert statement
@ -149,7 +164,7 @@ void IpfixDbWriterMongo::processDataDataRecord(const IpfixRecord::SourceID& sour
* loop over properties and template to get the IPFIX values in correct order to store in database
* The result is written to BSON Object, and flowstart is returned
*/
mongo::BSONObj IpfixDbWriterMongo::getInsertObj(time_t& flowstartsec, const IpfixRecord::SourceID& sourceID,
mongo::BSONObj IpfixDbWriterMongo::getInsertObj(const IpfixRecord::SourceID& sourceID,
TemplateInfo& dataTemplateInfo,uint16_t length, IpfixRecord::Data* data)
{
uint64_t intdata = 0;
@ -158,7 +173,7 @@ mongo::BSONObj IpfixDbWriterMongo::getInsertObj(time_t& flowstartsec, const Ipfi
bool notfound, notfound2;
mongo::BSONObjBuilder obj;
flowstartsec = 0;
time_t flowstartsec = 0;
/**loop over the properties and loop over the IPFIX_TYPEID of the record
to get the corresponding data to store and make insert statement*/
@ -178,10 +193,13 @@ mongo::BSONObj IpfixDbWriterMongo::getInsertObj(time_t& flowstartsec, const Ipfi
if(dataTemplateInfo.fieldCount > 0) {
// look inside the ipfix record
for(k=0; k < dataTemplateInfo.fieldCount; k++) {
if(dataTemplateInfo.fieldInfo[k].type.enterprise == prop->enterprise && dataTemplateInfo.fieldInfo[k].type.id == prop->ipfixId) {
if( dataTemplateInfo.fieldInfo[k].type.enterprise == prop->enterprise &&
dataTemplateInfo.fieldInfo[k].type.id == prop->ipfixId) {
notfound = false;
intdata = getData(dataTemplateInfo.fieldInfo[k].type,(data+dataTemplateInfo.fieldInfo[k].offset));
DPRINTF("IpfixDbWriterMongo::getData: really saw ipfix id %d in packet with intdata %llX, type %d, length %d and offset %X", prop->ipfixId, intdata, dataTemplateInfo.fieldInfo[k].type.id, dataTemplateInfo.fieldInfo[k].type.length, dataTemplateInfo.fieldInfo[k].offset);
DPRINTF("IpfixDbWriterMongo::getData: really saw ipfix id %d in packet with intdata %llX, type %d, length %d and offset %X",
prop->ipfixId, intdata, dataTemplateInfo.fieldInfo[k].type.id, dataTemplateInfo.fieldInfo[k].type.length,
dataTemplateInfo.fieldInfo[k].offset);
break;
}
}
@ -313,11 +331,9 @@ mongo::BSONObj IpfixDbWriterMongo::getInsertObj(time_t& flowstartsec, const Ipfi
}
}
DPRINTF("saw ipfix id %d in packet with intdata %llX", prop->ipfixId, intdata);
std::ostringstream o;
o << intdata;
obj << prop->propertyName << o.str();
o.str("");
msg(MSG_DEBUG, "saw ipfix id %s in packet with intdata %llX", prop->propertyName,
static_cast<int64_t>(intdata));
obj << prop->propertyName << static_cast<int64_t>(intdata);
}
if (flowstartsec == 0) {
@ -341,7 +357,6 @@ int IpfixDbWriterMongo::writeToDb()
return 0;
}
// FIXME
/**
* Returns the id of the exporter collection entry or 0 in the case of an error
*/
@ -367,18 +382,19 @@ int IpfixDbWriterMongo::getExporterID(const IpfixRecord::SourceID& sourceID)
// convert IP address (correct host byte order since 07/2010)
expIp = sourceID.exporterAddress.toUInt32();
mongo::BSONObj exporter = con.findOne(dbCollectionExporter, QUERY("sourceID" << sourceID.observationDomainId << "srcIp" << expIp));
// search exporter collection
// sql << "SELECT id FROM exporter WHERE sourceID=" << sourceID.observationDomainId << " AND srcIp=" << expIp;
// msg(MSG_DEBUG, "IpfixDbWriterMongo: SQL Query: %s", sql.str().c_str());
if(exporter.isEmpty()){
mongo::BSONObjBuilder b;
id = GEID++;
b << "sourceID" << sourceID.observationDomainId << "srcIP" << expIp << "id" << id;
mongo::BSONObj obj = b.obj();
con.insert(dbCollectionExporter, obj);
} else {
id = exporter.getIntField("id");
}
// insert new entry in exporter table since it is not found
if(id == -1)
{
//sql << "INSERT INTO exporter (ID,sourceID,srcIP) VALUES ( 0 ,'" << sourceID.observationDomainId << "','" << expIp << "')";
//msg(MSG_DEBUG, "IpfixDbWriterMongo: SQL Query: %s", sql.str().c_str());
}
// insert exporter in cache
// insert exporter in cache
ExporterCacheEntry tmp = {sourceID, id};
exporterCache.push_front(tmp);
@ -460,7 +476,20 @@ IpfixDbWriterMongo::IpfixDbWriterMongo(const string& hostname, const string& dat
dbCollectionFlows.append(dbName).append(".flows");
dbCollectionExporter.append(dbName).append(".exporter");
if(propertyNames.empty())
/* get properties */
for(vector<string>::const_iterator prop = propertyNames.begin(); prop != propertyNames.end(); prop++) {
i = 0;
while(identify[i].propertyName != 0) {
if(prop->compare(identify[i].propertyName) == 0) {
Property p = identify[i];
documentProperties.push_back(p);
break;
}
i++;
}
}
if(propertyNames.empty())
THROWEXCEPTION("IpfixDbWriterMongo: cannot initiate with no properties");
if(connectToDB() != 0)

View File

@ -75,7 +75,6 @@ class IpfixDbWriterMongo
*/
struct Property {
const char* propertyName; /** column name */
const char* propertyType; /** column data type in database */
uint64_t defaultValue; /** default value */
InformationElement::IeId ipfixId; /** IPFIX_TYPEID */
InformationElement::IeEnterpriseNumber enterprise; /** enterprise number */
@ -83,13 +82,14 @@ class IpfixDbWriterMongo
private:
static const unsigned MAX_EXPORTER = 10; // maximum numbers of cached exporters
static int GEID;
/**
* Struct buffers ODID, IP address and row index of an exporter
*/
struct ExporterCacheEntry {
IpfixRecord::SourceID sourceID;/** source id of the exporter */
int id; /** Id entry of sourcID and expIP in the ExporterTable */
int id; /** Id entry of sourcID and expIP in the ExporterTable */
};
@ -108,7 +108,7 @@ class IpfixDbWriterMongo
unsigned dbPort;
mongo::DBClientConnection con;
bool dbError; // db error flag
mongo::BSONObj getInsertObj(time_t& flowstartsec, const IpfixRecord::SourceID& sourceID,
mongo::BSONObj getInsertObj(const IpfixRecord::SourceID& sourceID,
TemplateInfo& dataTemplateInfo,uint16_t length, IpfixRecord::Data* data);
int writeToDb();
int getExporterID(const IpfixRecord::SourceID& sourceID);

View File

@ -33,7 +33,7 @@ IpfixDbWriterMongoCfg* IpfixDbWriterMongoCfg::create(XMLElement* e)
IpfixDbWriterMongoCfg::IpfixDbWriterMongoCfg(XMLElement* elem)
: CfgHelper<IpfixDbWriterMongo, IpfixDbWriterMongoCfg>(elem, "ipfixDbWriter"),
: CfgHelper<IpfixDbWriterMongo, IpfixDbWriterMongoCfg>(elem, "ipfixDbWriterMongo"),
port(27017), bufferObjects(30), observationDomainId(0)
{
if (!elem) return;