fix refresh interruption, fix #40

This commit is contained in:
Thomas Debesse 2014-10-22 02:19:57 +02:00
parent e9e5da8931
commit fb22c3bb14
2 changed files with 98 additions and 91 deletions

View File

@ -53,6 +53,8 @@ static const char savage_master_header[5] = { 0x7E, 0x41, 0x03, 0x00, 0x00 };
static void stat_next (struct stat_job *job);
static void parse_qstat_record (struct stat_conn *conn);
static GSList* stat_buffer_to_strings(gchar buffer[], gsize bufsize);
typedef void (*server_unref_void)(void*);
typedef void (*userver_unref_void)(void*);
@ -123,6 +125,17 @@ static void stat_master_update_done(
// check if master output is in savage format, if yes parse it and return true.
// otherwise return false so the normal parse function can do the work
#if 0
/*
* needs
* conn->buf = g_malloc (BUFFER_MINSIZE);
* conn->bufsize = BUFFER_MINSIZE;
* but currently
* conn->buf = NULL;
* conn->bufsize = 0;
*
*/
static void parse_savage_master_output (struct stat_conn *conn)
{
struct stat_job *job = conn->job;
@ -267,6 +280,7 @@ static void parse_savage_master_output (struct stat_conn *conn)
}
// infinite loop end, next lines are never reached
}
#endif
/**
parse qstat output line str, in ip:port format. return true if
@ -431,6 +445,7 @@ static gboolean stat_master_input_callback (GIOChannel *chan, GIOCondition condi
debug_increase_indent();
debug(3,"stat_master_input_callback(%p,%d,...)",conn,chan);
#if 0
#warning ugly hack for savage, make master handling more generic!
if(conn->master->type == SAS_SERVER && conn->master->master_type == MASTER_HTTP) {
/* if first time, check it */
@ -448,6 +463,7 @@ static gboolean stat_master_input_callback (GIOChannel *chan, GIOCondition condi
}
/* if you are here, savage is NOT detected, do your job */
#endif
/* return TRUE when there is nothing (more) to do */
while (1) {
first_used = 0;
@ -457,7 +473,7 @@ static gboolean stat_master_input_callback (GIOChannel *chan, GIOCondition condi
stat_master_update_done (conn, job, conn->master, SOURCE_ERROR);
stat_update_masters (job);
debug_decrease_indent();
return TRUE;
return FALSE;
}
status = g_io_channel_read_chars(chan, conn->buf + conn->pos, conn->bufsize - conn->pos, &res, &err);
@ -466,12 +482,12 @@ static gboolean stat_master_input_callback (GIOChannel *chan, GIOCondition condi
stat_master_update_done (conn, job, conn->master, SOURCE_UP);
stat_update_masters (job);
debug_decrease_indent();
return TRUE;
return FALSE;
}
else if (status == G_IO_STATUS_AGAIN) {
debug(3,"stat_master_input_callback -- unavailable");
debug_decrease_indent();
return TRUE;
return FALSE;
}
else if (status == G_IO_STATUS_ERROR) {
debug(3,"stat_master_input_callback -- error");
@ -498,7 +514,7 @@ static gboolean stat_master_input_callback (GIOChannel *chan, GIOCondition condi
stat_master_update_done (conn, job, conn->master, conn->master->state);
stat_update_masters (job);
debug_decrease_indent();
return TRUE;
return FALSE;
}
first_used = tmp - conn->buf;
@ -820,8 +836,7 @@ static void parse_qstat_record (struct stat_conn *conn) {
count as does the prepend below. So we need to decrement it
one just for fun.
*/
job->delayed.queued_servers =
server_list_prepend (job->delayed.queued_servers, server);
job->delayed.queued_servers = server_list_prepend (job->delayed.queued_servers, server);
job->progress.done++;
server_unref (server);
@ -829,8 +844,9 @@ static void parse_qstat_record (struct stat_conn *conn) {
parse_qstat_record_part2 (conn->strings->next, server);
for (list = job->server_handlers; list; list = list->next)
for (list = job->server_handlers; list; list = list->next) {
(* (server_func) list->data) (job, server);
}
}
}
@ -896,14 +912,6 @@ void parse_saved_server (GSList *strings) {
}
static void adjust_pointers (GSList *list, gpointer new, gpointer old) {
while (list) {
list->data = (gpointer) ((char *)new + ((char *)list->data - (char *)old));
list = list->next;
}
}
static void stat_servers_update_done (struct stat_conn *conn) {
debug (3, "stat_servers_update_done() -- Conn %lx server list %lx", conn, conn->job->servers);
server_list_free (conn->job->servers);
@ -911,7 +919,29 @@ static void stat_servers_update_done (struct stat_conn *conn) {
stat_free_conn (conn);
}
static GSList* stat_buffer_to_strings(gchar buffer[], gsize bufsize) {
GSList *strings = NULL;
gchar token[4096];
gsize i, last;
for (i = 0, last = 0; i < bufsize; i++) {
if (i - last == 4096) {
return strings; // error : too long line
}
else if (buffer[i] == '\n' || buffer[i] == '\0') {
token[i - last] = '\0';
debug(6, "stat_buffer_to_strings - token: %s", token);
strings = g_slist_append(strings, strdup(token));
last = i + 1;
}
else {
token[i - last] = buffer[i];
}
}
return strings;
}
/*
stat_servers_input_callback -- as data is returned from the qstat
@ -920,38 +950,53 @@ static void stat_servers_update_done (struct stat_conn *conn) {
*/
static gboolean stat_servers_input_callback (GIOChannel *chan, GIOCondition condition, struct stat_conn *conn) {
struct stat_job *job = conn->job;
int first_used = 0;
int blocked = FALSE;
char *tmp;
gchar *buf = g_malloc(sizeof(gchar*) * BUFFER_MINSIZE);
gsize res = 0;
GError *err = NULL;
GIOStatus status;
GSList *strings, *current;
/* debug (3, "stat_servers_input_callback() -- Conn %lx", conn); */
while (1) {
first_used = 0;
blocked = FALSE;
debug (3, "stat_servers_input_callback() -- Conn %lx", conn);
while (TRUE) {
status = g_io_channel_read_chars(chan, buf, BUFFER_MINSIZE, &res, &err);
if (conn->bufsize - conn->pos < BUFFER_TRESHOLD) {
if (conn->bufsize >= BUFFER_MAXSIZE) {
fprintf (stderr, "server record is too large\n");
stat_servers_update_done (conn);
stat_next (job);
return TRUE;
}
conn->bufsize += conn->bufsize;
tmp = conn->buf;
conn->buf = g_realloc (conn->buf, conn->bufsize);
adjust_pointers (conn->strings, conn->buf, tmp);
conn->bufsize += res;
if (conn->buf == NULL) {
conn->buf = g_malloc(sizeof(gchar*) * conn->bufsize);
}
else {
conn->buf = g_realloc(conn->buf, sizeof(gchar*) * conn->bufsize);
}
status = g_io_channel_read_chars(chan, conn->buf + conn->pos, conn->bufsize - conn->pos, &res, &err);
strncpy(conn->buf + (conn->bufsize - res), buf, res);
if (status == G_IO_STATUS_EOF) {
debug(3,"stat_servers_input_callback -- eof");
debug (3, "Conn %ld Sub Process Done with server list %lx", conn, conn->job->servers);
debug(3, "stat_servers_input_callback -- eof");
debug(3, "Conn %ld Sub Process Done with server list %lx", conn, conn->job->servers);
strings = stat_buffer_to_strings(conn->buf, conn->bufsize);
current = strings;
conn->strings = NULL;
while (current) {
while (strlen(current->data)) {
conn->strings = g_slist_append(conn->strings, strdup(current->data));
current = current->next;
}
if (conn->strings) {
parse_qstat_record (conn);
g_slist_free (conn->strings);
conn->strings = NULL;
}
current = current->next;
}
g_slist_free(strings);
stat_servers_update_done (conn);
stat_next (job);
return TRUE;
return FALSE;
}
else if (status == G_IO_STATUS_AGAIN) {
debug(3,"stat_servers_input_callback -- unavailable");
@ -959,6 +1004,8 @@ static gboolean stat_servers_input_callback (GIOChannel *chan, GIOCondition cond
}
else if (status == G_IO_STATUS_ERROR) {
debug(3,"stat_servers_input_callback -- error");
failed ("read", NULL);
stat_servers_update_done (conn);
stat_next (job);
@ -966,54 +1013,7 @@ static gboolean stat_servers_input_callback (GIOChannel *chan, GIOCondition cond
}
/* G_IO_STATUS_NORMAL */
debug(3,"stat_servers_input_callback -- chars read");
tmp = conn->buf + conn->pos;
conn->pos += res;
while (res && (tmp = memchr (tmp, '\n', res)) != NULL) {
debug(3,"stat_servers_input_callback -- parse new line");
*tmp++ = '\0';
if (conn->buf[conn->lastnl] == '\0') {
debug(3,"stat_servers_input_callback -- blocked");
blocked = TRUE;
parse_qstat_record (conn);
g_slist_free (conn->strings);
conn->strings = NULL;
first_used = conn->lastnl + 1;
}
else {
conn->strings = g_slist_append (conn->strings, conn->buf + conn->lastnl);
}
conn->lastnl = tmp - conn->buf;
res = conn->buf + conn->pos - tmp;
}
if (first_used) {
if (first_used == conn->pos) {
conn->pos = 0;
conn->lastnl = 0;
}
else {
tmp = conn->buf + first_used;
g_memmove (conn->buf, conn->buf + first_used, conn->pos - first_used);
conn->pos -= first_used;
conn->lastnl -= first_used;
adjust_pointers (conn->strings, conn->buf, tmp);
}
}
if (blocked) {
debug(3,"stat_servers_input_callback -- if blocked");
g_source_remove (conn->tag);
debug(3,"stat_servers_input_callback -- new watch");
conn->tag = g_io_add_watch (conn->chan, G_IO_IN | G_IO_HUP | G_IO_ERR | G_IO_PRI, conn->input_callback, conn);
}
/* loop */
}
// infinite loop end, next lines are never read
}
@ -1049,8 +1049,10 @@ static struct stat_conn *new_file_conn (struct stat_job *job, const char* file,
conn->chan = g_io_channel_unix_new (conn->fd);
conn->pos = 0;
conn->lastnl = 0;
#if 0
conn->first = TRUE;
conn->is_savage = FALSE;
#endif
conn->strings = NULL;
conn->servers = NULL;
@ -1106,16 +1108,18 @@ static struct stat_conn *start_qstat (struct stat_job *job, char *argv[], GIOFun
conn = g_malloc (sizeof (struct stat_conn));
conn->buf = g_malloc (BUFFER_MINSIZE);
conn->bufsize = BUFFER_MINSIZE;
conn->buf = NULL;
conn->bufsize = 0;
conn->tmpfile = NULL;
conn->pid = pid;
conn->fd = pipefds[0];
conn->chan = g_io_channel_unix_new (conn->fd);
conn->pos = 0;
conn->lastnl = 0;
#if 0
conn->first = TRUE;
conn->is_savage = FALSE;
#endif
conn->strings = NULL;
conn->servers = NULL;
@ -1525,7 +1529,8 @@ out:
}
#define MAX_SERVERS_IN_CMDLINE 8
// THD #define MAX_SERVERS_IN_CMDLINE 8
#define MAX_SERVERS_IN_CMDLINE 1
static struct stat_conn *stat_open_conn_qstat (struct stat_job *job) {
struct server *s = NULL;

View File

@ -55,8 +55,8 @@ struct stat_conn {
guint tag;
GIOFunc input_callback;
char *buf;
int bufsize;
gchar *buf;
gsize bufsize;
int pos;
int lastnl;
@ -70,9 +70,11 @@ struct stat_conn {
char *tmpfile;
#if 0
// UGLY HACKS
gboolean first; // marker for savage check header
gboolean is_savage; // true if file is in savage format
boolean is_savage; // true if file is in savage format
#endif
};
struct delayed_refresh {