dtracy: add support for aggregations

front
aiju 2018-12-08 15:07:53 +00:00
parent 03e60450c2
commit 58fa29447b
15 changed files with 703 additions and 22 deletions

View File

@ -9,7 +9,14 @@
enum {
DTSTRMAX = 256,
DTRECMAX = 1024,
DTMAXAGGBUF = 16,
DTBUFSZ = 65536,
DTANUMBUCKETS = 1024,
DTABUCKETS = DTBUFSZ - 4 * DTANUMBUCKETS,
};
#define DTANIL ((u32int)-1)
typedef struct DTName DTName;
typedef struct DTProbe DTProbe;
@ -21,6 +28,7 @@ typedef struct DTEnab DTEnab;
typedef struct DTChan DTChan;
typedef struct DTExpr DTExpr;
typedef struct DTProvider DTProvider;
typedef struct DTAgg DTAgg;
typedef struct DTBuf DTBuf;
struct DTName {
@ -30,7 +38,7 @@ struct DTName {
};
/*
we assign all pairs (probe,action-group) (called an enabling or DTEnab) a unique ID.
we assign all pairs (probe,action-group) (called an enabling or DTEnab) a unique ID called EPID.
we could also use probe IDs and action group IDs but using a single 32-bit ID for both is more flexible/efficient.
*/
struct DTEnab {
@ -123,14 +131,52 @@ struct DTExpr {
u32int *b;
};
/*
aggregation buffers are hashtables and use a different record format.
there are DTANUMBUCKETS 4-byte buckets at the end of the buffer.
each entry is (link,id,key,val) with a 4-byte link field for the hash chains and a 4-byte aggregation id.
the aggregation id actually contains all the data in the DTAgg struct:
4-bit type
12-bit keysize in qwords
16-bit unique id
the struct is just for kernel convenience
*/
enum {
AGGCNT,
AGGSUM,
AGGAVG,
AGGSTD,
AGGMIN,
AGGMAX,
};
struct DTAgg {
int id;
u16int keysize; /* in bytes */
u16int recsize;
uchar type;
};
/* an action is an expression, plus info about what to do with the result */
struct DTAct {
enum {
ACTTRACE, /* record the result. size is the number of bytes used. 0 <= size <= 8 */
ACTTRACESTR, /* take the result to be a pointer to a null-terminated string. store it as zero-padded char[size]. */
/*
ACTAGGKEY and ACTAGGVAL together record a value in an aggregation.
they must occur as a pair and targ must point to an already allocated aggregation buffer.
currently 0 <= size <= 8.
*/
ACTAGGKEY,
ACTAGGVAL,
ACTCANCEL, /* (must be last action) don't write anything into the main buffer. used to avoid pointless records when using aggregations. */
} type;
DTExpr *p;
int size;
DTAgg agg;
};
/* an action group is an optional predicate and a set of actions. */
@ -144,14 +190,13 @@ struct DTActGr {
int reclen; /* record size, including 12-byte header */
};
/* a clause list probe wildcard expressions and an action group. only used during set-up. */
/* a clause lists probe wildcard expressions and an action group. only used during set-up. */
struct DTClause {
int nprob;
char **probs;
DTActGr *gr;
};
enum { DTBUFSZ = 65536 };
struct DTBuf {
int wr;
uchar data[DTBUFSZ];
@ -170,6 +215,9 @@ struct DTChan {
/* we have 2 buffers per cpu, one for writing and one for reading. dtcread() swaps them if empty. */
DTBuf **wrbufs;
DTBuf **rdbufs;
/* aggregations use separate buffers */
DTBuf **aggwrbufs;
DTBuf **aggrdbufs;
/* list of enablings. */
DTEnab *enab;
@ -191,7 +239,7 @@ int dtefmt(Fmt *);
/* action group functions */
void dtgpack(Fmt *, DTActGr *);
char *dtgunpack(char *, DTActGr **);
int dtgverify(DTActGr *);
int dtgverify(DTChan *, DTActGr *);
void dtgfree(DTActGr *);
/* clause functions */
@ -205,9 +253,14 @@ void dtcfree(DTChan *);
int dtcaddgr(DTChan *, DTName, DTActGr *);
int dtcaddcl(DTChan *, DTClause *);
int dtcread(DTChan *, void *, int);
int dtcaggread(DTChan *, void *, int);
void dtcreset(DTChan *);
void dtcrun(DTChan *, int);
/* aggbuf functions */
int dtaunpackid(DTAgg *);
void dtarecord(DTChan *, int, DTAgg *, uchar *, int, s64int);
extern DTProvider *dtproviders[];
extern int dtnmach;

View File

@ -38,7 +38,7 @@ prog(DTKChan *p, char *s)
dtclfree(c);
if(rc < 0){
dtcreset(p->ch);
error("failed to add clause");
error(up->syserrstr);
}
}
}
@ -54,6 +54,7 @@ enum {
Qprog,
Qbuf,
Qepid,
Qaggbuf,
};
static Dirtab dtracydir[] = {
@ -61,6 +62,7 @@ static Dirtab dtracydir[] = {
"prog", { Qprog, 0, 0 }, 0, 0660,
"buf", { Qbuf, 0, 0, }, 0, 0440,
"epid", { Qepid, 0, 0 }, 0, 0440,
"aggbuf", { Qaggbuf, 0, 0 }, 0, 0440,
};
enum {
@ -269,11 +271,50 @@ epidread(DTKAux *aux, DTChan *c, char *a, long n, vlong off)
return readstr(off, a, n, aux->str);
}
static long
lockedread(DTChan *c, void *a, long n, int(*readf)(DTChan *, void *, int))
{
long rc;
if(waserror()){
qunlock(&dtracylock);
nexterror();
}
eqlock(&dtracylock);
rc = readf(c, a, n);
qunlock(&dtracylock);
poperror();
return rc;
}
static long
handleread(DTChan *c, void *a, long n, int(*readf)(DTChan *, void *, int))
{
long rc, m;
int i;
for(;;){
rc = lockedread(c, a, n, readf);
if(rc < 0) return -1;
if(rc > 0) break;
tsleep(&up->sleep, return0, 0, 250);
}
m = rc;
for(i = 0; i < 3 && m < n/2; i++){
tsleep(&up->sleep, return0, 0, 50);
rc = lockedread(c, (uchar *)a + m, n - m, readf);
if(rc < 0) break;
m += rc;
}
return m;
}
static long
dtracyread(Chan *c, void *a, long n, vlong off)
{
int rc;
DTKChan *p;
DTChan *ch;
eqlock(&dtracylock);
if(waserror()){
@ -299,9 +340,15 @@ dtracyread(Chan *c, void *a, long n, vlong off)
rc = readstr(off, a, n, up->genbuf);
break;
case Qbuf:
while(rc = dtcread(p->ch, a, n), rc == 0)
tsleep(&up->sleep, return0, 0, 250);
break;
ch = p->ch;
qunlock(&dtracylock);
poperror();
return handleread(ch, a, n, dtcread);
case Qaggbuf:
ch = p->ch;
qunlock(&dtracylock);
poperror();
return handleread(ch, a, n, dtcaggread);
case Qepid:
rc = epidread(c->aux, p->ch, a, n, off);
break;
@ -460,8 +507,6 @@ dtgetvar(int v)
switch(v){
case DTV_PID:
return up != nil ? up->pid : 0;
case DTV_MACHNO:
return m->machno;
default:
return 0;
}

View File

@ -55,6 +55,27 @@ addprobe(char *s)
clause->probs[clause->nprob++] = strdup(s);
}
static char *aggtypes[] = {
[AGGCNT] "count",
[AGGMIN] "min",
[AGGMAX] "max",
[AGGSUM] "sum",
[AGGAVG] "avg",
[AGGSTD] "std",
};
int
aggtype(Symbol *s)
{
int i;
for(i = 0; i < nelem(aggtypes); i++)
if(strcmp(s->name, aggtypes[i]) == 0)
return i;
error("%s unknown aggregation type", s->name);
return 0;
}
void
addstat(int type, ...)
{
@ -73,6 +94,19 @@ addstat(int type, ...)
case STATPRINT:
case STATPRINTF:
break;
case STATAGG:
s->agg.name = va_arg(va, Symbol *);
s->agg.key = va_arg(va, Node *);
s->agg.type = aggtype(va_arg(va, Symbol *));
s->agg.value = va_arg(va, Node *);
if(s->agg.type == AGGCNT){
if(s->agg.value != nil)
error("too many arguments for count()");
}else{
if(s->agg.value == nil)
error("need argument for %s()", aggtypes[s->agg.type]);
}
break;
default:
sysfatal("addstat: unknown type %d", type);
}
@ -158,12 +192,26 @@ prepprintf(Node **arg, int narg, DTActGr *g, int *recoff)
(*arg)->str = fmtstrflush(&f);
}
int aggid;
int
allagg(Clause *c)
{
Stat *s;
for(s = c->stats; s < c->stats + c->nstats; s++)
if(s->type != STATAGG)
return 0;
return 1;
}
DTClause *
mkdtclause(Clause *c)
{
DTClause *d;
Stat *s;
int recoff, i;
Node *n;
d = emalloc(sizeof(DTClause));
d->nprob = c->nprob;
@ -175,7 +223,7 @@ mkdtclause(Clause *c)
for(s = c->stats; s < c->stats + c->nstats; s++)
switch(s->type){
case STATEXPR:
actgradd(d->gr, (DTAct){ACTTRACE, codegen(s->n), 0});
actgradd(d->gr, (DTAct){ACTTRACE, codegen(s->n), 0, noagg});
break;
case STATPRINT:
for(i = 0; i < s->narg; i++)
@ -184,7 +232,22 @@ mkdtclause(Clause *c)
case STATPRINTF:
prepprintf(s->arg, s->narg, d->gr, &recoff);
break;
case STATAGG: {
DTAgg agg = {.id = s->agg.type << 28 | 1 << 16 | aggid++};
assert(dtaunpackid(&agg) >= 0);
aggs = realloc(aggs, sizeof(Agg) * aggid);
memset(&aggs[aggid-1], 0, sizeof(Agg));
aggs[aggid-1].DTAgg = agg;
aggs[aggid-1].name = strdup(s->agg.name == nil ? "" : s->agg.name->name);
actgradd(d->gr, (DTAct){ACTAGGKEY, codegen(s->agg.key), 8, agg});
n = s->agg.value;
if(n == nil) n = node(ONUM, 0ULL);
actgradd(d->gr, (DTAct){ACTAGGVAL, codegen(n), 8, agg});
break;
}
}
if(allagg(c))
actgradd(d->gr, (DTAct){ACTCANCEL, codegen(node(ONUM, 0)), 0, noagg});
return d;
}
@ -392,6 +455,7 @@ parseclause(Clause *cl, uchar *p, uchar *e, Enab *en, Biobuf *bp)
case STATPRINTF:
execprintf(s->arg, s->narg, p, e, en);
break;
case STATAGG: break;
default:
sysfatal("parseclause: unknown type %d", s->type);
}
@ -546,6 +610,17 @@ dump(void)
print("\t\ttrace string (%d bytes)\n", a->size);
dumpexpr(a->p, "\t\t\t");
break;
case ACTAGGKEY:
print("\t\taggregation key (%s,%d,%d)\n", a->agg.type >= nelem(aggtypes) ? "???" : aggtypes[a->agg.type], a->agg.keysize, (u16int)a->agg.id);
dumpexpr(a->p, "\t\t\t");
break;
case ACTAGGVAL:
print("\t\taggregation value (%s,%d,%d)\n", a->agg.type >= nelem(aggtypes) ? "???" : aggtypes[a->agg.type], a->agg.keysize, (u16int)a->agg.id);
dumpexpr(a->p, "\t\t\t");
break;
case ACTCANCEL:
print("\t\tcancel record\n");
break;
default:
print("\t\t??? %d\n", a->type);
}
@ -564,6 +639,8 @@ dump(void)
for(j = 0; j < s->narg; j++)
print("\t\t\targ %ε\n", s->arg[j]);
break;
case STATAGG:
break;
default:
print("\t\t??? %d\n", s->type);
}

