#include #include #include #include "sync_group.h" #include "sync_item.h" G_DEFINE_TYPE (SyncGroup, sync_group, G_TYPE_OBJECT); #define GROUP_PRIVATE(o) \ (G_TYPE_INSTANCE_GET_PRIVATE ((o), SYNC_TYPE_GROUP, SyncGroupPrivate)) #define DEBUG typedef struct _SyncGroupPrivate SyncGroupPrivate; enum { PROP_ITEM1 = 1, PROP_ITEM2, PROP_NAMESPACE, }; enum { STARTED, PROGRESS, CONFLICT, FINISHED, FAILED, LAST_SIGNAL }; static guint signals[LAST_SIGNAL] = { 0 }; struct _SyncGroupPrivate { SyncItem *item1; SyncItem *item2; gchar *namespace; OSyncGroup *group; OSyncEngine *engine; OSyncMapping *mapping; /* Required for conflicts */ gint max_changes; /* Used to work out sync progress */ gint changes; guint progress_id; gchar *error; GMutex *mutex; }; static OSyncEnv *default_env = NULL; static void sync_group_finalize (GObject *object) { SyncGroupPrivate *priv = GROUP_PRIVATE (object); /* FIXME: Guard against finalisation during syncs */ if (priv->item1) g_object_unref (priv->item1); if (priv->item2) g_object_unref (priv->item2); g_mutex_free (priv->mutex); G_OBJECT_CLASS (sync_group_parent_class)->finalize (object); } static void sync_group_set_property (GObject *object, guint prop_id, const GValue *value, GParamSpec *pspec) { SyncGroup *group = SYNC_GROUP (object); switch (prop_id) { case PROP_ITEM1 : sync_group_set_item1 (group, g_value_get_object (value)); break; case PROP_ITEM2 : sync_group_set_item2 (group, g_value_get_object (value)); break; case PROP_NAMESPACE : sync_group_set_namespace (group, g_value_get_string (value)); break; default : G_OBJECT_WARN_INVALID_PROPERTY_ID ( object, prop_id, pspec); break; } } static void sync_group_get_property (GObject *object, guint prop_id, GValue *value, GParamSpec *pspec) { SyncGroup *group = SYNC_GROUP (object); switch (prop_id) { case PROP_ITEM1 : g_value_set_object (value, sync_group_get_item1 (group)); break; case PROP_ITEM2 : g_value_set_object (value, sync_group_get_item2 (group)); break; case PROP_NAMESPACE : g_value_set_string (value, sync_group_get_namespace (group)); break; default : G_OBJECT_WARN_INVALID_PROPERTY_ID ( object, prop_id, pspec); break; } } static void sync_group_class_init (SyncGroupClass *klass) { GObjectClass *object_class = G_OBJECT_CLASS (klass); object_class->set_property = sync_group_set_property; object_class->get_property = sync_group_get_property; g_object_class_install_property ( object_class, PROP_ITEM1, g_param_spec_object ( "item1", ("Item1"), ("The first item in the synchronisation pair"), SYNC_TYPE_ITEM, G_PARAM_READWRITE)); g_object_class_install_property ( object_class, PROP_ITEM2, g_param_spec_object ( "item2", ("Item2"), ("The second item in the synchronisation pair"), SYNC_TYPE_ITEM, G_PARAM_READWRITE)); g_object_class_install_property ( object_class, PROP_NAMESPACE, g_param_spec_string ( "namespace", ("Name-space"), ("Name-space of the synchronisation group."), ("libsync"), G_PARAM_READWRITE)); signals[STARTED] = g_signal_new ("started", G_OBJECT_CLASS_TYPE (object_class), G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (SyncGroupClass, started), NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0); signals[PROGRESS] = g_signal_new ("progress", G_OBJECT_CLASS_TYPE (object_class), G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (SyncGroupClass, progress), NULL, NULL, g_cclosure_marshal_VOID__DOUBLE, G_TYPE_NONE, 1, G_TYPE_DOUBLE); signals[CONFLICT] = g_signal_new ("conflict", G_OBJECT_CLASS_TYPE (object_class), G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (SyncGroupClass, conflict), NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0); signals[FINISHED] = g_signal_new ("finished", G_OBJECT_CLASS_TYPE (object_class), G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (SyncGroupClass, finished), NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0); signals[FAILED] = g_signal_new ("failed", G_OBJECT_CLASS_TYPE (object_class), G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (SyncGroupClass, failed), NULL, NULL, g_cclosure_marshal_VOID__STRING, G_TYPE_NONE, 1, G_TYPE_STRING); g_type_class_add_private (klass, sizeof (SyncGroupPrivate)); object_class->finalize = sync_group_finalize; } static void sync_group_init (SyncGroup *self) { SyncGroupPrivate *priv = GROUP_PRIVATE (self); priv->mutex = g_mutex_new (); priv->namespace = g_strdup ("libsync"); } SyncGroup* sync_group_new (void) { return g_object_new (SYNC_TYPE_GROUP, NULL); } SyncGroup * sync_group_new_with_items (SyncItem *item1, SyncItem *item2) { return g_object_new (SYNC_TYPE_GROUP, "item1", item1, "item2", item2, NULL); } void sync_group_set_item1 (SyncGroup *group, SyncItem *item1) { SyncGroupPrivate *priv = GROUP_PRIVATE (group); if (priv->item1) { g_object_unref (priv->item1); priv->item1 = NULL; } if (item1) priv->item1 = g_object_ref (item1); } void sync_group_set_item2 (SyncGroup *group, SyncItem *item2) { SyncGroupPrivate *priv = GROUP_PRIVATE (group); if (priv->item2) { g_object_unref (priv->item2); priv->item2 = NULL; } if (item2) priv->item2 = g_object_ref (item2); } void sync_group_set_namespace (SyncGroup *group, const gchar *namespace) { SyncGroupPrivate *priv = GROUP_PRIVATE (group); g_free (priv->namespace); priv->namespace = g_strdup (namespace); } SyncItem * sync_group_get_item1 (SyncGroup *group) { SyncGroupPrivate *priv = GROUP_PRIVATE (group); return priv->item1; } SyncItem * sync_group_get_item2 (SyncGroup *group) { SyncGroupPrivate *priv = GROUP_PRIVATE (group); return priv->item2; } const gchar * sync_group_get_namespace (SyncGroup *group) { SyncGroupPrivate *priv = GROUP_PRIVATE (group); return priv->namespace; } static OSyncEnv * sync_group_get_default_env () { if (!default_env) { OSyncError *error = NULL; default_env = osync_env_new (); g_debug ("Creating new environment"); if (!osync_env_initialize (default_env, &error)) { g_error ("Unable to initialise OSyncEnv: %s", osync_error_print (&error)); osync_env_free (default_env); default_env = NULL; } } return default_env; } gchar *sync_group_get_name (SyncGroup *group) { SyncGroupPrivate *priv = GROUP_PRIVATE (group); if ((!priv->item1) || (!priv->item2)) { g_warning ("%s called on group without two items", G_STRFUNC); return NULL; } return g_strdup_printf ("%s-%s-%s", sync_group_get_namespace (group), sync_item_get_name (priv->item1), sync_item_get_name (priv->item2)); } gboolean sync_group_delete (SyncGroup *group) { OSyncEnv *env; OSyncGroup *os_group; OSyncError *error = NULL; gchar *name; env = sync_group_get_default_env (); name = sync_group_get_name (group); os_group = osync_env_find_group (env, name); g_free (name); if (!os_group) return TRUE; if (!osync_group_delete (os_group, &error)) { g_warning ("Failed to delete group: %s", osync_error_print (&error)); osync_error_free (&error); return FALSE; } return TRUE; } gboolean sync_group_save (SyncGroup *group) { OSyncError *error = NULL; OSyncGroup *os_group = sync_group_get_osync_group (group); if (!os_group) { g_warning ("Failed to retrieve OSyncGroup in %s", G_STRFUNC); return FALSE; } if (!osync_group_save (os_group, &error)) { g_warning ("Unable to save OSyncGroup: %s", osync_error_print (&error)); osync_error_free (&error); return FALSE; } return TRUE; } OSyncGroup * sync_group_get_osync_group (SyncGroup *group) { gchar *name; OSyncEnv *env; SyncGroupPrivate *priv = GROUP_PRIVATE (group); if ((!priv->item1) || (!priv->item2)) { g_warning ("%s called on group without two items", G_STRFUNC); return NULL; } if (priv->group) return priv->group; env = sync_group_get_default_env (); name = sync_group_get_name (group); priv->group = osync_env_find_group (env, name); if (!priv->group) { priv->group = osync_group_new (env); g_debug ("Creating new group"); osync_group_set_name (priv->group, name); /* Create OSyncMembers (adds them to group) */ sync_item_create_member (priv->item1, group); sync_item_create_member (priv->item2, group); } g_free (name); return priv->group; } static gboolean sync_group_started_idle (SyncGroup *group) { g_signal_emit (group, signals[STARTED], 0); return FALSE; } static gboolean sync_group_progress_idle (SyncGroup *group) { SyncGroupPrivate *priv = GROUP_PRIVATE (group); g_mutex_lock (priv->mutex); g_signal_emit (group, signals[PROGRESS], 0, priv->changes / (gdouble)priv->max_changes); priv->progress_id = 0; g_mutex_unlock (priv->mutex); return FALSE; } static gboolean sync_group_conflict_idle (SyncGroup *group) { g_signal_emit (group, signals[CONFLICT], 0); return FALSE; } static void sync_group_free_engine (SyncGroup *group) { SyncGroupPrivate *priv = GROUP_PRIVATE (group); osengine_finalize (priv->engine); osengine_free (priv->engine); priv->engine = NULL; } static gboolean sync_group_finished_idle (SyncGroup *group) { sync_group_free_engine (group); g_signal_emit (group, signals[FINISHED], 0); return FALSE; } static gboolean sync_group_failed_idle (SyncGroup *group) { SyncGroupPrivate *priv = GROUP_PRIVATE (group); sync_group_free_engine (group); g_signal_emit (group, signals[FAILED], 0, priv->error ? priv->error : ""); g_free (priv->error); priv->error = NULL; return FALSE; } static const char * OSyncChangeType2String (OSyncChangeType c) { switch (c) { case CHANGE_ADDED: return "ADDED"; case CHANGE_UNMODIFIED: return "UNMODIFIED"; case CHANGE_DELETED: return "DELETED"; case CHANGE_MODIFIED: return "MODIFIED"; default: case CHANGE_UNKNOWN: return "?"; } } static void sync_append_error (SyncGroup *group, gchar *text) { SyncGroupPrivate *priv = GROUP_PRIVATE (group); if (!priv->error) { priv->error = text; } else { gchar *new_string = g_strconcat (priv->error, "\n", text); g_free (text); g_free (priv->error); priv->error = new_string; } } static void sync_conflict_cb (OSyncEngine *engine, OSyncMapping *mapping, void *user_data) { SyncGroup *group = (SyncGroup *)user_data; SyncGroupPrivate *priv = GROUP_PRIVATE (group); priv->mapping = mapping; g_idle_add ((GSourceFunc)sync_group_conflict_idle, group); } static void sync_changestatus_cb (OSyncEngine *engine, OSyncChangeUpdate *status, void *user_data) { SyncGroup *group = (SyncGroup *)user_data; SyncGroupPrivate *priv = GROUP_PRIVATE (group); switch (status->type) { case CHANGE_RECEIVED: /* Shouldn't need to lock here, but just in case */ g_mutex_lock (priv->mutex); priv->max_changes ++; g_mutex_unlock (priv->mutex); #ifdef DEBUG g_debug ("Received a entry %s with data of size %i from " "member %i. Changetype %s", osync_change_get_uid (status->change), osync_change_get_datasize (status->change), status->member_id, OSyncChangeType2String ( osync_change_get_changetype ( status->change))); #endif break; case CHANGE_SENT: /* Update progress and add idle func to send progress * signal in main thread. */ g_mutex_lock (priv->mutex); if (priv->changes == 0) g_idle_add ((GSourceFunc) sync_group_started_idle, group); priv->changes ++; if (priv->progress_id == 0) priv->progress_id = g_idle_add ((GSourceFunc) sync_group_progress_idle, group); g_mutex_unlock (priv->mutex); #ifdef DEBUG g_debug ("Sent a entry %s of size %i to member %i. " "Changetype %s", osync_change_get_uid (status->change), osync_change_get_datasize (status->change), status->member_id, OSyncChangeType2String ( osync_change_get_changetype ( status->change))); #endif break; #ifdef DEBUG case CHANGE_RECEIVED_INFO: g_debug ("Received a entry %s without data from member " "%i. Changetype %s", osync_change_get_uid (status->change), status->member_id, OSyncChangeType2String ( osync_change_get_changetype ( status->change))); break; case CHANGE_WRITE_ERROR: g_debug ("Error writing entry %s to member %i: %s", osync_change_get_uid (status->change), status->member_id, osync_error_print (&(status->error))); break; case CHANGE_RECV_ERROR: g_debug ("Error reading entry %s from member %i: %s", osync_change_get_uid (status->change), status->member_id, osync_error_print (&(status->error))); break; #endif } } #ifdef DEBUG static void sync_mappingstatus_cb (OSyncMappingUpdate *status, void *user_data) { /* SyncGroup *group = (SyncGroup *)user_data; SyncGroupPrivate *priv = GROUP_PRIVATE (group);*/ switch (status->type) { case MAPPING_SOLVED: g_debug ("Mapping solved"); break; case MAPPING_SYNCED: g_debug ("Mapping Synced"); break; case MAPPING_WRITE_ERROR: g_debug ("Mapping Write Error: %s", osync_error_print (&(status->error))); break; } } #endif static void sync_enginestatus_cb (OSyncEngine *engine, OSyncEngineUpdate *status, void *user_data) { SyncGroup *group = (SyncGroup *)user_data; switch (status->type) { #ifdef DEBUG case ENG_PREV_UNCLEAN: g_debug ("The previous synchronization was unclean. " "Slow-syncing"); break; case ENG_ENDPHASE_CON: g_debug ("All clients connected or error"); break; case ENG_END_CONFLICTS: g_debug ("All conflicts have been reported"); break; case ENG_ENDPHASE_READ: g_debug ("All clients sent changes or error"); break; case ENG_ENDPHASE_WRITE: g_debug ("All clients have written"); break; case ENG_ENDPHASE_DISCON: g_debug ("All clients have disconnected"); break; #endif case ENG_SYNC_SUCCESSFULL: g_idle_add ((GSourceFunc) sync_group_finished_idle, group); break; case ENG_ERROR: g_idle_add ((GSourceFunc) sync_group_failed_idle, group); break; } } static void sync_memberstatus_cb (OSyncMemberUpdate *status, void *user_data) { SyncGroup *group = (SyncGroup *)user_data; #ifdef DEBUG switch (status->type) { case MEMBER_CONNECTED: g_debug ("Member %lli of type %s just connected", osync_member_get_id (status->member), osync_member_get_pluginname (status->member)); break; case MEMBER_DISCONNECTED: g_debug ("Member %lli of type %s just disconnected", osync_member_get_id (status->member), osync_member_get_pluginname (status->member)); break; case MEMBER_SENT_CHANGES: g_debug ("Member %lli of type %s just sent all changes", osync_member_get_id (status->member), osync_member_get_pluginname (status->member)); break; case MEMBER_COMMITTED_ALL: g_debug ( "Member %lli of type %s committed all changes.", osync_member_get_id (status->member), osync_member_get_pluginname (status->member)); break; #endif case MEMBER_CONNECT_ERROR: sync_append_error (group, g_strdup_printf ( "Member %lli of type %s had an " "error while connecting: %s", osync_member_get_id (status->member), osync_member_get_pluginname (status->member), osync_error_print (&(status->error)))); break; case MEMBER_GET_CHANGES_ERROR: sync_append_error (group, g_strdup_printf ( "Member %lli of type %s had an error while " "getting changes: %s", osync_member_get_id (status->member), osync_member_get_pluginname (status->member), osync_error_print (&(status->error)))); break; case MEMBER_SYNC_DONE_ERROR: sync_append_error (group, g_strdup_printf ( "Member %lli of type %s had an error while " "calling sync done: %s", osync_member_get_id (status->member), osync_member_get_pluginname (status->member), osync_error_print (&(status->error)))); break; case MEMBER_DISCONNECT_ERROR: sync_append_error (group, g_strdup_printf ( "Member %lli of type %s had an error while " "disconnecting: %s", osync_member_get_id (status->member), osync_member_get_pluginname (status->member), osync_error_print (&(status->error)))); break; case MEMBER_COMMITTED_ALL_ERROR: sync_append_error (group, g_strdup_printf ( "Member %lli of type %s had an error while " "commiting changes: %s", osync_member_get_id (status->member), osync_member_get_pluginname (status->member), osync_error_print (&(status->error)))); break; } } gboolean sync_group_start (SyncGroup *group) { OSyncError *error = NULL; OSyncGroup *os_group; SyncGroupPrivate *priv = GROUP_PRIVATE (group); if (!(os_group = sync_group_get_osync_group (group))) { g_warning ("Failed to create OSyncGroup"); return FALSE; } /* Save the group (NOTE: Won't be necessary in the future?) */ if (!sync_group_save (group)) { g_warning ("Save failed, not starting sync"); return FALSE; } if (!(priv->engine = osengine_new (os_group, &error))) { g_warning ("Error while creating syncengine: %s", osync_error_print (&error)); osync_error_free (&error); return FALSE; } osengine_set_conflict_callback ( priv->engine, sync_conflict_cb, group); osengine_set_changestatus_callback ( priv->engine, sync_changestatus_cb, group); #ifdef DEBUG osengine_set_mappingstatus_callback ( priv->engine, sync_mappingstatus_cb, group); osengine_set_memberstatus_callback ( priv->engine, sync_memberstatus_cb, group); #endif osengine_set_enginestatus_callback ( priv->engine, sync_enginestatus_cb, group); if (!osengine_init (priv->engine, &error)) { g_warning ("Error while initializing syncengine: %s", osync_error_print (&error)); osync_error_free (&error); /* sync_group_free_engine (group);*/ return FALSE; } priv->changes = 0; priv->max_changes = 0; if (!osengine_synchronize (priv->engine, &error)) { g_warning ("Error while starting synchronization: %s", osync_error_print (&error)); osync_error_free (&error); sync_group_free_engine (group); return FALSE; } return TRUE; } void sync_group_resolve_conflict (SyncGroup *group, SyncGroupConflictRes res) { SyncGroupPrivate *priv = GROUP_PRIVATE (group); if ((!priv->mapping) || (!priv->engine)) { g_warning ("%s called with no conflict to resolve", G_STRFUNC); return; } switch (res) { case ABORT: sync_group_abort (group); break; case USE_ITEM1: case USE_ITEM2: case IGNORE: osengine_mapping_ignore_conflict ( priv->engine, priv->mapping); break; default: g_warning ("Invalid conflict resolution code"); return; } priv->mapping = NULL; } void sync_group_abort (SyncGroup *group) { SyncGroupPrivate *priv = GROUP_PRIVATE (group); if (priv->engine) osengine_abort (priv->engine); else g_warning ("%s called, but no sync in progress", G_STRFUNC); } void sync_group_remove_item (SyncGroup *group, SyncItem *item) { SyncGroupPrivate *priv = GROUP_PRIVATE (group); if (priv->item1 == item) { sync_group_set_item1 (group, NULL); } else if (priv->item2 == item) { sync_group_set_item2 (group, NULL); } else { g_warning ("%s called on group not containing item", G_STRFUNC); } }