Oracle DB Writer now uses common SQL backend

Changes compile currently, but could not yet been tested due to
lack of a working Oracle database
master
Lothar Braun 2012-10-27 18:08:27 +02:00
parent 27a352bee0
commit f906722dda
11 changed files with 117 additions and 717 deletions

View File

@ -21,6 +21,7 @@ else(ORACLE_INCLUDE_DIR AND ORACLE_CLIENT_LIBRARIES AND ORACLE_CONNECTION_LIBRAR
/usr/lib/oracle/xe/app/oracle/product/*/client/rdbms/public
/usr/include/oracle/*/client
/opt/oracle/product/*/client/rdbms/public
/opt/oracle/*/client/
)
find_library(ORACLE_CLIENT_LIBRARIES NAMES clntsh libclntsh
@ -28,6 +29,7 @@ else(ORACLE_INCLUDE_DIR AND ORACLE_CLIENT_LIBRARIES AND ORACLE_CONNECTION_LIBRAR
/usr/lib/oracle/xe/app/oracle/product/*/client/lib
/usr/lib/oracle/*/client/lib
/opt/oracle/product/*/client/lib
/opt/oracle/*/client/lib
)
find_library(ORACLE_CONNECTION_LIBRARIES NAMES occi libocci
@ -35,6 +37,7 @@ else(ORACLE_INCLUDE_DIR AND ORACLE_CLIENT_LIBRARIES AND ORACLE_CONNECTION_LIBRAR
/usr/lib/oracle/xe/app/oracle/product/*/client/lib
/usr/lib/oracle/*/client/lib
/opt/oracle/product/*/client/lib
/opt/oracle/*/client/lib
)
if(ORACLE_INCLUDE_DIR AND ORACLE_CLIENT_LIBRARIES AND ORACLE_CONNECTION_LIBRARIES)

View File

@ -118,7 +118,6 @@ ADD_LIBRARY(modules
ipfix/database/IpfixDbWriterPg.cpp
ipfix/database/IpfixDbReaderOracle.cpp
ipfix/database/IpfixDbReaderOracleCfg.cpp
ipfix/database/IpfixDbWriterOracleCfg.cpp
ipfix/database/IpfixFlowInspectorExporterCfg.cpp
ipfix/database/IpfixFlowInspectorExporter.cpp
)

View File

@ -46,7 +46,6 @@
#include "modules/ipfix/aggregator/IpfixAggregatorCfg.h"
#include "modules/ipfix/aggregator/PacketAggregatorCfg.h"
#include "modules/ipfix/database/IpfixDbReaderMySQLCfg.h"
#include "modules/ipfix/database/IpfixDbWriterOracleCfg.h"
#include "modules/ipfix/database/IpfixDbWriterCfg.h"
#include "modules/ipfix/database/IpfixDbReaderOracleCfg.h"
#include "modules/ipfix/database/IpfixDbWriterMongoCfg.h"
@ -100,11 +99,10 @@ Cfg* ConfigManager::configModules[] = {
new P2PDetectorCfg(NULL),
new HostStatisticsCfg(NULL),
new IpfixCsExporterCfg(NULL),
#if defined(DB_SUPPORT_ENABLED) || defined(PG_SUPPORT_ENABLED)
#if defined(DB_SUPPORT_ENABLED) || defined(PG_SUPPORT_ENABLED) || defined(ORACLE_SUPPORT_ENABLED)
new IpfixDbWriterCfg(NULL),
#endif
#ifdef ORACLE_SUPPORT_ENABLED
new IpfixDbWriterOracleCfg(NULL),
new IpfixDbReaderOracleCfg(NULL),
#endif
#ifdef MONGO_SUPPORT_ENABLED

View File

@ -25,7 +25,7 @@
#ifndef IPFIXDBREADERORACLE_H
#define IPFIXDBREADERORACLE_H
#include "IpfixRecord.hpp"
#include "../IpfixRecord.hpp"
#include "common/ipfixlolib/ipfix.h"
#include "common/ipfixlolib/ipfixlolib.h"
#include "core/Module.h"

View File

@ -26,7 +26,7 @@
#include <core/XMLElement.h>
#include <core/Cfg.h>
#include "modules/ipfix/IpfixDbReaderOracle.hpp"
#include "IpfixDbReaderOracle.hpp"
#include <string>

View File

@ -23,7 +23,7 @@
#include "IpfixDbWriterCfg.h"
#include "IpfixDbWriterMySQL.hpp"
#include "IpfixDbWriterPg.hpp"
#include "IpfixDbWriterOracle.hpp"
IpfixDbWriterCfg* IpfixDbWriterCfg::create(XMLElement* e)
@ -106,6 +106,8 @@ IpfixDbWriterSQL* IpfixDbWriterCfg::createInstance()
instance = new IpfixDbWriterMySQL(databaseType.c_str(), hostname.c_str(), dbname.c_str(), user.c_str(), password.c_str(), port, observationDomainId, bufferRecords, colNames);
} else if (databaseType == "postgres") {
instance = new IpfixDbWriterPg(databaseType.c_str(), hostname.c_str(), dbname.c_str(), user.c_str(), password.c_str(), port, observationDomainId, bufferRecords, colNames);
} else if (databaseType == "oracle") {
instance = new IpfixDbWriterOracle(databaseType.c_str(), hostname.c_str(), dbname.c_str(), user.c_str(), password.c_str(), port, observationDomainId, bufferRecords, colNames);
} else {
THROWEXCEPTION("Database type \"%s\" not yet implemented in IpfixDbWriterCfg ...", databaseType.c_str());
}

View File

@ -27,46 +27,10 @@
#include "IpfixDbWriterOracle.hpp"
#include "common/msg.h"
const IpfixDbWriterOracle::Column IpfixDbWriterOracle::identify [] = {
{CN_dstIP, "NUMBER(10)", 0, IPFIX_TYPEID_destinationIPv4Address, 0},
{CN_srcIP, "NUMBER(10)", 0, IPFIX_TYPEID_sourceIPv4Address, 0},
{CN_srcPort, "NUMBER(5)", 0, IPFIX_TYPEID_sourceTransportPort, 0},
{CN_dstPort, "NUMBER(5)", 0, IPFIX_TYPEID_destinationTransportPort, 0},
{CN_proto, "NUMBER(3)", 0, IPFIX_TYPEID_protocolIdentifier, 0 },
{CN_dstTos, "NUMBER(3)", 0, IPFIX_TYPEID_classOfServiceIPv4, 0},
{CN_bytes, "NUMBER(20)", 0, IPFIX_TYPEID_octetDeltaCount, 0},
{CN_pkts, "NUMBER(20)", 0, IPFIX_TYPEID_packetDeltaCount, 0},
{CN_firstSwitched, "NUMBER(10)", 0, IPFIX_TYPEID_flowStartSeconds, 0}, // default value is invalid/not used for this ent
{CN_lastSwitched, "NUMBER(10)", 0, IPFIX_TYPEID_flowEndSeconds, 0}, // default value is invalid/not used for this entry
{CN_firstSwitchedMillis, "NUMBER(5)", 0, IPFIX_TYPEID_flowStartMilliSeconds, 0},
{CN_lastSwitchedMillis, "NUMBER(5)", 0, IPFIX_TYPEID_flowEndMilliSeconds, 0},
{CN_tcpControlBits, "NUMBER(5)", 0, IPFIX_TYPEID_tcpControlBits, 0},
//TODO: use enterprise number for the following extended types (Gerhard, 12/2009)
{CN_revbytes, "NUMBER(20)", 0, IPFIX_TYPEID_octetDeltaCount, IPFIX_PEN_reverse},
{CN_revpkts, "NUMBER(20)", 0, IPFIX_TYPEID_packetDeltaCount, IPFIX_PEN_reverse},
{CN_revFirstSwitched, "NUMBER(10)", 0, IPFIX_TYPEID_flowStartSeconds, IPFIX_PEN_reverse}, // default value is invalid/not used for this entry
{CN_revLastSwitched, "NUMBER(10)", 0, IPFIX_TYPEID_flowEndSeconds, IPFIX_PEN_reverse}, // default value is invalid/not used for this entry
{CN_revFirstSwitchedMillis, "NUMBER(5)", 0, IPFIX_TYPEID_flowStartMilliSeconds, IPFIX_PEN_reverse},
{CN_revLastSwitchedMillis, "NUMBER(5)", 0, IPFIX_TYPEID_flowEndMilliSeconds, IPFIX_PEN_reverse},
{CN_revTcpControlBits, "NUMBER(5)", 0, IPFIX_TYPEID_tcpControlBits, IPFIX_PEN_reverse},
{CN_maxPacketGap, "NUMBER(20)", 0, IPFIX_ETYPEID_maxPacketGap, IPFIX_PEN_vermont|IPFIX_PEN_reverse},
{CN_exporterID, "NUMBER(5)", 0, EXPORTERID, 0},
{0} // last entry must be 0
};
/**
* Compare two source IDs and check if exporter is the same (i.e., same IP address and observationDomainId
*/
bool IpfixDbWriterOracle::equalExporter(const IpfixRecord::SourceID& a, const IpfixRecord::SourceID& b) {
return (a.observationDomainId == b.observationDomainId) &&
(a.exporterAddress.len == b.exporterAddress.len) &&
(memcmp(a.exporterAddress.ip, b.exporterAddress.ip, a.exporterAddress.len) == 0 );
}
/**
* (re)connect to database
*/
int IpfixDbWriterOracle::connectToDB()
void IpfixDbWriterOracle::connectToDB()
{
dbError = true;
@ -80,26 +44,25 @@ int IpfixDbWriterOracle::connectToDB()
} catch (oracle::occi::SQLException& ex) {
msg(MSG_FATAL, "IpfixDbWriterOracle: Error while creating environment: %s.", ex.getMessage().c_str());
msg(MSG_FATAL, "IpfixDbWriterOracle: Did you configure your Oracle environment?");
return -1;
return ;
}
msg(MSG_DEBUG, "IpfixDbWriterOracle: Trying to connect to database ...");
try
{
char dbLogon[256];
sprintf(dbLogon, "%s:%u/%s", dbHost.c_str(), dbPort, dbName.c_str());
con = env->createConnection(dbUser, dbPassword, dbLogon);
sprintf(dbLogon, "%s:%u/%s", hostName, portNum, dbName);
con = env->createConnection(userName, password, dbLogon);
} catch (oracle::occi::SQLException& ex)
{
msg(MSG_FATAL,"IpfixDbWriterOracle: Oracle connect failed. Error: %s", ex.getMessage().c_str());
return 1;
return ;
}
msg(MSG_DEBUG,"IpfixDbWriterOracle: Oracle connection successful");
if (createExporterTable()!=0) return 1;
if (createExporterTable()!=0) return ;
dbError = false;
return 0;
}
int IpfixDbWriterOracle::createExporterTable()
@ -243,272 +206,17 @@ int IpfixDbWriterOracle::createExporterTable()
return 0;
}
/**
* save record to database
*/
void IpfixDbWriterOracle::processDataDataRecord(const IpfixRecord::SourceID& sourceID,
TemplateInfo& dataTemplateInfo, uint16_t length,
IpfixRecord::Data* data)
{
string rowString;
time_t flowStartSeconds;
msg(MSG_DEBUG, "IpfixDbWriterOracle: Processing data record");
if (dbError) {
msg(MSG_DEBUG, "IpfixDbWriterOracle: reconnecting to DB");
connectToDB();
if (dbError) return;
}
/* get new insert */
if(srcId.observationDomainId != 0) {
// use default source id
rowString = getInsertString(rowString, flowStartSeconds, srcId, dataTemplateInfo, length, data);
} else {
rowString = getInsertString(rowString, flowStartSeconds, sourceID, dataTemplateInfo, length, data);
}
msg(MSG_DEBUG, "IpfixDbWriterOracle: Row: %s", rowString.c_str());
// if current table is not ok, write to db and get new table name
if(!(flowStartSeconds >= currentTable.startTime && flowStartSeconds <= currentTable.endTime)) {
if(numberOfInserts > 0) {
msg(MSG_DEBUG, "IpfixDbWriterOracle: Writing buffered records to database");
insertStatement << " SELECT * FROM dual";
writeToDb();
numberOfInserts = 0;
}
if (setCurrentTable(flowStartSeconds) != 0) {
return;
}
}
// start new insert statement if necessary
if (numberOfInserts == 0) {
// start insert statement
insertStatement.str("");
insertStatement.clear();
insertStatement << "INSERT ALL INTO " << currentTable.name << " (" << tableColumnsString << ") VALUES " << rowString;
numberOfInserts = 1;
} else {
// append insert statement
insertStatement << " INTO " << currentTable.name << " (" << tableColumnsString << ") VALUES " << rowString;
numberOfInserts++;
}
// write to db if maxInserts is reached
if(numberOfInserts == maxInserts) {
msg(MSG_DEBUG, "IpfixDbWriterOracle: Writing buffered records to database");
insertStatement << " SELECT * FROM dual";
writeToDb();
numberOfInserts = 0;
}
}
/**
* loop over table columns and template to get the IPFIX values in correct order to store in database
* The result is written into row, the firstSwitched time is returned in flowstartsec
*/
string& IpfixDbWriterOracle::getInsertString(string& row, time_t& flowstartsec, const IpfixRecord::SourceID& sourceID,
TemplateInfo& dataTemplateInfo,uint16_t length, IpfixRecord::Data* data)
{
uint64_t intdata = 0;
uint64_t intdata2 = 0;
uint32_t k;
bool notfound, notfound2;
bool first = true;
ostringstream rowStream(row);
flowstartsec = 0;
rowStream << "(";
/**loop over the columname and loop over the IPFIX_TYPEID of the record
to get the corresponding data to store and make insert statement*/
for(vector<Column>::iterator col = tableColumns.begin(); col != tableColumns.end(); col++) {
if (col->ipfixId == EXPORTERID) {
// if this is the same source ID as last time, we get the exporter id from currentExporter
if ((currentExporter != NULL) && equalExporter(sourceID, currentExporter->sourceID)) {
DPRINTF("Exporter is same as last time (ODID=%d, id=%d)", sourceID.observationDomainId, currentExporter->id);
intdata = (uint64_t)currentExporter->id;
} else {
// lookup exporter buffer to get exporterID from sourcID and expIp
intdata = (uint64_t)getExporterID(sourceID);
}
} else {
notfound = true;
// try to gather data required for the field
if(dataTemplateInfo.fieldCount > 0) {
// look inside the ipfix record
for(k=0; k < dataTemplateInfo.fieldCount; k++) {
if(dataTemplateInfo.fieldInfo[k].type.enterprise == col->enterprise && dataTemplateInfo.fieldInfo[k].type.id == col->ipfixId) {
notfound = false;
intdata = getData(dataTemplateInfo.fieldInfo[k].type,(data+dataTemplateInfo.fieldInfo[k].offset));
DPRINTF("IpfixDbWriterOracle::getData: really saw ipfix id %d in packet with intdata %llX, type %d, length %d and offset %X", col->ipfixId, intdata, dataTemplateInfo.fieldInfo[k].type.id, dataTemplateInfo.fieldInfo[k].type.length, dataTemplateInfo.fieldInfo[k].offset);
break;
}
}
}
if( dataTemplateInfo.dataCount > 0 && notfound) {
// look in static data fields of template for data
for(k=0; k < dataTemplateInfo.dataCount; k++) {
if(dataTemplateInfo.fieldInfo[k].type.enterprise == col->enterprise && dataTemplateInfo.dataInfo[k].type.id == col->ipfixId) {
notfound = false;
intdata = getData(dataTemplateInfo.dataInfo[k].type,(dataTemplateInfo.data+dataTemplateInfo.dataInfo[k].offset));
break;
}
}
}
if(notfound) {
notfound2 = true;
// for some Ids, we have an alternative
if(col->enterprise == 0) {
switch (col->ipfixId) {
case IPFIX_TYPEID_flowStartSeconds:
if(dataTemplateInfo.fieldCount > 0) {
for(k=0; k < dataTemplateInfo.fieldCount; k++) {
// look for alternative (flowStartMilliSeconds/1000)
if(dataTemplateInfo.fieldInfo[k].type.id == IPFIX_TYPEID_flowStartMilliSeconds) {
intdata = getData(dataTemplateInfo.fieldInfo[k].type,(data+dataTemplateInfo.fieldInfo[k].offset)) / 1000;
notfound = false;
break;
}
// if no flow start time is available, maybe this is is from a netflow from Cisco
// then - as a last alternative - use flowStartSysUpTime as flow start time
if(dataTemplateInfo.fieldInfo[k].type.id == IPFIX_TYPEID_flowStartSysUpTime) {
intdata2 = getData(dataTemplateInfo.fieldInfo[k].type,(data+dataTemplateInfo.fieldInfo[k].offset));
notfound2 = false;
}
}
if(notfound && !notfound2) {
intdata = intdata2;
notfound = false;
}
}
break;
case IPFIX_TYPEID_flowEndSeconds:
if(dataTemplateInfo.fieldCount > 0) {
for(k=0; k < dataTemplateInfo.fieldCount; k++) {
// look for alternative (flowEndMilliSeconds/1000)
if(dataTemplateInfo.fieldInfo[k].type.id == IPFIX_TYPEID_flowEndMilliSeconds) {
intdata = getData(dataTemplateInfo.fieldInfo[k].type,(data+dataTemplateInfo.fieldInfo[k].offset)) / 1000;
notfound = false;
break;
}
// if no flow end time is available, maybe this is is from a netflow from Cisco
// then use flowEndSysUpTime as flow start time
if(dataTemplateInfo.fieldInfo[k].type.id == IPFIX_TYPEID_flowEndSysUpTime) {
intdata2 = getData(dataTemplateInfo.fieldInfo[k].type,(data+dataTemplateInfo.fieldInfo[k].offset));
notfound2 = false;
}
}
if(notfound && !notfound2) {
intdata = intdata2;
notfound = false;
}
}
break;
}
} else if (col->enterprise==IPFIX_PEN_reverse) {
switch (col->ipfixId) {
case IPFIX_TYPEID_flowStartSeconds:
// look for alternative (revFlowStartMilliSeconds/1000)
if(dataTemplateInfo.fieldCount > 0) {
for(k=0; k < dataTemplateInfo.fieldCount; k++) {
if(dataTemplateInfo.fieldInfo[k].type == InformationElement::IeInfo(IPFIX_TYPEID_flowStartMilliSeconds, IPFIX_PEN_reverse)) {
intdata = getData(dataTemplateInfo.fieldInfo[k].type,(data+dataTemplateInfo.fieldInfo[k].offset)) / 1000;
notfound = false;
break;
}
}
}
break;
case IPFIX_TYPEID_flowEndSeconds:
// look for alternative (revFlowEndMilliSeconds/1000)
if(dataTemplateInfo.fieldCount > 0) {
for(k=0; k < dataTemplateInfo.fieldCount; k++) {
if(dataTemplateInfo.fieldInfo[k].type == InformationElement::IeInfo(IPFIX_TYPEID_flowEndMilliSeconds, IPFIX_PEN_reverse)) {
intdata = getData(dataTemplateInfo.fieldInfo[k].type,(data+dataTemplateInfo.fieldInfo[k].offset)) / 1000;
notfound = false;
break;
}
}
}
break;
}
}
// if still not found, get default value
if(notfound)
intdata = col->defaultValue;
}
// we need extra treatment for timing related fields
if(col->enterprise == 0 ) {
switch (col->ipfixId) {
case IPFIX_TYPEID_flowStartSeconds:
// save time for table access
if (flowstartsec==0) flowstartsec = intdata;
break;
case IPFIX_TYPEID_flowEndSeconds:
break;
case IPFIX_TYPEID_flowStartMilliSeconds:
// if flowStartSeconds is not stored in one of the columns, but flowStartMilliSeconds is,
// then we use flowStartMilliSeconds for table access
// This is realized by storing this value only if flowStartSeconds has not yet been seen.
// A later appearing flowStartSeconds will override this value.
if (flowstartsec==0)
flowstartsec = intdata/1000;
case IPFIX_TYPEID_flowEndMilliSeconds:
// in the database the millisecond entry is counted from last second
intdata %= 1000;
break;
}
} else if (col->enterprise==IPFIX_PEN_reverse)
switch (col->ipfixId) {
case IPFIX_TYPEID_flowStartMilliSeconds:
case IPFIX_TYPEID_flowEndMilliSeconds:
// in the database the millisecond entry is counted from last second
intdata %= 1000;
break;
}
}
DPRINTF("saw ipfix id %d in packet with intdata %llX", col->ipfixId, intdata);
if(first)
rowStream << intdata;
else
rowStream << "," << intdata;
first = false;
}
rowStream << ")";
if (flowstartsec == 0) {
msg(MSG_ERROR, "IpfixDbWriterOracle: Failed to get timing data from record. Will be saved in default table.");
}
row = rowStream.str();
DPRINTF("Insert row: %s", row.c_str());
return row;
}
/*
* Write insertStatement to database
*/
int IpfixDbWriterOracle::writeToDb()
bool IpfixDbWriterOracle::writeToDb()
{
//msg(MSG_DEBUG, "SQL Query: %s", insertStatement.str().c_str());
oracle::occi::Statement *stmt = NULL;
oracle::occi::ResultSet *rs = NULL;
try
{
stmt = con->createStatement(insertStatement.str());
stmt = con->createStatement(insertBuffer.sql);
}
catch (oracle::occi::SQLException& ex)
{
@ -530,48 +238,36 @@ int IpfixDbWriterOracle::writeToDb()
}
stmt->closeResultSet(rs);
con->terminateStatement(stmt);
insertBuffer.curRows = 0;
insertBuffer.appendPtr = insertBuffer.bodyPtr;
*insertBuffer.appendPtr = 0;
msg(MSG_DEBUG,"IpfixDbWriterOracle: Write to database is complete");
return 0;
}
insertBuffer.curRows = 0;
insertBuffer.appendPtr = insertBuffer.bodyPtr;
*insertBuffer.appendPtr = 0;
return 1;
}
/*
* Sets the current table information and creates the table in the database if necessary
*/
int IpfixDbWriterOracle::setCurrentTable(time_t flowstartsec)
bool IpfixDbWriterOracle::createDBTable(const char* partitionname, uint64_t starttime, uint64_t endtime)
{
// generate table name
ostringstream tableStream;
struct tm* flowStartTime = gmtime(&flowstartsec);
tableStream << "H_" << (flowStartTime->tm_year+1900)
<< setfill('0') << setw(2) << (flowStartTime->tm_mon+1)
<< setfill('0') << setw(2) << (flowStartTime->tm_mday) << "_"
<< setfill('0') << setw(2) << (flowStartTime->tm_hour) << "_"
<< setw(1) << (flowStartTime->tm_min<30?0:1);
uint32_t i;
currentTable.name = tableStream.str();
// calculate table boundaries
if(flowStartTime->tm_min < 30) {
flowStartTime->tm_min = 0;
flowStartTime->tm_sec = 0;
currentTable.startTime = timegm(flowStartTime);
} else {
flowStartTime->tm_min = 30;
flowStartTime->tm_sec = 0;
currentTable.startTime = timegm(flowStartTime);
}
currentTable.endTime = currentTable.startTime + 1799;
msg(MSG_DEBUG, "IpfixDbWriterOracle: flowstartsec: %d, table name: %s, start time: %d, end time: %d", flowstartsec, currentTable.name.c_str(), currentTable.startTime, currentTable.endTime);
if (find(usedPartitions.begin(), usedPartitions.end(), partitionname)!=usedPartitions.end()) {
// found cached entry!
DPRINTF("Partition '%s' already created.", partitionname);
return true;
}
// check if table exists
ostringstream sql;
oracle::occi::Statement *stmt = NULL;
oracle::occi::ResultSet *rs = NULL;
sql << "SELECT COUNT(table_name) FROM user_tables WHERE table_name='" << currentTable.name << "'";
sql << "SELECT COUNT(table_name) FROM user_tables WHERE table_name='" << partitionname<< "'";
msg(MSG_DEBUG, "IpfixDbWriterOracle: SQL Query: %s", sql.str().c_str());
try
{
@ -616,7 +312,7 @@ int IpfixDbWriterOracle::setCurrentTable(time_t flowstartsec)
// create table
sql.str("");
sql << "CREATE TABLE " << currentTable.name << " ( " << tableColumnsCreateString << ")";
sql << "CREATE TABLE " << partitionname<< " ( " << tableColumnsCreateString << ")";
msg(MSG_DEBUG, "IpfixDbWriterOracle: SQL Query: %s", sql.str().c_str());
try
{
@ -646,7 +342,7 @@ int IpfixDbWriterOracle::setCurrentTable(time_t flowstartsec)
stmt->closeResultSet(rs);
con->terminateStatement(stmt);
}
msg(MSG_DEBUG, "IpfixDbWriterOracle: Table %s created ", currentTable.name.c_str());
msg(MSG_DEBUG, "IpfixDbWriterOracle: Table %s created ", partitionname);
return 0;
}
@ -654,34 +350,31 @@ int IpfixDbWriterOracle::setCurrentTable(time_t flowstartsec)
/**
* Returns the id of the exporter table entry or 0 in the case of an error
*/
int IpfixDbWriterOracle::getExporterID(const IpfixRecord::SourceID& sourceID)
int IpfixDbWriterOracle::getExporterID(IpfixRecord::SourceID* sourceID)
{
list<ExporterCacheEntry>::iterator iter;
uint32_t i;
oracle::occi::Statement* stmt = NULL;
oracle::occi::ResultSet* rs = NULL;
int id = -1;
int exporterID = -1;
uint32_t expIp = 0;
ostringstream sql;
iter = exporterCache.begin();
while(iter != exporterCache.end()) {
if (equalExporter(iter->sourceID, sourceID)) {
// found exporter in exporterCache
DPRINTF("Exporter (ODID=%d, id=%d) found in exporter cache", sourceID.observationDomainId, iter->id);
exporterCache.push_front(*iter);
exporterCache.erase(iter);
// update current exporter
currentExporter = &exporterCache.front();
return exporterCache.front().id;
}
iter++;
}
// convert IP address (correct host byte order since 07/2010)
expIp = sourceID.exporterAddress.toUInt32();
expIp = sourceID->exporterAddress.toUInt32();
/** Is the exporterID already in exporterBuffer? */
for(i = 0; i < curExporterEntries; i++) {
if(exporterEntries[i].observationDomainId == sourceID->observationDomainId &&
exporterEntries[i].ip==expIp) {
DPRINTF("Exporter sourceID/IP with ID %d is in the exporterBuffer\n",
exporterEntries[i].Id);
return exporterEntries[i].Id;
}
}
// search exporter table
sql << "SELECT id FROM exporter WHERE sourceID=" << sourceID.observationDomainId << " AND srcIp=" << expIp;
sql << "SELECT id FROM exporter WHERE sourceID=" << sourceID->observationDomainId << " AND srcIp=" << expIp;
msg(MSG_DEBUG, "IpfixDbWriterOracle: SQL Query: %s", sql.str().c_str());
try
{
@ -702,8 +395,8 @@ int IpfixDbWriterOracle::getExporterID(const IpfixRecord::SourceID& sourceID)
{
while(rs->next())
{
id = rs->getInt(1);
msg(MSG_DEBUG, "IpfixDbWriterOracle: ExporterID %d is in exporter table", id);
exporterID = rs->getInt(1);
msg(MSG_DEBUG, "IpfixDbWriterOracle: ExporterID %d is in exporter table", exporterID);
}
stmt->closeResultSet(rs);
}
@ -717,10 +410,10 @@ int IpfixDbWriterOracle::getExporterID(const IpfixRecord::SourceID& sourceID)
}
}
// insert new entry in exporter table since it is not found
if(id == -1)
if(exporterID == -1)
{
sql.str("");
sql << "INSERT INTO exporter (sourceID,srcIP) VALUES ('" << sourceID.observationDomainId << "','" << expIp << "')";
sql << "INSERT INTO exporter (sourceID,srcIP) VALUES ('" << sourceID->observationDomainId << "','" << expIp << "')";
msg(MSG_DEBUG, "IpfixDbWriterOracle: SQL Query: %s", sql.str().c_str());
try
{
@ -770,8 +463,8 @@ int IpfixDbWriterOracle::getExporterID(const IpfixRecord::SourceID& sourceID)
{
while(rs->next())
{
id = rs->getInt(1);
DPRINTF("ExporterID %d is in exporter table", id);
exporterID = rs->getInt(1);
DPRINTF("ExporterID %d is in exporter table", exporterID);
}
stmt->closeResultSet(rs);
}
@ -783,125 +476,39 @@ int IpfixDbWriterOracle::getExporterID(const IpfixRecord::SourceID& sourceID)
con->terminateStatement(stmt);
return 0;// If a failure occurs, return 0
}
msg(MSG_INFO,"IpfixDbWriterOracle: new exporter (ODID=%d, id=%d) inserted in exporter table", sourceID.observationDomainId, id);
msg(MSG_INFO,"IpfixDbWriterOracle: new exporter (ODID=%d, id=%d) inserted in exporter table", sourceID->observationDomainId, exporterID);
}
}
// insert exporter in cache
ExporterCacheEntry tmp = {sourceID, id};
exporterCache.push_front(tmp);
// update current exporter
currentExporter = &exporterCache.front();
// pop last element if exporter cache is to long
if(exporterCache.size() > MAX_EXPORTER)
exporterCache.pop_back();
return id;
}
/**
* Get data of the record is given by the IPFIX_TYPEID
*/
uint64_t IpfixDbWriterOracle::getData(InformationElement::IeInfo type, IpfixRecord::Data* data)
{
switch (type.length) {
case 1:
return (*(uint8_t*)data);
case 2:
return ntohs(*(uint16_t*)data);
case 4:
return ntohl(*(uint32_t*)data);
case 5: // may occur in the case if IP address + mask
return ntohl(*(uint32_t*)data);
case 8:
return ntohll(*(uint64_t*)data);
default:
printf("Uint with length %d unparseable", type.length);
return 0;
if (curExporterEntries==MAX_EXP_TABLE-1) {
// maybe here we should check how often this happens and display a severe warning if too
// many parallel streams are received at once
msg(MSG_INFO, "IpfixDbWriterPg: turnover for exporter cache occurred.");
curExporterEntries = 0;
}
/**Write new exporter in the exporterBuffer*/
exporterEntries[curExporterEntries].Id = exporterID;
exporterEntries[curExporterEntries].observationDomainId = sourceID->observationDomainId;
exporterEntries[curExporterEntries++].ip = expIp;
return exporterID;
}
/***** Public Methods ****************************************************/
/**
* called on Data Record arrival
*/
void IpfixDbWriterOracle::onDataRecord(IpfixDataRecord* record)
IpfixDbWriterOracle::IpfixDbWriterOracle(const char* dbType, const char* host, const char* db,
const char* user, const char* pw,
unsigned int port, uint16_t observationDomainId,
int maxStatements, vector<string> columns)
: IpfixDbWriterSQL(dbType, host, db, user, pw, port, observationDomainId, maxStatements, columns), con(0), env(0)
{
// only treat non-Options Data Records (although we cannot be sure that there is a Flow inside)
if((record->templateInfo->setId != TemplateInfo::NetflowTemplate)
&& (record->templateInfo->setId != TemplateInfo::IpfixTemplate)
&& (record->templateInfo->setId != TemplateInfo::IpfixDataTemplate)) {
record->removeReference();
return;
}
msg(MSG_DEBUG, "IpfixDbWriterOracle: Data record received will be passed for processing");
processDataDataRecord(*record->sourceID.get(), *record->templateInfo.get(),
record->dataLength, record->data);
record->removeReference();
connectToDB();
}
/**
* Constructor
*/
IpfixDbWriterOracle::IpfixDbWriterOracle(const string& hostname, const string& dbname,
const string& username, const string& password,
unsigned port, uint32_t observationDomainId, unsigned maxStatements,
const vector<string>& columns)
: currentExporter(NULL), numberOfInserts(0), maxInserts(maxStatements),
dbHost(hostname), dbName(dbname), dbUser(username), dbPassword(password), dbPort(port), con(0)
{
int i;
// set default source id
srcId.exporterAddress.len = 0;
srcId.observationDomainId = observationDomainId;
srcId.exporterPort = 0;
srcId.receiverPort = 0;
srcId.protocol = 0;
srcId.fileDescriptor = 0;
// invalide start settings for current table (to enforce table create)
currentTable.startTime = 1;
currentTable.endTime = 0;
if(columns.empty())
THROWEXCEPTION("IpfixDbWriterOracle: cannot initiate with no columns");
/* get columns */
bool first = true;
for(vector<string>::const_iterator col = columns.begin(); col != columns.end(); col++) {
i = 0;
while(identify[i].columnName != 0) {
if(col->compare(identify[i].columnName) == 0) {
Column c = identify[i];
tableColumns.push_back(c);
// update tableColumnsString
if(!first)
tableColumnsString.append(",");
tableColumnsString.append(identify[i].columnName);
// update tableColumnsCreateString
if(!first)
tableColumnsCreateString.append(", ");
tableColumnsCreateString.append(identify[i].columnName);
tableColumnsCreateString.append(" ");
tableColumnsCreateString.append(identify[i].columnType);
first = false;
break;
}
i++;
}
}
msg(MSG_INFO, "IpfixDbWriterOracle: columns are %s", tableColumnsString.c_str());
if(connectToDB() != 0)
THROWEXCEPTION("IpfixDbWriterOracle creation failed");
}
/**
* Destructor
*/
@ -912,6 +519,4 @@ IpfixDbWriterOracle::~IpfixDbWriterOracle()
oracle::occi::Environment::terminateEnvironment(env);
}
#endif /* ORACLE_SUPPORT_ENABLED */

