ragps/gps2sqlite.r

252 lines
9.9 KiB
R
Executable File

#!/usr/bin/r
gpsf<-list.files("/tmp","gps*")
gpsf<-gpsf[(as.numeric(Sys.time())-unlist(lapply(paste("/tmp/",gpsf,sep=""),file.mtime)))>100]
txt2bl<-function(indata){
instr<-strsplit(indata,"\\.")
ins<-data.frame(a=unlist(lapply(1:length(instr),function(i){return(unlist(instr[[i]][1]))})),b=unlist(lapply(1:length(instr),function(i){return(unlist(instr[[i]][2]))})),stringsAsFactors=FALSE)
# ins<-matrix(unlist(strsplit(indata,"\\.")),ncol=2,byrow=TRUE)
if(sum(is.na(ins$a),na.rm=TRUE)>0)ins[is.na(ins$a),"a"]<-"000"
if(sum(is.na(ins$b),na.rm=TRUE)>0)ins[is.na(ins$b),"b"]<-"000"
if(sum(nchar(ins$a)<2,na.rm=TRUE)>0)ins[nchar(ins$a)<2,"a"]<-"000"
if(sum(nchar(ins$b)<2,na.rm=TRUE)>0)ins[nchar(ins$b)<2,"b"]<-"000"
if(sum(nchar(ins$b)>5,na.rm=TRUE)>0)ins[nchar(ins$b)>5,"b"]<-unlist(lapply(ins[nchar(ins$b)>5,"b"],function(i){return(substr(i,1,5))}))
insec<-as.integer(paste(ins$b,sapply(6-nchar(ins$b),function(i){return(paste0(rep("0",i),collapse=""))}),sep=""))
inmin<-as.integer(sapply(ins$a,function(i){return(substr(i,nchar(i)-1,nchar(i)))}))+
60*as.integer(sapply(ins$a,function(i){return(substr(i,1,nchar(i)-2))}))
out<-inmin*1e6+insec
return(out)
}
dbCatchWriteTable<-function(db,table,indata,nfr=500){
ncalls<-floor(nrow(indata)/nfr)
rowi<-1:nrow(indata)
for(i in 0:ncalls){
win<-i*nfr+1:nfr
win<-win[win%in%rowi]
dbWriteTable(db,table,indata[win,],append=TRUE)
}
}
if(length(gpsf)>0){
col_tel<-4
rmcdf<-NULL
ggsdf<-NULL
gsvdf<-NULL
gsadf<-NULL
gstdf<-NULL
grsdf<-NULL
gbsdf<-NULL
vtgdf<-NULL
library(foreach)
# library(RSQLite)
library(doParallel)
library(parallel)
library(RMySQL)
library(RSQLite)
library(int64)
cores<-max(c(1,detectCores()-1),na.rm=TRUE)
if(is.infinite(cores))cores<-1
registerDoParallel(cores)
cl<-makeCluster(cores)
indata<-unlist(parLapply(cl,paste("/tmp/",gpsf,sep=""),readLines))
sid<-strsplit(indata,",")
maxcol<-max(sapply(sid,length))
clusterExport(cl,c("maxcol"))
inmatrix<-as.data.frame(matrix(unlist(lapply(sid,function(i){out<-i;if(length(i)<maxcol){out<-c(i,rep(NA,maxcol-length(i)));};return(out)})),ncol=maxcol,nrow=length(sid),byrow=TRUE),stringsAsFactors=FALSE)
inmatrix<-inmatrix[!is.na(inmatrix[,col_tel])&!is.na(inmatrix[,3]),]
inmatrix<-inmatrix[rowSums(is.na(inmatrix))<(ncol(inmatrix)-5),]
inmatrix<-inmatrix[(nchar(inmatrix[,col_tel])==5)&(nchar(inmatrix[,3])>1),]
inmatrix<-inmatrix[grep("^G",inmatrix[,col_tel]),]
inmatrix[,1]<-as.integer(inmatrix[,1])
inmatrix[,2]<-as.int64(inmatrix[,2])
inmatrix[,3]<-as.int64(inmatrix[,3])
for(i in 2:3)inmatrix[,i]<-as.int64(inmatrix[,i])
gc()
inrmc<-inmatrix[grep("RMC",inmatrix[,col_tel]),]
inrmc<-inrmc[(inrmc[,col_tel+2]=="A")&(inrmc[,col_tel+4]%in%c("N","S"))&(inrmc[,col_tel+6]%in%c("E","W")),]
inrmc<-inrmc[!is.na(inrmc[,col_tel+1])&!is.na(inrmc[,col_tel+9]),]
inrmc<-inrmc[(unlist(lapply(inrmc[,col_tel+3],function(i){return(length(unlist(strsplit(i,"\\."))))}))==2)&
(unlist(lapply(inrmc[,col_tel+5],function(i){return(length(unlist(strsplit(i,"\\."))))}))==2),]
if(nrow(inrmc)>0){
clusterExport(cl,c("inrmc","col_tel"))
rmcdf<-data.frame(uid=as.integer(inrmc[,1]),epoch=as.int64(inrmc[,3]),id=as.int64(inrmc[,2]),
breite=txt2bl(inrmc[,col_tel+3]),laenge=txt2bl(inrmc[,col_tel+5]),velocity=round(as.numeric(inrmc[,col_tel+7])*1000),
fix=as.numeric(inrmc[,col_tel+1]),system=substr(inrmc[,col_tel],2,2),
fix_epoch=parSapply(cl,1:nrow(inrmc),function(i){
rmct<-inrmc[i,col_tel+1]
if(nchar(rmct)<6)rmct<-paste0(paste0(rep(0,6-nchar(rmct)),collapse=""),rmct)
rmcd<-inrmc[i,col_tel+9]
if(nchar(rmcd)<6)rmcd<-paste0(paste0(rep(0,6-nchar(rmcd)),collapse=""),rmcd)
return(as.numeric(strptime(paste(rmcd,rmct,sep=""),format="%d%m%y%H%M%S",tz="GMT")))
})
,stringsAsFactors=FALSE)
rmcdf<-rmcdf[(rmcdf$breite<=5400000000)&(rmcdf$laenge<=10800000000),]
rm(inrmc)
gc()
}
ingga<-inmatrix[grep("GGA",inmatrix[,col_tel]),]
ingga<-ingga[(ingga[,col_tel+10]=="M")&(ingga[,col_tel+3]%in%c("N","S"))&(ingga[,col_tel+5]%in%c("E","W"))&!is.na(ingga[,col_tel+1]),]
ingga<-ingga[!is.na(ingga[,col_tel+1])&(nchar(ingga[,col_tel+1])%in%c(5,6)),]
if(nrow(ingga)>0){
clusterExport(cl,c("ingga","col_tel"))
ggadf<-data.frame(uid=as.integer(ingga[,1]),epoch=as.int64(ingga[,3]),id=as.int64(ingga[,2]),
fix=as.numeric(ingga[,col_tel+1]),breite=txt2bl(ingga[,col_tel+2]),laenge=txt2bl(ingga[,col_tel+4]),
sats=as.numeric(ingga[,col_tel+7]),hdop=as.numeric(ingga[,col_tel+8]),hoehe=as.numeric(ingga[,col_tel+9])*1000,
hoehe_datum=as.numeric(ingga[,col_tel+11])*1000,
fix_epoch=parSapply(cl,1:nrow(ingga),function(i){
# for(i in 1:nrow(ingga)){
rmct<-ingga[i,col_tel+1]
if(nchar(rmct)<6)rmct<-paste0(paste0(rep(0,6-nchar(rmct)),collapse=""),rmct)
rmcd<-format(strptime(ingga[i,3],format="%s"),"%d%m%y")
if(nchar(rmcd)<6)rmcd<-c(rep(0,6-nchar(rmcd)),rmcd)
fixdat<-NA
try(fixdat<-strptime(paste(rmcd,rmct,sep=""),format="%d%m%y%H%M%S",tz="GMT"))
return(as.numeric(fixdat))
})
,stringsAsFactors=FALSE)
ggadf<-ggadf[(ggadf$breite<=5400000000)&(ggadf$laenge<=10800000000),]
rm(ingga)
gc()
}
ingsv<-inmatrix[grep("GSV",inmatrix[,col_tel]),]
for(i in col_tel+1:3)ingsv[,i]<-as.numeric(ingsv[,i])
ingsv<-ingsv[(nchar(ingsv[,col_tel])==5)&!is.na(ingsv[,col_tel+1])&!is.na(ingsv[,col_tel+2])&!is.na(ingsv[,col_tel+3]),]
ingsv<-ingsv[(ingsv[,col_tel+1]<=6)&(ingsv[,col_tel+2]<=6)&(ingsv[,col_tel+3]>0),]
ingsv<-ingsv[rowSums(!is.na(ingsv[,col_tel+4:16]))>0,]
if(nrow(ingsv)>0){
ingsv$system<-substr(ingsv[,col_tel],2,2)
gsvdf<-foreach(i=1:nrow(ingsv),.combine=rbind,.inorder=FALSE,.multicombine=TRUE,.packages=c("int64"))%dopar%{
library(int64)
tinv<-ingsv[i,9:ncol(ingsv)]
tinv<-tinv[!names(tinv)%in%"system"]
tinv<-tinv[!is.na(tinv)]
tinv[nchar(tinv)==0]<-0
tinv<-as.numeric(tinv)
out<-data.frame(matrix(tinv,ncol=4,byrow=TRUE))
names(out)<-c("PRN","elevation","azimuth","SNR")
out$uid<-as.integer(ingsv[i,1])
out$epoch<-as.int64(ingsv[i,2])
out$id<-as.int64(ingsv[i,3])
out$system<-ingsv[i,"system"]
out$folge<-1:nrow(out)+(ingsv[i,col_tel+2]-1)*4
return(out)}
rm(ingsv)
gc()
}
ingsa<-inmatrix[grep("GSA",inmatrix[,col_tel]),]
ingsa<-ingsa[!is.na(ingsa[,col_tel+13]),]
ingsa<-ingsa[!is.na(ingsa[,col_tel]),]
ingsa<-ingsa[!is.na(as.numeric(ingsa[,col_tel+4])),]
if(nrow(ingsa)>0){
gsadf<-data.frame(uid=as.numeric(ingsa[,1]),epoch=as.numeric(ingsa[,2]),id=as.numeric(ingsa[,3]),
pdop=as.numeric(ingsa[,col_tel+15]),hdop=as.numeric(ingsa[,col_tel+16]),vdop=as.numeric(ingsa[,col_tel+17]),
system=substr(ingsa[,5],3,3),
sats=sapply(1:nrow(ingsa),function(i){
tg<-ingsa[i,col_tel+3:14]
tg<-as.numeric(tg[(nchar(tg)>0)&!is.na(tg)])
tg<-tg[!is.na(tg)]
tg<-sort(tg)
return(paste(tg,collapse="_"))
})
,stringsAsFactors=FALSE)
gsadf<-gsadf[nchar(gsadf$sats)>0,]
rm(ingsa)
gc()
}
ingbs<-inmatrix[grep("GBS",inmatrix[,col_tel]),]
if(nrow(ingbs)>0){
gbsdf<-data.frame(uid=as.numeric(ingbs[,1]),epoch=as.numeric(ingbs[,2]),id=as.numeric(ingbs[,3]),
system=substr(ingbs[,col_tel],3,3),lat_err=as.numeric(ingbs[,col_tel+2]),lon_err=as.numeric(ingbs[,col_tel+3]),
alt_err=as.numeric(ingbs[,col_tel+4]),failing_prn=as.numeric(ingbs[,col_tel+5]),prob_fail=as.numeric(ingbs[,col_tel+6]),
bias_fail=as.numeric(ingbs[,col_tel+7]),stringsAsFactors=FALSE)
gbsdf<-gbsdf[rowSums(is.na(gbsdf),na.rm=TRUE)<4,]
rm(ingbs)
gc()
}
ingrs<-inmatrix[grep("GRS",inmatrix[,col_tel]),]
if(nrow(ingrs)>0){
grsdf<-data.frame(receiver=ingrs[,1],uid=as.numeric(ingrs[,2]),epoch=as.numeric(ingrs[,3]),id=as.numeric(ingrs[,4]),
system=substr(ingrs[,col_tel],2,2),fix=as.numeric(ingrs[,col_tel+2]),fgga=as.numeric(ingrs[,col_tel+1]),stringsAsFactors=FALSE)
for(ig in 1:12)grsdf[,paste("S",ig,sep="")]<-as.numeric(ingrs[,7+ig])
grsdf<-grsdf[rowSums(is.na(grsdf),na.rm=TRUE)<10,]
rm(ingrs)
gc()
}
ingst<-inmatrix[grep("GST",inmatrix[,col_tel]),]
if(nrow(ingst)>0){
gstdf<-data.frame(receiver=ingst[,1],uid=as.numeric(ingst[,2]),epoch=as.numeric(ingst[,3]),id=as.numeric(ingst[,4]),
system=substr(ingst[,col_tel],3,3),
RMS=as.numeric(ingst[,col_tel+2]),lat_sigma=as.numeric(ingst[,col_tel+6]),lon_sigma=as.numeric(ingst[,col_tel+7]),
height_sigma=as.numeric(ingst[,col_tel+8])
,stringsAsFactors=FALSE)
gstdf<-gstdf[!is.na(gstdf$RMS),]
rm(ingst)
gc()
}
invtg<-inmatrix[grep("VTG",inmatrix[,col_tel]),]
if(nrow(invtg)>0){
vtgdf<-data.frame(receiver=invtg[,1],uid=as.numeric(invtg[,2]),epoch=as.numeric(invtg[,3]),id=as.numeric(invtg[,4]),
system=substr(invtg[,col_tel],2,2),
course_true=as.numeric(invtg[,col_tel+1]),course_mag=as.numeric(invtg[,col_tel+3]),speed_kn=as.numeric(invtg[,col_tel+5]),
speed_kmh=as.numeric(invtg[,col_tel+7])
,stringsAsFactors=FALSE)
rm(invtg)
gc()
}
# con = dbConnect(MySQL(), user='gps', password='gps', dbname='gps', host='localhost')
con <- dbConnect(RSQLite::SQLite(), ":memory:")
if(!is.null(rmcdf)){
names(rmcdf)<-toupper(names(rmcdf))
dbCatchWriteTable(con,"gps_rmc",rmcdf)
}
if(!is.null(ggadf)){
names(ggadf)<-toupper(names(ggadf))
dbCatchWriteTable(con,"gps_gga",ggadf)
}
if(!is.null(gbsdf)){
names(gbsdf)<-toupper(names(gbsdf))
dbCatchWriteTable(con,"gps_gbs",gbsdf)
}
if(!is.null(gsvdf)){
names(gsvdf)<-toupper(names(gsvdf))
dbCatchWriteTable(con,"gps_gsv",gsvdf)
}
if(!is.null(gsadf)){
names(gsadf)<-toupper(names(gsadf))
dbCatchWriteTable(con,"gps_gsa",gsadf)
}
if(!is.null(gstdf)){
names(gstdf)<-toupper(names(gstdf))
dbCatchWriteTable(con,"gps_gst",gstdf)
}
if(!is.null(grsdf)){
names(grsdf)<-toupper(names(grsdf))
dbCatchWriteTable(con,"gps_grs",grsdf)
}
if(!is.null(vtgdf)){
names(vtgdf)<-toupper(names(vtgdf))
dbCatchWriteTable(con,"gps_vtg",vtgdf)
}
dbDisconnect(con)
for(j in 0:floor(length(gpsf)/1000)){
rgpsf<-gpsf[1:1000+j*1000]
rgpsf<-rgpsf[!is.na(rgpsf)]
system2("sudo",paste("rm ",paste("/tmp/",rgpsf,sep="",collapse=" "),sep=" ",collapse=" "))
}
# for(i in gpsf)system2("sudo",paste("rm /tmp/",i,sep=""))
}