mqtt2sql
parent
432ba37a75
commit
5df762895b
|
@ -0,0 +1,61 @@
|
|||
import paho.mqtt.client as mqtt
|
||||
import json,time
|
||||
import pymysql,dateutil
|
||||
import datetime
|
||||
from dateutil.parser import parse as date_parse
|
||||
|
||||
sqlinsert="insert into datain (time,sensortime,topic,value) values ({0:d},{1:d},'{2}',{3:d})"
|
||||
|
||||
# The callback for when the client receives a CONNACK response from the server.
|
||||
def on_connect(client, userdata, flags, rc):
|
||||
print("Connected with result code "+str(rc))
|
||||
|
||||
# Subscribing in on_connect() means that if we lose the connection and
|
||||
# reconnect then subscriptions will be renewed.
|
||||
client.subscribe("tele/+/STATE")
|
||||
|
||||
# The callback for when a PUBLISH message is received from the server.
|
||||
def on_message(client, userdata, msg):
|
||||
try:
|
||||
mydb=pymysql.connect(read_default_file="~/.my.cnf",database="Sensor")
|
||||
except:
|
||||
print("Could not connect to sql server! Quitting")
|
||||
else:
|
||||
mycursor=mydb.cursor(pymysql.cursors.DictCursor)
|
||||
acttime=int(1000*time.time())
|
||||
jpl=json.loads(msg.payload)
|
||||
ts="sp/"+msg.topic.split("/")[1]
|
||||
# print(ts+" "+str(jpl))
|
||||
str_to_dt=0
|
||||
if 'Time' in jpl:
|
||||
str_to_dt = int(datetime.datetime.strptime(jpl['Time'], '%Y-%m-%dT%H:%M:%S').timestamp())
|
||||
print(acttime)
|
||||
print(str_to_dt)
|
||||
print(jpl)
|
||||
if 'LoadAvg' in jpl:
|
||||
sqlcheck=mycursor.execute(sqlinsert.format(acttime,str_to_dt,ts+'/LoadAvg',jpl['LoadAvg']))
|
||||
print(sqlinsert.format(acttime,str_to_dt,ts+'/LoadAvg',jpl['LoadAvg']))
|
||||
if 'POWER' in jpl:
|
||||
power=0
|
||||
if jpl['POWER']=='OFF':
|
||||
power=1
|
||||
sqlcheck=mycursor.execute(sqlinsert.format(acttime,str_to_dt,ts+'/Power',power))
|
||||
if 'Wifi' in jpl:
|
||||
jplw=jpl['Wifi']
|
||||
if 'RSSI' in jplw:
|
||||
sqlcheck=mycursor.execute(sqlinsert.format(acttime,str_to_dt,ts+'/RSSI',jplw['RSSI']))
|
||||
mycursor.close()
|
||||
mydb.commit()
|
||||
mydb.close()
|
||||
|
||||
client = mqtt.Client()
|
||||
client.on_connect = on_connect
|
||||
client.on_message = on_message
|
||||
|
||||
client.connect("172.24.41.2", 1883, 60)
|
||||
|
||||
# Blocking call that processes network traffic, dispatches callbacks and
|
||||
# handles reconnecting.
|
||||
# Other loop*() functions are available that give a threaded interface and a
|
||||
# manual interface.
|
||||
client.loop_forever()
|
|
@ -0,0 +1,27 @@
|
|||
from http.server import HTTPServer, BaseHTTPRequestHandler
|
||||
from socketserver import ThreadingMixIn
|
||||
import threading
|
||||
|
||||
USE_HTTPS = False
|
||||
|
||||
class Handler(BaseHTTPRequestHandler):
|
||||
|
||||
def do_GET(self):
|
||||
self.send_response(200)
|
||||
self.end_headers()
|
||||
self.wfile.write(b'Hello world\t' + threading.currentThread().getName().encode() + b'\t' + str(threading.active_count()).encode() + b'\n')
|
||||
|
||||
|
||||
class ThreadingSimpleServer(ThreadingMixIn, HTTPServer):
|
||||
pass
|
||||
|
||||
def run():
|
||||
server = ThreadingSimpleServer(('0.0.0.0', 24048), Handler)
|
||||
if USE_HTTPS:
|
||||
import ssl
|
||||
server.socket = ssl.wrap_socket(server.socket, keyfile='./key.pem', certfile='./cert.pem', server_side=True)
|
||||
server.serve_forever()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
run()
|
Loading…
Reference in New Issue