View File

@ -25,6 +25,7 @@
#define IPFIXDBWRITERORACLE_H_
#include "IpfixDbCommon.hpp"
#include "IpfixDbWriterSQL.hpp"
#include "common/ipfixlolib/ipfix.h"
#include "common/ipfixlolib/ipfixlolib.h"
#include <occi.h>
@ -41,96 +42,33 @@ using namespace std;
#define EXPORTERID 0
/**
* IpfixDbWriterOracle powered the communication to the oracle database server
* IpfixDbWriterMySQL powered the communication to the database server
* also between the other structs
*/
class IpfixDbWriterOracle
: public IpfixRecordDestination, public Module, public Source<NullEmitable*>
: public IpfixDbWriterSQL
{
public:
IpfixDbWriterOracle(const string& hostname, const string& dbname,
const string& username, const string& password,
unsigned port, uint32_t observationDomainId, unsigned maxStatements,
const vector<string>& columns);
IpfixDbWriterOracle(const char* dbType, const char* host, const char* db,
const char* user, const char* pw,
unsigned int port, uint16_t observationDomainId, // FIXME: observationDomainId
int maxStatements, vector<string> columns);
~IpfixDbWriterOracle();
void onDataRecord(IpfixDataRecord* record);
/**
* Struct to identify the relationship between columns names and
* IPFIX_TYPEID, column type and default value
*/
struct Column {
const char* columnName; /** column name */
const char* columnType; /** column data type in database */
uint64_t defaultValue; /** default value */
InformationElement::IeId ipfixId; /** IPFIX_TYPEID */
InformationElement::IeEnterpriseNumber enterprise; /** enterprise number */
};
virtual void connectToDB();
virtual bool writeToDb();
virtual int createExporterTable();
virtual int getExporterID(IpfixRecord::SourceID* sourceID);
virtual bool createDBTable(const char* partitionname, uint64_t starttime, uint64_t endtime);
private:
static const unsigned MAX_EXPORTER = 10; // maximum numbers of cached exporters
/**
* Struct buffers start and end time and tablename for the different tables
*/
struct TableCacheEntry {
time_t startTime; // smallest flow start second timestamp in the table
time_t endTime; // largest flow start second timestamp in the table
string name; // name of the table
};
/**
* Struct buffers ODID, IP address and row index of an exporter
*/
struct ExporterCacheEntry {
IpfixRecord::SourceID sourceID;/** source id of the exporter */
int id; /** Id entry of sourcID and expIP in the ExporterTable */
};
TableCacheEntry currentTable; // current table in tableCache
list<ExporterCacheEntry> exporterCache; // cached tables names, key=observationDomainId
ExporterCacheEntry* currentExporter; // pointer to current exporter in exporterCache
IpfixRecord::SourceID srcId; // default source ID
ostringstream insertStatement; // insert statement string
int numberOfInserts; // number of inserts in statement
int maxInserts; // maximum number of inserts per statement
vector<Column> tableColumns; // table columns
string tableColumnsString; // table columns as string for INSERT statements
string tableColumnsCreateString; // table columns as string for CREATE statements
// database data
string dbHost, dbName, dbUser, dbPassword;
unsigned dbPort;
oracle::occi::Environment *env;
oracle::occi::Connection *con;
bool dbError; // db error flag
int createDB();
int setCurrentTable(time_t flowstartsec);
string& getInsertString(string& row, time_t& flowstartsec, const IpfixRecord::SourceID& sourceID,
TemplateInfo& dataTemplateInfo,uint16_t length, IpfixRecord::Data* data);
int writeToDb();
int getExporterID(const IpfixRecord::SourceID& sourceID);
int connectToDB();
int createExporterTable();
void processDataDataRecord(const IpfixRecord::SourceID& sourceID,
TemplateInfo& dataTemplateInfo, uint16_t length,
IpfixRecord::Data* data);
uint64_t getData(InformationElement::IeInfo type, IpfixRecord::Data* data);
bool equalExporter(const IpfixRecord::SourceID& a, const IpfixRecord::SourceID& b);
const static Column identify[];
};
#endif
#endif

