diff options
Diffstat (limited to 'sync/src/sync_group.c')
-rw-r--r-- | sync/src/sync_group.c | 828 |
1 files changed, 828 insertions, 0 deletions
diff --git a/sync/src/sync_group.c b/sync/src/sync_group.c new file mode 100644 index 0000000..f9aa6f6 --- /dev/null +++ b/sync/src/sync_group.c @@ -0,0 +1,828 @@ + +#include <glib.h> +#include <opensync/opensync.h> +#include <osengine/engine.h> +#include "sync_group.h" +#include "sync_item.h" + +G_DEFINE_TYPE (SyncGroup, sync_group, G_TYPE_OBJECT); + +#define GROUP_PRIVATE(o) \ + (G_TYPE_INSTANCE_GET_PRIVATE ((o), SYNC_TYPE_GROUP, SyncGroupPrivate)) + +/*#define DEBUG*/ + +typedef struct _SyncGroupPrivate SyncGroupPrivate; + +enum { + PROP_ITEM1 = 1, + PROP_ITEM2, + PROP_NAMESPACE, +}; + +enum { + STARTED, + PROGRESS, + CONFLICT, + FINISHED, + FAILED, + LAST_SIGNAL +}; + +static guint signals[LAST_SIGNAL] = { 0 }; + +struct _SyncGroupPrivate +{ + SyncItem *item1; + SyncItem *item2; + gchar *namespace; + OSyncGroup *group; + OSyncEngine *engine; + OSyncMapping *mapping; /* Required for conflicts */ + + gint max_changes; /* Used to work out sync progress */ + gint changes; + guint progress_id; + GMutex *mutex; +}; + +static OSyncEnv *default_env = NULL; + +static void +sync_group_finalize (GObject *object) +{ + SyncGroupPrivate *priv = GROUP_PRIVATE (object); + + /* FIXME: Guard against finalisation during syncs */ + if (priv->item1) g_object_unref (priv->item1); + if (priv->item2) g_object_unref (priv->item2); + g_mutex_free (priv->mutex); + + G_OBJECT_CLASS (sync_group_parent_class)->finalize (object); +} + +static void +sync_group_set_property (GObject *object, + guint prop_id, + const GValue *value, + GParamSpec *pspec) +{ + SyncGroup *group = SYNC_GROUP (object); + + switch (prop_id) { + case PROP_ITEM1 : + sync_group_set_item1 (group, + g_value_get_object (value)); + break; + case PROP_ITEM2 : + sync_group_set_item2 (group, + g_value_get_object (value)); + break; + case PROP_NAMESPACE : + sync_group_set_namespace (group, + g_value_get_string (value)); + break; + default : + G_OBJECT_WARN_INVALID_PROPERTY_ID ( + object, prop_id, pspec); + break; + } +} + +static void +sync_group_get_property (GObject *object, + guint prop_id, + GValue *value, + GParamSpec *pspec) +{ + SyncGroup *group = SYNC_GROUP (object); + + switch (prop_id) { + case PROP_ITEM1 : + g_value_set_object (value, + sync_group_get_item1 (group)); + break; + case PROP_ITEM2 : + g_value_set_object (value, + sync_group_get_item2 (group)); + break; + case PROP_NAMESPACE : + g_value_set_string (value, + sync_group_get_namespace (group)); + break; + default : + G_OBJECT_WARN_INVALID_PROPERTY_ID ( + object, prop_id, pspec); + break; + } +} + +static void +sync_group_class_init (SyncGroupClass *klass) +{ + GObjectClass *object_class = G_OBJECT_CLASS (klass); + + object_class->set_property = sync_group_set_property; + object_class->get_property = sync_group_get_property; + + g_object_class_install_property ( + object_class, + PROP_ITEM1, + g_param_spec_object ( + "item1", + ("Item1"), + ("The first item in the synchronisation pair"), + SYNC_TYPE_ITEM, + G_PARAM_READWRITE)); + g_object_class_install_property ( + object_class, + PROP_ITEM2, + g_param_spec_object ( + "item2", + ("Item2"), + ("The second item in the synchronisation pair"), + SYNC_TYPE_ITEM, + G_PARAM_READWRITE)); + g_object_class_install_property ( + object_class, + PROP_NAMESPACE, + g_param_spec_string ( + "namespace", + ("Name-space"), + ("Name-space of the synchronisation group."), + ("libsync"), + G_PARAM_READWRITE)); + + signals[STARTED] = + g_signal_new ("started", + G_OBJECT_CLASS_TYPE (object_class), + G_SIGNAL_RUN_LAST, + G_STRUCT_OFFSET (SyncGroupClass, started), + NULL, NULL, + g_cclosure_marshal_VOID__VOID, + G_TYPE_NONE, 0); + signals[PROGRESS] = + g_signal_new ("progress", + G_OBJECT_CLASS_TYPE (object_class), + G_SIGNAL_RUN_LAST, + G_STRUCT_OFFSET (SyncGroupClass, progress), + NULL, NULL, + g_cclosure_marshal_VOID__DOUBLE, + G_TYPE_NONE, 1, G_TYPE_DOUBLE); + signals[CONFLICT] = + g_signal_new ("conflict", + G_OBJECT_CLASS_TYPE (object_class), + G_SIGNAL_RUN_LAST, + G_STRUCT_OFFSET (SyncGroupClass, conflict), + NULL, NULL, + g_cclosure_marshal_VOID__VOID, + G_TYPE_NONE, 0); + signals[FINISHED] = + g_signal_new ("finished", + G_OBJECT_CLASS_TYPE (object_class), + G_SIGNAL_RUN_LAST, + G_STRUCT_OFFSET (SyncGroupClass, finished), + NULL, NULL, + g_cclosure_marshal_VOID__VOID, + G_TYPE_NONE, 0); + signals[FAILED] = + g_signal_new ("failed", + G_OBJECT_CLASS_TYPE (object_class), + G_SIGNAL_RUN_LAST, + G_STRUCT_OFFSET (SyncGroupClass, failed), + NULL, NULL, + g_cclosure_marshal_VOID__VOID, + G_TYPE_NONE, 0); + + g_type_class_add_private (klass, sizeof (SyncGroupPrivate)); + + object_class->finalize = sync_group_finalize; +} + +static void +sync_group_init (SyncGroup *self) +{ + SyncGroupPrivate *priv = GROUP_PRIVATE (self); + + priv->mutex = g_mutex_new (); + priv->namespace = g_strdup ("libsync"); +} + +SyncGroup* +sync_group_new (void) +{ + return g_object_new (SYNC_TYPE_GROUP, NULL); +} + +SyncGroup * +sync_group_new_with_items (SyncItem *item1, SyncItem *item2) +{ + return g_object_new (SYNC_TYPE_GROUP, "item1", item1, "item2", item2, + NULL); +} + +void +sync_group_set_item1 (SyncGroup *group, SyncItem *item1) +{ + SyncGroupPrivate *priv = GROUP_PRIVATE (group); + + if (priv->item1) + sync_item_ungroup (priv->item1); + if (item1) { + priv->item1 = g_object_ref_sink (item1); + sync_item_set_group (item1, group); + } +} + +void +sync_group_set_item2 (SyncGroup *group, SyncItem *item2) +{ + SyncGroupPrivate *priv = GROUP_PRIVATE (group); + + if (priv->item2) + sync_item_ungroup (priv->item2); + if (item2) { + priv->item2 = g_object_ref_sink (item2); + sync_item_set_group (item2, group); + } +} + +void sync_group_set_namespace (SyncGroup *group, const gchar *namespace) +{ + SyncGroupPrivate *priv = GROUP_PRIVATE (group); + g_free (priv->namespace); + priv->namespace = g_strdup (namespace); +} + +SyncItem * +sync_group_get_item1 (SyncGroup *group) +{ + SyncGroupPrivate *priv = GROUP_PRIVATE (group); + + return priv->item1; +} + +SyncItem * +sync_group_get_item2 (SyncGroup *group) +{ + SyncGroupPrivate *priv = GROUP_PRIVATE (group); + + return priv->item2; +} + +const gchar * +sync_group_get_namespace (SyncGroup *group) +{ + SyncGroupPrivate *priv = GROUP_PRIVATE (group); + + return priv->namespace; +} + +static OSyncEnv * +sync_group_get_default_env () +{ + if (!default_env) { + OSyncError *error = NULL; + default_env = osync_env_new (); + g_debug ("Creating new environment"); + if (!osync_env_initialize (default_env, &error)) { + g_error ("Unable to initialise OSyncEnv: %s", + osync_error_print (&error)); + osync_env_free (default_env); + default_env = NULL; + } + } + + return default_env; +} + +gchar *sync_group_get_name (SyncGroup *group) +{ + SyncGroupPrivate *priv = GROUP_PRIVATE (group); + + if ((!priv->item1) || (!priv->item2)) { + g_warning ("%s called on group without two items", G_STRFUNC); + return NULL; + } + + return g_strdup_printf ("%s-%s-%s", + sync_group_get_namespace (group), + sync_item_get_name (priv->item1), + sync_item_get_name (priv->item2)); +} + +gboolean +sync_group_delete (SyncGroup *group) +{ + SyncGroupPrivate *priv = GROUP_PRIVATE (group); + OSyncEnv *env; + OSyncGroup *os_group; + OSyncError *error = NULL; + gchar *name; + SyncItem *item1 = NULL; + SyncItem *item2 = NULL; + + if (priv->item1) { + item1 = g_object_ref (priv->item1); + sync_item_ungroup (priv->item1); + } + if (priv->item2) { + item2 = g_object_ref (priv->item2); + sync_item_ungroup (priv->item2); + } + + env = sync_group_get_default_env (); + name = sync_group_get_name (group); + os_group = osync_env_find_group (env, name); + g_free (name); + if (!os_group) return TRUE; + + if (!osync_group_delete (os_group, &error)) { + g_warning ("Failed to delete group: %s", + osync_error_print (&error)); + osync_error_free (&error); + return FALSE; + } + + if (item1) { + sync_group_set_item1 (group, item1); + g_object_unref (item1); + } + if (item2) { + sync_group_set_item2 (group, item2); + g_object_unref (item2); + } + + return TRUE; +} + +gboolean +sync_group_save (SyncGroup *group) +{ + OSyncError *error = NULL; + OSyncGroup *os_group = sync_group_get_osync_group (group); + + if (!os_group) { + g_warning ("Failed to retrieve OSyncGroup in %s", G_STRFUNC); + return FALSE; + } + + if (!osync_group_save (os_group, &error)) { + g_warning ("Unable to save OSyncGroup: %s", + osync_error_print (&error)); + osync_error_free (&error); + return FALSE; + } + + return TRUE; +} + +OSyncGroup * +sync_group_get_osync_group (SyncGroup *group) +{ + gchar *name; + OSyncEnv *env; + SyncGroupPrivate *priv = GROUP_PRIVATE (group); + + if ((!priv->item1) || (!priv->item2)) { + g_warning ("%s called on group without two items", G_STRFUNC); + return NULL; + } + + if (priv->group) + return priv->group; + + env = sync_group_get_default_env (); + name = sync_group_get_name (group); + priv->group = osync_env_find_group (env, name); + + if (!priv->group) { + priv->group = osync_group_new (env); + g_debug ("Creating new group"); + osync_group_set_name (priv->group, name); + + /* Create OSyncMembers (adds them to group) */ + sync_item_get_member (priv->item1); + sync_item_get_member (priv->item2); + } + g_free (name); + + return priv->group; +} + +static gboolean +sync_group_started_idle (SyncGroup *group) +{ + g_signal_emit (group, signals[STARTED], 0); + + return FALSE; +} + +static gboolean +sync_group_progress_idle (SyncGroup *group) +{ + SyncGroupPrivate *priv = GROUP_PRIVATE (group); + + g_mutex_lock (priv->mutex); + g_signal_emit (group, signals[PROGRESS], 0, priv->changes / + (gdouble)priv->max_changes); + priv->progress_id = 0; + g_mutex_unlock (priv->mutex); + + return FALSE; +} + +static gboolean +sync_group_conflict_idle (SyncGroup *group) +{ + g_signal_emit (group, signals[CONFLICT], 0); + + return FALSE; +} + +static void +sync_group_free_engine (SyncGroup *group) +{ + SyncGroupPrivate *priv = GROUP_PRIVATE (group); + + osengine_finalize (priv->engine); + osengine_free (priv->engine); + priv->engine = NULL; +} + +static gboolean +sync_group_finished_idle (SyncGroup *group) +{ + sync_group_free_engine (group); + g_signal_emit (group, signals[FINISHED], 0); + + return FALSE; +} + +static gboolean +sync_group_failed_idle (SyncGroup *group) +{ + sync_group_free_engine (group); + g_signal_emit (group, signals[FAILED], 0); + + return FALSE; +} + +static const char * +OSyncChangeType2String (OSyncChangeType c) +{ + switch (c) { + case CHANGE_ADDED: return "ADDED"; + case CHANGE_UNMODIFIED: return "UNMODIFIED"; + case CHANGE_DELETED: return "DELETED"; + case CHANGE_MODIFIED: return "MODIFIED"; + default: + case CHANGE_UNKNOWN: return "?"; + } +} + +static void +sync_conflict_cb (OSyncEngine *engine, OSyncMapping *mapping, + void *user_data) +{ + SyncGroup *group = (SyncGroup *)user_data; + SyncGroupPrivate *priv = GROUP_PRIVATE (group); + + priv->mapping = mapping; + g_idle_add ((GSourceFunc)sync_group_conflict_idle, group); +} + +static void +sync_changestatus_cb (OSyncEngine *engine, OSyncChangeUpdate *status, + void *user_data) +{ + SyncGroup *group = (SyncGroup *)user_data; + SyncGroupPrivate *priv = GROUP_PRIVATE (group); + + switch (status->type) { + case CHANGE_RECEIVED: + /* Shouldn't need to lock here, but just in case */ + g_mutex_lock (priv->mutex); + priv->max_changes ++; + g_mutex_unlock (priv->mutex); +#ifdef DEBUG + g_debug ("Received a entry %s with data of size %i from " + "member %i. Changetype %s", + osync_change_get_uid (status->change), + osync_change_get_datasize (status->change), + status->member_id, + OSyncChangeType2String ( + osync_change_get_changetype ( + status->change))); +#endif + break; + case CHANGE_SENT: + /* Update progress and add idle func to send progress + * signal in main thread. + */ + g_mutex_lock (priv->mutex); + if (priv->changes == 0) + g_idle_add ((GSourceFunc) + sync_group_started_idle, group); + priv->changes ++; + if (priv->progress_id == 0) + priv->progress_id = g_idle_add ((GSourceFunc) + sync_group_progress_idle, group); + g_mutex_unlock (priv->mutex); +#ifdef DEBUG + g_debug ("Sent a entry %s of size %i to member %i. " + "Changetype %s", + osync_change_get_uid (status->change), + osync_change_get_datasize (status->change), + status->member_id, + OSyncChangeType2String ( + osync_change_get_changetype ( + status->change))); +#endif + break; +#ifdef DEBUG + case CHANGE_RECEIVED_INFO: + g_debug ("Received a entry %s without data from member " + "%i. Changetype %s", + osync_change_get_uid (status->change), + status->member_id, + OSyncChangeType2String ( + osync_change_get_changetype ( + status->change))); + break; + case CHANGE_WRITE_ERROR: + g_debug ("Error writing entry %s to member %i: %s", + osync_change_get_uid (status->change), + status->member_id, + osync_error_print (&(status->error))); + break; + case CHANGE_RECV_ERROR: + g_debug ("Error reading entry %s from member %i: %s", + osync_change_get_uid (status->change), + status->member_id, + osync_error_print (&(status->error))); + break; +#endif + } +} +#ifdef DEBUG +static void +sync_mappingstatus_cb (OSyncMappingUpdate *status, void *user_data) +{ +/* SyncGroup *group = (SyncGroup *)user_data; + SyncGroupPrivate *priv = GROUP_PRIVATE (group);*/ + + switch (status->type) { + case MAPPING_SOLVED: + g_debug ("Mapping solved"); + break; + case MAPPING_SYNCED: + g_debug ("Mapping Synced"); + break; + case MAPPING_WRITE_ERROR: + g_debug ("Mapping Write Error: %s", + osync_error_print (&(status->error))); + break; + } +} +#endif +static void +sync_enginestatus_cb (OSyncEngine *engine, OSyncEngineUpdate *status, + void *user_data) +{ + SyncGroup *group = (SyncGroup *)user_data; + + switch (status->type) { +#ifdef DEBUG + case ENG_PREV_UNCLEAN: + g_debug ("The previous synchronization was unclean. " + "Slow-syncing"); + break; + case ENG_ENDPHASE_CON: + g_debug ("All clients connected or error"); + break; + case ENG_END_CONFLICTS: + g_debug ("All conflicts have been reported"); + break; + case ENG_ENDPHASE_READ: + g_debug ("All clients sent changes or error"); + break; + case ENG_ENDPHASE_WRITE: + g_debug ("All clients have written"); + break; + case ENG_ENDPHASE_DISCON: + g_debug ("All clients have disconnected"); + break; +#endif + case ENG_SYNC_SUCCESSFULL: + g_idle_add ((GSourceFunc) + sync_group_finished_idle, group); + break; + case ENG_ERROR: + g_idle_add ((GSourceFunc) + sync_group_failed_idle, group); + break; + } +} +#ifdef DEBUG +static void +sync_memberstatus_cb (OSyncMemberUpdate *status, void *user_data) +{ + switch (status->type) { + case MEMBER_CONNECTED: + g_debug ("Member %lli of type %s just connected", + osync_member_get_id (status->member), + osync_member_get_pluginname (status->member)); + break; + case MEMBER_DISCONNECTED: + g_debug ("Member %lli of type %s just disconnected", + osync_member_get_id (status->member), + osync_member_get_pluginname (status->member)); + break; + case MEMBER_SENT_CHANGES: + g_debug ("Member %lli of type %s just sent all changes", + osync_member_get_id (status->member), + osync_member_get_pluginname (status->member)); + break; + case MEMBER_COMMITTED_ALL: + g_debug ( + "Member %lli of type %s committed all changes.", + osync_member_get_id (status->member), + osync_member_get_pluginname (status->member)); + break; + case MEMBER_CONNECT_ERROR: + g_debug ("Member %lli of type %s had an " + "error while connecting: %s", + osync_member_get_id (status->member), + osync_member_get_pluginname (status->member), + osync_error_print (&(status->error))); + break; + case MEMBER_GET_CHANGES_ERROR: + g_debug ("Member %lli of type %s had an error while " + "getting changes: %s", + osync_member_get_id (status->member), + osync_member_get_pluginname (status->member), + osync_error_print (&(status->error))); + break; + case MEMBER_SYNC_DONE_ERROR: + g_debug ("Member %lli of type %s had an error while " + "calling sync done: %s", + osync_member_get_id (status->member), + osync_member_get_pluginname (status->member), + osync_error_print (&(status->error))); + break; + case MEMBER_DISCONNECT_ERROR: + g_debug ("Member %lli of type %s had an error while " + "disconnecting: %s", + osync_member_get_id (status->member), + osync_member_get_pluginname (status->member), + osync_error_print (&(status->error))); + break; + case MEMBER_COMMITTED_ALL_ERROR: + g_debug ("Member %lli of type %s had an error while " + "commiting changes: %s", + osync_member_get_id (status->member), + osync_member_get_pluginname (status->member), + osync_error_print (&(status->error))); + break; + } +} +#endif +gboolean +sync_group_start (SyncGroup *group) +{ + OSyncError *error = NULL; + OSyncGroup *os_group; + SyncGroupPrivate *priv = GROUP_PRIVATE (group); + + if (!(os_group = sync_group_get_osync_group (group))) { + g_warning ("Failed to create OSyncGroup"); + return FALSE; + } + + /* Save the group (NOTE: Won't be necessary in the future?) */ + if (!sync_group_save (group)) { + g_warning ("Save failed, not starting sync"); + return FALSE; + } + + if (!(priv->engine = osengine_new (os_group, &error))) { + g_warning ("Error while creating syncengine: %s", + osync_error_print (&error)); + osync_error_free (&error); + return FALSE; + } + + osengine_set_conflict_callback ( + priv->engine, sync_conflict_cb, group); + osengine_set_changestatus_callback ( + priv->engine, sync_changestatus_cb, group); +#ifdef DEBUG + osengine_set_mappingstatus_callback ( + priv->engine, sync_mappingstatus_cb, group); + osengine_set_memberstatus_callback ( + priv->engine, sync_memberstatus_cb, group); +#endif + osengine_set_enginestatus_callback ( + priv->engine, sync_enginestatus_cb, group); + + if (!osengine_init (priv->engine, &error)) { + g_warning ("Error while initializing syncengine: %s", + osync_error_print (&error)); + osync_error_free (&error); + sync_group_free_engine (group); + return FALSE; + } + + priv->changes = 0; + priv->max_changes = 0; + + if (!osengine_synchronize (priv->engine, &error)) { + g_warning ("Error while starting synchronization: %s", + osync_error_print (&error)); + osync_error_free (&error); + sync_group_free_engine (group); + return FALSE; + } + + return TRUE; +} + +void +sync_group_resolve_conflict (SyncGroup *group, SyncGroupConflictRes res) +{ + SyncGroupPrivate *priv = GROUP_PRIVATE (group); + + if ((!priv->mapping) || (!priv->engine)) { + g_warning ("%s called with no conflict to resolve", G_STRFUNC); + return; + } + + switch (res) { + case USE_ITEM1: + case USE_ITEM2: + case ABORT: + case IGNORE: + default: + osengine_mapping_ignore_conflict ( + priv->engine, priv->mapping); + } + + priv->mapping = NULL; +} + +void +sync_group_abort (SyncGroup *group) +{ + SyncGroupPrivate *priv = GROUP_PRIVATE (group); + if (priv->engine) osengine_abort (priv->engine); + else g_warning ("%s called, but no sync in progress", G_STRFUNC); +} + +static void +sync_group_regroup (SyncGroup *group, gboolean item1, gboolean item2) +{ + SyncGroupPrivate *priv = GROUP_PRIVATE (group); + + /* Regroup items so their members can be recreated */ + if (priv->group) { + SyncItem *item; + + /* Freeing groups with OpenSync removes them from the env */ + priv->group = NULL; + + if (item1) { + item = g_object_ref (priv->item1); + sync_item_ungroup (priv->item1); + sync_group_set_item1 (group, item); + g_object_unref (item); + } + if (item2) { + item = g_object_ref (priv->item2); + sync_item_ungroup (priv->item2); + sync_group_set_item2 (group, item); + g_object_unref (item); + } + } +} + +void sync_group_remove_item (SyncGroup *group, + SyncItem *item) +{ + SyncGroupPrivate *priv = GROUP_PRIVATE (group); + gboolean item1; + + if (priv->item1 == item) { + sync_group_regroup (group, FALSE, TRUE); + priv->item1 = NULL; + g_object_unref (item); + item1 = TRUE; + } else if (priv->item2 == item) { + sync_group_regroup (group, TRUE, FALSE); + priv->item2 = NULL; + g_object_unref (item); + } else { + g_warning ("%s called on group not containing item", G_STRFUNC); + return; + } +} |