mod for mqtt2sql
parent
301b318358
commit
f32e1d21c4
65
mqtt2sql.py
65
mqtt2sql.py
|
@ -11,16 +11,18 @@ lastvalue={}
|
|||
# check values and insert into database
|
||||
def data2sql(acttime,msgtime,topic,value,mycursor):
|
||||
bsend=True
|
||||
iv=int(value)
|
||||
if topic in lastvalue:
|
||||
if value == lastvalue[topic]:
|
||||
if iv == lastvalue[topic]:
|
||||
bsend=False
|
||||
if bsend:
|
||||
lastvalue[topic]=value
|
||||
print(sqlinsert.format(acttime,msgtime,topic,value))
|
||||
lastvalue[topic]=iv
|
||||
# print(sqlinsert.format(acttime,msgtime,topic,iv))
|
||||
try:
|
||||
sqlcheck=mycursor.execute(sqlinsert.format(acttime,msgtime,topic,value))
|
||||
print(sqlcheck)
|
||||
sqlcheck=mycursor.execute(sqlinsert.format(acttime,msgtime,topic,iv))
|
||||
# print(sqlcheck)
|
||||
except Exception as e:
|
||||
print(sqlinsert.format(acttime,msgtime,topic,iv))
|
||||
print(e)
|
||||
|
||||
# The callback for when the client receives a CONNACK response from the server.
|
||||
|
@ -118,11 +120,16 @@ def on_message_sens(client, userdata, msg):
|
|||
data2sql(acttime,str_to_dt,ts+'/PowerFactor',int(1000*jplw['Factor']),mycursor_sens)
|
||||
except Exception as e:
|
||||
print(e)
|
||||
if 'Voltags' in jplw:
|
||||
if 'Voltage' in jplw:
|
||||
try:
|
||||
data2sql(acttime,str_to_dt,ts+'/Voltage',int(1000*jplw['Voltage']),mycursor_sens)
|
||||
except Exception as e:
|
||||
print(e)
|
||||
if 'Total' in jplw:
|
||||
try:
|
||||
data2sql(acttime,str_to_dt,ts+'/Total',int(1000*jplw['Total']),mycursor_sens)
|
||||
except Exception as e:
|
||||
print(e)
|
||||
if 'Current' in jplw:
|
||||
try:
|
||||
data2sql(acttime,str_to_dt,ts+'/Current',int(1000*jplw['Current']),mycursor_sens)
|
||||
|
@ -164,6 +171,33 @@ def on_message_sns(client, userdata, msg):
|
|||
mydb_sns.commit()
|
||||
mydb_sns.close()
|
||||
|
||||
# The callback for when the client receives a CONNACK response from the server.
|
||||
def on_connect_net(client, userdata, flags, rc):
|
||||
print("Connected with result code "+str(rc))
|
||||
# Subscribing in on_connect() means that if we lose the connection and
|
||||
# reconnect then subscriptions will be renewed.
|
||||
client.subscribe("tele/+/net/#")
|
||||
|
||||
# The callback for when a PUBLISH message is received from the server.
|
||||
def on_message_net(client, userdata, msg):
|
||||
try:
|
||||
mydb_net=pymysql.connect(read_default_file="~/.my.cnf",database="Sensor")
|
||||
except:
|
||||
print("Could not connect to sql server! Quitting")
|
||||
else:
|
||||
print(int(msg.payload.decode("utf-8")))
|
||||
mycursor_net=mydb_net.cursor(pymysql.cursors.DictCursor)
|
||||
acttime=int(1000*time.time())
|
||||
ts=msg.topic.replace("tele","sp")
|
||||
print(ts)
|
||||
try:
|
||||
data2sql(acttime,acttime,ts,int(msg.payload.decode("utf-8")),mycursor_net)
|
||||
except Exception as e:
|
||||
print(e)
|
||||
mycursor_net.close()
|
||||
mydb_net.commit()
|
||||
mydb_net.close()
|
||||
|
||||
client = mqtt.Client()
|
||||
client.on_connect = on_connect
|
||||
client.on_message = on_message
|
||||
|
@ -172,24 +206,31 @@ client_sens = mqtt.Client()
|
|||
client_sens.on_connect = on_connect_sens
|
||||
client_sens.on_message = on_message_sens
|
||||
|
||||
client_sns = mqtt.Client()
|
||||
client_sns.on_connect = on_connect_sns
|
||||
client_sns.on_message = on_message_sns
|
||||
#client_sns = mqtt.Client()
|
||||
#client_sns.on_connect = on_connect_sns
|
||||
#client_sns.on_message = on_message_sns
|
||||
|
||||
client_net = mqtt.Client()
|
||||
client_net.on_connect = on_connect_net
|
||||
client_net.on_message = on_message_net
|
||||
|
||||
client.connect("172.24.41.2", 1883, 60)
|
||||
client_sens.connect("172.24.41.2", 1883, 60)
|
||||
client_sns.connect("172.24.41.2", 1883, 60)
|
||||
#client_sns.connect("172.24.41.2", 1883, 60)
|
||||
client_net.connect("172.24.41.2", 1883, 60)
|
||||
# Blocking call that processes network traffic, dispatches callbacks and
|
||||
# handles reconnecting.
|
||||
# Other loop*() functions are available that give a threaded interface and a
|
||||
# manual interface.
|
||||
mq_state=threading.Thread(target=client.loop_forever)
|
||||
mq_sens=threading.Thread(target=client_sens.loop_forever)
|
||||
mq_sns=threading.Thread(target=client_sns.loop_forever)
|
||||
#mq_sns=threading.Thread(target=client_sns.loop_forever)
|
||||
mq_net=threading.Thread(target=client_net.loop_forever)
|
||||
#client_sns.loop_forever()
|
||||
#client.loop_forever()
|
||||
mq_state.start()
|
||||
mq_sens.start()
|
||||
mq_sns.start()
|
||||
#mq_sns.start()
|
||||
mq_net.start()
|
||||
while True:
|
||||
time.sleep(1)
|
||||
|
|
|
@ -0,0 +1,11 @@
|
|||
#!/bin/sh
|
||||
|
||||
mosquitto_sub -h 172.24.42.2 -R -t 'tele/+/sens/#' -F "%t %p %U" | while read TOPIC VALU EPOCH; do
|
||||
SEPOCH=$(echo $EPOCH|sed -e 's/\.//g'|cut -b -13)
|
||||
LEPOCH=$(date +%s%N|cut -b -13)
|
||||
TOPI=$(echo $TOPIC|sed -e 's/tele/sp/g')
|
||||
IVAL=$(echo $VALU|cut -d\. -f1)
|
||||
echo "insert into datain (time,sensortime,topic,value) values ($LEPOCH,$SEPOCH,'$TOPI',$IVAL);"|mysql -t Sensor
|
||||
echo "insert into datain (time,sensortime,topic,value) values ($LEPOCH,$SEPOCH,'$TOPI',$IVAL);"
|
||||
done
|
||||
|
|
@ -0,0 +1,72 @@
|
|||
DROP PROCEDURE IF EXISTS insert_topic;
|
||||
DELIMITER //
|
||||
CREATE PROCEDURE insert_topic(vtopic varchar(64),newid smallint unsigned)
|
||||
begin
|
||||
declare tpos tinyint;
|
||||
declare feldname varchar(64);
|
||||
DECLARE bcfgDone INT;
|
||||
DECLARE cfg_curs CURSOR FOR SELECT cfg_topic.feld,cfg_topic.pos FROM cfg_topic where cfg_topic.minlength is null or cfg_topic.minlength <= (select (length(vtopic) - length(replace(vtopic,'/','')) +1));
|
||||
DECLARE CONTINUE HANDLER FOR NOT FOUND SET bcfgDone = 1;
|
||||
OPEN cfg_curs;
|
||||
REPEAT
|
||||
fetch cfg_curs into feldname,tpos;
|
||||
if (tpos < 0) then
|
||||
select substring_index(substring_index(vtopic,'/',tpos),'/',1) into @feldinhalt;
|
||||
else
|
||||
select substring_index(substring_index(vtopic,'/',tpos),'/',-1) into @feldinhalt;
|
||||
end if;
|
||||
if (select count(*) from topic where topic.tid = newid and topic.feld=feldname) = 0 then
|
||||
insert into topic (tid,feld,inhalt) values (newid,feldname,@feldinhalt);
|
||||
end if;
|
||||
UNTIL bcfgDone END REPEAT;
|
||||
close cfg_curs;
|
||||
end
|
||||
//
|
||||
DELIMITER ;
|
||||
|
||||
|
||||
|
||||
DROP PROCEDURE IF EXISTS import_data;
|
||||
DELIMITER //
|
||||
CREATE PROCEDURE import_data()
|
||||
BEGIN
|
||||
DECLARE bDone INT;
|
||||
DECLARE vtopic VARCHAR(64); -- or approriate type
|
||||
DECLARE curs CURSOR FOR SELECT DISTINCT topic FROM datatmp;
|
||||
DECLARE CONTINUE HANDLER FOR NOT FOUND SET bDone = 1;
|
||||
OPEN curs;
|
||||
SET bDone = 0;
|
||||
SELECT if(max(tid) is null,1,max(tid)+1) into @newid from topic;
|
||||
SELECT @newid;
|
||||
REPEAT
|
||||
FETCH curs INTO vtopic;
|
||||
select vtopic;
|
||||
IF (select max(topic.tid) from topic where feld = 'topic' and inhalt = vtopic) is null THEN
|
||||
call insert_topic(vtopic,@newid);
|
||||
INSERT INTO topic (tid,feld,inhalt) VALUES (@newid,'topic',vtopic);
|
||||
select @newid+1 into @newid;
|
||||
ELSE
|
||||
select max(topic.tid) into @actid from topic where feld='topic' and inhalt = vtopic;
|
||||
END IF;
|
||||
insert into data_storage (time,sensortime,tid,value) select tmp.time,tmp.sensortime,@actid as tid,tmp.value from datatmp tmp where tmp.topic=vtopic;
|
||||
COMMIT;
|
||||
select @actid;
|
||||
UNTIL bDone END REPEAT;
|
||||
CLOSE curs;
|
||||
END
|
||||
//
|
||||
DELIMITER ;
|
||||
|
||||
DROP PROCEDURE IF EXISTS imd;
|
||||
DELIMITER //
|
||||
CREATE PROCEDURE imd()
|
||||
BEGIN
|
||||
create table datatmp as select * from datain;
|
||||
call import_data();
|
||||
delete from datain where datain.time <= (select max(dt.time) from datatmp dt);
|
||||
drop table datatmp;
|
||||
END
|
||||
//
|
||||
DELIMITER ;
|
||||
|
||||
CREATE EVENT import_data ON SCHEDULE EVERY 1 MINUTE DO CALL imd();
|
Loading…
Reference in New Issue