View File

@ -1,108 +0,0 @@
/*
* IPFIX Database Reader/Writer Oracle Connector Configuration
* Copyright (C) 2011 Philipp Fehre <philipp.fehre@googlemail.com>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*
*/
#ifdef ORACLE_SUPPORT_ENABLED
#include "IpfixDbWriterOracleCfg.h"
IpfixDbWriterOracleCfg* IpfixDbWriterOracleCfg::create(XMLElement* e)
{
assert(e);
assert(e->getName() == getName());
return new IpfixDbWriterOracleCfg(e);
}
IpfixDbWriterOracleCfg::IpfixDbWriterOracleCfg(XMLElement* elem)
: CfgHelper<IpfixDbWriterOracle, IpfixDbWriterOracleCfg>(elem, "ipfixDbWriterOracle"),
port(0), bufferRecords(30), observationDomainId(0)
{
msg(MSG_DEBUG, "Starting configuration for Oracle connection");
if (!elem) return;
XMLNode::XMLSet<XMLElement*> set = _elem->getElementChildren();
for (XMLNode::XMLSet<XMLElement*>::iterator it = set.begin(); it != set.end(); it++) {
XMLElement* e = *it;
if (e->matches("host")) {
hostname = e->getFirstText();
} else if (e->matches("port")) {
port = getInt("port");
} else if (e->matches("dbname")) {
dbname = e->getFirstText();
} else if (e->matches("username")) {
user = e->getFirstText();
} else if (e->matches("password")) {
password = e->getFirstText();
} else if (e->matches("bufferrecords")) {
bufferRecords = getInt("bufferrecords");
} else if (e->matches("columns")) {
readColumns(e);
} else if (e->matches("next")) { // ignore next
} else {
msg(MSG_FATAL, "Unknown IpfixDbWriterOracle config statement %s\n", e->getName().c_str());
continue;
}
}
if (hostname=="") THROWEXCEPTION("IpfixDbWriterOracleCfg: host not set in configuration!");
if (port==0) THROWEXCEPTION("IpfixDbWriterOracleCfg: port not set in configuration!");
if (dbname=="") THROWEXCEPTION("IpfixDbWriterOracleCfg: dbname not set in configuration!");
if (user=="") THROWEXCEPTION("IpfixDbWriterOracleCfg: username not set in configuration!");
if (password=="") THROWEXCEPTION("IpfixDbWriterOracleCfg: password not set in configuration!");
}
void IpfixDbWriterOracleCfg::readColumns(XMLElement* elem) {
colNames.clear();
XMLNode::XMLSet<XMLElement*> set = elem->getElementChildren();
for (XMLNode::XMLSet<XMLElement*>::iterator it = set.begin();
it != set.end();
it++) {
XMLElement* e = *it;
if (e->matches("name")) {
colNames.push_back(e->getFirstText());
msg(MSG_DEBUG, "Row: %s", e->getFirstText().c_str());
} else {
msg(MSG_FATAL, "Unknown IpfixDbWriterOracle config statement %s\n", e->getName().c_str());
continue;
}
}
}
IpfixDbWriterOracleCfg::~IpfixDbWriterOracleCfg()
{
}
IpfixDbWriterOracle* IpfixDbWriterOracleCfg::createInstance()
{
instance = new IpfixDbWriterOracle(hostname, dbname, user, password, port, observationDomainId, bufferRecords, colNames);
msg(MSG_DEBUG, "IpfixDbWriterOracle configuration host %s db %s user %s password %s port %i observationDomainId %i bufferRecords %i\n",
hostname.c_str(), dbname.c_str(), user.c_str(), password.c_str(), port, observationDomainId, bufferRecords);
return instance;
}
bool IpfixDbWriterOracleCfg::deriveFrom(IpfixDbWriterOracleCfg* old)
{
return false;
}
#endif /* ORACLE_SUPPORT_ENABLED */

