From a53024bf182eb5a6e590195e7184afc50e926e5a Mon Sep 17 00:00:00 2001 From: ademant Date: Sun, 17 Nov 2019 22:13:21 +0100 Subject: [PATCH] finished first version of mqtt2sql --- mqtt2sql.py | 109 ++++++++++- pysqltq.py | 242 ++++++++++++----------- pyweb.py | 549 ++++++++++++++++++++++++++-------------------------- 3 files changed, 494 insertions(+), 406 deletions(-) diff --git a/mqtt2sql.py b/mqtt2sql.py index 7044e7d..1c3cc26 100644 --- a/mqtt2sql.py +++ b/mqtt2sql.py @@ -3,13 +3,27 @@ import json,time import pymysql,dateutil import datetime from dateutil.parser import parse as date_parse +import threading sqlinsert="insert into datain (time,sensortime,topic,value) values ({0:d},{1:d},'{2}',{3:d})" +lastvalue={} + +# check values and insert into database +def data2sql(acttime,msgtime,topic,value,mycursor): + bsend=True + if topic in lastvalue: + if value == lastvalue[topic]: + bsend=False + if bsend: + lastvalue[topic]=value + try: + sqlcheck=mycursor.execute(sqlinsert.format(acttime,msgtime,topic,value)) + except Exception as e: + print(e) # The callback for when the client receives a CONNACK response from the server. def on_connect(client, userdata, flags, rc): print("Connected with result code "+str(rc)) - # Subscribing in on_connect() means that if we lose the connection and # reconnect then subscriptions will be renewed. client.subscribe("tele/+/STATE") @@ -28,34 +42,109 @@ def on_message(client, userdata, msg): # print(ts+" "+str(jpl)) str_to_dt=0 if 'Time' in jpl: - str_to_dt = int(datetime.datetime.strptime(jpl['Time'], '%Y-%m-%dT%H:%M:%S').timestamp()) - print(acttime) - print(str_to_dt) - print(jpl) + str_to_dt = int(1000*datetime.datetime.strptime(jpl['Time'], '%Y-%m-%dT%H:%M:%S').timestamp()) if 'LoadAvg' in jpl: - sqlcheck=mycursor.execute(sqlinsert.format(acttime,str_to_dt,ts+'/LoadAvg',jpl['LoadAvg'])) - print(sqlinsert.format(acttime,str_to_dt,ts+'/LoadAvg',jpl['LoadAvg'])) + try: + data2sql(acttime,str_to_dt,ts+'/LoadAvg',jpl['LoadAvg'],mycursor) + except Exception as e: + print(e) if 'POWER' in jpl: power=0 if jpl['POWER']=='OFF': power=1 - sqlcheck=mycursor.execute(sqlinsert.format(acttime,str_to_dt,ts+'/Power',power)) + try: + data2sql(acttime,str_to_dt,ts+'/Power',power,mycursor) + except Exception as e: + print(e) if 'Wifi' in jpl: jplw=jpl['Wifi'] if 'RSSI' in jplw: - sqlcheck=mycursor.execute(sqlinsert.format(acttime,str_to_dt,ts+'/RSSI',jplw['RSSI'])) + try: + data2sql(acttime,str_to_dt,ts+'/RSSI',jplw['RSSI'],mycursor) + except Exception as e: + print(e) mycursor.close() mydb.commit() mydb.close() +# The callback for when the client receives a CONNACK response from the server. +def on_connect_sens(client, userdata, flags, rc): + print("Connected with result code "+str(rc)) + # Subscribing in on_connect() means that if we lose the connection and + # reconnect then subscriptions will be renewed. + client.subscribe("tele/+/SENSOR") + +# The callback for when a PUBLISH message is received from the server. +def on_message_sens(client, userdata, msg): + try: + mydb_sens=pymysql.connect(read_default_file="~/.my.cnf",database="Sensor") + except: + print("Could not connect to sql server! Quitting") + else: + mycursor_sens=mydb_sens.cursor(pymysql.cursors.DictCursor) + acttime=int(1000*time.time()) + jpl=json.loads(msg.payload) + ts="sp/"+msg.topic.split("/")[1] +# print(ts+" "+str(jpl)) + str_to_dt=0 + if 'Time' in jpl: + str_to_dt = int(1000*datetime.datetime.strptime(jpl['Time'], '%Y-%m-%dT%H:%M:%S').timestamp()) + if 'ENERGY' in jpl: + jplw=jpl['ENERGY'] + if 'Power' in jplw: + try: + data2sql(acttime,str_to_dt,ts+'/Power',int(1000*jplw['Power']),mycursor_sens) + except Exception as e: + print(e) + if 'ApparentPower' in jplw: + try: + data2sql(acttime,str_to_dt,ts+'/ApparentPower',int(1000*jplw['ApparentPower']),mycursor_sens) + except Exception as e: + print(e) + if 'ReactivePower' in jplw: + try: + data2sql(acttime,str_to_dt,ts+'/ReactivePower',int(1000*jplw['ReactivePower']),mycursor_sens) + except Exception as e: + print(e) + if 'Factor' in jplw: + try: + data2sql(acttime,str_to_dt,ts+'/PowerFactor',int(1000*jplw['Factor']),mycursor_sens) + except Exception as e: + print(e) + if 'Voltags' in jplw: + try: + data2sql(acttime,str_to_dt,ts+'/Voltage',int(1000*jplw['Voltage']),mycursor_sens) + except Exception as e: + print(e) + if 'Current' in jplw: + try: + data2sql(acttime,str_to_dt,ts+'/Current',int(1000*jplw['Current']),mycursor_sens) + except Exception as e: + print(e) + mycursor_sens.close() + mydb_sens.commit() + mydb_sens.close() + client = mqtt.Client() client.on_connect = on_connect client.on_message = on_message client.connect("172.24.41.2", 1883, 60) +client_sens = mqtt.Client() +client_sens.on_connect = on_connect_sens +client_sens.on_message = on_message_sens + +client.connect("172.24.41.2", 1883, 60) +client_sens.connect("172.24.41.2", 1883, 60) # Blocking call that processes network traffic, dispatches callbacks and # handles reconnecting. # Other loop*() functions are available that give a threaded interface and a # manual interface. -client.loop_forever() +mq_state=threading.Thread(target=client.loop_forever) +mq_sens=threading.Thread(target=client_sens.loop_forever) +#client.loop_forever() +mq_state.start() +mq_sens.start() +while True: + time.sleep(1) diff --git a/pysqltq.py b/pysqltq.py index 79c0565..5b71328 100644 --- a/pysqltq.py +++ b/pysqltq.py @@ -1,6 +1,6 @@ -import socket,threading,socketserver,json,time +import socket,threading,SocketServer,json,time import pymysql -from queue import Queue +from Queue import Queue def sql_get_dict(mycursor,mytable,myvar): mycursor.execute("select * from "+str(mytable)) @@ -11,130 +11,128 @@ def sql_get_dict(mycursor,mytable,myvar): return(tout) def sql_insert(q): - try: - mydb=pymysql.connect(read_default_file="~/.my.cnf",database="rasolar") - except: - print("Could not connect to sql server! Quitting") - else: - mycursor=mydb.cursor(pymysql.cursors.DictCursor) - sqlinsert="insert into data_storage (time,device_id,var_id,sensor_id,i2c,value) values ({0:d},{1:d},{2:d},{3:d},{4:d},{5:d})" - # get variable id out of sql - svar=sql_get_dict(mycursor,"var_id","var") - sdev=sql_get_dict(mycursor,"device_id","device") - ssens=sql_get_dict(mycursor,"sensor_id","sensor") - sqldata=[] - while True: - if q.empty(): - time.sleep(0.1) - else: - try: - indata=q.get() - if indata is not None: - q.task_done() - except Exception as e: - print("Error during queuing") - print(e) - else: - if indata is not None: - if indata['device'] not in sdev: - mycursor.execute("insert into device_id (device) values('"+str(indata['device'])+"')") - mydb.commit() - sdev=sql_get_dict(mycursor,"device_id","device") - if indata['sensor'] not in ssens: - mycursor.execute("insert into sensor_id (sensor) values('"+str(indata['sensor'])+"')") - mydb.commit() - ssens=sql_get_dict(mycursor,"sensor_id","sensor") - if indata['varname'] not in svar: - mycursor.execute("insert into var_id (var) values('"+str(indata['varname'])+"')") - mydb.commit() - svar=sql_get_dict(mycursor,"var_id","var") - try: - mycursor.execute(sqlinsert.format(indata['time'],sdev[indata['device']],svar[indata['varname']],ssens[indata['sensor']],indata['i2c'],indata['value'])) - except: - print("Eror in execute sql insert") - print(sqlinsert.format(indata['time'],sdev[indata['device']],svar[indata['varname']],ssens[indata['sensor']],indata['i2c'],indata['value']),file="sql_missed.txt") - else: - mydb.commit() - - -class ThreadedTCPRequestHandler(socketserver.StreamRequestHandler): - def __init__(self, request, client_address, server): - self.queue = server.queue - socketserver.StreamRequestHandler.__init__(self, request, client_address, server) - def handle(self): - indata = str(self.request.recv(1024), 'ascii') - cur_thread = threading.current_thread() - #indata=self.data - bjson=True - # try if indata is in json format. - # only process indata, if in json + try: + mydb=pymysql.connect(read_default_file="~/.my.cnf",database="rasolar") + except: + print("Could not connect to sql server! Quitting") + else: + mycursor=mydb.cursor(pymysql.cursors.DictCursor) + sqlinsert="insert into data_storage (time,device_id,var_id,sensor_id,i2c,value) values ({0:d},{1:d},{2:d},{3:d},{4:d},{5:d})" + # get variable id out of sql + svar=sql_get_dict(mycursor,"var_id","var") + sdev=sql_get_dict(mycursor,"device_id","device") + ssens=sql_get_dict(mycursor,"sensor_id","sensor") + sqldata=[] + while True: + if q.empty(): + time.sleep(0.1) + else: try: - test=json.loads(indata) - except: - bjson=False + indata=q.get() + if indata is not None: + q.task_done() + except Exception as e: + print("Error during queuing") + print(e) else: - # indata must have a payload entry - if "payload" in test: - # get credential for sql server and open connection - # only process if sql connection could be open + if indata is not None: + if indata['device'] not in sdev: + mycursor.execute("insert into device_id (device) values('"+str(indata['device'])+"')") + mydb.commit() + sdev=sql_get_dict(mycursor,"device_id","device") + if indata['sensor'] not in ssens: + mycursor.execute("insert into sensor_id (sensor) values('"+str(indata['sensor'])+"')") + mydb.commit() + ssens=sql_get_dict(mycursor,"sensor_id","sensor") + if indata['varname'] not in svar: + mycursor.execute("insert into var_id (var) values('"+str(indata['varname'])+"')") + mydb.commit() + svar=sql_get_dict(mycursor,"var_id","var") - datasource=self.client_address[0] - if "device" in test: - datasource=test['device'].translate(' ./:;*|') - datasource=datasource[:64] - multi=1 - if "mult" in test: - multi=int(test['mult']) - payload=test['payload'] - for x,y in payload.items(): - # remove unwanted characters from variable name - varx=x.translate(' ./:;*!') - varx=varx[:64] - value=0 - if "value" in y: - value=int(y['value']) - sensor="" - if "time" in y: - datatime=int(y['time']) - else: - datatime=int(1000*time.time()) - if "sensor" in y: - sensor=y['sensor'] - sensor=sensor.translate(' ./:;*!') - sensor=sensor[:32] - i2c=0 - if "i2c" in y: - i2c=int(y['i2c']) - q.put({"time":datatime,"device":datasource,"varname":varx,"sensor":sensor,"i2c":i2c,"value":int(multi*value)},block=False) + txt_sql=sqlinsert.format(indata['time'],sdev[indata['device']],svar[indata['varname']],ssens[indata['sensor']],indata['i2c'],indata['value']) + try: + mycursor.execute(txt_sql) + except: + print("Eror in execute sql insert") + errlog=open("sql_missed.txt","a") + errlog.write(txt_sql) + errlog.close() + else: + mydb.commit() -class ThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer): - def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True,queue=None): - self.queue = queue - socketserver.TCPServer.__init__(self, server_address, RequestHandlerClass, + +class ThreadedTCPRequestHandler(SocketServer.StreamRequestHandler): + def __init__(self, request, client_address, server): + self.queue = server.queue + socketserver.StreamRequestHandler.__init__(self, request, client_address, server) + def handle(self): + indata = str(self.request.recv(1024), 'ascii') + cur_thread = threading.current_thread() + #indata=self.data + bjson=True + # try if indata is in json format. + # only process indata, if in json + try: + test=json.loads(indata) + except: + bjson=False + else: + # indata must have a payload entry + if "payload" in test: + # get credential for sql server and open connection + # only process if sql connection could be open + datasource=self.client_address[0] + if "device" in test: + datasource=test['device'].translate(' ./:;*|') + datasource=datasource[:64] + multi=1 + if "mult" in test: + multi=int(test['mult']) + payload=test['payload'] + for x,y in payload.items(): + # remove unwanted characters from variable name + varx=x.translate(' ./:;*!') + varx=varx[:64] + value=0 + if "value" in y: + value=int(y['value']) + sensor="" + if "time" in y: + datatime=int(y['time']) + else: + datatime=int(1000*time.time()) + if "sensor" in y: + sensor=y['sensor'] + sensor=sensor.translate(' ./:;*!') + sensor=sensor[:32] + i2c=0 + if "i2c" in y: + i2c=int(y['i2c']) + q.put({"time":datatime,"device":datasource,"varname":varx,"sensor":sensor,"i2c":i2c,"value":int(multi*value)},block=False) + +class ThreadedTCPServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer): + def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True,queue=None): + self.queue = queue + SocketServer.TCPServer.__init__(self, server_address, RequestHandlerClass, bind_and_activate=bind_and_activate) if __name__ == "__main__": - - q=Queue(maxsize=0) - - sql_worker=threading.Thread(target=sql_insert,args=(q,)) - sql_worker.setDaemon(True) - sql_worker.start() - - # Port 0 means to select an arbitrary unused port - - HOST, PORT = "", 24048 - server = ThreadedTCPServer((HOST, PORT), ThreadedTCPRequestHandler,queue=q) - - # Start a thread with the server -- that thread will then start one - # more thread for each request - server_thread = threading.Thread(target=server.serve_forever,args=(q,)) - # Exit the server thread when the main thread terminates - server_thread.daemon = True - server_thread.start() - print("Server loop running in thread:", server_thread.name) - - server.serve_forever() - - server.shutdown() - server.server_close() + q=Queue(maxsize=0) + sql_worker=threading.Thread(target=sql_insert,args=(q,)) + sql_worker.setDaemon(True) + sql_worker.start() +# Port 0 means to select an arbitrary unused port + HOST, PORT = "", 24048 + server = ThreadedTCPServer((HOST, PORT), ThreadedTCPRequestHandler,queue=q) + server.timeout=None + # Start a thread with the server -- that thread will then start one + # more thread for each request + server_thread = threading.Thread(target=server.serve_forever,args=(q,)) + # Exit the server thread when the main thread terminates + server_thread.daemon = True + server_thread.timeout=None + server_thread.start() + print("Server loop running in thread:", server_thread.name) + server.serve_forever() + server.shutdown() + server.server_close() diff --git a/pyweb.py b/pyweb.py index 2af4a11..11cd5cf 100755 --- a/pyweb.py +++ b/pyweb.py @@ -10,9 +10,9 @@ pathname = os.path.dirname(sys.argv[0]) abspath=os.path.abspath(pathname) configfile=abspath+"/config.json" try: - cf=open(configfile,"r") + cf=open(configfile,"r") except: - cf=open(configfile+".template","r") + cf=open(configfile+".template","r") log_conf=json.load(cf) cf.close() @@ -24,28 +24,28 @@ for n in parameter: if "sqlserver" in log_conf: hostname="banana" if "host" in log_conf['sqlserver']: - hostname=log_conf['sqlserver']['host'] + hostname=log_conf['sqlserver']['host'] port=24049 if "port" in log_conf['sqlserver']: - port=int(log_conf['sqlserver']['port']) + port=int(log_conf['sqlserver']['port']) clientlist=parameter['allowed_ip'] try: - mydb=pymysql.connect(read_default_file="~/.my.cnf",database="rasolar") + mydb=pymysql.connect(read_default_file="~/.my.cnf",database="rasolar") except: - clientlist=parameter['allowed_ip'] + clientlist=parameter['allowed_ip'] else: - mycursor=mydb.cursor(pymysql.cursors.DictCursor) - mycursor.execute("select ip,keyid from clients;") - myclientlist=mycursor.fetchall() - if len(myclientlist)>0: - tcl={} - for i in myclientlist: - if len(gpg.list_keys(i['keyid']))>0: - tcl[i['ip']]=i['keyid'] - if len(tcl)>0: - clientlist=tcl - + mycursor=mydb.cursor(pymysql.cursors.DictCursor) + mycursor.execute("select ip,keyid from clients;") + myclientlist=mycursor.fetchall() + if len(myclientlist)>0: + tcl={} + for i in myclientlist: + if len(gpg.list_keys(i['keyid']))>0: + tcl[i['ip']]=i['keyid'] + if len(tcl)>0: + clientlist=tcl + #sqlinsert="insert into measures (time,id,value) values ({0:d},{1:d},{2:d})" sqlinsert="insert into measures (time,id,value) values " hsqlinsert="insert into hourly_measures (time,id,value) values " @@ -67,105 +67,106 @@ _JSONSIGNEDDATA="signed_data" _JSONENCRYPTDATA="encrypted_data" def analyse_jsonin(json_in,hash_id): - measdata={} - if _JSONDATA in json_in: - measdata=json_in[_JSONDATA] - if _JSONSIGNEDDATA in json_in: - vpgp=gpg.verify(json_in[_JSONSIGNEDDATA]) - if hash_id != vpgp.key_id: - print("signature does not fit hash id") - else: - signed_in=json_in[_JSONSIGNEDDATA].split("\n") - signed_in[signed_in.index(_BEGINSIGNATURE):]="" - del signed_in[signed_in.index(_BEGINMESSAGE)] - del signed_in[signed_in.index("")] - for h in signed_in: - if _BEGINHASH in h: - del signed_in[signed_in.index(h)] - if len(signed_in)>0: - measdata=json.loads(signed_in[0]) - if _JSONENCRYPTDATA in json_in: - dpgp=gpg.decrypt(json_in[_JSONENCRYPTDATA]) - if hash_id != dpgp.key_id: - print("signature of encrypted data does not fit hash id") - else: - measdata=json.loads(dpgp.data) - if len(measdata)==0: - print("no data available") - else: - _thread.start_new_thread(insert_sql,(measdata,)) + measdata={} + if _JSONDATA in json_in: + measdata=json_in[_JSONDATA] + if _JSONSIGNEDDATA in json_in: + vpgp=gpg.verify(json_in[_JSONSIGNEDDATA]) + if hash_id != vpgp.key_id: + print("signature does not fit hash id") + else: + signed_in=json_in[_JSONSIGNEDDATA].split("\n") + signed_in[signed_in.index(_BEGINSIGNATURE):]="" + del signed_in[signed_in.index(_BEGINMESSAGE)] + del signed_in[signed_in.index("")] + for h in signed_in: + if _BEGINHASH in h: + del signed_in[signed_in.index(h)] + if len(signed_in)>0: + measdata=json.loads(signed_in[0]) + if _JSONENCRYPTDATA in json_in: + dpgp=gpg.decrypt(json_in[_JSONENCRYPTDATA]) + if hash_id != dpgp.key_id: + print("signature of encrypted data does not fit hash id") + else: + measdata=json.loads(dpgp.data) + if len(measdata)==0: + print("no data available") + else: + _thread.start_new_thread(insert_sql,(measdata,)) def insert_sql(measdata): -# tsi=sqlinsert - sqlu=[] - tsi="" - tsi_count=0 - atsi="" - for h in measdata: # iterate over each variable stored in json - md=measdata[h] - sqlu.append("delete from last_measures where id = '"+str(h)+"';") - mmax=max(md['measures']) - atsi=atsi+'('+str(mmax)+','+h+','+str(md['measures'][mmax])+'),' - for m in md['measures']: # iterate over each measurement for given variable - try: - stime=int(m) - except: - print("wrong entry") - else: - ttsi='('+str(m)+','+h+','+str(md['measures'][m])+'),' - tsi=tsi+ttsi - tsi_count=tsi_count+1 - tsi=tsi[:-1]+';' - atsi=atsi[:-1]+';' - try: - mydb=pymysql.connect(read_default_file="~/.my.cnf",database="rasolar") - except: - print("could not connect to mysql") - with open("missed.sql","a") as sqlstore: - sqlstore.write(tsi) - else: - mycursor=mydb.cursor(pymysql.cursors.DictCursor) - for h in measdata: - mycursor.execute('select id from ids where id='+h+';') - t=mycursor.fetchall() - if len(t)==0: - hsql='insert into ids (id,device,varname,sensor,sensorsub,i2c) values ('+h+',' - hm=measdata[h] - ht={} - for i in ['device','varname','sensor','sensorsub']: - if i in hm: - hsql=hsql+"'"+hm[i]+"'"+',' - else: - hsql=hsql+"\'\'," - if 'i2c' in hm: - try: - hsql=hsql+"'"+str(hm['i2c'])+"'" - except: - hsql=hsql+"'0'" - else: - hsql=hsql+0 - hsql=hsql+');' - try: - mycursor.execute(hsql) - except: - print("could not insert new var_id") - print(hsql) - for dsql in sqlu: - mycursor.execute(dsql) - mycursor.execute(sqlinsert+tsi) - mycursor.execute(hsqlinsert+tsi) - mycursor.execute(dsqlinsert+tsi) - mycursor.execute(asqlinsert+atsi) - delhtime=int((time.time()-3600)*1000) - deldtime=int((time.time()-86400)*1000) - mycursor.execute("delete from hourly_measures where time < "+str(delhtime)) - mycursor.execute("delete from daily_measures where time < "+str(deldtime)) - print(str(tsi_count)+" new measures inserted") - mycursor.close() - mydb.commit() - mydb.close() -# print(tsi) -# print(measdata) +# tsi=sqlinsert + sqlu=[] + tsi="" + tsi_count=0 + atsi="" + for h in measdata: # iterate over each variable stored in json + md=measdata[h] + sqlu.append("delete from last_measures where id = '"+str(h)+"';") + mmax=max(md['measures']) + atsi=atsi+'('+str(mmax)+','+h+','+str(md['measures'][mmax])+'),' + for m in md['measures']: # iterate over each measurement for given variable + try: + stime=int(m) + except: + print("wrong entry") + else: + ttsi='('+str(m)+','+h+','+str(md['measures'][m])+'),' + tsi=tsi+ttsi + tsi_count=tsi_count+1 + tsi=tsi[:-1]+';' + atsi=atsi[:-1]+';' + try: + mydb=pymysql.connect(read_default_file="~/.my.cnf",database="rasolar") + except: + print("could not connect to mysql") + with open("missed.sql","a") as sqlstore: + sqlstore.write(tsi) + else: + mycursor=mydb.cursor(pymysql.cursors.DictCursor) + for h in measdata: + mycursor.execute('select id from ids where id='+h+';') + t=mycursor.fetchall() + if len(t)==0: + hsql='insert into ids (id,device,varname,sensor,sensorsub,i2c) values ('+h+',' + hm=measdata[h] + ht={} + for i in ['device','varname','sensor','sensorsub']: + if i in hm: + hsql=hsql+"'"+hm[i]+"'"+',' + else: + hsql=hsql+"\'\'," + if 'i2c' in hm: + try: + hsql=hsql+"'"+str(hm['i2c'])+"'" + except: + hsql=hsql+"'0'" + else: + hsql=hsql+0 + hsql=hsql+');' + try: + mycursor.execute(hsql) + except: + print("could not insert new var_id") + print(hsql) + for dsql in sqlu: + mycursor.execute(dsql) + mycursor.execute(sqlinsert+tsi) + print(sqlinsert+tsi) + mycursor.execute(hsqlinsert+tsi) + mycursor.execute(dsqlinsert+tsi) + mycursor.execute(asqlinsert+atsi) + delhtime=int((time.time()-3600)*1000) + deldtime=int((time.time()-86400)*1000) + mycursor.execute("delete from hourly_measures where time < "+str(delhtime)) + mycursor.execute("delete from daily_measures where time < "+str(deldtime)) + print(str(tsi_count)+" new measures inserted") + mycursor.close() + mydb.commit() + mydb.close() +# print(tsi) +# print(measdata) @@ -175,198 +176,198 @@ app=Bottle() @app.get('/') def approot(): - return ''' - Uebersicht
- Solarübersicht
- CPU-Temperaturen
- ''' + return ''' + Uebersicht
+ Solarübersicht
+ CPU-Temperaturen
+ ''' @app.get('/ids') def show_ids(): - mydb=pymysql.connect(read_default_file="~/.my.cnf",database="rasolar") - mycursor=mydb.cursor(pymysql.cursors.DictCursor) - starttime=time.time() - mycursor.execute('select gm.*,last_measures.value,ids.varname,ids.device,ids.sensor,ids.i2c,ids.sensorsub from (select id, max(time) as time,count(time) as count from last_measures group by id) gm join last_measures on last_measures.id=gm.id and last_measures.time=gm.time join ids on ids.id=gm.id;') - print("ids sql duration:"+str(time.time()-starttime)) - row=mycursor.fetchall() - mycursor.close() - mydb.close() - return template('ids.tpl',measdata=row) + mydb=pymysql.connect(read_default_file="~/.my.cnf",database="rasolar") + mycursor=mydb.cursor(pymysql.cursors.DictCursor) + starttime=time.time() + mycursor.execute('select gm.*,last_measures.value,ids.varname,ids.device,ids.sensor,ids.i2c,ids.sensorsub from (select id, max(time) as time,count(time) as count from last_measures group by id) gm join last_measures on last_measures.id=gm.id and last_measures.time=gm.time join ids on ids.id=gm.id;') + print("ids sql duration:"+str(time.time()-starttime)) + row=mycursor.fetchall() + mycursor.close() + mydb.close() + return template('ids.tpl',measdata=row) @app.get('/solar') def show_solar(): - mydb=pymysql.connect(read_default_file="~/.my.cnf",database="rasolar") - myc=mydb.cursor() - myc.execute("select gm.time,last_measures.value/1000,ids.varname from (select id, max(time) as time from last_measures group by id) gm join last_measures on last_measures.id=gm.id and last_measures.time=gm.time join ids on ids.id=gm.id where ids.sensor='tristar';") - ts=myc.fetchall() - myc.close() - mydb.close() - tt={"kwh_tot":0,"volt_bat":0,"volt_bat_sens":0,"volt_sweep_mc":0,"volt_sweep_oc":0,"amp_bat":0,"temp_heatsink":0,"temp_bat":0} - zs=0 - for tims,value,varname in ts: - tt[varname]=value - if int(tims)>zs: - zs=int(tims) - return template('solar.tpl',solardata=tt,zeitstempel=time.strftime('%H:%M:%S %Y-%m-%d', time.localtime(zs/1000))) + mydb=pymysql.connect(read_default_file="~/.my.cnf",database="rasolar") + myc=mydb.cursor() + myc.execute("select gm.time,last_measures.value/1000,ids.varname from (select id, max(time) as time from last_measures group by id) gm join last_measures on last_measures.id=gm.id and last_measures.time=gm.time join ids on ids.id=gm.id where ids.sensor='tristar';") + ts=myc.fetchall() + myc.close() + mydb.close() + tt={"kwh_tot":0,"volt_bat":0,"volt_bat_sens":0,"volt_sweep_mc":0,"volt_sweep_oc":0,"amp_bat":0,"temp_heatsink":0,"temp_bat":0} + zs=0 + for tims,value,varname in ts: + tt[varname]=value + if int(tims)>zs: + zs=int(tims) + return template('solar.tpl',solardata=tt,zeitstempel=time.strftime('%H:%M:%S %Y-%m-%d', time.localtime(zs/1000))) @app.get('/temp') def show_temperature(): - mydb=pymysql.connect(read_default_file="~/.my.cnf",database="rasolar") - myc=mydb.cursor() - myc.execute("select gm.time,last_measures.value/1000,ids.varname,ids.device,short_names.short from (select id, max(time) as time from last_measures group by id) gm join last_measures on last_measures.id=gm.id and last_measures.time=gm.time join ids on ids.id=gm.id join short_names on short_names.id=gm.id;") - ts=myc.fetchall() - myc.close() - mydb.close() - tt={"temp_garten":0,"temp_dach":0,"temp_carport":0,"hum_garten":0,"hum_dach":0,"hum_carport":0,"press_garten":0,"press_dach":0,"press_carport":0,"lux_dach":0,"uv_dach":0,"temp_solar":0,"temp_bat":0,"temp_wohnen":0,"hum_wohnen":0,"press_wohnen":0} - zs=0 - thp={"rasolar":{"temperature":0,"humidity":0,"pressure":0,"time":0},"ragps":{"temperature":0,"humidity":0,"pressure":0,"time":0},"ragarden":{"temperature":0,"humidity":0,"pressure":0,"time":0}} - - for tims,value,varname,device,short in ts: - tt[short]=value - try: - thp[device][varname]=value - except: - thp[device]={varname:value} - try: - if thp[device]['time']zs: - zs=int(tims) - return template('temperature.tpl',tempdata=tt,zeitstempel=time.strftime('%H:%M:%S %Y-%m-%d', time.localtime(zs/1000))) + mydb=pymysql.connect(read_default_file="~/.my.cnf",database="rasolar") + myc=mydb.cursor() + myc.execute("select gm.time,last_measures.value/1000,ids.varname,ids.device,short_names.short from (select id, max(time) as time from last_measures group by id) gm join last_measures on last_measures.id=gm.id and last_measures.time=gm.time join ids on ids.id=gm.id join short_names on short_names.id=gm.id;") + ts=myc.fetchall() + myc.close() + mydb.close() + tt={"temp_garten":0,"temp_dach":0,"temp_carport":0,"hum_garten":0,"hum_dach":0,"hum_carport":0,"press_garten":0,"press_dach":0,"press_carport":0,"lux_dach":0,"uv_dach":0,"temp_solar":0,"temp_bat":0,"temp_wohnen":0,"hum_wohnen":0,"press_wohnen":0} + zs=0 + thp={"rasolar":{"temperature":0,"humidity":0,"pressure":0,"time":0},"ragps":{"temperature":0,"humidity":0,"pressure":0,"time":0},"ragarden":{"temperature":0,"humidity":0,"pressure":0,"time":0}} + + for tims,value,varname,device,short in ts: + tt[short]=value + try: + thp[device][varname]=value + except: + thp[device]={varname:value} + try: + if thp[device]['time']zs: + zs=int(tims) + return template('temperature.tpl',tempdata=tt,zeitstempel=time.strftime('%H:%M:%S %Y-%m-%d', time.localtime(zs/1000))) @app.get('/clients') def show_clients(): - mydb=pymysql.connect(read_default_file="~/.my.cnf",database="rasolar") - myc=mydb.cursor() - no_clients=myc.execute('select distinct ids.device from ids join hourly_measures on hourly_measures.id=ids.id;') - if no_clients > 0: - t_cl=myc.fetchall() - client_list=[] - for cl in t_cl: - client_list.append(cl[0]) - return template('clients.tpl',clientdata=client_list) - else: - return ''' - - Übersicht Tristar - -

