pysql/create_sensor_db.psql

147 lines
5.5 KiB
Plaintext

CREATE schema if not exists sensor;
create role sensordatain connection limit 10 password '';
create role sensordataread connection limit 10 password '';
drop table sensor.datain;
drop table sensor.data_storage;
drop table sensor.last_data;
drop table sensor.cfg_topic;
drop table sensor.topic_list;
drop table sensor.topic_def;
create table if not exists sensor.datain (timestamp bigint not null,
topic varchar(256),
value bigint not null);
create table if not exists sensor.data_storage (timestamp bigint not null,
topicid bigint not null,
value bigint not null);
create table if not exists sensor.last_data (topicid bigint not null,
value bigint not null);
create table if not exists sensor.cfg_topic (feld varchar(256),
pos smallint not null,
minlength smallint );
create table if not exists sensor.topic_list (topicid bigint not null primary key,
topic varchar(256) not null unique);
create table if not exists sensor.topic_def (topicid bigint not null,
feld varchar(256),
inhalt varchar(256));
insert into sensor.cfg_topic (feld,pos,minlength) values ('device',2,NULL);
insert into sensor.cfg_topic (feld,pos,minlength) values ('quantity',-1,NULL);
insert into sensor.cfg_topic (feld,pos,minlength) values ('place',3,NULL);
insert into sensor.cfg_topic (feld,pos,minlength) values ('internal_id',4,5);
grant select,insert,update on sensor.data_storage to akosensordatain with grant option;
grant insert,select,update on sensor.last_data to akosensordatain with grant option;
grant insert,select on sensor.topic_list to akosensordatain with grant option;
grant insert,select on sensor.topic_def to akosensordatain with grant option;
grant execute on all procedures in schema sensor to akosensordatain with grant option;
grant select,insert,update on sensor.data_storage to sensor with grant option;
grant insert,select,update on sensor.last_data to sensor with grant option;
grant insert,select on sensor.topic_list to sensor with grant option;
grant insert,select on sensor.topic_def to sensor with grant option;
grant execute on all procedures in schema sensor to sensor with grant option;
grant connect on database sensor to sensor;
grant select on akosensor.data_storage to sensordataread with grant option;
CREATE OR REPLACE PROCEDURE sensor.insert_topic(vtopic varchar(1024),newid bigint)
language plpgsql
as $$
declare
feldname varchar(1024);
tpos int;
feldinhalt varchar(64);
bcfgDone INT;
cfg_curs CURSOR FOR SELECT feld,pos FROM sensor.cfg_topic as cfgt where cfgt.minlength is null or cfgt.minlength <= (select (length(vtopic) - length(replace(vtopic,'/','')) +1));
begin
OPEN cfg_curs;
loop
fetch cfg_curs into feldname,tpos;
exit when not found;
if (tpos < 0) then
feldinhalt := reverse(split_part(reverse(vtopic),'/',abs(tpos)));
else
feldinhalt := split_part(vtopic,'/',tpos);
end if;
if (newid in (select topicid from sensor.topic_def where topic_def.feld=feldname)) = false then
insert into sensor.topic_def (topicid,feld,inhalt) values (newid,feldname,feldinhalt);
end if;
end loop;
close cfg_curs;
end
$$;
CREATE OR REPLACE PROCEDURE sensor.insert_data(intopic varchar(256),invalue bigint,tstamp bigint)
language plpgsql
as $$
declare
crctopic bigint;
BEGIN
if intopic is null then
raise 'missing Topic' using message = 'Topic is missing';
end if;
crctopic=crc32(intopic);
if invalue is null then
raise 'missing value' using message = 'Value is missing';
end if;
if tstamp is null then
raise 'missing time' using message = 'Time is missing';
end if;
if (crctopic in (select topicid from sensor.topic_list))=false then
call sensor.insert_topic(intopic,crctopic);
insert into sensor.topic_list (topicid,topic) values (crctopic,intopic);
insert into sensor.data_storage (timestamp,topicid,value) values (tstamp,crctopic,invalue);
insert into sensor.last_data (topicid,value) values (crctopic,invalue);
else
if (invalue not in (select value from sensor.last_data where topicid=crctopic)) then
insert into sensor.data_storage (timestamp,topicid,value) values (tstamp,crctopic,invalue);
update sensor.last_data set value=invalue where topicid=crctopic;
else
if (select count(distinct td.value) from (select ds.value from sensor.data_storage ds where ds.topicid=crctopic and (extract(epoch from current_timestamp)-300)*1000 < ds.timestamp order by ds.timestamp desc limit 2) td) = 1 then
update sensor.data_storage set timestamp=tstamp where topicid=crctopic and timestamp=(select max(timestamp) from akosensor.data_storage where topicid=crctopic);
else
insert into sensor.data_storage (timestamp,topicid,value) values (tstamp,crctopic,invalue);
end if;
end if;
end if;
END
$$;
CREATE OR REPLACE FUNCTION crc32(text_string text) RETURNS bigint AS $$
DECLARE
tmp bigint;
i int;
j int;
byte_length int;
binary_string bytea;
BEGIN
IF text_string = '' THEN
RETURN 0;
END IF;
i = 0;
tmp = 4294967295;
byte_length = bit_length(text_string) / 8;
binary_string = decode(replace(text_string, E'\\\\', E'\\\\\\\\'), 'escape');
LOOP
tmp = (tmp # get_byte(binary_string, i))::bigint;
i = i + 1;
j = 0;
LOOP
tmp = ((tmp >> 1) # (3988292384 * (tmp & 1)))::bigint;
j = j + 1;
IF j >= 8 THEN
EXIT;
END IF;
END LOOP;
IF i >= byte_length THEN
EXIT;
END IF;
END LOOP;
RETURN (tmp # 4294967295);
END
$$ IMMUTABLE LANGUAGE plpgsql;