483 lines
8.8 KiB
C
483 lines
8.8 KiB
C
/*
|
|
* Sun RPC client.
|
|
*/
|
|
#include <u.h>
|
|
#include <libc.h>
|
|
#include <thread.h>
|
|
#include <sunrpc.h>
|
|
|
|
typedef struct Out Out;
|
|
struct Out
|
|
{
|
|
char err[ERRMAX]; /* error string */
|
|
Channel *creply; /* send to finish rpc */
|
|
uchar *p; /* pending request packet */
|
|
int n; /* size of request */
|
|
ulong tag; /* flush tag of pending request */
|
|
ulong xid; /* xid of pending request */
|
|
ulong st; /* first send time */
|
|
ulong t; /* resend time */
|
|
int nresend; /* number of resends */
|
|
SunRpc rpc; /* response rpc */
|
|
};
|
|
|
|
static void
|
|
udpThread(void *v)
|
|
{
|
|
uchar *p, *buf;
|
|
Ioproc *io;
|
|
int n;
|
|
SunClient *cli;
|
|
enum { BufSize = 65536 };
|
|
|
|
cli = v;
|
|
buf = emalloc(BufSize);
|
|
io = ioproc();
|
|
p = nil;
|
|
for(;;){
|
|
n = ioread(io, cli->fd, buf, BufSize);
|
|
if(n <= 0)
|
|
break;
|
|
p = emalloc(4+n);
|
|
memmove(p+4, buf, n);
|
|
p[0] = n>>24;
|
|
p[1] = n>>16;
|
|
p[2] = n>>8;
|
|
p[3] = n;
|
|
if(sendp(cli->readchan, p) == 0)
|
|
break;
|
|
p = nil;
|
|
}
|
|
free(p);
|
|
closeioproc(io);
|
|
while(send(cli->dying, nil) == -1)
|
|
;
|
|
}
|
|
|
|
static void
|
|
netThread(void *v)
|
|
{
|
|
uchar *p, buf[4];
|
|
Ioproc *io;
|
|
uint n, tot;
|
|
int done;
|
|
SunClient *cli;
|
|
|
|
cli = v;
|
|
io = ioproc();
|
|
tot = 0;
|
|
p = nil;
|
|
for(;;){
|
|
n = ioreadn(io, cli->fd, buf, 4);
|
|
if(n != 4)
|
|
break;
|
|
n = (buf[0]<<24)|(buf[1]<<16)|(buf[2]<<8)|buf[3];
|
|
if(cli->chatty)
|
|
fprint(2, "%.8ux...", n);
|
|
done = n&0x80000000;
|
|
n &= ~0x80000000;
|
|
if(tot == 0){
|
|
p = emalloc(4+n);
|
|
tot = 4;
|
|
}else
|
|
p = erealloc(p, tot+n);
|
|
if(ioreadn(io, cli->fd, p+tot, n) != n)
|
|
break;
|
|
tot += n;
|
|
if(done){
|
|
p[0] = tot>>24;
|
|
p[1] = tot>>16;
|
|
p[2] = tot>>8;
|
|
p[3] = tot;
|
|
if(sendp(cli->readchan, p) == 0)
|
|
break;
|
|
p = nil;
|
|
tot = 0;
|
|
}
|
|
}
|
|
free(p);
|
|
closeioproc(io);
|
|
while(send(cli->dying, 0) == -1)
|
|
;
|
|
}
|
|
|
|
static void
|
|
timerThread(void *v)
|
|
{
|
|
Ioproc *io;
|
|
SunClient *cli;
|
|
|
|
cli = v;
|
|
io = ioproc();
|
|
for(;;){
|
|
if(iosleep(io, 200) < 0)
|
|
break;
|
|
if(sendul(cli->timerchan, 0) == 0)
|
|
break;
|
|
}
|
|
closeioproc(io);
|
|
while(send(cli->dying, 0) == -1)
|
|
;
|
|
}
|
|
|
|
static ulong
|
|
msec(void)
|
|
{
|
|
return nsec()/1000000;
|
|
}
|
|
|
|
static ulong
|
|
twait(ulong rtt, int nresend)
|
|
{
|
|
ulong t;
|
|
|
|
t = rtt;
|
|
if(nresend <= 1)
|
|
{}
|
|
else if(nresend <= 3)
|
|
t *= 2;
|
|
else if(nresend <= 18)
|
|
t <<= nresend-2;
|
|
else
|
|
t = 60*1000;
|
|
if(t > 60*1000)
|
|
t = 60*1000;
|
|
|
|
return t;
|
|
}
|
|
|
|
static void
|
|
rpcMuxThread(void *v)
|
|
{
|
|
uchar *buf, *p, *ep;
|
|
int i, n, nout, mout;
|
|
ulong t, xidgen, tag;
|
|
Alt a[5];
|
|
Out *o, **out;
|
|
SunRpc rpc;
|
|
SunClient *cli;
|
|
|
|
cli = v;
|
|
mout = 16;
|
|
nout = 0;
|
|
out = emalloc(mout*sizeof(out[0]));
|
|
xidgen = truerand();
|
|
|
|
a[0].op = CHANRCV;
|
|
a[0].c = cli->rpcchan;
|
|
a[0].v = &o;
|
|
a[1].op = CHANNOP;
|
|
a[1].c = cli->timerchan;
|
|
a[1].v = nil;
|
|
a[2].op = CHANRCV;
|
|
a[2].c = cli->flushchan;
|
|
a[2].v = &tag;
|
|
a[3].op = CHANRCV;
|
|
a[3].c = cli->readchan;
|
|
a[3].v = &buf;
|
|
a[4].op = CHANEND;
|
|
|
|
for(;;){
|
|
switch(alt(a)){
|
|
case 0: /* o = <-rpcchan */
|
|
if(o == nil)
|
|
goto Done;
|
|
cli->nsend++;
|
|
/* set xid */
|
|
o->xid = ++xidgen;
|
|
if(cli->needcount)
|
|
p = o->p+4;
|
|
else
|
|
p = o->p;
|
|
p[0] = xidgen>>24;
|
|
p[1] = xidgen>>16;
|
|
p[2] = xidgen>>8;
|
|
p[3] = xidgen;
|
|
if(write(cli->fd, o->p, o->n) != o->n){
|
|
free(o->p);
|
|
o->p = nil;
|
|
snprint(o->err, sizeof o->err, "write: %r");
|
|
sendp(o->creply, 0);
|
|
break;
|
|
}
|
|
if(nout >= mout){
|
|
mout *= 2;
|
|
out = erealloc(out, mout*sizeof(out[0]));
|
|
}
|
|
o->st = msec();
|
|
o->nresend = 0;
|
|
o->t = o->st + twait(cli->rtt.avg, 0);
|
|
if(cli->chatty) fprint(2, "send %lux %lud %lud\n", o->xid, o->st, o->t);
|
|
out[nout++] = o;
|
|
a[1].op = CHANRCV;
|
|
break;
|
|
|
|
case 1: /* <-timerchan */
|
|
t = msec();
|
|
for(i=0; i<nout; i++){
|
|
o = out[i];
|
|
if((int)(t - o->t) > 0){
|
|
if(cli->chatty) fprint(2, "resend %lux %lud %lud\n", o->xid, t, o->t);
|
|
if(cli->maxwait && t - o->st >= cli->maxwait){
|
|
free(o->p);
|
|
o->p = nil;
|
|
strcpy(o->err, "timeout");
|
|
sendp(o->creply, 0);
|
|
out[i--] = out[--nout];
|
|
continue;
|
|
}
|
|
cli->nresend++;
|
|
o->nresend++;
|
|
o->t = t + twait(cli->rtt.avg, o->nresend);
|
|
if(write(cli->fd, o->p, o->n) != o->n){
|
|
free(o->p);
|
|
o->p = nil;
|
|
snprint(o->err, sizeof o->err, "rewrite: %r");
|
|
sendp(o->creply, 0);
|
|
out[i--] = out[--nout];
|
|
continue;
|
|
}
|
|
}
|
|
}
|
|
/* stop ticking if no work; rpcchan will turn it back on */
|
|
if(nout == 0)
|
|
a[1].op = CHANNOP;
|
|
break;
|
|
|
|
case 2: /* tag = <-flushchan */
|
|
for(i=0; i<nout; i++){
|
|
o = out[i];
|
|
if(o->tag == tag){
|
|
out[i--] = out[--nout];
|
|
strcpy(o->err, "flushed");
|
|
free(o->p);
|
|
o->p = nil;
|
|
sendp(o->creply, 0);
|
|
}
|
|
}
|
|
break;
|
|
|
|
case 3: /* buf = <-readchan */
|
|
p = buf;
|
|
n = (p[0]<<24)|(p[1]<<16)|(p[2]<<8)|p[3];
|
|
p += 4;
|
|
ep = p+n;
|
|
if(sunRpcUnpack(p, ep, &p, &rpc) < 0){
|
|
fprint(2, "in: %.*H unpack failed\n", n, buf+4);
|
|
free(buf);
|
|
break;
|
|
}
|
|
if(cli->chatty)
|
|
fprint(2, "in: %B\n", &rpc);
|
|
if(rpc.iscall){
|
|
fprint(2, "did not get reply\n");
|
|
free(buf);
|
|
break;
|
|
}
|
|
o = nil;
|
|
for(i=0; i<nout; i++){
|
|
o = out[i];
|
|
if(o->xid == rpc.xid)
|
|
break;
|
|
}
|
|
if(i==nout){
|
|
if(cli->chatty) fprint(2, "did not find waiting request\n");
|
|
free(buf);
|
|
break;
|
|
}
|
|
out[i] = out[--nout];
|
|
free(o->p);
|
|
o->p = nil;
|
|
if(rpc.status == SunSuccess){
|
|
o->p = buf;
|
|
o->rpc = rpc;
|
|
}else{
|
|
o->p = nil;
|
|
free(buf);
|
|
sunErrstr(rpc.status);
|
|
rerrstr(o->err, sizeof o->err);
|
|
}
|
|
sendp(o->creply, 0);
|
|
break;
|
|
}
|
|
}
|
|
Done:
|
|
free(out);
|
|
sendp(cli->dying, 0);
|
|
}
|
|
|
|
SunClient*
|
|
sunDial(char *address)
|
|
{
|
|
int fd;
|
|
SunClient *cli;
|
|
|
|
if((fd = dial(address, 0, 0, 0)) < 0)
|
|
return nil;
|
|
|
|
cli = emalloc(sizeof(SunClient));
|
|
cli->fd = fd;
|
|
cli->maxwait = 15000;
|
|
cli->rtt.avg = 1000;
|
|
cli->dying = chancreate(sizeof(void*), 0);
|
|
cli->rpcchan = chancreate(sizeof(Out*), 0);
|
|
cli->timerchan = chancreate(sizeof(ulong), 0);
|
|
cli->flushchan = chancreate(sizeof(ulong), 0);
|
|
cli->readchan = chancreate(sizeof(uchar*), 0);
|
|
if(strstr(address, "udp!")){
|
|
cli->needcount = 0;
|
|
cli->nettid = threadcreate(udpThread, cli, SunStackSize);
|
|
cli->timertid = threadcreate(timerThread, cli, SunStackSize);
|
|
}else{
|
|
cli->needcount = 1;
|
|
cli->nettid = threadcreate(netThread, cli, SunStackSize);
|
|
/* assume reliable: don't need timer */
|
|
/* BUG: netThread should know how to redial */
|
|
}
|
|
threadcreate(rpcMuxThread, cli, SunStackSize);
|
|
|
|
return cli;
|
|
}
|
|
|
|
void
|
|
sunClientClose(SunClient *cli)
|
|
{
|
|
int n;
|
|
|
|
/*
|
|
* Threadints get you out of any stuck system calls
|
|
* or thread rendezvouses, but do nothing if the thread
|
|
* is in the ready state. Keep interrupting until it takes.
|
|
*/
|
|
n = 0;
|
|
if(!cli->timertid)
|
|
n++;
|
|
while(n < 2){
|
|
threadint(cli->nettid);
|
|
if(cli->timertid)
|
|
threadint(cli->timertid);
|
|
yield();
|
|
while(nbrecv(cli->dying, nil) == 1)
|
|
n++;
|
|
}
|
|
|
|
sendp(cli->rpcchan, 0);
|
|
recvp(cli->dying);
|
|
|
|
/* everyone's gone: clean up */
|
|
close(cli->fd);
|
|
chanfree(cli->flushchan);
|
|
chanfree(cli->readchan);
|
|
chanfree(cli->timerchan);
|
|
free(cli);
|
|
}
|
|
|
|
void
|
|
sunClientFlushRpc(SunClient *cli, ulong tag)
|
|
{
|
|
sendul(cli->flushchan, tag);
|
|
}
|
|
|
|
void
|
|
sunClientProg(SunClient *cli, SunProg *p)
|
|
{
|
|
if(cli->nprog%16 == 0)
|
|
cli->prog = erealloc(cli->prog, (cli->nprog+16)*sizeof(cli->prog[0]));
|
|
cli->prog[cli->nprog++] = p;
|
|
}
|
|
|
|
int
|
|
sunClientRpc(SunClient *cli, ulong tag, SunCall *tx, SunCall *rx, uchar **tofree)
|
|
{
|
|
uchar *bp, *p, *ep;
|
|
int i, n1, n2, n, nn;
|
|
Out o;
|
|
SunProg *prog;
|
|
SunStatus ok;
|
|
|
|
for(i=0; i<cli->nprog; i++)
|
|
if(cli->prog[i]->prog == tx->rpc.prog && cli->prog[i]->vers == tx->rpc.vers)
|
|
break;
|
|
if(i==cli->nprog){
|
|
werrstr("unknown sun rpc program %d version %d", tx->rpc.prog, tx->rpc.vers);
|
|
return -1;
|
|
}
|
|
prog = cli->prog[i];
|
|
|
|
if(cli->chatty){
|
|
fprint(2, "out: %B\n", &tx->rpc);
|
|
fprint(2, "\t%C\n", tx);
|
|
}
|
|
|
|
n1 = sunRpcSize(&tx->rpc);
|
|
n2 = sunCallSize(prog, tx);
|
|
|
|
n = n1+n2;
|
|
if(cli->needcount)
|
|
n += 4;
|
|
|
|
bp = emalloc(n);
|
|
ep = bp+n;
|
|
p = bp;
|
|
if(cli->needcount){
|
|
nn = n-4;
|
|
p[0] = (nn>>24)|0x80;
|
|
p[1] = nn>>16;
|
|
p[2] = nn>>8;
|
|
p[3] = nn;
|
|
p += 4;
|
|
}
|
|
if((ok = sunRpcPack(p, ep, &p, &tx->rpc)) != SunSuccess
|
|
|| (ok = sunCallPack(prog, p, ep, &p, tx)) != SunSuccess){
|
|
sunErrstr(ok);
|
|
free(bp);
|
|
return -1;
|
|
}
|
|
if(p != ep){
|
|
werrstr("rpc: packet size mismatch");
|
|
free(bp);
|
|
return -1;
|
|
}
|
|
|
|
memset(&o, 0, sizeof o);
|
|
o.creply = chancreate(sizeof(void*), 0);
|
|
o.tag = tag;
|
|
o.p = bp;
|
|
o.n = n;
|
|
|
|
sendp(cli->rpcchan, &o);
|
|
recvp(o.creply);
|
|
chanfree(o.creply);
|
|
|
|
if(o.p == nil){
|
|
werrstr("%s", o.err);
|
|
return -1;
|
|
}
|
|
|
|
p = o.rpc.data;
|
|
ep = p+o.rpc.ndata;
|
|
rx->rpc = o.rpc;
|
|
rx->rpc.proc = tx->rpc.proc;
|
|
rx->rpc.prog = tx->rpc.prog;
|
|
rx->rpc.vers = tx->rpc.vers;
|
|
rx->type = (rx->rpc.proc<<1)|1;
|
|
if((ok = sunCallUnpack(prog, p, ep, &p, rx)) != SunSuccess){
|
|
sunErrstr(ok);
|
|
werrstr("unpack: %r");
|
|
free(o.p);
|
|
return -1;
|
|
}
|
|
|
|
if(cli->chatty){
|
|
fprint(2, "in: %B\n", &rx->rpc);
|
|
fprint(2, "in:\t%C\n", rx);
|
|
}
|
|
|
|
if(tofree)
|
|
*tofree = o.p;
|
|
else
|
|
free(o.p);
|
|
|
|
return 0;
|
|
}
|