import socket,numpy,time,thread,json,requests import paho.mqtt.client as mqtt import paho.mqtt.publish as publish class meas_data: def __init__(self,var_name,ring_length=100,sigma=2,device=socket.gethostname(),sensor="CPU",i2c=0,store_file="/home/pi/data",mean_count=5,store_each_cycle=True,multiplicator=1,digits=5): self.set_varname(var_name) self.value=[] self.mval=[] self.stat_val_mean=0 self.stat_val_std=0 self.set_ring_length(ring_length) self.set_mean_count(mean_count) self.sensebox="" self.senseid="" self.sense_url="https://ingress.opensensemap.org/" self.sense_intervall=300 self.sense_last_time=0 self.sqlhost="" self.sqlport=0 self.bsense=False self.bsql=False self.bchanged=False self.bfile=False self.store_file=store_file self.act_value=0 self.act_std=0 self.act_time=0 self.stat_mean=0 self.stat_std=0 self.sigma=sigma self.set_device(device) self.set_sensor(sensor) self.set_digits(digits) self.i2c=i2c self.set_multiplicator(multiplicator) self.bstoreeach=store_each_cycle self.mqtt_broker="" self.mqtt_port=1883 self.mqtt_topic="" self.mqtt_bool=False self.json_out={"device":device,"payload":{var_name:{"time":0,"sensor":sensor,"value":0,"i2c":i2c}}} def set_varname(self,var_name): tvn=var_name.translate(None,"/:;\\{}(){]%&") if len(tvn)==0: tvn="var" self.var_name=tvn def set_digits(self,digits): self.digits=1 try: td=int(digits)-1 except: print("could not set digits") else: if td<0: td=1 if td>20: td=20 self.digits=td def set_ring_length(self,ring_length): trl=int(ring_length) if trl<0: trl=(-1)*trl if trl==0: trl = 60 self.ring_length=trl def set_mean_count(self,mean_count): tmc=int(mean_count) if tmc < 0: tmc=(-1)*tmc if tmc == 0: tmc=self.ring_length if mean_count>self.ring_length: mean_count=self.ring_length self.mean_count=mean_count def set_sensebox(self,sensebox,senseid,intervall=300): tsb=sensebox tsb=tsb[0:24] self.sensebox=tsb tsi=senseid tsi=tsi[0:24] self.senseid=tsi self.sense_intervall=int(intervall) self.bsense=True self.sense_url="https://ingress.opensensemap.org/boxes/%s/%s" % (tsb,tsi) print(self.sense_url) def set_sql(self,host,port=24049): th=host self.sqlhost=th self.sqlport=int(port) self.bsql=True def set_mqtt(self,broker="banana",port=1883,topic=""): mt=topic if len(mt)==0: mt=self.device+"/"+self.sensor+"/"+self.var_name else: mt=mt.translate(":;\\{}(){]%&") mb=broker mport=int(port) if mport < 1024: mport=1883 try: mqtt.connect(host=mb,port=mport) except: print("no connection to broker") #else: self.mqtt_broker=mb self.mqtt_port=mport self.mqtt_topic=mt self.mqtt_bool=True def set_file_log(self,store_file="/home/pi/data"): if len(store_file)>1: self.store_file=store_file+"_{:d}.txt" try: f1=open(self.store_file+"_{:d}.txt".format(int(time.time()/3600)),"a") except: print("could not open data storage") self.bfile=False else: self.bfile=True def show_def(self): print(self.var_name) print(self.value) def set_device(self,device): td=device.translate("/:;\\{}(){]%&") self.device=device def set_sensor(self,sensor): ts=sensor self.sensor=ts def set_multiplicator(self,multiplicator): tmult=float(multiplicator) if tmult < 1: tmult=tmult*(-1) self.mult=tmult def append(self,value): try: tdig=10**(round(numpy.log10(value))-self.digits) except: print("") else: try: tv=round(value/tdig)*tdig except: tv=value self.value.append(tv) if self.mqtt_bool: thread.start_new_thread(self.send_mqtt,(0,)) if self.bsql: thread.start_new_thread(self.send_sql,(0,)) if (len(self.mval)==0) and (len(self.value)>1): self.stat_mean=numpy.mean(self.value) self.stat_std=numpy.std(self.value)/numpy.sqrt(len(self.value)-1) self.mval.append(value) if self.bsense: if len(self.mval)==self.mean_count: mmean=numpy.mean(self.mval) nstd=self.stat_std+numpy.std(self.mval)/numpy.sqrt(len(self.mval)-1) bsave=self.bstoreeach if abs(mmean-self.stat_mean) > (self.sigma*nstd): bsave=True if bsave: if self.stat_val_std==0: self.stat_val_std=abs(self.stat_val_mean/100) nstd=10**(round(numpy.log10(numpy.maximum(0.001,numpy.std(self.mval))))-1) mmean=round(mmean/nstd,0)*nstd self.act_value=int(self.mult*mmean) self.act_std=int(self.mult*numpy.std(self.mval)) self.act_time=int(1000*time.time()) # if self.bsql: # thread.start_new_thread(self.send_sql,(0,)) if self.bfile: thread.start_new_thread(self.send_file,(0,)) if self.bsense: thread.start_new_thread(self.upload_osm,(0,)) self.mval=[] if len(self.value)>=self.ring_length: self.value=self.value[((-1)*self.ring_length):] def get_ring(self): return(self.value) def get_act_value(self): return(self.act_value) def send_file(self,trigger=0): try: f1=open(self.store_file.format(int(time.time()/3600)),"a") except: print("could not open log file") else: f1.write("time:{0};".format(int(self.act_time))+self.var_name+":{0}".format(self.act_value)+"\n") f1.close() def send_sql(self,trigger=0): sv=(-99) if len(self.value)>0: if len(self.value)>1: if self.value[-1]!=self.value[-2]: sv=self.value[-1] else: sv=self.value[-1] if sv >(-99): act_time=int(1000*time.time()) # self.json_out['time']=act_time self.json_out['payload'][self.var_name]['time']=act_time self.json_out['payload'][self.var_name]['value']=int(sv*self.mult) try: s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) except: print("{}: could not connect to database".format(time.time())) else: try: s.connect((self.sqlhost, self.sqlport)) except: print("{}: could not connect to database".format(time.time())) else: s.sendall(json.dumps(self.json_out)) s.close() def upload_osm(self,trigger=0): if (self.act_time-self.sense_last_time)>(self.sense_intervall*1000): r = requests.post(self.sense_url,json={'value': float(self.act_value)/self.mult}) if (r.status_code != requests.codes.ok) & (r.status_code != 201): print("Error %d: %s" % (r.status_code,r.text)) else: self.sense_last_time=self.act_time def send_mqtt(self,trigger=0): sv=(-99) if len(self.value)>0: if len(self.value)>1: if self.value[-1]!=self.value[-2]: sv=self.value[-1] else: sv=self.value[-1] if sv >(-99): try: publish.single(self.mqtt_topic,sv,hostname=self.mqtt_broker,port=self.mqtt_port) except: print("could not send mqtt") #test=meas_data("temp",5) #test.set_sql("localhost") #test.set_file_log("/home/ademant/data")