Aktuell keine Rechner verbunden

- - ''' + mydb=pymysql.connect(read_default_file="~/.my.cnf",database="rasolar") + myc=mydb.cursor() + no_clients=myc.execute('select distinct ids.device from ids join hourly_measures on hourly_measures.id=ids.id;') + if no_clients > 0: + t_cl=myc.fetchall() + client_list=[] + for cl in t_cl: + client_list.append(cl[0]) + return template('clients.tpl',clientdata=client_list) + else: + return ''' + + Übersicht Tristar + +

Aktuell keine Rechner verbunden

+ + ''' @app.get('/client/') def forward_client(client): - response=requests.get('http://'+client+':8080/') - print(requests.get('http://'+client+':8080/')) - if response.status_code == 200: - return response.content - else: - return ''' - - Keine Verbindung - -

Kein Anschluss unter dieser Himbeere

- - ''' + response=requests.get('http://'+client+':8080/') + print(requests.get('http://'+client+':8080/')) + if response.status_code == 200: + return response.content + else: + return ''' + + Keine Verbindung + +

Kein Anschluss unter dieser Himbeere

+ + ''' @app.get('/cpu') def show_ids(): - starttime=time.time() - mydb=pymysql.connect(read_default_file="~/.my.cnf",database="rasolar") - myc=mydb.cursor(pymysql.cursors.DictCursor) - myc.execute('select * from ids where sensor="CPU";') - channels=myc.fetchall() - myc.close() - cpu_col=cm.rainbow(numpy.linspace(0,1,len(channels))) - plt.figure(figsize=[6,8]) - mycursor=mydb.cursor() - for j in range(len(channels)): - mycursor.execute('select ((select max(time) from hourly_measures)-hourly_measures.time)/1000 as time,hourly_measures.value/1000 as value from hourly_measures join ids on ids.id=hourly_measures.id where ids.sensor="CPU" and ids.id='+str(channels[j]['id'])+';') - row=numpy.array(mycursor.fetchall()) - plt.plot(row[:,0],row[:,1],color=cpu_col[j],label=channels[j]['varname']+'; '+channels[j]['device']) - plt.legend() - print(str(time.time()-starttime)+"s for fetching and display") - plt.title('CPU Verlauf') - plt.savefig("svg/cpu.svg") - mycursor.close() - mydb.close() - if len(row)>0: - return template('cputemp.tpl',measdata=row) + starttime=time.time() + mydb=pymysql.connect(read_default_file="~/.my.cnf",database="rasolar") + myc=mydb.cursor(pymysql.cursors.DictCursor) + myc.execute('select * from ids where sensor="CPU";') + channels=myc.fetchall() + myc.close() + cpu_col=cm.rainbow(numpy.linspace(0,1,len(channels))) + plt.figure(figsize=[6,8]) + mycursor=mydb.cursor() + for j in range(len(channels)): + mycursor.execute('select ((select max(time) from hourly_measures)-hourly_measures.time)/1000 as time,hourly_measures.value/1000 as value from hourly_measures join ids on ids.id=hourly_measures.id where ids.sensor="CPU" and ids.id='+str(channels[j]['id'])+';') + row=numpy.array(mycursor.fetchall()) + plt.plot(row[:,0],row[:,1],color=cpu_col[j],label=channels[j]['varname']+'; '+channels[j]['device']) + plt.legend() + print(str(time.time()-starttime)+"s for fetching and display") + plt.title('CPU Verlauf') + plt.savefig("svg/cpu.svg") + mycursor.close() + mydb.close() + if len(row)>0: + return template('cputemp.tpl',measdata=row) @app.get('/svg/') def show_svg(svgfile): - return static_file("svg/"+svgfile,root=abspath) + return static_file("svg/"+svgfile,root=abspath) @app.get('/graph//') def show_graph(mid,kind): - sqltable="hourly_measures" - if(kind=="daily"): - sqltable="daily_measures" - starttime=time.time() - mydb=pymysql.connect(read_default_file="~/.my.cnf",database="rasolar") - mycursor=mydb.cursor() - print(str(time.time()-starttime)+' sql open') - nrow=mycursor.execute('select ((select max(time) from '+sqltable+')-'+sqltable+'.time)/3600000 as time,'+sqltable+'.value/1000 as value from '+sqltable+' where id=%s order by time',str(mid)) - if (nrow>0): - row=numpy.array(mycursor.fetchall()) - print(str(time.time()-starttime)+' sql fetched') - nirow=mycursor.execute('select id,varname,device,sensor,i2c from ids where id='+str(mid)) - gdata=mycursor.fetchall() - mycursor.close() - mydb.close() - kanal_info={"varname":"","device":"","sensor":"","i2c":0,"count":nrow} - for ids,varname,device,sensor,i2c in gdata: - kanal_info["varname"]=varname - kanal_info["device"]=device - kanal_info["sensor"]=sensor - kanal_info["i2c"]=int(i2c) - print(str(time.time()-starttime)+' sql closed') - plt.figure(figsize=[6,8]) - plt.plot(row[:,0],row[:,1]) - print(str(time.time()-starttime)+' picture') - plt.savefig("svg/"+mid+".svg") - print(str(time.time()-starttime)+' saved') - if len(row)>0: - return template('verlauf.tpl',measdata=row,mid=mid,kanal_info=kanal_info) - else: - return template(''' - - Keine Daten - -

