add sql script
parent
f32e1d21c4
commit
1c09e18f1b
|
@ -0,0 +1,146 @@
|
|||
CREATE schema if not exists sensor;
|
||||
create role sensordatain connection limit 10 password '';
|
||||
create role sensordataread connection limit 10 password '';
|
||||
|
||||
drop table sensor.datain;
|
||||
drop table sensor.data_storage;
|
||||
drop table sensor.last_data;
|
||||
drop table sensor.cfg_topic;
|
||||
drop table sensor.topic_list;
|
||||
drop table sensor.topic_def;
|
||||
create table if not exists sensor.datain (timestamp bigint not null,
|
||||
topic varchar(256),
|
||||
value bigint not null);
|
||||
create table if not exists sensor.data_storage (timestamp bigint not null,
|
||||
topicid bigint not null,
|
||||
value bigint not null);
|
||||
create table if not exists sensor.last_data (topicid bigint not null,
|
||||
value bigint not null);
|
||||
create table if not exists sensor.cfg_topic (feld varchar(256),
|
||||
pos smallint not null,
|
||||
minlength smallint );
|
||||
create table if not exists sensor.topic_list (topicid bigint not null primary key,
|
||||
topic varchar(256) not null unique);
|
||||
create table if not exists sensor.topic_def (topicid bigint not null,
|
||||
feld varchar(256),
|
||||
inhalt varchar(256));
|
||||
|
||||
insert into sensor.cfg_topic (feld,pos,minlength) values ('device',2,NULL);
|
||||
insert into sensor.cfg_topic (feld,pos,minlength) values ('quantity',-1,NULL);
|
||||
insert into sensor.cfg_topic (feld,pos,minlength) values ('place',3,NULL);
|
||||
insert into sensor.cfg_topic (feld,pos,minlength) values ('internal_id',4,5);
|
||||
|
||||
grant select,insert,update on sensor.data_storage to akosensordatain with grant option;
|
||||
grant insert,select,update on sensor.last_data to akosensordatain with grant option;
|
||||
grant insert,select on sensor.topic_list to akosensordatain with grant option;
|
||||
grant insert,select on sensor.topic_def to akosensordatain with grant option;
|
||||
grant execute on all procedures in schema sensor to akosensordatain with grant option;
|
||||
grant select,insert,update on sensor.data_storage to sensor with grant option;
|
||||
grant insert,select,update on sensor.last_data to sensor with grant option;
|
||||
grant insert,select on sensor.topic_list to sensor with grant option;
|
||||
grant insert,select on sensor.topic_def to sensor with grant option;
|
||||
grant execute on all procedures in schema sensor to sensor with grant option;
|
||||
grant connect on database sensor to sensor;
|
||||
grant select on akosensor.data_storage to sensordataread with grant option;
|
||||
|
||||
CREATE OR REPLACE PROCEDURE sensor.insert_topic(vtopic varchar(1024),newid bigint)
|
||||
language plpgsql
|
||||
as $$
|
||||
declare
|
||||
feldname varchar(1024);
|
||||
tpos int;
|
||||
feldinhalt varchar(64);
|
||||
bcfgDone INT;
|
||||
cfg_curs CURSOR FOR SELECT feld,pos FROM sensor.cfg_topic as cfgt where cfgt.minlength is null or cfgt.minlength <= (select (length(vtopic) - length(replace(vtopic,'/','')) +1));
|
||||
begin
|
||||
OPEN cfg_curs;
|
||||
loop
|
||||
fetch cfg_curs into feldname,tpos;
|
||||
exit when not found;
|
||||
|
||||
if (tpos < 0) then
|
||||
feldinhalt := reverse(split_part(reverse(vtopic),'/',abs(tpos)));
|
||||
else
|
||||
feldinhalt := split_part(vtopic,'/',tpos);
|
||||
end if;
|
||||
if (newid in (select topicid from sensor.topic_def where topic_def.feld=feldname)) = false then
|
||||
insert into sensor.topic_def (topicid,feld,inhalt) values (newid,feldname,feldinhalt);
|
||||
end if;
|
||||
end loop;
|
||||
close cfg_curs;
|
||||
end
|
||||
$$;
|
||||
|
||||
|
||||
CREATE OR REPLACE PROCEDURE sensor.insert_data(intopic varchar(256),invalue bigint,tstamp bigint)
|
||||
language plpgsql
|
||||
as $$
|
||||
declare
|
||||
crctopic bigint;
|
||||
BEGIN
|
||||
if intopic is null then
|
||||
raise 'missing Topic' using message = 'Topic is missing';
|
||||
end if;
|
||||
crctopic=crc32(intopic);
|
||||
if invalue is null then
|
||||
raise 'missing value' using message = 'Value is missing';
|
||||
end if;
|
||||
if tstamp is null then
|
||||
raise 'missing time' using message = 'Time is missing';
|
||||
end if;
|
||||
|
||||
if (crctopic in (select topicid from sensor.topic_list))=false then
|
||||
call sensor.insert_topic(intopic,crctopic);
|
||||
insert into sensor.topic_list (topicid,topic) values (crctopic,intopic);
|
||||
insert into sensor.data_storage (timestamp,topicid,value) values (tstamp,crctopic,invalue);
|
||||
insert into sensor.last_data (topicid,value) values (crctopic,invalue);
|
||||
else
|
||||
if (invalue not in (select value from sensor.last_data where topicid=crctopic)) then
|
||||
insert into sensor.data_storage (timestamp,topicid,value) values (tstamp,crctopic,invalue);
|
||||
update sensor.last_data set value=invalue where topicid=crctopic;
|
||||
else
|
||||
if (select count(distinct td.value) from (select ds.value from sensor.data_storage ds where ds.topicid=crctopic and (extract(epoch from current_timestamp)-300)*1000 < ds.timestamp order by ds.timestamp desc limit 2) td) = 1 then
|
||||
update sensor.data_storage set timestamp=tstamp where topicid=crctopic and timestamp=(select max(timestamp) from akosensor.data_storage where topicid=crctopic);
|
||||
else
|
||||
insert into sensor.data_storage (timestamp,topicid,value) values (tstamp,crctopic,invalue);
|
||||
end if;
|
||||
end if;
|
||||
end if;
|
||||
END
|
||||
$$;
|
||||
|
||||
|
||||
CREATE OR REPLACE FUNCTION crc32(text_string text) RETURNS bigint AS $$
|
||||
DECLARE
|
||||
tmp bigint;
|
||||
i int;
|
||||
j int;
|
||||
byte_length int;
|
||||
binary_string bytea;
|
||||
BEGIN
|
||||
IF text_string = '' THEN
|
||||
RETURN 0;
|
||||
END IF;
|
||||
|
||||
i = 0;
|
||||
tmp = 4294967295;
|
||||
byte_length = bit_length(text_string) / 8;
|
||||
binary_string = decode(replace(text_string, E'\\\\', E'\\\\\\\\'), 'escape');
|
||||
LOOP
|
||||
tmp = (tmp # get_byte(binary_string, i))::bigint;
|
||||
i = i + 1;
|
||||
j = 0;
|
||||
LOOP
|
||||
tmp = ((tmp >> 1) # (3988292384 * (tmp & 1)))::bigint;
|
||||
j = j + 1;
|
||||
IF j >= 8 THEN
|
||||
EXIT;
|
||||
END IF;
|
||||
END LOOP;
|
||||
IF i >= byte_length THEN
|
||||
EXIT;
|
||||
END IF;
|
||||
END LOOP;
|
||||
RETURN (tmp # 4294967295);
|
||||
END
|
||||
$$ IMMUTABLE LANGUAGE plpgsql;
|
|
@ -0,0 +1,99 @@
|
|||
CREATE DATABASE if not exists sensor;
|
||||
create user if not exists 'sensordatain'@'127.0.0.1' identified by '';
|
||||
create user if not exists 'sensordatain'@'localhost' identified by '';
|
||||
create user if not exists 'sensordataread'@'172.24.13.5' identified by '';
|
||||
|
||||
create table if not exists akosensor.datain (timestamp bigint unsigned not null comment 'Zeitstempel der Sensordaten',
|
||||
topic varchar(256) comment 'topic, unter den dieses Datum verwaltet wird',
|
||||
value bigint not null comment 'Sensordaten als Integer');
|
||||
create table if not exists akosensor.data_storage (timestamp bigint unsigned not null comment 'Zeitstempel der Sensordaten als epoch',
|
||||
topicid int unsigned not null comment 'numerische ID des Topics aus der Topic-Tabelle',
|
||||
value bigint not null comment 'Sensordaten als Integer', index (timestamp,topicid));
|
||||
create table if not exists akosensor.last_data (topicid int unsigned not null comment 'numerische ID des Topics aus der Topic-Tabelle',
|
||||
value bigint not null comment 'Sensordaten als Integer', index (topicid));
|
||||
create table if not exists akosensor.cfg_topic (feld varchar(256) comment 'Bezeichnung des topic-Anteils',
|
||||
pos tinyint not null comment 'Position innerhalb des Topics',
|
||||
minlength tinyint comment 'Mindestanzahl an elementen, die das Topic haben muss, damit die Regel greift');
|
||||
create table if not exists akosensor.topic_list (topicid int unsigned not null primary key comment 'numerische ID des topics',
|
||||
topic varchar(256) not null unique comment 'Topic');
|
||||
create table if not exists akosensor.topic_def (topicid int unsigned not null comment 'Interne ID eines topics',
|
||||
feld varchar(256) comment 'Bezeichnung des Datums',
|
||||
inhalt varchar(256) comment 'Inhalt des Datums',index (topicid,feld));
|
||||
create or replace view akosensor.data_view as select ds.timestamp/1000 as time,td.topic as topic,ds.value from akosensor.data_storage ds join akosensor.topic_list td on td.topicid=ds.topicid;
|
||||
|
||||
grant select,insert,update on sensor.data_storage to 'sensordatain'@'localhost' with grant option;
|
||||
grant insert,select,update on sensor.last_data to 'sensordatain'@'localhost' with grant option;
|
||||
grant insert,select on sensor.topic_list to 'sensordatain'@'localhost' with grant option;
|
||||
grant insert,select on sensor.topic_def to 'sensordatain'@'localhost' with grant option;
|
||||
grant execute on sensor.* to 'sensordatain'@'localhost' with grant option;
|
||||
grant select on sensor.data_storage to 'sensordataread'@'172.24.13.5' with grant option;
|
||||
grant select on sensor.data_view to 'sensordataread'@'172.24.13.5' with grant option;
|
||||
|
||||
DELIMITER //
|
||||
CREATE OR REPLACE PROCEDURE sensor.insert_topic(vtopic varchar(1024),newid int unsigned)
|
||||
begin
|
||||
declare tpos tinyint;
|
||||
declare feldname varchar(1024);
|
||||
DECLARE bcfgDone INT;
|
||||
DECLARE cfg_curs CURSOR FOR SELECT cfg_topic.feld,cfg_topic.pos FROM cfg_topic where cfg_topic.minlength is null or cfg_topic.minlength <= (select (length(vtopic) - length(replace(vtopic,'/','')) +1));
|
||||
DECLARE CONTINUE HANDLER FOR NOT FOUND SET bcfgDone = 1;
|
||||
OPEN cfg_curs;
|
||||
REPEAT
|
||||
fetch cfg_curs into feldname,tpos;
|
||||
if (tpos < 0) then
|
||||
select substring_index(substring_index(vtopic,'/',tpos),'/',1) into @feldinhalt;
|
||||
else
|
||||
select substring_index(substring_index(vtopic,'/',tpos),'/',-1) into @feldinhalt;
|
||||
end if;
|
||||
if (select count(*) from topic_def where topic_def.topicid = newid and topic_def.feld=feldname) = 0 then
|
||||
insert into topic_def (topicid,feld,inhalt) values (newid,feldname,@feldinhalt);
|
||||
end if;
|
||||
UNTIL bcfgDone END REPEAT;
|
||||
close cfg_curs;
|
||||
end
|
||||
//
|
||||
DELIMITER ;
|
||||
|
||||
DELIMITER //
|
||||
CREATE OR REPLACE PROCEDURE sensor.insert_data(IN intopic varchar(256),IN invalue bigint,IN tstamp bigint unsigned)
|
||||
BEGIN
|
||||
select crc32(intopic) into @crctopic;
|
||||
if (@crctopic is null) then
|
||||
signal sqlstate '45000' set mysql_errno=32100,message_text='Missing topic';
|
||||
end if;
|
||||
if (select invalue is null) then
|
||||
signal sqlstate '45000' set mysql_errno=32100,message_text='Missing value';
|
||||
end if;
|
||||
if (select tstamp is null) then
|
||||
signal sqlstate '45000' set mysql_errno=32100,message_text='Missing timestamp';
|
||||
end if;
|
||||
if (@crctopic in (select topicid from topic_list))=0 then
|
||||
# new topic means no value existent
|
||||
call insert_topic(intopic,@crctopic);
|
||||
insert into topic_list (topicid,topic) values (@crctopic,intopic);
|
||||
insert into data_storage (timestamp,topicid,value) values (tstamp,@crctopic,invalue);
|
||||
insert into last_data (topicid,value) values (@crctopic,invalue);
|
||||
else
|
||||
# topic exists and values should be available
|
||||
if (invalue not in (select value from last_data where topicid=@crctopic)) then
|
||||
insert into data_storage (timestamp,topicid,value) values (tstamp,@crctopic,invalue);
|
||||
update last_data set value=invalue where topicid=@crctopic;
|
||||
else
|
||||
if (select count(distinct td.value) from (select data_storage.value from data_storage where topicid=@crctopic and (data_storage.timestamp > (unix_timestamp(now)-300)*1000) order by timestamp desc limit 2) td) = 1 then
|
||||
update data_storage set timestamp=tstamp where topicid=@crctopic and timestamp=(select max(timestamp) from data_storage where topicid=@crctopic);
|
||||
else
|
||||
insert into data_storage (timestamp,topicid,value) values (tstamp,@crctopic,invalue);
|
||||
end if; # check if second last match
|
||||
end if; # compare with last value
|
||||
end if; # topicid is null
|
||||
END
|
||||
//
|
||||
DELIMITER ;
|
||||
|
||||
grant execute on sensor.* to 'akosensordatain'@'localhost' with grant option;
|
||||
set global event_scheduler=ON;
|
||||
|
||||
insert into sensor.cfg_topic (feld,pos,minlength) values ('device',2,NULL);
|
||||
insert into sensor.cfg_topic (feld,pos,minlength) values ('quantity',-1,NULL);
|
||||
insert into sensor.cfg_topic (feld,pos,minlength) values ('place',3,NULL);
|
||||
insert into sensor.cfg_topic (feld,pos,minlength) values ('internal_id',4,5);
|
Loading…
Reference in New Issue