pysql/pysqltq.py

139 lines
5.0 KiB
Python

import socket,threading,SocketServer,json,time
import pymysql
from Queue import Queue
def sql_get_dict(mycursor,mytable,myvar):
mycursor.execute("select * from "+str(mytable))
tout={}
tt=mycursor.fetchall()
for tv in tt:
tout[tv[myvar]]=tv['id']
return(tout)
def sql_insert(q):
try:
mydb=pymysql.connect(read_default_file="~/.my.cnf",database="rasolar")
except:
print("Could not connect to sql server! Quitting")
else:
mycursor=mydb.cursor(pymysql.cursors.DictCursor)
sqlinsert="insert into data_storage (time,device_id,var_id,sensor_id,i2c,value) values ({0:d},{1:d},{2:d},{3:d},{4:d},{5:d})"
# get variable id out of sql
svar=sql_get_dict(mycursor,"var_id","var")
sdev=sql_get_dict(mycursor,"device_id","device")
ssens=sql_get_dict(mycursor,"sensor_id","sensor")
sqldata=[]
while True:
if q.empty():
time.sleep(0.1)
else:
try:
indata=q.get()
if indata is not None:
q.task_done()
except Exception as e:
print("Error during queuing")
print(e)
else:
if indata is not None:
if indata['device'] not in sdev:
mycursor.execute("insert into device_id (device) values('"+str(indata['device'])+"')")
mydb.commit()
sdev=sql_get_dict(mycursor,"device_id","device")
if indata['sensor'] not in ssens:
mycursor.execute("insert into sensor_id (sensor) values('"+str(indata['sensor'])+"')")
mydb.commit()
ssens=sql_get_dict(mycursor,"sensor_id","sensor")
if indata['varname'] not in svar:
mycursor.execute("insert into var_id (var) values('"+str(indata['varname'])+"')")
mydb.commit()
svar=sql_get_dict(mycursor,"var_id","var")
txt_sql=sqlinsert.format(indata['time'],sdev[indata['device']],svar[indata['varname']],ssens[indata['sensor']],indata['i2c'],indata['value'])
try:
mycursor.execute(txt_sql)
except:
print("Eror in execute sql insert")
errlog=open("sql_missed.txt","a")
errlog.write(txt_sql)
errlog.close()
else:
mydb.commit()
class ThreadedTCPRequestHandler(SocketServer.StreamRequestHandler):
def __init__(self, request, client_address, server):
self.queue = server.queue
socketserver.StreamRequestHandler.__init__(self, request, client_address, server)
def handle(self):
indata = str(self.request.recv(1024), 'ascii')
cur_thread = threading.current_thread()
#indata=self.data
bjson=True
# try if indata is in json format.
# only process indata, if in json
try:
test=json.loads(indata)
except:
bjson=False
else:
# indata must have a payload entry
if "payload" in test:
# get credential for sql server and open connection
# only process if sql connection could be open
datasource=self.client_address[0]
if "device" in test:
datasource=test['device'].translate(' ./:;*|')
datasource=datasource[:64]
multi=1
if "mult" in test:
multi=int(test['mult'])
payload=test['payload']
for x,y in payload.items():
# remove unwanted characters from variable name
varx=x.translate(' ./:;*!')
varx=varx[:64]
value=0
if "value" in y:
value=int(y['value'])
sensor=""
if "time" in y:
datatime=int(y['time'])
else:
datatime=int(1000*time.time())
if "sensor" in y:
sensor=y['sensor']
sensor=sensor.translate(' ./:;*!')
sensor=sensor[:32]
i2c=0
if "i2c" in y:
i2c=int(y['i2c'])
q.put({"time":datatime,"device":datasource,"varname":varx,"sensor":sensor,"i2c":i2c,"value":int(multi*value)},block=False)
class ThreadedTCPServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True,queue=None):
self.queue = queue
SocketServer.TCPServer.__init__(self, server_address, RequestHandlerClass,
bind_and_activate=bind_and_activate)
if __name__ == "__main__":
q=Queue(maxsize=0)
sql_worker=threading.Thread(target=sql_insert,args=(q,))
sql_worker.setDaemon(True)
sql_worker.start()
# Port 0 means to select an arbitrary unused port
HOST, PORT = "", 24048
server = ThreadedTCPServer((HOST, PORT), ThreadedTCPRequestHandler,queue=q)
server.timeout=None
# Start a thread with the server -- that thread will then start one
# more thread for each request
server_thread = threading.Thread(target=server.serve_forever,args=(q,))
# Exit the server thread when the main thread terminates
server_thread.daemon = True
server_thread.timeout=None
server_thread.start()
print("Server loop running in thread:", server_thread.name)
server.serve_forever()
server.shutdown()
server.server_close()