Die Daten verstecken sich.

+ sqltable="hourly_measures" + if(kind=="daily"): + sqltable="daily_measures" + starttime=time.time() + mydb=pymysql.connect(read_default_file="~/.my.cnf",database="rasolar") + mycursor=mydb.cursor() + print(str(time.time()-starttime)+' sql open') + nrow=mycursor.execute('select ((select max(time) from '+sqltable+')-'+sqltable+'.time)/3600000 as time,'+sqltable+'.value/1000 as value from '+sqltable+' where id=%s order by time',str(mid)) + if (nrow>0): + row=numpy.array(mycursor.fetchall()) + print(str(time.time()-starttime)+' sql fetched') + nirow=mycursor.execute('select id,varname,device,sensor,i2c from ids where id='+str(mid)) + gdata=mycursor.fetchall() + mycursor.close() + mydb.close() + kanal_info={"varname":"","device":"","sensor":"","i2c":0,"count":nrow} + for ids,varname,device,sensor,i2c in gdata: + kanal_info["varname"]=varname + kanal_info["device"]=device + kanal_info["sensor"]=sensor + kanal_info["i2c"]=int(i2c) + print(str(time.time()-starttime)+' sql closed') + plt.figure(figsize=[6,8]) + plt.plot(row[:,0],row[:,1]) + print(str(time.time()-starttime)+' picture') + plt.savefig("svg/"+mid+".svg") + print(str(time.time()-starttime)+' saved') + if len(row)>0: + return template('verlauf.tpl',measdata=row,mid=mid,kanal_info=kanal_info) + else: + return template(''' + + Keine Daten + +

Die Daten verstecken sich.

Letzte Stunde
Letzter Tag
- - ''',mid=mid) + + ''',mid=mid) @app.get('/graph/') def show_graph_short(mid): - show_graph(mid,'hourly') + show_graph(mid,'hourly') @app.post('/data/') def dataimport(hash_id): -# print(hash_id) - timestart=time.time() - # check if request comes from allowed ip - if (request.remote_addr in clientlist): - # hash must be the used gpg key id - if (hash_id in clientlist[request.remote_addr]): -# print("correct id") - # check, if json is transmitted - try: - json_in=json.loads(request.json) -# print(json_in) - except: - print("no json") - else: - print(time.time()-timestart) - _thread.start_new_thread(analyse_jsonin,(json_in,hash_id,)) - else: - print("wrong id") - else: - print("not allowed client address") +# print(hash_id) + timestart=time.time() + # check if request comes from allowed ip + if (request.remote_addr in clientlist): + # hash must be the used gpg key id + if (hash_id in clientlist[request.remote_addr]): +# print("correct id") + # check, if json is transmitted + try: + json_in=json.loads(request.json) +# print(json_in) + except: + print("no json") + else: + print(time.time()-timestart) + _thread.start_new_thread(analyse_jsonin,(json_in,hash_id,)) + else: + print("wrong id") + else: + print("not allowed client address")