554 lines
11 KiB
C
554 lines
11 KiB
C
/*
|
|
* mooutils-thread.c
|
|
*
|
|
* Copyright (C) 2004-2008 by Yevgen Muntyan <muntyan@tamu.edu>
|
|
*
|
|
* This file is part of medit. medit is free software; you can
|
|
* redistribute it and/or modify it under the terms of the
|
|
* GNU Lesser General Public License as published by the
|
|
* Free Software Foundation; either version 2.1 of the License,
|
|
* or (at your option) any later version.
|
|
*
|
|
* You should have received a copy of the GNU Lesser General Public
|
|
* License along with medit. If not, see <http://www.gnu.org/licenses/>.
|
|
*/
|
|
|
|
#include "config.h"
|
|
#include "mooutils/mooutils-thread.h"
|
|
#include "mooutils/mooutils-misc.h"
|
|
#include "mooutils/mooutils-debug.h"
|
|
#include "mooutils/mootype-macros.h"
|
|
|
|
#include <stdio.h>
|
|
#include <errno.h>
|
|
#include <fcntl.h>
|
|
#ifdef HAVE_UNISTD_H
|
|
#include <unistd.h>
|
|
#endif
|
|
#ifdef __WIN32__
|
|
#include <windows.h>
|
|
#include <io.h>
|
|
#ifndef pipe
|
|
#define pipe(phandles) _pipe (phandles, 4096, _O_BINARY)
|
|
#endif
|
|
#endif
|
|
|
|
MOO_DEBUG_INIT(threads, FALSE)
|
|
|
|
typedef struct {
|
|
MooEventQueueCallback callback;
|
|
gpointer callback_data;
|
|
GDestroyNotify notify;
|
|
guint id;
|
|
} QueueClient;
|
|
|
|
typedef struct {
|
|
gpointer data;
|
|
GDestroyNotify destroy;
|
|
guint id;
|
|
} EventData;
|
|
|
|
typedef struct {
|
|
GSList *clients;
|
|
guint last_id;
|
|
|
|
int pipe_in;
|
|
int pipe_out;
|
|
GIOChannel *io;
|
|
|
|
GHashTable *data;
|
|
|
|
volatile gboolean init;
|
|
} EventQueue;
|
|
|
|
|
|
static GStaticMutex queue_lock = G_STATIC_MUTEX_INIT;
|
|
static EventQueue queue;
|
|
|
|
|
|
static QueueClient *
|
|
get_event_client (guint id)
|
|
{
|
|
GSList *l;
|
|
|
|
g_return_val_if_fail (queue.init, NULL);
|
|
|
|
for (l = queue.clients; l != NULL; l = l->next)
|
|
{
|
|
QueueClient *s = l->data;
|
|
|
|
if (s->id == id)
|
|
return s;
|
|
}
|
|
|
|
return NULL;
|
|
}
|
|
|
|
|
|
static void
|
|
invoke_callback (gpointer id,
|
|
GQueue *events)
|
|
{
|
|
GList *l;
|
|
QueueClient *client;
|
|
|
|
moo_dmsg ("processing events for id %u", GPOINTER_TO_UINT (id));
|
|
client = get_event_client (GPOINTER_TO_UINT (id));
|
|
|
|
if (client)
|
|
{
|
|
GList *data_list = NULL;
|
|
|
|
for (l = events->head; l != NULL; l = l->next)
|
|
{
|
|
EventData *data = l->data;
|
|
data_list = g_list_prepend (data_list, data->data);
|
|
}
|
|
|
|
data_list = g_list_reverse (data_list);
|
|
|
|
client->callback (data_list, client->callback_data);
|
|
|
|
g_list_free (data_list);
|
|
}
|
|
|
|
for (l = events->head; l != NULL; l = l->next)
|
|
{
|
|
EventData *data = l->data;
|
|
|
|
if (data->destroy)
|
|
data->destroy (data->data);
|
|
|
|
g_free (data);
|
|
}
|
|
|
|
g_queue_free (events);
|
|
}
|
|
|
|
|
|
static gboolean
|
|
got_data (GIOChannel *io)
|
|
{
|
|
GHashTable *data;
|
|
char buf[1];
|
|
|
|
g_static_mutex_lock (&queue_lock);
|
|
data = queue.data;
|
|
queue.data = NULL;
|
|
g_io_channel_read_chars (io, buf, 1, NULL, NULL);
|
|
g_static_mutex_unlock (&queue_lock);
|
|
|
|
g_hash_table_foreach (data, (GHFunc) invoke_callback, NULL);
|
|
g_hash_table_destroy (data);
|
|
|
|
return TRUE;
|
|
}
|
|
|
|
|
|
void
|
|
_moo_event_queue_do_events (guint event_id)
|
|
{
|
|
GQueue *events = NULL;
|
|
|
|
g_return_if_fail (queue.init);
|
|
|
|
g_static_mutex_lock (&queue_lock);
|
|
|
|
if (queue.data)
|
|
{
|
|
events = g_hash_table_lookup (queue.data, GUINT_TO_POINTER (event_id));
|
|
|
|
if (events)
|
|
g_hash_table_remove (queue.data, GUINT_TO_POINTER (event_id));
|
|
}
|
|
|
|
g_static_mutex_unlock (&queue_lock);
|
|
|
|
if (events)
|
|
invoke_callback (GUINT_TO_POINTER (event_id), events);
|
|
}
|
|
|
|
|
|
static void
|
|
init_queue (void)
|
|
{
|
|
if (g_atomic_int_get (&queue.init))
|
|
return;
|
|
|
|
g_static_mutex_lock (&queue_lock);
|
|
|
|
if (!queue.init)
|
|
{
|
|
int fds[2];
|
|
GSource *source;
|
|
|
|
if (pipe (fds) != 0)
|
|
{
|
|
perror ("pipe");
|
|
goto out;
|
|
}
|
|
|
|
queue.clients = NULL;
|
|
|
|
queue.pipe_in = fds[1];
|
|
queue.pipe_out = fds[0];
|
|
|
|
#ifdef __WIN32__
|
|
queue.io = g_io_channel_win32_new_fd (queue.pipe_out);
|
|
#else
|
|
queue.io = g_io_channel_unix_new (queue.pipe_out);
|
|
#endif
|
|
|
|
source = g_io_create_watch (queue.io, G_IO_IN);
|
|
g_source_set_callback (source, (GSourceFunc) got_data, NULL, NULL);
|
|
g_source_set_can_recurse (source, TRUE);
|
|
g_source_attach (source, NULL);
|
|
|
|
queue.data = NULL;
|
|
|
|
queue.init = TRUE;
|
|
}
|
|
|
|
out:
|
|
g_static_mutex_unlock (&queue_lock);
|
|
}
|
|
|
|
|
|
guint
|
|
_moo_event_queue_connect (MooEventQueueCallback callback,
|
|
gpointer data,
|
|
GDestroyNotify notify)
|
|
{
|
|
QueueClient *client;
|
|
|
|
g_return_val_if_fail (callback != NULL, 0);
|
|
|
|
init_queue ();
|
|
|
|
client = g_new0 (QueueClient, 1);
|
|
client->callback = callback;
|
|
client->callback_data = data;
|
|
client->notify = notify;
|
|
|
|
g_static_mutex_lock (&queue_lock);
|
|
client->id = ++queue.last_id;
|
|
queue.clients = g_slist_prepend (queue.clients, client);
|
|
g_static_mutex_unlock (&queue_lock);
|
|
|
|
return client->id;
|
|
}
|
|
|
|
|
|
void
|
|
_moo_event_queue_disconnect (guint event_id)
|
|
{
|
|
QueueClient *client;
|
|
|
|
g_return_if_fail (event_id != 0);
|
|
g_return_if_fail (queue.init);
|
|
|
|
g_static_mutex_lock (&queue_lock);
|
|
|
|
client = get_event_client (event_id);
|
|
|
|
if (!client)
|
|
g_warning ("%s: no client with id %d", G_STRLOC, event_id);
|
|
else
|
|
queue.clients = g_slist_remove (queue.clients, client);
|
|
|
|
g_static_mutex_unlock (&queue_lock);
|
|
|
|
if (client && client->notify)
|
|
client->notify (client->callback_data);
|
|
|
|
g_free (client);
|
|
}
|
|
|
|
|
|
/* called from a thread */
|
|
void
|
|
_moo_event_queue_push (guint event_id,
|
|
gpointer data,
|
|
GDestroyNotify data_destroy)
|
|
{
|
|
char c = 'd';
|
|
EventData *event_data;
|
|
GQueue *events;
|
|
|
|
event_data = g_new (EventData, 1);
|
|
event_data->data = data;
|
|
event_data->destroy = data_destroy;
|
|
event_data->id = event_id;
|
|
|
|
g_static_mutex_lock (&queue_lock);
|
|
|
|
if (!queue.data)
|
|
{
|
|
(void) write (queue.pipe_in, &c, 1);
|
|
queue.data = g_hash_table_new (g_direct_hash, g_direct_equal);
|
|
}
|
|
|
|
events = g_hash_table_lookup (queue.data, GUINT_TO_POINTER (event_id));
|
|
|
|
if (!events)
|
|
{
|
|
events = g_queue_new ();
|
|
g_hash_table_insert (queue.data, GUINT_TO_POINTER (event_id), events);
|
|
}
|
|
|
|
g_queue_push_tail (events, event_data);
|
|
|
|
g_static_mutex_unlock (&queue_lock);
|
|
}
|
|
|
|
|
|
/****************************************************************************/
|
|
/* Messages
|
|
*/
|
|
|
|
static GStaticMutex message_lock = G_STATIC_MUTEX_INIT;
|
|
static volatile int message_event_id = 0;
|
|
static volatile int print_event_id = 0;
|
|
|
|
static void
|
|
message_callback (GList *events, G_GNUC_UNUSED gpointer data)
|
|
{
|
|
while (events)
|
|
{
|
|
gdk_threads_enter ();
|
|
_moo_message ("%s", (char*) events->data);
|
|
gdk_threads_leave ();
|
|
events = events->next;
|
|
}
|
|
}
|
|
|
|
static void
|
|
print_callback (GList *events, G_GNUC_UNUSED gpointer data)
|
|
{
|
|
while (events)
|
|
{
|
|
gdk_threads_enter ();
|
|
g_print ("%s", (char*) events->data);
|
|
gdk_threads_leave ();
|
|
events = events->next;
|
|
}
|
|
}
|
|
|
|
static void
|
|
init_message_queue (void)
|
|
{
|
|
g_static_mutex_lock (&message_lock);
|
|
|
|
if (!message_event_id)
|
|
message_event_id = _moo_event_queue_connect (message_callback, NULL, NULL);
|
|
if (!print_event_id)
|
|
print_event_id = _moo_event_queue_connect (print_callback, NULL, NULL);
|
|
|
|
g_static_mutex_unlock (&message_lock);
|
|
}
|
|
|
|
void
|
|
_moo_print_async (const char *format,
|
|
...)
|
|
{
|
|
char *msg;
|
|
va_list args;
|
|
|
|
va_start (args, format);
|
|
msg = g_strdup_vprintf (format, args);
|
|
va_end (args);
|
|
|
|
if (msg)
|
|
{
|
|
init_message_queue ();
|
|
_moo_event_queue_push (print_event_id, msg, g_free);
|
|
}
|
|
}
|
|
|
|
void
|
|
_moo_message_async (const char *format,
|
|
...)
|
|
{
|
|
char *msg;
|
|
va_list args;
|
|
|
|
va_start (args, format);
|
|
msg = g_strdup_vprintf (format, args);
|
|
va_end (args);
|
|
|
|
if (msg)
|
|
{
|
|
init_message_queue ();
|
|
_moo_event_queue_push (message_event_id, msg, g_free);
|
|
}
|
|
}
|
|
|
|
|
|
struct MooAsyncJob {
|
|
GObject base;
|
|
|
|
MooAsyncJobCallback callback;
|
|
gpointer data;
|
|
GDestroyNotify data_notify;
|
|
|
|
GThread *thread;
|
|
GMutex *mutex;
|
|
guint cancelled : 1;
|
|
};
|
|
|
|
typedef struct {
|
|
GObjectClass base_class;
|
|
} MooAsyncJobClass;
|
|
|
|
MOO_DEFINE_TYPE_STATIC (MooAsyncJob, moo_async_job, G_TYPE_OBJECT)
|
|
|
|
static void
|
|
moo_async_job_dispose (GObject *object)
|
|
{
|
|
MooAsyncJob *job = (MooAsyncJob*) object;
|
|
|
|
if (job->data_notify)
|
|
{
|
|
GDestroyNotify notify = job->data_notify;
|
|
job->data_notify = NULL;
|
|
notify (job->data);
|
|
job->data = NULL;
|
|
}
|
|
|
|
g_assert (!job->thread);
|
|
|
|
if (job->mutex)
|
|
{
|
|
g_mutex_free (job->mutex);
|
|
job->mutex = NULL;
|
|
}
|
|
|
|
G_OBJECT_CLASS (moo_async_job_parent_class)->dispose (object);
|
|
}
|
|
|
|
static void
|
|
moo_async_job_class_init (MooAsyncJobClass *klass)
|
|
{
|
|
G_OBJECT_CLASS (klass)->dispose = moo_async_job_dispose;
|
|
}
|
|
|
|
static void
|
|
moo_async_job_init (MooAsyncJob *job)
|
|
{
|
|
job->callback = NULL;
|
|
job->data = NULL;
|
|
job->data_notify = NULL;
|
|
|
|
job->thread = NULL;
|
|
job->mutex = g_mutex_new ();
|
|
job->cancelled = FALSE;
|
|
}
|
|
|
|
MooAsyncJob *
|
|
moo_async_job_new (MooAsyncJobCallback callback,
|
|
gpointer data,
|
|
GDestroyNotify data_notify)
|
|
{
|
|
MooAsyncJob *job;
|
|
|
|
g_return_val_if_fail (callback != NULL, NULL);
|
|
|
|
job = g_object_new (moo_async_job_get_type (), NULL);
|
|
job->callback = callback;
|
|
job->data = data;
|
|
job->data_notify = data_notify;
|
|
|
|
return job;
|
|
}
|
|
|
|
static gpointer
|
|
moo_async_job_thread_func (MooAsyncJob *job)
|
|
{
|
|
gboolean proceed = TRUE;
|
|
|
|
while (proceed)
|
|
{
|
|
g_mutex_lock (job->mutex);
|
|
|
|
if (job->cancelled)
|
|
{
|
|
_moo_print_async ("%s: job cancelled\n", G_STRFUNC);
|
|
g_mutex_unlock (job->mutex);
|
|
break;
|
|
}
|
|
|
|
g_mutex_unlock (job->mutex);
|
|
|
|
proceed = job->callback (job->data);
|
|
|
|
if (!proceed)
|
|
_moo_print_async ("%s: job finished\n", G_STRFUNC);
|
|
|
|
if (proceed)
|
|
g_usleep (1000);
|
|
}
|
|
|
|
g_mutex_lock (job->mutex);
|
|
|
|
if (job->data_notify)
|
|
{
|
|
GDestroyNotify notify = job->data_notify;
|
|
job->data_notify = NULL;
|
|
notify (job->data);
|
|
}
|
|
|
|
job->thread = NULL;
|
|
|
|
g_mutex_unlock (job->mutex);
|
|
|
|
g_object_unref (job);
|
|
return NULL;
|
|
}
|
|
|
|
void
|
|
moo_async_job_start (MooAsyncJob *job)
|
|
{
|
|
GError *error = NULL;
|
|
|
|
g_return_if_fail (job != NULL);
|
|
g_return_if_fail (job->thread == NULL);
|
|
|
|
g_mutex_lock (job->mutex);
|
|
|
|
job->thread = g_thread_create ((GThreadFunc) moo_async_job_thread_func,
|
|
g_object_ref (job),
|
|
FALSE, &error);
|
|
if (!job->thread)
|
|
{
|
|
g_critical ("%s: could not start thread: %s", G_STRLOC, error->message);
|
|
g_error_free (error);
|
|
goto out;
|
|
}
|
|
|
|
out:
|
|
g_mutex_unlock (job->mutex);
|
|
}
|
|
|
|
void
|
|
moo_async_job_cancel (MooAsyncJob *job)
|
|
{
|
|
g_return_if_fail (job != NULL);
|
|
g_mutex_lock (job->mutex);
|
|
job->cancelled = TRUE;
|
|
g_mutex_unlock (job->mutex);
|
|
}
|
|
|
|
void
|
|
moo_async_job_ref (MooAsyncJob *job)
|
|
{
|
|
g_return_if_fail (job != NULL);
|
|
g_object_ref (job);
|
|
}
|
|
|
|
void
|
|
moo_async_job_unref (MooAsyncJob *job)
|
|
{
|
|
g_return_if_fail (job != NULL);
|
|
g_object_unref (job);
|
|
}
|