pysql/mqtt2sql.py

242 lines
7.7 KiB
Python

import paho.mqtt.client as mqtt
import json,time
import pymysql,dateutil
import datetime
from dateutil.parser import parse as date_parse
import threading
sqlinsert="insert into datain (time,sensortime,topic,value) values ({0:d},{1:d},'{2}',{3:d})"
lastvalue={}
# check values and insert into database
def data2sql(acttime,msgtime,topic,value,mycursor):
bsend=True
iv=int(value)
if topic in lastvalue:
if iv == lastvalue[topic]:
bsend=False
if bsend:
lastvalue[topic]=iv
# print(sqlinsert.format(acttime,msgtime,topic,iv))
try:
sqlcheck=mycursor.execute(sqlinsert.format(acttime,msgtime,topic,iv))
# print(sqlcheck)
except Exception as e:
print(sqlinsert.format(acttime,msgtime,topic,iv))
print(e)
# The callback for when the client receives a CONNACK response from the server.
def on_connect(client, userdata, flags, rc):
print("Connected with result code "+str(rc))
# Subscribing in on_connect() means that if we lose the connection and
# reconnect then subscriptions will be renewed.
client.subscribe("tele/+/STATE")
# The callback for when a PUBLISH message is received from the server.
def on_message(client, userdata, msg):
try:
mydb=pymysql.connect(read_default_file="~/.my.cnf",database="Sensor")
except:
print("Could not connect to sql server! Quitting")
else:
mycursor=mydb.cursor(pymysql.cursors.DictCursor)
acttime=int(1000*time.time())
jpl=json.loads(msg.payload)
ts="sp/"+msg.topic.split("/")[1]
# print(ts+" "+str(jpl))
str_to_dt=0
if 'Time' in jpl:
str_to_dt = int(1000*datetime.datetime.strptime(jpl['Time'], '%Y-%m-%dT%H:%M:%S').timestamp())
if 'LoadAvg' in jpl:
try:
data2sql(acttime,str_to_dt,ts+'/LoadAvg',jpl['LoadAvg'],mycursor)
except Exception as e:
print(e)
if 'POWER' in jpl:
power=0
if jpl['POWER']=='OFF':
power=1
try:
data2sql(acttime,str_to_dt,ts+'/PowerStatus',power,mycursor)
except Exception as e:
print(e)
if 'Wifi' in jpl:
jplw=jpl['Wifi']
if 'RSSI' in jplw:
try:
data2sql(acttime,str_to_dt,ts+'/RSSI',jplw['RSSI'],mycursor)
except Exception as e:
print(e)
if 'Signal' in jplw:
try:
data2sql(acttime,str_to_dt,ts+'/Signal',jplw['Signal'],mycursor)
except Exception as e:
print(e)
if 'Channel' in jplw:
try:
data2sql(acttime,str_to_dt,ts+'/Channel',jplw['Channel'],mycursor)
except Exception as e:
print(e)
mycursor.close()
mydb.commit()
mydb.close()
# The callback for when the client receives a CONNACK response from the server.
def on_connect_sens(client, userdata, flags, rc):
print("Connected with result code "+str(rc))
# Subscribing in on_connect() means that if we lose the connection and
# reconnect then subscriptions will be renewed.
client.subscribe("tele/+/SENSOR")
# The callback for when a PUBLISH message is received from the server.
def on_message_sens(client, userdata, msg):
try:
mydb_sens=pymysql.connect(read_default_file="~/.my.cnf",database="Sensor")
except:
print("Could not connect to sql server! Quitting")
else:
mycursor_sens=mydb_sens.cursor(pymysql.cursors.DictCursor)
acttime=int(1000*time.time())
jpl=json.loads(msg.payload)
ts="sp/"+msg.topic.split("/")[1]
# print(ts+" "+str(jpl))
str_to_dt=0
if 'Time' in jpl:
str_to_dt = int(1000*datetime.datetime.strptime(jpl['Time'], '%Y-%m-%dT%H:%M:%S').timestamp())
if 'ENERGY' in jpl:
jplw=jpl['ENERGY']
if 'Power' in jplw:
try:
data2sql(acttime,str_to_dt,ts+'/Power',int(1000*jplw['Power']),mycursor_sens)
except Exception as e:
print(e)
if 'ApparentPower' in jplw:
try:
data2sql(acttime,str_to_dt,ts+'/ApparentPower',int(1000*jplw['ApparentPower']),mycursor_sens)
except Exception as e:
print(e)
if 'ReactivePower' in jplw:
try:
data2sql(acttime,str_to_dt,ts+'/ReactivePower',int(1000*jplw['ReactivePower']),mycursor_sens)
except Exception as e:
print(e)
if 'Factor' in jplw:
try:
data2sql(acttime,str_to_dt,ts+'/PowerFactor',int(1000*jplw['Factor']),mycursor_sens)
except Exception as e:
print(e)
if 'Voltage' in jplw:
try:
data2sql(acttime,str_to_dt,ts+'/Voltage',int(1000*jplw['Voltage']),mycursor_sens)
except Exception as e:
print(e)
if 'Total' in jplw:
try:
data2sql(acttime,str_to_dt,ts+'/Total',int(1000*jplw['Total']),mycursor_sens)
except Exception as e:
print(e)
if 'Current' in jplw:
try:
data2sql(acttime,str_to_dt,ts+'/Current',int(1000*jplw['Current']),mycursor_sens)
except Exception as e:
print(e)
if 'Period' in jplw:
try:
data2sql(acttime,str_to_dt,ts+'/Period',int(1000*jplw['Period']),mycursor_sens)
except Exception as e:
print(e)
mycursor_sens.close()
mydb_sens.commit()
mydb_sens.close()
# The callback for when the client receives a CONNACK response from the server.
def on_connect_sns(client, userdata, flags, rc):
print("Connected with result code "+str(rc))
# Subscribing in on_connect() means that if we lose the connection and
# reconnect then subscriptions will be renewed.
client.subscribe("tele/+/sens/#")
# The callback for when a PUBLISH message is received from the server.
def on_message_sns(client, userdata, msg):
try:
mydb_sns=pymysql.connect(read_default_file="~/.my.cnf",database="Sensor")
except:
print("Could not connect to sql server! Quitting")
else:
print(int(msg.payload.decode("utf-8")))
mycursor_sns=mydb_sns.cursor(pymysql.cursors.DictCursor)
acttime=int(1000*time.time())
ts=msg.topic.replace("tele","sp")
print(ts)
try:
data2sql(acttime,acttime,ts,int(msg.payload.decode("utf-8")),mycursor_sns)
except Exception as e:
print(e)
mycursor_sns.close()
mydb_sns.commit()
mydb_sns.close()
# The callback for when the client receives a CONNACK response from the server.
def on_connect_net(client, userdata, flags, rc):
print("Connected with result code "+str(rc))
# Subscribing in on_connect() means that if we lose the connection and
# reconnect then subscriptions will be renewed.
client.subscribe("tele/+/net/#")
# The callback for when a PUBLISH message is received from the server.
def on_message_net(client, userdata, msg):
try:
mydb_net=pymysql.connect(read_default_file="~/.my.cnf",database="Sensor")
except:
print("Could not connect to sql server! Quitting")
else:
print(int(msg.payload.decode("utf-8")))
mycursor_net=mydb_net.cursor(pymysql.cursors.DictCursor)
acttime=int(1000*time.time())
ts=msg.topic.replace("tele","sp")
print(ts)
try:
data2sql(acttime,acttime,ts,int(msg.payload.decode("utf-8")),mycursor_net)
except Exception as e:
print(e)
mycursor_net.close()
mydb_net.commit()
mydb_net.close()
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client_sens = mqtt.Client()
client_sens.on_connect = on_connect_sens
client_sens.on_message = on_message_sens
#client_sns = mqtt.Client()
#client_sns.on_connect = on_connect_sns
#client_sns.on_message = on_message_sns
client_net = mqtt.Client()
client_net.on_connect = on_connect_net
client_net.on_message = on_message_net
client.connect("172.24.41.2", 1883, 60)
client_sens.connect("172.24.41.2", 1883, 60)
#client_sns.connect("172.24.41.2", 1883, 60)
client_net.connect("172.24.41.2", 1883, 60)
# Blocking call that processes network traffic, dispatches callbacks and
# handles reconnecting.
# Other loop*() functions are available that give a threaded interface and a
# manual interface.
mq_state=threading.Thread(target=client.loop_forever)
mq_sens=threading.Thread(target=client_sens.loop_forever)
#mq_sns=threading.Thread(target=client_sns.loop_forever)
mq_net=threading.Thread(target=client_net.loop_forever)
#client_sns.loop_forever()
#client.loop_forever()
mq_state.start()
mq_sens.start()
#mq_sns.start()
mq_net.start()
while True:
time.sleep(1)