147 lines
5.5 KiB
Plaintext
147 lines
5.5 KiB
Plaintext
CREATE schema if not exists sensor;
|
|
create role sensordatain connection limit 10 password '';
|
|
create role sensordataread connection limit 10 password '';
|
|
|
|
drop table sensor.datain;
|
|
drop table sensor.data_storage;
|
|
drop table sensor.last_data;
|
|
drop table sensor.cfg_topic;
|
|
drop table sensor.topic_list;
|
|
drop table sensor.topic_def;
|
|
create table if not exists sensor.datain (timestamp bigint not null,
|
|
topic varchar(256),
|
|
value bigint not null);
|
|
create table if not exists sensor.data_storage (timestamp bigint not null,
|
|
topicid bigint not null,
|
|
value bigint not null);
|
|
create table if not exists sensor.last_data (topicid bigint not null,
|
|
value bigint not null);
|
|
create table if not exists sensor.cfg_topic (feld varchar(256),
|
|
pos smallint not null,
|
|
minlength smallint );
|
|
create table if not exists sensor.topic_list (topicid bigint not null primary key,
|
|
topic varchar(256) not null unique);
|
|
create table if not exists sensor.topic_def (topicid bigint not null,
|
|
feld varchar(256),
|
|
inhalt varchar(256));
|
|
|
|
insert into sensor.cfg_topic (feld,pos,minlength) values ('device',2,NULL);
|
|
insert into sensor.cfg_topic (feld,pos,minlength) values ('quantity',-1,NULL);
|
|
insert into sensor.cfg_topic (feld,pos,minlength) values ('place',3,NULL);
|
|
insert into sensor.cfg_topic (feld,pos,minlength) values ('internal_id',4,5);
|
|
|
|
grant select,insert,update on sensor.data_storage to akosensordatain with grant option;
|
|
grant insert,select,update on sensor.last_data to akosensordatain with grant option;
|
|
grant insert,select on sensor.topic_list to akosensordatain with grant option;
|
|
grant insert,select on sensor.topic_def to akosensordatain with grant option;
|
|
grant execute on all procedures in schema sensor to akosensordatain with grant option;
|
|
grant select,insert,update on sensor.data_storage to sensor with grant option;
|
|
grant insert,select,update on sensor.last_data to sensor with grant option;
|
|
grant insert,select on sensor.topic_list to sensor with grant option;
|
|
grant insert,select on sensor.topic_def to sensor with grant option;
|
|
grant execute on all procedures in schema sensor to sensor with grant option;
|
|
grant connect on database sensor to sensor;
|
|
grant select on akosensor.data_storage to sensordataread with grant option;
|
|
|
|
CREATE OR REPLACE PROCEDURE sensor.insert_topic(vtopic varchar(1024),newid bigint)
|
|
language plpgsql
|
|
as $$
|
|
declare
|
|
feldname varchar(1024);
|
|
tpos int;
|
|
feldinhalt varchar(64);
|
|
bcfgDone INT;
|
|
cfg_curs CURSOR FOR SELECT feld,pos FROM sensor.cfg_topic as cfgt where cfgt.minlength is null or cfgt.minlength <= (select (length(vtopic) - length(replace(vtopic,'/','')) +1));
|
|
begin
|
|
OPEN cfg_curs;
|
|
loop
|
|
fetch cfg_curs into feldname,tpos;
|
|
exit when not found;
|
|
|
|
if (tpos < 0) then
|
|
feldinhalt := reverse(split_part(reverse(vtopic),'/',abs(tpos)));
|
|
else
|
|
feldinhalt := split_part(vtopic,'/',tpos);
|
|
end if;
|
|
if (newid in (select topicid from sensor.topic_def where topic_def.feld=feldname)) = false then
|
|
insert into sensor.topic_def (topicid,feld,inhalt) values (newid,feldname,feldinhalt);
|
|
end if;
|
|
end loop;
|
|
close cfg_curs;
|
|
end
|
|
$$;
|
|
|
|
|
|
CREATE OR REPLACE PROCEDURE sensor.insert_data(intopic varchar(256),invalue bigint,tstamp bigint)
|
|
language plpgsql
|
|
as $$
|
|
declare
|
|
crctopic bigint;
|
|
BEGIN
|
|
if intopic is null then
|
|
raise 'missing Topic' using message = 'Topic is missing';
|
|
end if;
|
|
crctopic=crc32(intopic);
|
|
if invalue is null then
|
|
raise 'missing value' using message = 'Value is missing';
|
|
end if;
|
|
if tstamp is null then
|
|
raise 'missing time' using message = 'Time is missing';
|
|
end if;
|
|
|
|
if (crctopic in (select topicid from sensor.topic_list))=false then
|
|
call sensor.insert_topic(intopic,crctopic);
|
|
insert into sensor.topic_list (topicid,topic) values (crctopic,intopic);
|
|
insert into sensor.data_storage (timestamp,topicid,value) values (tstamp,crctopic,invalue);
|
|
insert into sensor.last_data (topicid,value) values (crctopic,invalue);
|
|
else
|
|
if (invalue not in (select value from sensor.last_data where topicid=crctopic)) then
|
|
insert into sensor.data_storage (timestamp,topicid,value) values (tstamp,crctopic,invalue);
|
|
update sensor.last_data set value=invalue where topicid=crctopic;
|
|
else
|
|
if (select count(distinct td.value) from (select ds.value from sensor.data_storage ds where ds.topicid=crctopic and (extract(epoch from current_timestamp)-300)*1000 < ds.timestamp order by ds.timestamp desc limit 2) td) = 1 then
|
|
update sensor.data_storage set timestamp=tstamp where topicid=crctopic and timestamp=(select max(timestamp) from akosensor.data_storage where topicid=crctopic);
|
|
else
|
|
insert into sensor.data_storage (timestamp,topicid,value) values (tstamp,crctopic,invalue);
|
|
end if;
|
|
end if;
|
|
end if;
|
|
END
|
|
$$;
|
|
|
|
|
|
CREATE OR REPLACE FUNCTION crc32(text_string text) RETURNS bigint AS $$
|
|
DECLARE
|
|
tmp bigint;
|
|
i int;
|
|
j int;
|
|
byte_length int;
|
|
binary_string bytea;
|
|
BEGIN
|
|
IF text_string = '' THEN
|
|
RETURN 0;
|
|
END IF;
|
|
|
|
i = 0;
|
|
tmp = 4294967295;
|
|
byte_length = bit_length(text_string) / 8;
|
|
binary_string = decode(replace(text_string, E'\\\\', E'\\\\\\\\'), 'escape');
|
|
LOOP
|
|
tmp = (tmp # get_byte(binary_string, i))::bigint;
|
|
i = i + 1;
|
|
j = 0;
|
|
LOOP
|
|
tmp = ((tmp >> 1) # (3988292384 * (tmp & 1)))::bigint;
|
|
j = j + 1;
|
|
IF j >= 8 THEN
|
|
EXIT;
|
|
END IF;
|
|
END LOOP;
|
|
IF i >= byte_length THEN
|
|
EXIT;
|
|
END IF;
|
|
END LOOP;
|
|
RETURN (tmp # 4294967295);
|
|
END
|
|
$$ IMMUTABLE LANGUAGE plpgsql;
|