View File

@ -1,64 +0,0 @@
/*
* IPFIX Database Reader/Writer Oracle Connector Configuration
* Copyright (C) 2011 Philipp Fehre <philipp.fehre@googlemail.com>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*
*/
#ifndef IPFIXDBWRITERORACLECFG_H_
#define IPFIXDBWRITERORACLECFG_H_
#ifdef ORACLE_SUPPORT_ENABLED
#include <core/XMLElement.h>
#include <core/Cfg.h>
#include "IpfixDbWriterOracle.hpp"
#include <string>
using namespace std;
class IpfixDbWriterOracleCfg
: public CfgHelper<IpfixDbWriterOracle, IpfixDbWriterOracleCfg>
{
public:
friend class ConfigManager;
virtual IpfixDbWriterOracleCfg* create(XMLElement* e);
virtual ~IpfixDbWriterOracleCfg();
virtual IpfixDbWriterOracle* createInstance();
virtual bool deriveFrom(IpfixDbWriterOracleCfg* old);
protected:
string hostname; /**< hostname of database host */
uint16_t port; /**< port of database */
string dbname; /**< database name */
string user; /**< user name for login to database */
string password; /**< password for login to database */
uint16_t bufferRecords; /**< amount of records to buffer until they are written to database */
uint32_t observationDomainId; /**< default observation domain id (overrides the one received in the records */
vector<string> colNames; /**< column names */
void readColumns(XMLElement*);
IpfixDbWriterOracleCfg(XMLElement*);
};
#endif /* ORACLE_SUPPORT_ENABLED */
#endif /* IPFIXDBWRITERORACLECFG_H_ */

