new full view of all data; new event to move from data_storage to retent_data

master
ademant 2019-12-22 17:09:12 +01:00
parent 4f93218cd8
commit fd9eef4f16
3 changed files with 306 additions and 25 deletions

View File

@ -1,36 +1,62 @@
CREATE DATABASE if not exists sensor;
create user if not exists 'sensordatain'@'127.0.0.1' identified by '';
create user if not exists 'sensordatain'@'localhost' identified by '';
create user if not exists 'sensordataread'@'172.24.13.5' identified by '';
CREATE DATABASE if not exists tsensor;
create user if not exists 'sensor'@'localhost' identified by 'Whillmqq';
create user if not exists 'sensordataread'@'localhost' identified by 'Whill';
create table if not exists akosensor.datain (timestamp bigint unsigned not null comment 'Zeitstempel der Sensordaten',
create table if not exists tsensor.datain (timestamp bigint unsigned not null comment 'Zeitstempel der Sensordaten',
topic varchar(256) comment 'topic, unter den dieses Datum verwaltet wird',
value bigint not null comment 'Sensordaten als Integer');
create table if not exists akosensor.data_storage (timestamp bigint unsigned not null comment 'Zeitstempel der Sensordaten als epoch',
create table if not exists tsensor.data_storage (timestamp bigint unsigned not null comment 'Zeitstempel der Sensordaten als epoch',
topicid int unsigned not null comment 'numerische ID des Topics aus der Topic-Tabelle',
value bigint not null comment 'Sensordaten als Integer', index (timestamp,topicid));
create table if not exists akosensor.last_data (topicid int unsigned not null comment 'numerische ID des Topics aus der Topic-Tabelle',
create table if not exists tsensor.data_retent as select * from tsensor.data_storage;
create table if not exists tsensor.last_data (topicid int unsigned not null comment 'numerische ID des Topics aus der Topic-Tabelle',
value bigint not null comment 'Sensordaten als Integer', index (topicid));
create table if not exists akosensor.cfg_topic (feld varchar(256) comment 'Bezeichnung des topic-Anteils',
create table if not exists tsensor.cfg_topic (feld varchar(256) comment 'Bezeichnung des topic-Anteils',
pos tinyint not null comment 'Position innerhalb des Topics',
minlength tinyint comment 'Mindestanzahl an elementen, die das Topic haben muss, damit die Regel greift');
create table if not exists akosensor.topic_list (topicid int unsigned not null primary key comment 'numerische ID des topics',
create table if not exists tsensor.topic_list (topicid int unsigned not null primary key comment 'numerische ID des topics',
topic varchar(256) not null unique comment 'Topic');
create table if not exists akosensor.topic_def (topicid int unsigned not null comment 'Interne ID eines topics',
create table if not exists tsensor.topic_def (topicid int unsigned not null comment 'Interne ID eines topics',
feld varchar(256) comment 'Bezeichnung des Datums',
inhalt varchar(256) comment 'Inhalt des Datums',index (topicid,feld));
create or replace view akosensor.data_view as select ds.timestamp/1000 as time,td.topic as topic,ds.value from akosensor.data_storage ds join akosensor.topic_list td on td.topicid=ds.topicid;
create or replace view tsensor.data_view as select utd.*,tl.topic,tdq.inhalt as quantity,tdd.inhalt as device,tdi.inhalt as internal_id from ((select ds.timestamp/1000 as time,ds.topicid,ds.value from tsensor.data_storage ds) union (select ds.timestamp/1000 as time,ds.topicid,ds.value from tsensor.data_retent ds)) utd
join tsensor.topic_list tl on utd.topicid=tl.topicid
left join tsensor.topic_def tdq on tdq.topicid=utd.topicid and tdq.feld='quantity'
left join tsensor.topic_def tdd on tdd.topicid=utd.topicid and tdq.feld='device'
left join tsensor.topic_def tdi on tdi.topicid=utd.topicid and tdq.feld='internal_id';
grant select,insert,update on sensor.data_storage to 'sensordatain'@'localhost' with grant option;
grant insert,select,update on sensor.last_data to 'sensordatain'@'localhost' with grant option;
grant insert,select on sensor.topic_list to 'sensordatain'@'localhost' with grant option;
grant insert,select on sensor.topic_def to 'sensordatain'@'localhost' with grant option;
grant execute on sensor.* to 'sensordatain'@'localhost' with grant option;
grant select on sensor.data_storage to 'sensordataread'@'172.24.13.5' with grant option;
grant select on sensor.data_view to 'sensordataread'@'172.24.13.5' with grant option;
grant select,insert,update on tsensor.data_storage to 'sensor'@'localhost' with grant option;
grant insert,select,update on tsensor.last_data to 'sensor'@'localhost' with grant option;
grant insert,select on tsensor.topic_list to 'sensor'@'localhost' with grant option;
grant insert,select on tsensor.topic_def to 'sensor'@'localhost' with grant option;
grant execute on tsensor.* to 'sensor'@'localhost' with grant option;
grant select on tsensor.data_storage to 'sensordataread'@'localhost' with grant option;
grant select on tsensor.data_view to 'sensordataread'@'localhost' with grant option;
delimiter //
create or replace procedure tsensor.retent_data()
begin
declare tid bigint;
declare btopicdone int;
declare topic_curs cursor for select distinct topic_list.topicid from topic_list;
declare continue handler for not found set btopicdone = 1;
open topic_curs;
repeat
fetch topic_curs into tid;
select count(*) into @actcount from data_storage where topicid=tid;
if (@actcount > 2) then
select min(timestamp) into @mts from (select timestamp from data_storage where topicid=tid order by timestamp desc limit 2) td ;
insert into data_retent select * from data_storage where topicid=tid and timestamp < @mts;
delete from data_storage where topicid=tid and timestamp < @mts;
end if;
until btopicdone end repeat;
close topic_curs;
end
//
delimiter ;
DELIMITER //
CREATE OR REPLACE PROCEDURE sensor.insert_topic(vtopic varchar(1024),newid int unsigned)
CREATE OR REPLACE PROCEDURE tsensor.insert_topic(vtopic varchar(1024),newid int unsigned)
begin
declare tpos tinyint;
declare feldname varchar(1024);
@ -55,7 +81,7 @@ end
DELIMITER ;
DELIMITER //
CREATE OR REPLACE PROCEDURE sensor.insert_data(IN intopic varchar(256),IN invalue bigint,IN tstamp bigint unsigned)
CREATE OR REPLACE PROCEDURE tsensor.insert_data(IN intopic varchar(256),IN invalue bigint,IN tstamp bigint unsigned)
BEGIN
select crc32(intopic) into @crctopic;
if (@crctopic is null) then
@ -90,10 +116,12 @@ END
//
DELIMITER ;
grant execute on sensor.* to 'akosensordatain'@'localhost' with grant option;
grant execute on tsensor.* to 'sensor'@'localhost' with grant option;
set global event_scheduler=ON;
insert into sensor.cfg_topic (feld,pos,minlength) values ('device',2,NULL);
insert into sensor.cfg_topic (feld,pos,minlength) values ('quantity',-1,NULL);
insert into sensor.cfg_topic (feld,pos,minlength) values ('place',3,NULL);
insert into sensor.cfg_topic (feld,pos,minlength) values ('internal_id',4,5);
create event if not exists tsensor.retent_old_data on schedule every 1 hour do call tsensor.retent_data();
insert into tsensor.cfg_topic (feld,pos,minlength) values ('device',2,NULL);
insert into tsensor.cfg_topic (feld,pos,minlength) values ('quantity',-1,NULL);
insert into tsensor.cfg_topic (feld,pos,minlength) values ('place',3,NULL);
insert into tsensor.cfg_topic (feld,pos,minlength) values ('internal_id',4,5);

242
mqtt2call_sql.py Normal file
View File

@ -0,0 +1,242 @@
import paho.mqtt.client as mqtt
import json,time
import pymysql,dateutil
import datetime
from dateutil.parser import parse as date_parse
import threading
#sqlinsert="insert into datain (time,sensortime,topic,value) values ({0:d},{1:d},'{2}',{3:d})"
sqlinsert="call insert_data('{2}',{3:d},{1:d})"
lastvalue={}
# check values and insert into database
def data2sql(acttime,msgtime,topic,value,mycursor):
bsend=True
iv=int(value)
if topic in lastvalue:
if iv == lastvalue[topic]:
bsend=False
if bsend:
lastvalue[topic]=iv
# print(sqlinsert.format(acttime,msgtime,topic,iv))
try:
sqlcheck=mycursor.execute(sqlinsert.format(acttime,msgtime,topic,iv))
# print(sqlcheck)
except Exception as e:
print(sqlinsert.format(acttime,msgtime,topic,iv))
print(e)
# The callback for when the client receives a CONNACK response from the server.
def on_connect(client, userdata, flags, rc):
print("Connected with result code "+str(rc))
# Subscribing in on_connect() means that if we lose the connection and
# reconnect then subscriptions will be renewed.
client.subscribe("tele/+/STATE")
# The callback for when a PUBLISH message is received from the server.
def on_message(client, userdata, msg):
try:
mydb=pymysql.connect(read_default_file="~/.my.cnf",database="tsensor")
except:
print("Could not connect to sql server! Quitting")
else:
mycursor=mydb.cursor(pymysql.cursors.DictCursor)
acttime=int(1000*time.time())
jpl=json.loads(msg.payload)
ts="sp/"+msg.topic.split("/")[1]
# print(ts+" "+str(jpl))
str_to_dt=0
if 'Time' in jpl:
str_to_dt = int(1000*datetime.datetime.strptime(jpl['Time'], '%Y-%m-%dT%H:%M:%S').timestamp())
if 'LoadAvg' in jpl:
try:
data2sql(acttime,str_to_dt,ts+'/LoadAvg',jpl['LoadAvg'],mycursor)
except Exception as e:
print(e)
if 'POWER' in jpl:
power=0
if jpl['POWER']=='OFF':
power=1
try:
data2sql(acttime,str_to_dt,ts+'/PowerStatus',power,mycursor)
except Exception as e:
print(e)
if 'Wifi' in jpl:
jplw=jpl['Wifi']
if 'RSSI' in jplw:
try:
data2sql(acttime,str_to_dt,ts+'/RSSI',jplw['RSSI'],mycursor)
except Exception as e:
print(e)
if 'Signal' in jplw:
try:
data2sql(acttime,str_to_dt,ts+'/Signal',jplw['Signal'],mycursor)
except Exception as e:
print(e)
if 'Channel' in jplw:
try:
data2sql(acttime,str_to_dt,ts+'/Channel',jplw['Channel'],mycursor)
except Exception as e:
print(e)
mycursor.close()
mydb.commit()
mydb.close()
# The callback for when the client receives a CONNACK response from the server.
def on_connect_sens(client, userdata, flags, rc):
print("Connected with result code "+str(rc))
# Subscribing in on_connect() means that if we lose the connection and
# reconnect then subscriptions will be renewed.
client.subscribe("tele/+/SENSOR")
# The callback for when a PUBLISH message is received from the server.
def on_message_sens(client, userdata, msg):
try:
mydb_sens=pymysql.connect(read_default_file="~/.my.cnf",database="tsensor")
except:
print("Could not connect to sql server! Quitting")
else:
mycursor_sens=mydb_sens.cursor(pymysql.cursors.DictCursor)
acttime=int(1000*time.time())
jpl=json.loads(msg.payload)
ts="sp/"+msg.topic.split("/")[1]
# print(ts+" "+str(jpl))
str_to_dt=0
if 'Time' in jpl:
str_to_dt = int(1000*datetime.datetime.strptime(jpl['Time'], '%Y-%m-%dT%H:%M:%S').timestamp())
if 'ENERGY' in jpl:
jplw=jpl['ENERGY']
if 'Power' in jplw:
try:
data2sql(acttime,str_to_dt,ts+'/Power',int(1000*jplw['Power']),mycursor_sens)
except Exception as e:
print(e)
if 'ApparentPower' in jplw:
try:
data2sql(acttime,str_to_dt,ts+'/ApparentPower',int(1000*jplw['ApparentPower']),mycursor_sens)
except Exception as e:
print(e)
if 'ReactivePower' in jplw:
try:
data2sql(acttime,str_to_dt,ts+'/ReactivePower',int(1000*jplw['ReactivePower']),mycursor_sens)
except Exception as e:
print(e)
if 'Factor' in jplw:
try:
data2sql(acttime,str_to_dt,ts+'/PowerFactor',int(1000*jplw['Factor']),mycursor_sens)
except Exception as e:
print(e)
if 'Voltage' in jplw:
try:
data2sql(acttime,str_to_dt,ts+'/Voltage',int(1000*jplw['Voltage']),mycursor_sens)
except Exception as e:
print(e)
if 'Total' in jplw:
try:
data2sql(acttime,str_to_dt,ts+'/Total',int(1000*jplw['Total']),mycursor_sens)
except Exception as e:
print(e)
if 'Current' in jplw:
try:
data2sql(acttime,str_to_dt,ts+'/Current',int(1000*jplw['Current']),mycursor_sens)
except Exception as e:
print(e)
if 'Period' in jplw:
try:
data2sql(acttime,str_to_dt,ts+'/Period',int(1000*jplw['Period']),mycursor_sens)
except Exception as e:
print(e)
mycursor_sens.close()
mydb_sens.commit()
mydb_sens.close()
# The callback for when the client receives a CONNACK response from the server.
def on_connect_sns(client, userdata, flags, rc):
print("Connected with result code "+str(rc))
# Subscribing in on_connect() means that if we lose the connection and
# reconnect then subscriptions will be renewed.
client.subscribe("tele/+/sens/#")
# The callback for when a PUBLISH message is received from the server.
def on_message_sns(client, userdata, msg):
try:
mydb_sns=pymysql.connect(read_default_file="~/.my.cnf",database="tsensor")
except:
print("Could not connect to sql server! Quitting")
else:
print(int(msg.payload.decode("utf-8")))
mycursor_sns=mydb_sns.cursor(pymysql.cursors.DictCursor)
acttime=int(1000*time.time())
ts=msg.topic.replace("tele","sp")
print(ts)
try:
data2sql(acttime,acttime,ts,int(msg.payload.decode("utf-8")),mycursor_sns)
except Exception as e:
print(e)
mycursor_sns.close()
mydb_sns.commit()
mydb_sns.close()
# The callback for when the client receives a CONNACK response from the server.
def on_connect_net(client, userdata, flags, rc):
print("Connected with result code "+str(rc))
# Subscribing in on_connect() means that if we lose the connection and
# reconnect then subscriptions will be renewed.
client.subscribe("tele/+/net/#")
# The callback for when a PUBLISH message is received from the server.
def on_message_net(client, userdata, msg):
try:
mydb_net=pymysql.connect(read_default_file="~/.my.cnf",database="tsensor")
except:
print("Could not connect to sql server! Quitting")
else:
print(int(msg.payload.decode("utf-8")))
mycursor_net=mydb_net.cursor(pymysql.cursors.DictCursor)
acttime=int(1000*time.time())
ts=msg.topic.replace("tele","sp")
print(ts)
try:
data2sql(acttime,acttime,ts,int(msg.payload.decode("utf-8")),mycursor_net)
except Exception as e:
print(e)
mycursor_net.close()
mydb_net.commit()
mydb_net.close()
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client_sens = mqtt.Client()
client_sens.on_connect = on_connect_sens
client_sens.on_message = on_message_sens
#client_sns = mqtt.Client()
#client_sns.on_connect = on_connect_sns
#client_sns.on_message = on_message_sns
client_net = mqtt.Client()
client_net.on_connect = on_connect_net
client_net.on_message = on_message_net
client.connect("172.24.41.2", 1883, 60)
client_sens.connect("172.24.41.2", 1883, 60)
#client_sns.connect("172.24.41.2", 1883, 60)
client_net.connect("172.24.41.2", 1883, 60)
# Blocking call that processes network traffic, dispatches callbacks and
# handles reconnecting.
# Other loop*() functions are available that give a threaded interface and a
# manual interface.
mq_state=threading.Thread(target=client.loop_forever)
mq_sens=threading.Thread(target=client_sens.loop_forever)
#mq_sns=threading.Thread(target=client_sns.loop_forever)
mq_net=threading.Thread(target=client_net.loop_forever)
#client_sns.loop_forever()
#client.loop_forever()
mq_state.start()
mq_sens.start()
#mq_sns.start()
mq_net.start()
while True:
time.sleep(1)

11
mqtt2call_sql.sh Executable file
View File

@ -0,0 +1,11 @@
#!/bin/sh
mosquitto_sub -h 172.24.42.2 -R -t 'tele/+/sens/#' -F "%t %p %U" | while read TOPIC VALU EPOCH; do
SEPOCH=$(echo $EPOCH|sed -e 's/\.//g'|cut -b -13)
LEPOCH=$(date +%s%N|cut -b -13)
TOPI=$(echo $TOPIC|sed -e 's/tele/sp/g')
IVAL=$(echo $VALU|cut -d\. -f1)
echo "call insert_data('$TOPI',$IVAL,$SEPOCH);"|mysql -t tsensor
echo "call insert_data('$TOPI',$IVAL,$EPOCH);"
done