199
sys/src/cmd/dtracy/agg.c Normal file
View File

@ -0,0 +1,199 @@
#include <u.h>
#include <libc.h>
#include <dtracy.h>
#include <bio.h>
#include <avl.h>
#include <mp.h>
#include "dat.h"
#include "fns.h"
typedef struct ANode ANode;
struct ANode {
Avl;
s64int val, cnt;
u64int sq[2];
int keysize;
uchar key[1];
};
Agg *aggs;
static Avltree **trees;
static ANode *key;
int interrupted;
static int
aggcmp(Avl *ap, Avl *bp)
{
ANode *a, *b;
a = (ANode *) ap;
b = (ANode *) bp;
return memcmp(a->key, b->key, a->keysize);
}
static void
createrecord(int type, ANode *n, s64int *q)
{
switch(type){
case AGGCNT: n->cnt = q[0]; break;
case AGGSUM: case AGGMIN: case AGGMAX: n->val = q[0]; break;
case AGGAVG: n->cnt = q[1]; n->val = q[0]; break;
case AGGSTD: n->cnt = q[1]; n->val = q[0]; n->sq[0] = q[2]; n->sq[1] = q[3]; break;
default: abort();
}
}
static void
updaterecord(int type, ANode *n, s64int *q)
{
u64int r;
switch(type){
case AGGCNT: n->cnt += q[0]; break;
case AGGSUM: n->val += q[0]; break;
case AGGAVG: n->cnt += q[1]; n->val += q[0]; break;
case AGGSTD:
n->cnt += q[1];
n->val += q[0];
r = n->sq[0] + q[2];
if(r < q[2]) n->sq[1]++;
n->sq[0] = r;
n->sq[1] += q[3];
break;
default: abort();
}
}
int
aggparsebuf(uchar *p, int n)
{
uchar *e;
Agg *a;
u32int id;
Avltree *tp;
ANode *np;
e = p + n;
for(; p + 8 < e; p += a->recsize){
id = *(u32int*)&p[4];
if((u16int)id >= aggid){
inval:
fprint(2, "invalid record in aggregation buffer\n");
return -1;
}
a = &aggs[(u16int)id];
if(a->type != id>>28) goto inval;
if(a->keysize != (id>>13&0x7ff8)) goto inval;
if(p + a->recsize > e) goto inval;
tp = trees[(u16int)id];
key->keysize = a->keysize;
memcpy(key->key, &p[8], a->keysize);
np = (ANode *) avllookup(tp, key, 0);
if(np == nil){
np = emalloc(sizeof(ANode) - 1 + a->keysize);
*np = *key;
createrecord(a->type, np, (s64int*)&p[8+a->keysize]);
avlinsert(tp, np);
}else
updaterecord(a->type, np, (s64int*)&p[8+a->keysize]);
}
return 0;
}
void
agginit(void)
{
int i, m;
trees = emalloc(sizeof(Avltree *) * aggid);
m = 0;
for(i = 0; i < aggid; i++){
trees[i] = avlcreate(aggcmp);
if(aggs[i].keysize > m)
m = aggs[i].keysize;
}
key = emalloc(sizeof(ANode) - 1 + m);
}
int
aggnote(void *, char *note)
{
if(strcmp(note, "interrupt") != 0 || interrupted)
return 0;
interrupted = 1;
return 1;
}
void
aggkeyprint(Fmt *f, Agg *, ANode *a)
{
fmtprint(f, "%20lld ", *(u64int*)a->key);
}
static double
variance(ANode *a)
{
mpint *x, *y, *z;
double r;
x = vtomp(a->val, nil);
y = uvtomp(a->sq[0], nil);
z = vtomp(a->sq[1], nil);
mpleft(z, 64, z);
mpadd(z, y, y);
vtomp(a->cnt, z);
mpmul(x, x, x);
mpmul(y, z, y);
mpsub(y, x, x);
r = mptod(x) / a->cnt;
mpfree(x);
mpfree(y);
mpfree(z);
return r;
}
void
aggvalprint(Fmt *f, int type, ANode *a)
{
double x, s;
switch(type){
case AGGCNT: fmtprint(f, "%20lld", a->cnt); break;
case AGGSUM: case AGGMIN: case AGGMAX: fmtprint(f, "%20lld", a->val); break;
case AGGAVG: fmtprint(f, "%20g", (double)a->val / a->cnt); break;
case AGGSTD:
x = (double)a->val / a->cnt;
s = variance(a);
if(s < 0)
fmtprint(f, "%20g %20s", x, "NaN");
else{
fmtprint(f, "%20g %20g", x, sqrt(s));
}
break;
default:
abort();
}
}
void
aggdump(void)
{
Fmt f;
char buf[8192];
int i;
ANode *a;
fmtfdinit(&f, 1, buf, sizeof(buf));
for(i = 0; i < aggid; i++){
a = (ANode *) avlmin(trees[i]);
for(; a != nil; a = (ANode *) avlnext(a)){
fmtprint(&f, "%s\t", aggs[i].name);
aggkeyprint(&f, &aggs[i], a);
aggvalprint(&f, aggs[i].type, a);
fmtprint(&f, "\n");
}
}
fmtfdflush(&f);
}

