rasolar/meas_data.py

267 lines
7.6 KiB
Python
Raw Normal View History

2019-06-25 07:32:44 -07:00
import socket,numpy,time,thread,json,requests
2019-07-02 05:17:52 -07:00
import paho.mqtt.client as mqtt
import paho.mqtt.publish as publish
2019-07-29 03:31:59 -07:00
from uuid import getnode
import zlib
2019-06-23 21:26:41 -07:00
class meas_data:
2019-07-29 03:31:59 -07:00
def __init__(self,var_name,ring_length=100,sigma=2,device=socket.gethostname(),sensor="CPU",i2c=0,store_file="/home/pi/data",multiplicator=1,digits=5,check_last=0,mac=0,sql_min_wait=5):
2019-07-01 01:43:38 -07:00
self.set_varname(var_name)
2019-06-23 21:26:41 -07:00
self.value=[]
2019-06-30 10:19:49 -07:00
self.mval=[]
2019-06-23 21:26:41 -07:00
self.sensebox=""
self.senseid=""
self.sense_url="https://ingress.opensensemap.org/"
self.sense_intervall=300
self.sense_last_time=0
2019-07-29 03:31:59 -07:00
self.sql_min_wait=sql_min_wait
2019-06-23 21:26:41 -07:00
self.sqlhost=""
self.sqlport=0
2019-07-29 03:31:59 -07:00
self.sql_last_transmit=0
2019-06-23 21:26:41 -07:00
self.bsense=False
self.bsql=False
self.bchanged=False
self.bfile=False
self.store_file=store_file
2019-06-24 07:08:14 -07:00
self.sigma=sigma
2019-07-01 01:43:38 -07:00
self.set_device(device)
self.set_sensor(sensor)
2019-07-03 20:44:45 -07:00
self.set_digits(digits)
2019-07-23 12:42:45 -07:00
self.set_check_last(check_last)
2019-07-29 03:31:59 -07:00
self.i2c=set_i2c(i2c)
2019-07-01 01:43:38 -07:00
self.set_multiplicator(multiplicator)
2019-07-02 05:17:52 -07:00
self.mqtt_broker=""
self.mqtt_port=1883
self.mqtt_topic=""
self.mqtt_bool=False
2019-07-29 03:31:59 -07:00
self.hash=0
self.set_id(self)
self.json_out={"hash":self.hash,"signature":"","payload":{"device":self.device,"varname":self.var_name,,"i2c":self.i2c,"sensor":self.sensor,"mac":self.mac,"measures":{}}}
def set_id(self):
idstring=str(hex(getnode()))[2:-1]+device+str(self.i2c)+sensor+var_name
self.hash=zlib.crc32(idstring) % (1<<32)
def get_id(self):
print(self.hash)
def set_i2c(self,i2c):
if (isinstance(i2c,(int,long,float)) and not isinstance(i2c,bool)):
self.i2c=i2c
else:
self.i2c=0
def set_mac(self,mac):
if mac > 0:
self.mac=mac
else:
self.mac=getnode()
2019-07-01 01:43:38 -07:00
def set_varname(self,var_name):
tvn=var_name.translate(None,"/:;\\{}(){]%&")
if len(tvn)==0:
tvn="var"
self.var_name=tvn
def set_check_last(self,check_last):
if check_last > 0:
self.bchecklast=True
self.checklast=check_last
else:
self.bchecklast=False
self.ckecklast=0
2019-07-03 20:44:45 -07:00
def set_digits(self,digits):
2019-07-03 20:52:52 -07:00
self.digits=1
2019-07-03 20:44:45 -07:00
try:
td=int(digits)-1
2019-07-03 20:52:52 -07:00
except:
print("could not set digits")
else:
if td<0:
td=1
if td>20:
td=20
self.digits=td
2019-07-01 01:43:38 -07:00
def set_ring_length(self,ring_length):
trl=int(ring_length)
if trl<0:
trl=(-1)*trl
if trl==0:
trl = 60
self.ring_length=trl
def set_mean_count(self,mean_count):
tmc=int(mean_count)
if tmc < 0:
tmc=(-1)*tmc
if tmc == 0:
tmc=self.ring_length
if mean_count>self.ring_length:
mean_count=self.ring_length
self.mean_count=mean_count
2019-06-23 21:26:41 -07:00
def set_sensebox(self,sensebox,senseid,intervall=300):
tsb=sensebox
2019-06-25 07:32:44 -07:00
tsb=tsb[0:24]
2019-06-23 21:26:41 -07:00
self.sensebox=tsb
tsi=senseid
2019-06-25 07:32:44 -07:00
tsi=tsi[0:24]
2019-06-23 21:26:41 -07:00
self.senseid=tsi
self.sense_intervall=int(intervall)
self.bsense=True
self.sense_url="https://ingress.opensensemap.org/boxes/%s/%s" % (tsb,tsi)
2019-06-25 07:32:44 -07:00
print(self.sense_url)
2019-06-23 21:26:41 -07:00
def set_sql(self,host,port=24049):
th=host
self.sqlhost=th
self.sqlport=int(port)
self.bsql=True
2019-07-29 03:31:59 -07:00
def set_mqtt(self,broker="localhost",port=1883,topic=""):
2019-07-02 05:17:52 -07:00
mt=topic
if len(mt)==0:
mt=self.device+"/"+self.sensor+"/"+self.var_name
else:
2019-07-02 07:28:15 -07:00
mt=mt.translate(":;\\{}(){]%&")
2019-07-02 19:42:33 -07:00
mb=broker
2019-07-02 05:17:52 -07:00
mport=int(port)
2019-07-02 08:11:41 -07:00
if mport < 1024:
2019-07-02 05:17:52 -07:00
mport=1883
try:
mqtt.connect(host=mb,port=mport)
except:
print("no connection to broker")
2019-07-02 07:28:15 -07:00
#else:
2019-07-02 08:11:41 -07:00
self.mqtt_broker=mb
self.mqtt_port=mport
2019-07-02 07:28:15 -07:00
self.mqtt_topic=mt
self.mqtt_bool=True
2019-06-23 21:26:41 -07:00
def set_file_log(self,store_file="/home/pi/data"):
if len(store_file)>1:
self.store_file=store_file+"_{:d}.txt"
try:
f1=open(self.store_file+"_{:d}.txt".format(int(time.time()/3600)),"a")
except:
print("could not open data storage")
self.bfile=False
else:
self.bfile=True
def show_def(self):
print(self.var_name)
print(self.value)
2019-07-01 01:43:38 -07:00
def set_device(self,device):
self.device=device
def set_sensor(self,sensor):
2019-07-02 07:28:15 -07:00
ts=sensor
2019-07-01 01:43:38 -07:00
self.sensor=ts
2019-06-30 12:23:04 -07:00
def set_multiplicator(self,multiplicator):
2019-07-01 01:43:38 -07:00
tmult=float(multiplicator)
if tmult < 1:
tmult=tmult*(-1)
self.mult=tmult
2019-06-23 21:26:41 -07:00
def append(self,value):
2019-07-04 07:20:30 -07:00
try:
tdig=10**(round(numpy.log10(value))-self.digits)
except:
2019-07-04 07:26:00 -07:00
print("")
2019-07-04 07:22:52 -07:00
else:
try:
tv=round(value/tdig)*tdig
except:
tv=value
2019-07-23 12:46:33 -07:00
btv=True
if self.bchecklast and (len(self.value)>0):
testvalue=self.value[-1*min(self.checklast,len(self.value)):]
if isinstance(testvalue,list):
if tv in testvalue:
btv=False
else:
if tv==testvalue:
btv=False
2019-07-23 12:46:33 -07:00
if btv:
self.value.append(tv)
if self.mqtt_bool:
thread.start_new_thread(self.send_mqtt,(0,))
if self.bsql:
thread.start_new_thread(self.send_sql,(0,))
if (len(self.mval)==0) and (len(self.value)>1):
self.stat_mean=numpy.mean(self.value)
self.stat_std=numpy.std(self.value)/numpy.sqrt(len(self.value)-1)
self.mval.append(value)
if self.bsense:
if len(self.mval)==self.mean_count:
mmean=numpy.mean(self.mval)
nstd=self.stat_std+numpy.std(self.mval)/numpy.sqrt(len(self.mval)-1)
bsave=self.bstoreeach
if abs(mmean-self.stat_mean) > (self.sigma*nstd):
bsave=True
if bsave:
if self.stat_val_std==0:
self.stat_val_std=abs(self.stat_val_mean/100)
nstd=10**(round(numpy.log10(numpy.maximum(0.001,numpy.std(self.mval))))-1)
mmean=round(mmean/nstd,0)*nstd
self.act_value=int(self.mult*mmean)
self.act_std=int(self.mult*numpy.std(self.mval))
self.act_time=int(1000*time.time())
if self.bfile:
thread.start_new_thread(self.send_file,(0,))
if self.bsense:
thread.start_new_thread(self.upload_osm,(0,))
self.mval=[]
if len(self.value)>=self.ring_length:
self.value=self.value[((-1)*self.ring_length):]
2019-06-23 21:26:41 -07:00
def get_ring(self):
return(self.value)
def get_act_value(self):
return(self.act_value)
def send_file(self,trigger=0):
try:
f1=open(self.store_file.format(int(time.time()/3600)),"a")
except:
print("could not open log file")
else:
f1.write("time:{0};".format(int(self.act_time))+self.var_name+":{0}".format(self.act_value)+"\n")
f1.close()
def send_sql(self,trigger=0):
2019-07-04 21:59:29 -07:00
sv=(-99)
if len(self.value)>0:
if len(self.value)>1:
if self.value[-1]!=self.value[-2]:
sv=self.value[-1]
else:
sv=self.value[-1]
2019-07-29 03:31:59 -07:00
# self.json_out={"hash":self.hash,signature="","payload":{"device":self.device,"varname":self.var_name,,"i2c":self.i2c,"sensor":self.sensor,"mac":self.mac,"measures":{"time":0,"value":0}}}
2019-07-04 21:59:29 -07:00
if sv >(-99):
act_time=int(1000*time.time())
2019-07-29 03:31:59 -07:00
self.json_out['payload']['measures'][act_time]=int(sv*self.mult)
if ((act_time-self.sql_last_transmit)>(self.sql_min_wait*1000)):
2019-07-04 21:59:29 -07:00
try:
2019-07-29 03:31:59 -07:00
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
2019-07-04 21:59:29 -07:00
except:
print("{}: could not connect to database".format(time.time()))
else:
2019-07-29 03:31:59 -07:00
try:
s.connect((self.sqlhost, self.sqlport))
except:
print("{}: could not connect to database".format(time.time()))
s.close()
else:
signatur=int(sha3.sha3_224(json.dumps(self.json_out['payload'])).hexdigest(),16)
s.sendall(json.dumps(self.json_out))
s.close()
2019-06-23 21:26:41 -07:00
def upload_osm(self,trigger=0):
if (self.act_time-self.sense_last_time)>(self.sense_intervall*1000):
2019-06-30 12:23:04 -07:00
r = requests.post(self.sense_url,json={'value': float(self.act_value)/self.mult})
if (r.status_code != requests.codes.ok) & (r.status_code != 201):
print("Error %d: %s" % (r.status_code,r.text))
else:
self.sense_last_time=self.act_time
2019-07-03 07:49:05 -07:00
def send_mqtt(self,trigger=0):
2019-07-04 21:59:29 -07:00
sv=(-99)
if len(self.value)>0:
if len(self.value)>1:
if self.value[-1]!=self.value[-2]:
2019-07-03 10:10:40 -07:00
sv=self.value[-1]
2019-07-04 21:59:29 -07:00
else:
sv=self.value[-1]
if sv >(-99):
try:
publish.single(self.mqtt_topic,sv,hostname=self.mqtt_broker,port=self.mqtt_port)
except:
print("could not send mqtt")
2019-06-23 21:26:41 -07:00
2019-06-24 06:57:54 -07:00
#test=meas_data("temp",5)
#test.set_sql("localhost")
#test.set_file_log("/home/ademant/data")