start new server; start sql queuing
parent
a8c71d0a22
commit
e4a8d1eae4
|
@ -0,0 +1,57 @@
|
|||
def sql_get_dict(mycursor,mytable,myvar):
|
||||
mycursor.execute("select * from "+str(mytable))
|
||||
tout={}
|
||||
tt=mycursor.fetchall()
|
||||
for tv in tt:
|
||||
tout[tv[myvar]]=tv['id']
|
||||
return(tout)
|
||||
|
||||
def sql_insert(q):
|
||||
mysqlmem=apsw.Connection(":memory:")
|
||||
mycursor=mysqlmem.cursor()
|
||||
mycursor.execute("create table data_storage(time integer,sensor_id integer,value integer);")
|
||||
bsql=False
|
||||
sqlinsert="insert into data_storage (time,sensor_id,value) values ({0:d},{1:d},{2:d})"
|
||||
sqldata=[]
|
||||
try:
|
||||
mydb=pymysql.connect(read_default_file="~/.my.cnf",database="rasolar")
|
||||
except:
|
||||
print("Could not connect to sql server! Quitting")
|
||||
else:
|
||||
bsql=True
|
||||
mycursor=mydb.cursor(pymysql.cursors.DictCursor)
|
||||
|
||||
while True:
|
||||
if q.empty():
|
||||
if not bsql:
|
||||
try:
|
||||
mydb=pymysql.connect(read_default_file="~/.my.cnf",database="rasolar")
|
||||
except:
|
||||
bsql=False
|
||||
else:
|
||||
mycursor=mydb.cursor(pymysql.cursors.DictCursor)
|
||||
|
||||
else:
|
||||
time.sleep(0.1)
|
||||
else:
|
||||
try:
|
||||
indata=q.get()
|
||||
if indata is not None:
|
||||
q.task_done()
|
||||
except Exception as e:
|
||||
print("Error during queuing")
|
||||
print(e)
|
||||
else:
|
||||
if indata is not None:
|
||||
sqlin=sqlinsert.format(indata['time'],indata['sensorid'],indata['value'])
|
||||
try:
|
||||
mycursor.execute(sqlin)
|
||||
except:
|
||||
print("Eror in execute sql insert")
|
||||
print(sqlin,file="sql_missed.txt")
|
||||
else:
|
||||
if bsql:
|
||||
mydb.commit()
|
||||
else:
|
||||
mysqlmem.commit()
|
||||
|
|
@ -0,0 +1,58 @@
|
|||
#!/usr/bin/env python
|
||||
|
||||
import requests,threading,time,json,zlib,gnupg,socket,psutil,os,sys,pymysql,queue,numpy,apsw
|
||||
from flask import Flask
|
||||
import libproxysql as ps
|
||||
|
||||
app = Flask("proxysql")
|
||||
|
||||
p = HttpParser()
|
||||
gpg=gnupg.GPG()
|
||||
|
||||
pathname = os.path.dirname(sys.argv[0])
|
||||
abspath=os.path.abspath(pathname)
|
||||
configfile=abspath+"/config.json"
|
||||
try:
|
||||
cf=open(configfile,"r")
|
||||
except:
|
||||
cf=open(configfile+".template","r")
|
||||
|
||||
log_conf=json.load(cf)
|
||||
cf.close()
|
||||
|
||||
parameter={"device":socket.gethostname(),"allowed_ip":{"127.0.0.1":"25A4CF79414F10FD"},"gpg_keyid":"25A4CF79414F10FD"}
|
||||
for n in parameter:
|
||||
if n in log_conf:
|
||||
parameter[n]=log_conf[n]
|
||||
|
||||
|
||||
@app.route('/')
|
||||
def hello_world():
|
||||
return 'Hello World!'
|
||||
|
||||
|
||||
# since we're using threads, shouldn't we be able to pause execution of one?
|
||||
@app.route('/slow')
|
||||
def slow():
|
||||
import time
|
||||
time.sleep(10)
|
||||
return 'zzz'
|
||||
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
q=Queue(maxsize=0)
|
||||
|
||||
sql_worker=threading.Thread(target=ps.sql_insert,args=(q,))
|
||||
sql_worker.setDaemon(True)
|
||||
sql_worker.start()
|
||||
|
||||
with open('./log', 'a+') as log:
|
||||
try:
|
||||
app.run(threaded=True)
|
||||
log.write("done adding wsgi app\n")
|
||||
except Exception, e:
|
||||
log.write(repr(e))
|
||||
|
||||
|
Loading…
Reference in New Issue