View File

@ -296,10 +296,10 @@ tracegen(Node *n, DTActGr *g, int *recoff)
case ORECORD:
switch(n->typ->type){
case TYPINT:
actgradd(g, (DTAct){ACTTRACE, codegen(n->n1), n->typ->size});
actgradd(g, (DTAct){ACTTRACE, codegen(n->n1), n->typ->size, noagg});
break;
case TYPSTRING:
actgradd(g, (DTAct){ACTTRACESTR, codegen(n->n1), n->typ->size});
actgradd(g, (DTAct){ACTTRACESTR, codegen(n->n1), n->typ->size, noagg});
break;
default:
sysfatal("tracegen: don't know how to record %τ", n->typ);

View File

@ -5,6 +5,7 @@ typedef struct Clause Clause;
typedef struct Enab Enab;
typedef struct Stat Stat;
typedef struct Type Type;
typedef struct Agg Agg;
enum {
SYMHASH = 256,
@ -89,10 +90,19 @@ struct Stat {
STATEXPR,
STATPRINT,
STATPRINTF,
STATAGG,
} type;
/* STATEXPR */
Node *n;
/* STATPRINT, STATPRINTF */
int narg;
Node **arg;
/* STATAGG */
struct {
Symbol *name;
int type;
Node *key, *value;
} agg;
};
struct Clause {
@ -112,6 +122,11 @@ struct Enab {
Enab *next;
};
struct Agg {
DTAgg;
char *name;
};
extern int errors;
#pragma varargck type "α" int
@ -121,3 +136,6 @@ extern int errors;
#pragma varargck argpos error 1
extern int dflag;
extern DTAgg noagg;
extern int aggid;
extern Agg *aggs;

View File

@ -5,6 +5,8 @@
#include "dat.h"
#include "fns.h"
DTAgg noagg;
char *dtracyroot = "";
int dtracyno;
int ctlfd, buffd;
@ -160,23 +162,56 @@ err:
}
void
int
bufread(Biobuf *bp)
{
static uchar buf[65536];
int n;
n = read(buffd, buf, sizeof(buf));
if(n < 0) sysfatal("bufread: %r");
if(n < 0)
sysfatal("bufread: %r");
if(parsebuf(buf, n, bp) < 0)
sysfatal("parsebuf: %r");
Bflush(bp);
return 0;
}
void
aggproc(void)
{
char buf[65536];
int buffd, n;
extern int interrupted;
switch(rfork(RFPROC|RFMEM)){
case -1: sysfatal("rfork: %r");
case 0: return;
default: break;
}
snprint(buf, sizeof(buf), "%s/%d/aggbuf", dtracyroot, dtracyno);
buffd = open(buf, OREAD);
if(buffd < 0) sysfatal("open: %r");
agginit();
atnotify(aggnote, 1);
while(!interrupted){
n = read(buffd, buf, sizeof(buf));
if(n < 0){
if(interrupted)
break;
sysfatal("aggbufread: %r");
}
if(aggparsebuf((uchar *) buf, n) < 0)
exits("error");
}
aggdump();
exits(nil);
}
static void
usage(void)
{
fprint(2, "usage: %s [ -cd ] script\n", argv0);
fprint(2, "usage: %s [ -d ] script\n", argv0);
exits("usage");
}
@ -217,6 +252,8 @@ main(int argc, char **argv)
fprint(ctlfd, "go");
out = Bfdopen(1, OWRITE);
if(out == nil) sysfatal("Bfdopen: %r");
if(aggid > 0)
aggproc();
for(;;)
bufread(out);
}

