139 lines
5.0 KiB
Python
139 lines
5.0 KiB
Python
import socket,threading,SocketServer,json,time
|
|
import pymysql
|
|
from Queue import Queue
|
|
|
|
def sql_get_dict(mycursor,mytable,myvar):
|
|
mycursor.execute("select * from "+str(mytable))
|
|
tout={}
|
|
tt=mycursor.fetchall()
|
|
for tv in tt:
|
|
tout[tv[myvar]]=tv['id']
|
|
return(tout)
|
|
|
|
def sql_insert(q):
|
|
try:
|
|
mydb=pymysql.connect(read_default_file="~/.my.cnf",database="rasolar")
|
|
except:
|
|
print("Could not connect to sql server! Quitting")
|
|
else:
|
|
mycursor=mydb.cursor(pymysql.cursors.DictCursor)
|
|
sqlinsert="insert into data_storage (time,device_id,var_id,sensor_id,i2c,value) values ({0:d},{1:d},{2:d},{3:d},{4:d},{5:d})"
|
|
# get variable id out of sql
|
|
svar=sql_get_dict(mycursor,"var_id","var")
|
|
sdev=sql_get_dict(mycursor,"device_id","device")
|
|
ssens=sql_get_dict(mycursor,"sensor_id","sensor")
|
|
sqldata=[]
|
|
while True:
|
|
if q.empty():
|
|
time.sleep(0.1)
|
|
else:
|
|
try:
|
|
indata=q.get()
|
|
if indata is not None:
|
|
q.task_done()
|
|
except Exception as e:
|
|
print("Error during queuing")
|
|
print(e)
|
|
else:
|
|
if indata is not None:
|
|
if indata['device'] not in sdev:
|
|
mycursor.execute("insert into device_id (device) values('"+str(indata['device'])+"')")
|
|
mydb.commit()
|
|
sdev=sql_get_dict(mycursor,"device_id","device")
|
|
if indata['sensor'] not in ssens:
|
|
mycursor.execute("insert into sensor_id (sensor) values('"+str(indata['sensor'])+"')")
|
|
mydb.commit()
|
|
ssens=sql_get_dict(mycursor,"sensor_id","sensor")
|
|
if indata['varname'] not in svar:
|
|
mycursor.execute("insert into var_id (var) values('"+str(indata['varname'])+"')")
|
|
mydb.commit()
|
|
svar=sql_get_dict(mycursor,"var_id","var")
|
|
|
|
txt_sql=sqlinsert.format(indata['time'],sdev[indata['device']],svar[indata['varname']],ssens[indata['sensor']],indata['i2c'],indata['value'])
|
|
try:
|
|
mycursor.execute(txt_sql)
|
|
except:
|
|
print("Eror in execute sql insert")
|
|
errlog=open("sql_missed.txt","a")
|
|
errlog.write(txt_sql)
|
|
errlog.close()
|
|
else:
|
|
mydb.commit()
|
|
|
|
|
|
class ThreadedTCPRequestHandler(SocketServer.StreamRequestHandler):
|
|
def __init__(self, request, client_address, server):
|
|
self.queue = server.queue
|
|
socketserver.StreamRequestHandler.__init__(self, request, client_address, server)
|
|
def handle(self):
|
|
indata = str(self.request.recv(1024), 'ascii')
|
|
cur_thread = threading.current_thread()
|
|
#indata=self.data
|
|
bjson=True
|
|
# try if indata is in json format.
|
|
# only process indata, if in json
|
|
try:
|
|
test=json.loads(indata)
|
|
except:
|
|
bjson=False
|
|
else:
|
|
# indata must have a payload entry
|
|
if "payload" in test:
|
|
# get credential for sql server and open connection
|
|
# only process if sql connection could be open
|
|
datasource=self.client_address[0]
|
|
if "device" in test:
|
|
datasource=test['device'].translate(' ./:;*|')
|
|
datasource=datasource[:64]
|
|
multi=1
|
|
if "mult" in test:
|
|
multi=int(test['mult'])
|
|
payload=test['payload']
|
|
for x,y in payload.items():
|
|
# remove unwanted characters from variable name
|
|
varx=x.translate(' ./:;*!')
|
|
varx=varx[:64]
|
|
value=0
|
|
if "value" in y:
|
|
value=int(y['value'])
|
|
sensor=""
|
|
if "time" in y:
|
|
datatime=int(y['time'])
|
|
else:
|
|
datatime=int(1000*time.time())
|
|
if "sensor" in y:
|
|
sensor=y['sensor']
|
|
sensor=sensor.translate(' ./:;*!')
|
|
sensor=sensor[:32]
|
|
i2c=0
|
|
if "i2c" in y:
|
|
i2c=int(y['i2c'])
|
|
q.put({"time":datatime,"device":datasource,"varname":varx,"sensor":sensor,"i2c":i2c,"value":int(multi*value)},block=False)
|
|
|
|
class ThreadedTCPServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
|
|
def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True,queue=None):
|
|
self.queue = queue
|
|
SocketServer.TCPServer.__init__(self, server_address, RequestHandlerClass,
|
|
bind_and_activate=bind_and_activate)
|
|
|
|
if __name__ == "__main__":
|
|
q=Queue(maxsize=0)
|
|
sql_worker=threading.Thread(target=sql_insert,args=(q,))
|
|
sql_worker.setDaemon(True)
|
|
sql_worker.start()
|
|
# Port 0 means to select an arbitrary unused port
|
|
HOST, PORT = "", 24048
|
|
server = ThreadedTCPServer((HOST, PORT), ThreadedTCPRequestHandler,queue=q)
|
|
server.timeout=None
|
|
# Start a thread with the server -- that thread will then start one
|
|
# more thread for each request
|
|
server_thread = threading.Thread(target=server.serve_forever,args=(q,))
|
|
# Exit the server thread when the main thread terminates
|
|
server_thread.daemon = True
|
|
server_thread.timeout=None
|
|
server_thread.start()
|
|
print("Server loop running in thread:", server_thread.name)
|
|
server.serve_forever()
|
|
server.shutdown()
|
|
server.server_close()
|