lib9p: added toilet queues

front
aiju 2011-08-16 22:00:34 +02:00
parent 2f2c930669
commit c65100ffa0
3 changed files with 138 additions and 0 deletions

View File

@ -27,10 +27,28 @@ typedef struct Filelist Filelist;
typedef struct Tree Tree;
typedef struct Readdir Readdir;
typedef struct Srv Srv;
typedef struct Reqqueue Reqqueue;
typedef struct Queueelem Queueelem;
#pragma incomplete Filelist
#pragma incomplete Readdir
struct Queueelem
{
Queueelem *prev, *next;
void (*f)(Req *);
};
struct Reqqueue
{
QLock;
Rendez;
Queueelem;
int pid;
Req *cur;
jmp_buf flush;
};
struct Fid
{
ulong fid;
@ -60,6 +78,8 @@ struct Req
Fid* afid;
Fid* newfid;
Srv* srv;
Queueelem qu;
/* below is implementation-specific; don't use */
QLock lk;
@ -255,3 +275,7 @@ int authattach(Req*);
extern void (*_forker)(void (*)(void*), void*, int);
Reqqueue* reqqueuecreate(void);
void reqqueuepush(Reqqueue*, Req*, void (*)(Req *));
void reqqueueflush(Reqqueue*, Req*);
int reqqueueflushed(void);

View File

@ -12,6 +12,7 @@ OFILES=\
req.$O\
parse.$O\
post.$O\
queue.$O\
rfork.$O\
srv.$O\
thread.$O\

113
sys/src/lib9p/queue.c Normal file
View File

@ -0,0 +1,113 @@
#include <u.h>
#include <libc.h>
#include <thread.h>
#include <fcall.h>
#include <9p.h>
static int
_reqqueuenote(void *uregs, char *note)
{
Reqqueue *q;
if(strcmp(note, "flush") != 0)
return 0;
q = *threaddata();
if(q != nil){
q->cur = nil;
notejmp(uregs, q->flush, 1);
}
return 1;
}
static void
_reqqueueproc(void *v)
{
Reqqueue *q;
Req *r;
void (*f)(Req *);
q = v;
*threaddata() = q;
rfork(RFNOTEG);
threadnotify(_reqqueuenote, 1);
for(;;){
qlock(q);
q->cur = nil;
while(q->next == q)
rsleep(q);
r = (Req*)(((char*)q->next) - ((char*)&((Req*)0)->qu));
r->qu.next->prev = r->qu.prev;
r->qu.prev->next = r->qu.next;
f = r->qu.f;
qlock(&r->lk);
memset(&r->qu, 0, sizeof(r->qu));
qunlock(&r->lk);
q->cur = r;
if(setjmp(q->flush)){
respond(r, "interrupted");
continue;
}
qunlock(q);
f(r);
}
}
Reqqueue *
reqqueuecreate(void)
{
Reqqueue *q;
q = emalloc9p(sizeof(*q));
memset(q, 0, sizeof(*q));
q->l = q;
q->next = q->prev = q;
q->pid = threadpid(proccreate(_reqqueueproc, q, mainstacksize));
print("%d\n", q->pid);
return q;
}
void
reqqueuepush(Reqqueue *q, Req *r, void (*f)(Req *))
{
qlock(q);
r->qu.f = f;
r->qu.next = q;
r->qu.prev = q->prev;
q->prev->next = &r->qu;
q->prev = &r->qu;
rwakeupall(q);
qunlock(q);
}
void
reqqueueflush(Reqqueue *q, Req *r)
{
qlock(q);
if(q->cur == r){
postnote(PNPROC, q->pid, "flush");
qunlock(q);
}else{
if(r->qu.next != nil){
r->qu.next->prev = r->qu.prev;
r->qu.prev->next = r->qu.next;
}
qlock(&r->lk);
memset(&r->qu, 0, sizeof(r->qu));
qunlock(&r->lk);
qunlock(q);
respond(r, "interrupted");
}
}
int
reqqueueflushed(void)
{
Reqqueue *q;
q = *threaddata();
qlock(q);
if(setjmp(q->flush))
return 1;
qunlock(q);
return 0;
}