242 lines
7.7 KiB
Python
242 lines
7.7 KiB
Python
import paho.mqtt.client as mqtt
|
|
import json,time
|
|
import pymysql,dateutil
|
|
import datetime
|
|
from dateutil.parser import parse as date_parse
|
|
import threading
|
|
|
|
sqlinsert="insert into datain (time,sensortime,topic,value) values ({0:d},{1:d},'{2}',{3:d})"
|
|
lastvalue={}
|
|
|
|
# check values and insert into database
|
|
def data2sql(acttime,msgtime,topic,value,mycursor):
|
|
bsend=True
|
|
iv=int(value)
|
|
if topic in lastvalue:
|
|
if iv == lastvalue[topic]:
|
|
bsend=False
|
|
if bsend:
|
|
lastvalue[topic]=iv
|
|
# print(sqlinsert.format(acttime,msgtime,topic,iv))
|
|
try:
|
|
sqlcheck=mycursor.execute(sqlinsert.format(acttime,msgtime,topic,iv))
|
|
# print(sqlcheck)
|
|
except Exception as e:
|
|
print(sqlinsert.format(acttime,msgtime,topic,iv))
|
|
print(e)
|
|
|
|
# The callback for when the client receives a CONNACK response from the server.
|
|
def on_connect(client, userdata, flags, rc):
|
|
print("Connected with result code "+str(rc))
|
|
# Subscribing in on_connect() means that if we lose the connection and
|
|
# reconnect then subscriptions will be renewed.
|
|
client.subscribe("tele/+/STATE")
|
|
|
|
# The callback for when a PUBLISH message is received from the server.
|
|
def on_message(client, userdata, msg):
|
|
try:
|
|
mydb=pymysql.connect(read_default_file="~/.my.cnf",database="Sensor")
|
|
except:
|
|
print("Could not connect to sql server! Quitting")
|
|
else:
|
|
mycursor=mydb.cursor(pymysql.cursors.DictCursor)
|
|
acttime=int(1000*time.time())
|
|
jpl=json.loads(msg.payload)
|
|
ts="sp/"+msg.topic.split("/")[1]
|
|
# print(ts+" "+str(jpl))
|
|
str_to_dt=0
|
|
if 'Time' in jpl:
|
|
str_to_dt = int(1000*datetime.datetime.strptime(jpl['Time'], '%Y-%m-%dT%H:%M:%S').timestamp())
|
|
if 'LoadAvg' in jpl:
|
|
try:
|
|
data2sql(acttime,str_to_dt,ts+'/LoadAvg',jpl['LoadAvg'],mycursor)
|
|
except Exception as e:
|
|
print(e)
|
|
if 'POWER' in jpl:
|
|
power=0
|
|
if jpl['POWER']=='OFF':
|
|
power=1
|
|
try:
|
|
data2sql(acttime,str_to_dt,ts+'/PowerStatus',power,mycursor)
|
|
except Exception as e:
|
|
print(e)
|
|
if 'Wifi' in jpl:
|
|
jplw=jpl['Wifi']
|
|
if 'RSSI' in jplw:
|
|
try:
|
|
data2sql(acttime,str_to_dt,ts+'/RSSI',jplw['RSSI'],mycursor)
|
|
except Exception as e:
|
|
print(e)
|
|
if 'Signal' in jplw:
|
|
try:
|
|
data2sql(acttime,str_to_dt,ts+'/Signal',jplw['Signal'],mycursor)
|
|
except Exception as e:
|
|
print(e)
|
|
if 'Channel' in jplw:
|
|
try:
|
|
data2sql(acttime,str_to_dt,ts+'/Channel',jplw['Channel'],mycursor)
|
|
except Exception as e:
|
|
print(e)
|
|
mycursor.close()
|
|
mydb.commit()
|
|
mydb.close()
|
|
|
|
# The callback for when the client receives a CONNACK response from the server.
|
|
def on_connect_sens(client, userdata, flags, rc):
|
|
print("Connected with result code "+str(rc))
|
|
# Subscribing in on_connect() means that if we lose the connection and
|
|
# reconnect then subscriptions will be renewed.
|
|
client.subscribe("tele/+/SENSOR")
|
|
|
|
# The callback for when a PUBLISH message is received from the server.
|
|
def on_message_sens(client, userdata, msg):
|
|
try:
|
|
mydb_sens=pymysql.connect(read_default_file="~/.my.cnf",database="Sensor")
|
|
except:
|
|
print("Could not connect to sql server! Quitting")
|
|
else:
|
|
mycursor_sens=mydb_sens.cursor(pymysql.cursors.DictCursor)
|
|
acttime=int(1000*time.time())
|
|
jpl=json.loads(msg.payload)
|
|
ts="sp/"+msg.topic.split("/")[1]
|
|
# print(ts+" "+str(jpl))
|
|
str_to_dt=0
|
|
if 'Time' in jpl:
|
|
str_to_dt = int(1000*datetime.datetime.strptime(jpl['Time'], '%Y-%m-%dT%H:%M:%S').timestamp())
|
|
if 'ENERGY' in jpl:
|
|
jplw=jpl['ENERGY']
|
|
if 'Power' in jplw:
|
|
try:
|
|
data2sql(acttime,str_to_dt,ts+'/Power',int(1000*jplw['Power']),mycursor_sens)
|
|
except Exception as e:
|
|
print(e)
|
|
if 'ApparentPower' in jplw:
|
|
try:
|
|
data2sql(acttime,str_to_dt,ts+'/ApparentPower',int(1000*jplw['ApparentPower']),mycursor_sens)
|
|
except Exception as e:
|
|
print(e)
|
|
if 'ReactivePower' in jplw:
|
|
try:
|
|
data2sql(acttime,str_to_dt,ts+'/ReactivePower',int(1000*jplw['ReactivePower']),mycursor_sens)
|
|
except Exception as e:
|
|
print(e)
|
|
if 'Factor' in jplw:
|
|
try:
|
|
data2sql(acttime,str_to_dt,ts+'/PowerFactor',int(1000*jplw['Factor']),mycursor_sens)
|
|
except Exception as e:
|
|
print(e)
|
|
if 'Voltage' in jplw:
|
|
try:
|
|
data2sql(acttime,str_to_dt,ts+'/Voltage',int(1000*jplw['Voltage']),mycursor_sens)
|
|
except Exception as e:
|
|
print(e)
|
|
if 'Total' in jplw:
|
|
try:
|
|
data2sql(acttime,str_to_dt,ts+'/Total',int(1000*jplw['Total']),mycursor_sens)
|
|
except Exception as e:
|
|
print(e)
|
|
if 'Current' in jplw:
|
|
try:
|
|
data2sql(acttime,str_to_dt,ts+'/Current',int(1000*jplw['Current']),mycursor_sens)
|
|
except Exception as e:
|
|
print(e)
|
|
if 'Period' in jplw:
|
|
try:
|
|
data2sql(acttime,str_to_dt,ts+'/Period',int(1000*jplw['Period']),mycursor_sens)
|
|
except Exception as e:
|
|
print(e)
|
|
mycursor_sens.close()
|
|
mydb_sens.commit()
|
|
mydb_sens.close()
|
|
|
|
# The callback for when the client receives a CONNACK response from the server.
|
|
def on_connect_sns(client, userdata, flags, rc):
|
|
print("Connected with result code "+str(rc))
|
|
# Subscribing in on_connect() means that if we lose the connection and
|
|
# reconnect then subscriptions will be renewed.
|
|
client.subscribe("tele/+/sens/#")
|
|
|
|
# The callback for when a PUBLISH message is received from the server.
|
|
def on_message_sns(client, userdata, msg):
|
|
try:
|
|
mydb_sns=pymysql.connect(read_default_file="~/.my.cnf",database="Sensor")
|
|
except:
|
|
print("Could not connect to sql server! Quitting")
|
|
else:
|
|
print(int(msg.payload.decode("utf-8")))
|
|
mycursor_sns=mydb_sns.cursor(pymysql.cursors.DictCursor)
|
|
acttime=int(1000*time.time())
|
|
ts=msg.topic.replace("tele","sp")
|
|
print(ts)
|
|
try:
|
|
data2sql(acttime,acttime,ts,int(msg.payload.decode("utf-8")),mycursor_sns)
|
|
except Exception as e:
|
|
print(e)
|
|
mycursor_sns.close()
|
|
mydb_sns.commit()
|
|
mydb_sns.close()
|
|
|
|
# The callback for when the client receives a CONNACK response from the server.
|
|
def on_connect_net(client, userdata, flags, rc):
|
|
print("Connected with result code "+str(rc))
|
|
# Subscribing in on_connect() means that if we lose the connection and
|
|
# reconnect then subscriptions will be renewed.
|
|
client.subscribe("tele/+/net/#")
|
|
|
|
# The callback for when a PUBLISH message is received from the server.
|
|
def on_message_net(client, userdata, msg):
|
|
try:
|
|
mydb_net=pymysql.connect(read_default_file="~/.my.cnf",database="Sensor")
|
|
except:
|
|
print("Could not connect to sql server! Quitting")
|
|
else:
|
|
print(int(msg.payload.decode("utf-8")))
|
|
mycursor_net=mydb_net.cursor(pymysql.cursors.DictCursor)
|
|
acttime=int(1000*time.time())
|
|
ts=msg.topic.replace("tele","sp")
|
|
print(ts)
|
|
try:
|
|
data2sql(acttime,acttime,ts,int(msg.payload.decode("utf-8")),mycursor_net)
|
|
except Exception as e:
|
|
print(e)
|
|
mycursor_net.close()
|
|
mydb_net.commit()
|
|
mydb_net.close()
|
|
|
|
client = mqtt.Client()
|
|
client.on_connect = on_connect
|
|
client.on_message = on_message
|
|
|
|
client_sens = mqtt.Client()
|
|
client_sens.on_connect = on_connect_sens
|
|
client_sens.on_message = on_message_sens
|
|
|
|
#client_sns = mqtt.Client()
|
|
#client_sns.on_connect = on_connect_sns
|
|
#client_sns.on_message = on_message_sns
|
|
|
|
client_net = mqtt.Client()
|
|
client_net.on_connect = on_connect_net
|
|
client_net.on_message = on_message_net
|
|
|
|
client.connect("172.24.41.2", 1883, 60)
|
|
client_sens.connect("172.24.41.2", 1883, 60)
|
|
#client_sns.connect("172.24.41.2", 1883, 60)
|
|
client_net.connect("172.24.41.2", 1883, 60)
|
|
# Blocking call that processes network traffic, dispatches callbacks and
|
|
# handles reconnecting.
|
|
# Other loop*() functions are available that give a threaded interface and a
|
|
# manual interface.
|
|
mq_state=threading.Thread(target=client.loop_forever)
|
|
mq_sens=threading.Thread(target=client_sens.loop_forever)
|
|
#mq_sns=threading.Thread(target=client_sns.loop_forever)
|
|
mq_net=threading.Thread(target=client_net.loop_forever)
|
|
#client_sns.loop_forever()
|
|
#client.loop_forever()
|
|
mq_state.start()
|
|
mq_sens.start()
|
|
#mq_sns.start()
|
|
mq_net.start()
|
|
while True:
|
|
time.sleep(1)
|