import socket,numpy,time,_thread,json,requests,zlib,binascii,io,os,socket has_gnupg=False try: import gnupg except: has_gnupg=False gpg=[] else: has_gnupg=True gpg=gnupg.GPG() has_mqtt=False try: import paho.mqtt.client as mqtt import paho.mqtt.publish as publish except: has_mqtt=False else: has_mqtt=True from uuid import getnode _STRINGREMOVE=str.maketrans(dict.fromkeys(":;\\{}(){]%&")) class meas_data: def __init__(self,var_config=[]): # def __init__(self,var_name,ring_length=100,sigma=2,device=socket.gethostname(),sensor="CPU",i2c=0,store_dir="/home/pi/data",multiplicator=1,digits=5,check_last=0,mac=0,sql_min_wait=5,rsa_key="",check_last_shrinking=0.99,mean_count=5,store_each_cycle=False,sens_id={'varname':"",'sensor':"CPU",'i2c':0},id_conf=0,deviceid="FF"): if "var_name" in var_config: self.set_varname(var_config["var_name"]) else: self.set_varname("dummy") self.value=[] self.act_value=0 self.act_time=0 self.mval=[] self.stat_std=0 self.stat_mean=0 self.stat_val_std=0 self.stat_val_mean=0 if "ring_length" in var_config: self.set_ring_length(var_config["ring_length"]) else: self.set_ring_length(60) if "mean_count" in var_config: self.set_mean_count(var_config["mean_count"]) if "store_each_cycle" in var_config: self.set_store_cycle(var_config["store_each_cycle"]) self.sensebox="" self.senseid="" self.sense_url="https://ingress.opensensemap.org/" self.sense_intervall=300 self.sense_last_time=0 if "sql_min_wait" in var_config: self.sql_min_wait=var_config["sql_min_wait"] self.sqlhost="" self.sqlport=0 self.sql_last_transmit=0 self.bsense=False self.bsql=False self.bchecklast=False self.bchanged=False self.bfile=False self.bstoreeach=False if "sigma" in var_config: self.sigma=var_config["sigma"] else: self.sigma=2 if "device" in var_config: self.set_device(var_config["device"]) else: self.set_device(socket.gethostname()) if "deviceid" in var_config: self.set_deviceid(var_config["deviceid"]) else: self.set_device(format(getnode(),"x")) if "sensor" in var_config: self.set_sensor(var_config["sensor"]) else: self.set_sensor("local") if "digits" in var_config: self.set_digits(var_config["digits"]) else: self.set_digits(2) if ("check_last" in var_config) & ("check_last_shrinking" in var_config): self.set_check_last(var_config["check_last"],var_config["check_last_shrinking"]) if "i2c" in var_config: self.set_i2c(var_config["i2c"]) else: self.set_i2c(0) if "multiplicator" in var_config: self.set_multiplicator(var_config["multiplicator"]) else: self.set_multiplicator(1) if "sens_id" in var_config: self.set_sensid(var_config["sens_id"]) self.mqtt_broker="" self.mqtt_port=1883 self.mqtt_topic="" self.mqtt_bool=False self.hash=0 self.ids="" self.set_mac() # self.set_id(id_conf,deviceid) if "rsa_key" in var_config: self.set_rsa(var_config["rsa_key"]) else: self.set_rsa("") self.sql_out="" self.set_id() self.json_out={"hash":self.hash,"signature":"","payload":{"device":self.device,"varname":self.var_name,"i2c":self.i2c,"sensor":self.sensor,"mac":self.mac,"measures":{}}} if "store_dir" in var_config: self.set_file_log(var_config["store_dir"]) else: self.set_file_log(var_config[""]) self.tcpsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.tcpsock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) def set_sensid(self,sens_id): if "varname" in sens_id: self.set_varname(sens_id["varname"]) if "sensor" in sens_id: self.set_sensor(sens_id["sensor"]) if "i2c" in sens_id: self.set_i2c(sens_id["i2c"]) if "channel" in sens_id: self.set_channel(sens_id["channel"]) def set_store_cycle(self,store_each_cycle): self.bstoreeach=False if isinstance(store_each_cycle,bool): self.bstoreeach=True def sign(self): self.sql_out=self.json_out if self.brsa and not self.brsa_sql: self.sql_out={"signed_gpg":gpg.sign(json.dumps(self.json_out),keyid=self.rsa['keyid']).data.decode('utf-8'),"hash":self.hash} def set_rsa(self,rsa_key): self.brsa=False self.brsa_sql=False if len(rsa_key)>0: self.brsa=False try: gpgkey=gpg.list_keys(keys=rsa_key.translate(_STRINGREMOVE)) except: self.brsa=False else: if len(gpgkey)>0: self.rsa=gpgkey[0] self.brsa=True else: self.brsa=False def set_id(self): idstring=self.device+str(self.i2c)+self.sensor+self.var_name self.ids=idstring self.hash=zlib.crc32(idstring.encode("utf-8")) % (1<<32) def get_id(self): print(self.hash) def set_mac(self): self.mac=format(getnode(),"x") def set_i2c(self,i2c): if (isinstance(i2c,(int,float)) and not isinstance(i2c,bool)): self.i2c=i2c else: self.i2c=0 def set_channel(self,channel): if (isinstance(channel,(int,float)) and not isinstance(channel,bool)): self.channel=channel else: self.channel=0 def get_varname(self): return self.var_name def set_varname(self,var_name): tvn=var_name.translate(_STRINGREMOVE) if len(tvn)==0: tvn="var" self.var_name=tvn def set_check_last(self,check_last,check_last_shrinking): if (isinstance(check_last,(int,float)) and not isinstance(check_last,bool))and(isinstance(check_last_shrinking,(int,float)) and not isinstance(check_last_shrinking,bool)): self.checklast_min=0 self.checklast_max=0 self.checklast_shrinking=1 if check_last > 0: self.bchecklast=True self.checklast=check_last if (check_last_shrinking < 1) and (check_last_shrinking > 0): self.checklast_shrinking=check_last_shrinking else: self.bchecklast=False self.ckecklast=0 def set_digits(self,digits): self.digits=1 if (isinstance(digits,(int,float)) and not isinstance(digits,bool)): try: td=int(digits)-1 except: print("could not set digits") else: if td<0: td=1 if td>20: td=20 self.digits=td def set_ring_length(self,ring_length): self.ring_length=60 if (isinstance(ring_length,(int,float)) and not isinstance(ring_length,bool)): trl=int(ring_length) if trl<0: trl=(-1)*trl if trl==0: trl = 60 self.ring_length=trl def set_mean_count(self,mean_count): self.mean_count=5 if (isinstance(mean_count,(int,float)) and not isinstance(mean_count,bool)): tmc=int(mean_count) if tmc < 0: tmc=(-1)*tmc if tmc == 0: tmc=self.ring_length if mean_count>self.ring_length: mean_count=self.ring_length self.mean_count=mean_count def set_sensebox(self,sensebox,senseid,intervall=300): tsb=sensebox.translate(_STRINGREMOVE) tsb=tsb[0:24] self.sensebox=tsb tsi=senseid.translate(_STRINGREMOVE) tsi=tsi[0:24] self.senseid=tsi self.sense_intervall=int(intervall) self.bsense=True self.sense_url="https://ingress.opensensemap.org/boxes/%s/%s" % (tsb,tsi) print(self.sense_url) def set_sql(self,host,port=8080,min_wait=5): self.bsql=True th=host.translate(_STRINGREMOVE) self.sqlhost=th self.sqlport=8080 if (isinstance(port,(int,float)) and not isinstance(port,bool)): self.sqlport=int(port) if self.sqlport<1024 or self.sqlport > 2**16: self.bsql=False self.sqlurl="http://"+self.sqlhost+":"+str(self.sqlport)+"/data/"+str(self.hash) if isinstance(min_wait,(int,float)) and not isinstance(min_wait,bool): self.sql_min_wait=min_wait else: self.sql_min_wait=0 self.brsa_sql=False if self.brsa: skey=gpg.list_keys(keys="@"+th) if len(skey)>0: self.brsa_sql=True self.sql_rsa=skey[1] else: self.brsa_sql=False def set_mqtt(self,broker="localhost",port=1883,topic=""): mt=topic.translate(_STRINGREMOVE) if len(mt)==0: mt='tele/'+self.device+"/sens/"+self.sensor+"/"+self.var_name else: mt=mt.translate(":;\\{}(){]%&") mb=broker.translate(_STRINGREMOVE) mport=1883 if (isinstance(port,(int,float)) and not isinstance(port,bool)): mport=int(port) self.mqtt_broker=mb self.mqtt_port=mport self.mqtt_topic=mt self.mqtt_bool=has_mqtt if has_mqtt: if mport < 1024: mport=1883 try: mqtt.connect(host=mb,port=mport) except: print("no connection to broker") self.mqtt=False #else: else: self.mqtt_bool=False def set_file_log(self,store_dir="/home/pi/data/"): self.file_time=io.BytesIO() self.file_value=io.BytesIO() if len(store_dir)>1: self.store_dir=store_dir+str(self.hash)+'/' if os.path.exists(self.store_dir): self.bfile=True else: try: os.makedirs(self.store_dir,exist_ok=True) except: self.bfile=False else: self.bfile=True else: self.bfile=False def show_def(self): print(self.var_name) print(self.value) def get_device(self): return self.device def set_device(self,device): self.device=device.translate(_STRINGREMOVE) def get_deviceid(self): return self.deviceid def set_deviceid(self,deviceid): if (isinstance(deviceid,(int,float)) and not isinstance(deviceid,bool)): self.deviceid=deviceid else: self.deviceid=getnode() def get_sensor(self): return self.sensor def set_sensor(self,sensor): ts=sensor.translate(_STRINGREMOVE) self.sensor=ts def set_multiplicator(self,multiplicator): tmult=float(multiplicator) if tmult < 1: tmult=tmult*(-1) self.mult=tmult def append(self,value): if (isinstance(value,(int,float)) and not isinstance(value,bool)): tdif=-1 if value>0: try: tdig=10**(round(numpy.log10(value))-self.digits) except: print("") else: tdig=1 if tdig>=0: try: tv=round(value/tdig)*tdig except: tv=value btv=True if self.bchecklast and (len(self.value)>0): # check if new value is within range between min - max if (tv <= self.checklast_max) and (tv >= self.checklast_min): btv=False minmax=(self.checklast_max-self.checklast_min)*(1-self.checklast_shrinking)/2 if minmax>0: self.checklast_max=self.checklast_max-minmax self.checklast_min=self.checklast_min+minmax if self.checklast_min>self.checklast_max: self.checklaft_min=self.checklast_max else: # check if new value is exact within last values testvalue=self.value[-1*min(self.checklast,len(self.value)):] if isinstance(testvalue,list): if tv in testvalue: btv=False else: if tv==testvalue: btv=False if btv: self.value.append(tv) self.act_value=tv self.act_time=int(1000*time.time()) if self.bchecklast: # calc new min/max values checklast_amount=min(self.checklast,len(self.value)) btv=True testvalue=[] startcheck=len(self.value)-1 # get last unique values while btv: tv=self.value[startcheck] if not tv in testvalue: testvalue.append(tv) startcheck=startcheck-1 if (startcheck<=0) or (len(testvalue)>self.checklast): btv=False if isinstance(testvalue,list): if len(testvalue)>=3: # simple check for outlier tvrmin=testvalue tvrmax=testvalue tvrmin.remove(min(tvrmin)) tvrmax.remove(max(tvrmin)) tvrminm=numpy.mean(tvrmin) tvrmaxm=numpy.mean(tvrmax) tvrmins=numpy.std(tvrmin)*3 tvrmaxs=numpy.std(tvrmax)*3 if min(testvalue)<(tvrminm-tvrmins): testvalue.remove(min(testvalue)) if max(testvalue)>(tvrmaxm+tvrmaxs): testvalue.remove(max(testvalue)) self.checklast_min=min(testvalue) self.checklast_max=max(testvalue) else: self.checklast_min=testvalue self.checklast_max=testvalue if self.mqtt_bool: _thread.start_new_thread(self.send_mqtt,(0,)) if self.bsql: _thread.start_new_thread(self.send_sql,(0,)) if self.bfile: _thread.start_new_thread(self.send_file,(0,)) if (len(self.mval)==0) and (len(self.value)>1): self.stat_mean=numpy.mean(self.value) self.stat_std=numpy.std(self.value)/numpy.sqrt(len(self.value)-1) self.mval.append(value) if self.bsense: if len(self.mval)==self.mean_count: mmean=numpy.mean(self.mval) nstd=self.stat_std+numpy.std(self.mval)/numpy.sqrt(len(self.mval)-1) bsave=self.bstoreeach if abs(mmean-self.stat_mean) > (self.sigma*nstd): bsave=True if bsave: if self.stat_val_std==0: self.stat_val_std=abs(self.stat_val_mean/100) nstd=10**(round(numpy.log10(numpy.maximum(0.001,numpy.std(self.mval))))-1) mmean=round(mmean/nstd,0)*nstd self.act_value=int(self.mult*mmean) self.act_std=int(self.mult*numpy.std(self.mval)) self.act_time=int(1000*time.time()) if self.bfile: _thread.start_new_thread(self.send_file,(0,)) if self.bsense: _thread.start_new_thread(self.upload_osm,(0,)) self.mval=[] if len(self.value)>=self.ring_length: self.value=self.value[((-1)*self.ring_length):] def get_ring(self): return(self.value) def get_act_value(self): return(self.act_value) def send_file(self,trigger=0): self.file_time.write(int(self.act_time).to_bytes(8,byteorder="big",signed=False)) self.file_value.write(int(self.act_value*self.mult).to_bytes(3,byteorder="big",signed=False)) if(len(self.file_time.getvalue())>64): bstored=True try: ft=open(self.store_dir+'timestamp','ab+') fv=open(self.store_dir+'value','ab+') except: bstored=False else: ft.write(self.file_time.getvalue()) fv.write(self.file_value.getvalue()) ft.close() fv.close() self.file_time.close() self.file_value.close() self.file_time=io.BytesIO() self.file_value=io.BytesIO() def send_sql(self,trigger=0): sv=(-99) if len(self.value)>0: if len(self.value)>1: if self.value[-1]!=self.value[-2]: sv=self.value[-1] else: sv=self.value[-1] if sv >(-99): self.json_out['payload']['measures'][self.act_time]=int(sv*self.mult) if ((self.act_time-self.sql_last_transmit)>(self.sql_min_wait*1000)) and (len(self.json_out['payload']['measures'])>0): self.sign() # self.tcpsock.connect(("127.0.0.1",24048)) # self.tcpsock.send(json.dumps(self.sql_out).encode("utf-8")) # self.tcpsock.close() # self.json_out['payload']['measures']={} try: self._r=requests.put(self.sqlurl,json=json.dumps(self.sql_out)) # self._r=requests.post(self.sqlurl,data=self.sql_out) except: self._r={"status_code":404} else: if self._r.status_code==200: self.json_out['payload']['measures']={} self.sql_last_transmit=time.time()*1000 def upload_osm(self,trigger=0): if (self.act_time-self.sense_last_time)>(self.sense_intervall*1000): r = requests.post(self.sense_url,json={'value': float(self.act_value)/self.mult}) if (r.status_code != requests.codes.ok) & (r.status_code != 201): print("Error %d: %s" % (r.status_code,r.text)) else: self.sense_last_time=self.act_time def send_mqtt(self,trigger=0): sv=(-99) if len(self.value)>0: if len(self.value)>1: if self.value[-1]!=self.value[-2]: sv=self.value[-1] else: sv=self.value[-1] if sv >(-99): try: publish.single(self.mqtt_topic,self.mult*sv,hostname=self.mqtt_broker,port=self.mqtt_port,retain=True) except: print("could not send mqtt")