View File

@ -101,6 +101,33 @@ const static IpfixDbWriterSQL::Column identifyPg [] = {
{ 0} // last entry must be 0
};
/** Oracle **/
const IpfixDbWriterSQL::Column identifyOracle [] = {
{ CN_dstIP, IPFIX_TYPEID_destinationIPv4Address, "NUMBER(10)", 0, 0},
{ CN_srcIP, IPFIX_TYPEID_sourceIPv4Address, "NUMBER(10)", 0, 0},
{ CN_srcPort, IPFIX_TYPEID_sourceTransportPort, "NUMBER(5)", 0, 0},
{ CN_dstPort, IPFIX_TYPEID_destinationTransportPort, "NUMBER(5)", 0, 0},
{ CN_proto, IPFIX_TYPEID_protocolIdentifier, "NUMBER(3)", 0, 0 },
{ CN_dstTos, IPFIX_TYPEID_classOfServiceIPv4, "NUMBER(3)", 0, 0},
{ CN_bytes, IPFIX_TYPEID_octetDeltaCount, "NUMBER(20)", 0, 0},
{ CN_pkts, IPFIX_TYPEID_packetDeltaCount, "NUMBER(20)", 0, 0},
{ CN_firstSwitched, IPFIX_TYPEID_flowStartMilliSeconds, "NUMBER(15)", 0, 0}, // default value is invalid/not used for this ent
{ CN_lastSwitched, IPFIX_TYPEID_flowEndMilliSeconds, "NUMBER(15)", 0, 0}, // default value is invalid/not used for this entry
{ CN_tcpControlBits, IPFIX_TYPEID_tcpControlBits, "NUMBER(5)", 0, 0},
//TODO: use enterprise number for the following extended types (Gerhard, 12/2009)
{ CN_revbytes, IPFIX_TYPEID_octetDeltaCount, "NUMBER(20)", IPFIX_PEN_reverse, 0},
{ CN_revpkts, IPFIX_TYPEID_packetDeltaCount, "NUMBER(20)", IPFIX_PEN_reverse, 0},
{ CN_revFirstSwitched, IPFIX_TYPEID_flowStartMilliSeconds, "NUMBER(15)", IPFIX_PEN_reverse, 0}, // default value is invalid/not used for this entry
{ CN_revLastSwitched, IPFIX_TYPEID_flowEndMilliSeconds, "NUMBER(15)", IPFIX_PEN_reverse, 0}, // default value is invalid/not used for this entry
{ CN_revTcpControlBits, IPFIX_TYPEID_tcpControlBits, "NUMBER(5)", IPFIX_PEN_reverse, 0},
{ CN_maxPacketGap, IPFIX_ETYPEID_maxPacketGap, "NUMBER(20)", IPFIX_PEN_vermont|IPFIX_PEN_reverse},
{ CN_exporterID, EXPORTERID, "NUMBER(5)", 0, 0},
{ CN_flowStartSysUpTime, IPFIX_TYPEID_flowStartSysUpTime, "NUMBER(10)", 0, 0},
{ CN_flowEndSysUpTime, IPFIX_TYPEID_flowEndSysUpTime, "NUMBER(10)", 0, 0},
{ 0 } // last entry must be 0
};
/***** Global Variables ******************************************************/