132 lines
7.0 KiB
SQL
132 lines
7.0 KiB
SQL
CREATE DATABASE if not exists tsensor;
|
|
create user if not exists 'sensor'@'localhost' identified by 'Whillmqq';
|
|
create user if not exists 'sensordataread'@'localhost' identified by 'Whill';
|
|
|
|
create table if not exists tsensor.datain (timestamp bigint unsigned not null comment 'Zeitstempel der Sensordaten',
|
|
topic varchar(256) comment 'topic, unter den dieses Datum verwaltet wird',
|
|
value bigint not null comment 'Sensordaten als Integer');
|
|
create table if not exists tsensor.data_storage (timestamp bigint unsigned not null comment 'Zeitstempel der Sensordaten als epoch',
|
|
topicid int unsigned not null comment 'numerische ID des Topics aus der Topic-Tabelle',
|
|
value bigint not null comment 'Sensordaten als Integer', index (timestamp,topicid));
|
|
create table if not exists tsensor.data_retent as select * from tsensor.data_storage;
|
|
create table if not exists tsensor.last_data (topicid int unsigned not null comment 'numerische ID des Topics aus der Topic-Tabelle',
|
|
value bigint not null comment 'Sensordaten als Integer', index (topicid));
|
|
create table if not exists tsensor.cfg_topic (feld varchar(256) comment 'Bezeichnung des topic-Anteils',
|
|
pos tinyint not null comment 'Position innerhalb des Topics',
|
|
minlength tinyint comment 'Mindestanzahl an elementen, die das Topic haben muss, damit die Regel greift');
|
|
create table if not exists tsensor.topic_list (topicid int unsigned not null primary key comment 'numerische ID des topics',
|
|
topic varchar(256) not null unique comment 'Topic');
|
|
create table if not exists tsensor.topic_def (topicid int unsigned not null comment 'Interne ID eines topics',
|
|
feld varchar(256) comment 'Bezeichnung des Datums',
|
|
inhalt varchar(256) comment 'Inhalt des Datums',index (topicid,feld));
|
|
create or replace view tsensor.data_view as select utd.*,tl.topic,tdq.inhalt as quantity,tdd.inhalt as device,tdi.inhalt as internal_id from ((select ds.timestamp/1000 as time,ds.topicid,ds.value from tsensor.data_storage ds) union (select ds.timestamp/1000 as time,ds.topicid,ds.value from tsensor.data_retent ds)) utd
|
|
join tsensor.topic_list tl on utd.topicid=tl.topicid
|
|
left join tsensor.topic_def tdq on tdq.topicid=utd.topicid and tdq.feld='quantity'
|
|
left join tsensor.topic_def tdd on tdd.topicid=utd.topicid and tdq.feld='device'
|
|
left join tsensor.topic_def tdi on tdi.topicid=utd.topicid and tdq.feld='internal_id';
|
|
|
|
grant select,insert,update on tsensor.data_storage to 'sensor'@'localhost' with grant option;
|
|
grant insert,select,update on tsensor.last_data to 'sensor'@'localhost' with grant option;
|
|
grant insert,select on tsensor.topic_list to 'sensor'@'localhost' with grant option;
|
|
grant insert,select on tsensor.topic_def to 'sensor'@'localhost' with grant option;
|
|
grant execute on tsensor.* to 'sensor'@'localhost' with grant option;
|
|
grant select on tsensor.data_storage to 'sensordataread'@'localhost' with grant option;
|
|
grant select on tsensor.data_view to 'sensordataread'@'localhost' with grant option;
|
|
|
|
delimiter //
|
|
create or replace procedure tsensor.retent_data()
|
|
begin
|
|
declare tid bigint;
|
|
declare btopicdone int;
|
|
declare topic_curs cursor for select distinct topic_list.topicid from topic_list;
|
|
declare continue handler for not found set btopicdone = 1;
|
|
open topic_curs;
|
|
repeat
|
|
fetch topic_curs into tid;
|
|
select count(*) into @actcount from data_storage where topicid=tid;
|
|
if (@actcount > 2) then
|
|
select min(timestamp) into @mts from (select timestamp from data_storage where topicid=tid order by timestamp desc limit 2) td ;
|
|
insert into data_retent select * from data_storage where topicid=tid and timestamp < @mts;
|
|
delete from data_storage where topicid=tid and timestamp < @mts;
|
|
commit;
|
|
end if;
|
|
until btopicdone end repeat;
|
|
close topic_curs;
|
|
end
|
|
//
|
|
delimiter ;
|
|
|
|
DELIMITER //
|
|
CREATE OR REPLACE PROCEDURE tsensor.insert_topic(vtopic varchar(1024),newid int unsigned)
|
|
begin
|
|
declare tpos tinyint;
|
|
declare feldname varchar(1024);
|
|
DECLARE bcfgDone INT;
|
|
DECLARE cfg_curs CURSOR FOR SELECT cfg_topic.feld,cfg_topic.pos FROM cfg_topic where cfg_topic.minlength is null or cfg_topic.minlength <= (select (length(vtopic) - length(replace(vtopic,'/','')) +1));
|
|
DECLARE CONTINUE HANDLER FOR NOT FOUND SET bcfgDone = 1;
|
|
OPEN cfg_curs;
|
|
REPEAT
|
|
fetch cfg_curs into feldname,tpos;
|
|
if (tpos < 0) then
|
|
select substring_index(substring_index(vtopic,'/',tpos),'/',1) into @feldinhalt;
|
|
else
|
|
select substring_index(substring_index(vtopic,'/',tpos),'/',-1) into @feldinhalt;
|
|
end if;
|
|
if (select count(*) from topic_def where topic_def.topicid = newid and topic_def.feld=feldname) = 0 then
|
|
insert into topic_def (topicid,feld,inhalt) values (newid,feldname,@feldinhalt);
|
|
commit;
|
|
end if;
|
|
UNTIL bcfgDone END REPEAT;
|
|
close cfg_curs;
|
|
end
|
|
//
|
|
DELIMITER ;
|
|
|
|
DELIMITER //
|
|
CREATE OR REPLACE PROCEDURE tsensor.insert_data(IN intopic varchar(256),IN invalue bigint,IN tstamp bigint unsigned)
|
|
BEGIN
|
|
select crc32(intopic) into @crctopic;
|
|
if (@crctopic is null) then
|
|
signal sqlstate '45000' set mysql_errno=32100,message_text='Missing topic';
|
|
end if;
|
|
if (select invalue is null) then
|
|
signal sqlstate '45000' set mysql_errno=32100,message_text='Missing value';
|
|
end if;
|
|
if (select tstamp is null) then
|
|
signal sqlstate '45000' set mysql_errno=32100,message_text='Missing timestamp';
|
|
end if;
|
|
if (@crctopic in (select topicid from topic_list))=0 then
|
|
# new topic means no value existent
|
|
call insert_topic(intopic,@crctopic);
|
|
insert into topic_list (topicid,topic) values (@crctopic,intopic);
|
|
insert into data_storage (timestamp,topicid,value) values (tstamp,@crctopic,invalue);
|
|
insert into last_data (topicid,value) values (@crctopic,invalue);
|
|
else
|
|
# topic exists and values should be available
|
|
if (invalue not in (select value from last_data where topicid=@crctopic)) then
|
|
insert into data_storage (timestamp,topicid,value) values (tstamp,@crctopic,invalue);
|
|
update last_data set value=invalue where topicid=@crctopic;
|
|
else
|
|
if (select count(distinct td.value) from (select data_storage.value from data_storage where topicid=@crctopic and (data_storage.timestamp > (unix_timestamp(now())-300)*1000) order by timestamp desc limit 2) td) = 1 then
|
|
update data_storage set timestamp=tstamp where topicid=@crctopic and timestamp=(select max(timestamp) from data_storage where topicid=@crctopic);
|
|
else
|
|
insert into data_storage (timestamp,topicid,value) values (tstamp,@crctopic,invalue);
|
|
end if; # check if second last match
|
|
end if; # compare with last value
|
|
end if; # topicid is null
|
|
commit;
|
|
END
|
|
//
|
|
DELIMITER ;
|
|
|
|
grant execute on tsensor.* to 'sensor'@'localhost' with grant option;
|
|
set global event_scheduler=ON;
|
|
|
|
create event if not exists tsensor.retent_old_data on schedule every 1 hour do call tsensor.retent_data();
|
|
|
|
insert into tsensor.cfg_topic (feld,pos,minlength) values ('device',2,NULL);
|
|
insert into tsensor.cfg_topic (feld,pos,minlength) values ('quantity',-1,NULL);
|
|
insert into tsensor.cfg_topic (feld,pos,minlength) values ('place',3,NULL);
|
|
insert into tsensor.cfg_topic (feld,pos,minlength) values ('internal_id',4,5);
|
|
commit;
|