View File

@ -33,3 +33,7 @@ Type *type(int, ...);
int min(int, int);
int max(int, int);
Node *addtype(Type *, Node *);
int aggparsebuf(uchar *, int);
int aggnote(void *, char *);
void aggdump(void);
void agginit(void);

View File

@ -9,6 +9,7 @@ OFILES=\
cgen.$O\
act.$O\
type.$O\
agg.$O\
YFILES=parse.y

View File

@ -16,7 +16,8 @@
Type *t;
}
%type <n> expr
%type <n> expr optexpr
%type <sym> optsym
%type <t> type
%token <sym> TSYM
@ -63,7 +64,9 @@ stats0: stat | stats0 ';' stat
stat: expr { addstat(STATEXPR, exprcheck($1, 0)); }
| TPRINT { addstat(STATPRINT); } pelist
| TPRINTF { addstat(STATPRINTF); } pelist
| '@' optsym '[' expr ']' '=' TSYM '(' optexpr ')' { addstat(STATAGG, $2, $4, $7, $9); }
optsym: TSYM | { $$ = nil; }
optexpr: expr | { $$ = nil; }
pelist:
'(' ')'

138
sys/src/libdtracy/agg.c Normal file
View File

@ -0,0 +1,138 @@
#include <u.h>
#include <libc.h>
#include <dtracy.h>
int
dtaunpackid(DTAgg *a)
{
a->type = a->id >> 28 & 15;
a->keysize = a->id >> 13 & 0x7ff8;
switch(a->type){
case AGGCNT:
case AGGSUM:
case AGGMIN:
case AGGMAX:
a->recsize = 8 + a->keysize + 8;
return 0;
case AGGAVG:
a->recsize = 8 + a->keysize + 16;
return 0;
case AGGSTD:
a->recsize = 8 + a->keysize + 32;
return 0;
default:
return -1;
}
}
static u64int
hash(uchar *s, int n, int m)
{
u64int h;
int i;
h = 0xcbf29ce484222325ULL;
for(i = 0; i < n; i++){
h ^= s[i];
h *= 0x100000001b3ULL;
}
for(; i < m; i++)
h *= 0x100000001b3ULL;
return h;
}
static int
keyeq(uchar *a, uchar *b, int n, int m)
{
int i;
for(i = 0; i < n; i++)
if(a[i] != b[i])
return 0;
for(; i < m; i++)
if(a[i] != 0)
return 0;
return 1;
}
/* calculate v*v with 128 bits precision and add it to the 128-bit word at q */
static void
addsquare(u64int *q, s64int v)
{
u32int v0;
s32int v1;
s64int s0, s1, s2;
u64int r;
v0 = v;
v1 = v>>32;
s0 = (s64int)v0 * (s64int)v0;
s1 = (s64int)v0 * (s64int)v1;
s2 = (s64int)v1 * (s64int)v1;
r = s0 + (s1<<33);
if(r < (u64int)s0) q[1]++;
q[0] += r;
if(q[0] < r) q[1]++;
q[1] += s2 + (s1>>31);
}
static void
updaterecord(int type, u64int *q, s64int val)
{
switch(type){
case AGGCNT: q[0] += 1; break;
case AGGSUM: q[0] += val; break;
case AGGAVG: q[0] += val; q[1]++; break;
case AGGMIN: if(val < q[0]) q[0] = val; break;
case AGGMAX: if(val > q[0]) q[0] = val; break;
case AGGSTD: q[0] += val; q[1]++; addsquare(&q[2], val); break;
}
}
static void
createrecord(int type, u64int *q, s64int val)
{
switch(type){
case AGGCNT: q[0] = 1; break;
case AGGSUM: case AGGMIN: case AGGMAX: q[0] = val; break;
case AGGAVG: q[0] = val; q[1] = 1; break;
case AGGSTD: q[0] = val; q[1] = 1; q[2] = 0; q[3] = 0; addsquare(&q[2], val); break;
}
}
/* runs in probe context */
void
dtarecord(DTChan *ch, int mach, DTAgg *a, uchar *key, int nkey, s64int val)
{
u64int h;
u32int *p, *q;
DTBuf *c;
c = ch->aggwrbufs[mach];
h = hash(key, nkey, a->keysize);
p = (u32int*)(c->data + DTABUCKETS + (h % DTANUMBUCKETS) * 4);
while(*p != DTANIL){
assert((uint)*p < DTABUCKETS);
q = (u32int*)(c->data + *p);
if(q[1] == a->id && keyeq((uchar*)(q + 2), key, nkey, a->keysize) == 0){
updaterecord(a->type, (u64int*)(q + 2 + a->keysize / 4), val);
return;
}
p = q;
}
if(c->wr + a->recsize > DTABUCKETS)
return;
*p = c->wr;
q = (u32int*)(c->data + c->wr);
q[0] = DTANIL;
q[1] = a->id;
if(nkey == a->keysize)
memmove(&q[2], key, nkey);
else if(nkey > a->keysize){
memmove(&q[2], key, nkey);
memset((uchar*)q + 8 + nkey, 0, a->keysize - nkey);
}else
memmove(&q[2], key, a->keysize);
createrecord(a->type, (u64int*)(q + 2 + a->keysize / 4), val);
c->wr += a->recsize;
}

View File

@ -44,6 +44,14 @@ dtcnew(void)
c->rdbufs[i] = dtmalloc(sizeof(DTBuf));
c->wrbufs[i] = dtmalloc(sizeof(DTBuf));
}
c->aggrdbufs = dtmalloc(sizeof(DTBuf *) * dtnmach);
c->aggwrbufs = dtmalloc(sizeof(DTBuf *) * dtnmach);
for(i = 0; i < dtnmach; i++){
c->aggrdbufs[i] = dtmalloc(sizeof(DTBuf));
c->aggwrbufs[i] = dtmalloc(sizeof(DTBuf));
memset(c->aggrdbufs[i]->data, -1, DTBUFSZ);
memset(c->aggwrbufs[i]->data, -1, DTBUFSZ);
}
return c;
}
@ -63,6 +71,12 @@ dtcfree(DTChan *ch)
}
free(ch->rdbufs);
free(ch->wrbufs);
for(i = 0; i < dtnmach; i++){
free(ch->aggrdbufs[i]);
free(ch->aggwrbufs[i]);
}
free(ch->aggrdbufs);
free(ch->aggwrbufs);
free(ch);
}
@ -73,7 +87,7 @@ dtcaddgr(DTChan *c, DTName name, DTActGr *gr)
DTEnab *ep;
int i, nl, n;
if(dtgverify(gr) < 0)
if(dtgverify(c, gr) < 0)
return -1;
gr->chan = c;
@ -194,13 +208,54 @@ dtcread(DTChan *c, void *buf, int n)
return 0;
}
static void
dtcaggbufswap(DTChan *c, int n)
{
DTBuf *z;
dtmachlock(n);
z = c->aggrdbufs[n];
c->aggrdbufs[n] = c->aggwrbufs[n];
c->aggwrbufs[n] = z;
dtmachunlock(n);
}
int
dtcaggread(DTChan *c, void *buf, int n)
{
int i, swapped;
if(c->state == DTCFAULT){
werrstr("%s", c->errstr);
return -1;
}
for(i = 0; i < dtnmach; i++){
if(swapped = c->aggrdbufs[i]->wr == 0)
dtcaggbufswap(c, i);
if(c->aggrdbufs[i]->wr != 0){
if(c->aggrdbufs[i]->wr > n){
werrstr("short read");
return -1;
}
n = c->aggrdbufs[i]->wr;
memmove(buf, c->aggrdbufs[i]->data, n);
c->aggrdbufs[i]->wr = 0;
memset(c->aggrdbufs[i]->data + DTABUCKETS, -1, 4 * DTANUMBUCKETS);
if(!swapped)
dtcaggbufswap(c, i);
return n;
}
}
return 0;
}
void
dtcreset(DTChan *c)
{
DTEnab *ep, *eq;
for(ep = c->enab; ep != nil; ep = ep->channext){
/* careful! has to look atomic for etptrigger */
/* careful! has to look atomic for dtptrigger */
ep->probprev->probnext = ep->probnext;
ep->probnext->probprev = ep->probprev;
}

View File

@ -8,6 +8,7 @@ OFILES=\
dtefmt.$O\
pack.$O\
chan.$O\
agg.$O\
HFILES=\
/sys/include/dtracy.h\

View File

@ -27,6 +27,12 @@ dtgpack(Fmt *f, DTActGr *g)
fmtprint(f, "t%d\n", g->acts[i].type);
fmtprint(f, "s%d\n", g->acts[i].size);
dtepack(f, g->acts[i].p);
switch(g->acts[i].type){
case ACTAGGKEY:
case ACTAGGVAL:
fmtprint(f, "A%#.8ux\n", g->acts[i].agg.id);
break;
}
}
fmtprint(f, "G");
}
@ -132,6 +138,18 @@ dtgunpack(char *s, DTActGr **rp)
case ACTTRACESTR:
g->reclen += g->acts[i].size;
break;
case ACTAGGKEY:
if(*s++ != 'A') goto fail;
s = u32unpack(s, (u32int *) &g->acts[i].agg.id);
if(s == nil) goto fail;
break;
case ACTAGGVAL:
if(*s++ != 'A') goto fail;
s = u32unpack(s, (u32int *) &g->acts[i].agg.id);
if(s == nil) goto fail;
break;
case ACTCANCEL:
break;
default:
goto fail;
}
@ -182,7 +200,7 @@ dtclfree(DTClause *c)
int i;
if(c == nil) return;
if(--c->gr->ref == 0)
if(c->gr != nil && --c->gr->ref == 0)
dtgfree(c->gr);
for(i = 0; i < c->nprob; i++)
free(c->probs[i]);

View File

@ -80,7 +80,7 @@ invalid:
}
int
dtgverify(DTActGr *g)
dtgverify(DTChan *, DTActGr *g)
{
int i;
@ -96,6 +96,26 @@ dtgverify(DTActGr *g)
if(g->acts[i].p == nil || dteverify(g->acts[i].p) < 0 || (uint)g->acts[i].size > DTRECMAX)
return -1;
break;
case ACTAGGKEY:
if(g->acts[i].p == nil || dteverify(g->acts[i].p) < 0 || (uint)g->acts[i].size > 8)
return -1;
if(i == g->nact - 1 || g->acts[i+1].type != ACTAGGVAL || g->acts[i+1].agg.id != g->acts[i].agg.id)
return -1;
break;
case ACTAGGVAL:
if(g->acts[i].p == nil || dteverify(g->acts[i].p) < 0 || (uint)g->acts[i].size > 8)
return -1;
if(i == 0 || g->acts[i-1].type != ACTAGGKEY)
return -1;
if(dtaunpackid(&g->acts[i].agg) < 0)
return -1;
break;
case ACTCANCEL:
if(g->acts[i].p == nil || dteverify(g->acts[i].p) < 0)
return -1;
if(i != g->nact - 1)
return -1;
break;
default:
return -1;
}
@ -225,6 +245,7 @@ dtgexec(DTActGr *g, ExecInfo *info)
DTBuf *b;
u8int *bp;
s64int v;
uchar aggkey[8];
int i, j;
b = g->chan->wrbufs[info->machno];
@ -240,6 +261,8 @@ dtgexec(DTActGr *g, ExecInfo *info)
PUT4(info->epid);
PUT8(info->ts);
for(i = 0; i < g->nact; i++){
if(g->acts[i].type == ACTCANCEL)
return 0;
if(dteexec(g->acts[i].p, info, &v) < 0)
return -1;
switch(g->acts[i].type){
@ -256,6 +279,15 @@ dtgexec(DTActGr *g, ExecInfo *info)
}
bp += g->acts[i].size;
break;
case ACTAGGKEY:
for(j = 0; j < g->acts[i].size; j++){
aggkey[j] = v;
v >>= 8;
}
break;
case ACTAGGVAL:
dtarecord(g->chan, info->machno, &g->acts[i].agg, aggkey, g->acts[i-1].size, v);
break;
}
}
assert(bp - b->data - b->wr == g->reclen);