diff options
Diffstat (limited to 'xlators/features/changelog/lib/src/gf-changelog.c')
| -rw-r--r-- | xlators/features/changelog/lib/src/gf-changelog.c | 859 | 
1 files changed, 426 insertions, 433 deletions
diff --git a/xlators/features/changelog/lib/src/gf-changelog.c b/xlators/features/changelog/lib/src/gf-changelog.c index 8198560e736..c7791c62950 100644 --- a/xlators/features/changelog/lib/src/gf-changelog.c +++ b/xlators/features/changelog/lib/src/gf-changelog.c @@ -45,291 +45,291 @@   */  xlator_t *master = NULL; -static inline -gf_private_t *gf_changelog_alloc_priv () +static inline gf_private_t * +gf_changelog_alloc_priv()  { -        int ret = 0; -        gf_private_t *priv = NULL; - -        priv = GF_CALLOC (1, sizeof (*priv), gf_changelog_mt_priv_t); -        if (!priv) -                goto error_return; -        INIT_LIST_HEAD (&priv->connections); -        INIT_LIST_HEAD (&priv->cleanups); - -        ret = pthread_mutex_init (&priv->lock, NULL); -        if (ret != 0) -                goto free_priv; -        ret = pthread_cond_init (&priv->cond, NULL); -        if (ret != 0) -                goto cleanup_mutex; - -        priv->api = NULL; -        return priv; - - cleanup_mutex: -        (void) pthread_mutex_destroy (&priv->lock); - free_priv: -        GF_FREE (priv); - error_return: -        return NULL; +    int ret = 0; +    gf_private_t *priv = NULL; + +    priv = GF_CALLOC(1, sizeof(*priv), gf_changelog_mt_priv_t); +    if (!priv) +        goto error_return; +    INIT_LIST_HEAD(&priv->connections); +    INIT_LIST_HEAD(&priv->cleanups); + +    ret = pthread_mutex_init(&priv->lock, NULL); +    if (ret != 0) +        goto free_priv; +    ret = pthread_cond_init(&priv->cond, NULL); +    if (ret != 0) +        goto cleanup_mutex; + +    priv->api = NULL; +    return priv; + +cleanup_mutex: +    (void)pthread_mutex_destroy(&priv->lock); +free_priv: +    GF_FREE(priv); +error_return: +    return NULL;  } -#define GF_CHANGELOG_EVENT_POOL_SIZE   16384 +#define GF_CHANGELOG_EVENT_POOL_SIZE 16384  #define GF_CHANGELOG_EVENT_THREAD_COUNT 4  static int -gf_changelog_ctx_defaults_init (glusterfs_ctx_t *ctx) +gf_changelog_ctx_defaults_init(glusterfs_ctx_t *ctx)  { -        cmd_args_t    *cmd_args = NULL; -        struct rlimit  lim = {0, }; -        call_pool_t   *pool = NULL; -        int            ret         = -1; - -        ret = xlator_mem_acct_init (THIS, gf_changelog_mt_end); -        if (ret != 0) -                return -1; +    cmd_args_t *cmd_args = NULL; +    struct rlimit lim = { +        0, +    }; +    call_pool_t *pool = NULL; +    int ret = -1; + +    ret = xlator_mem_acct_init(THIS, gf_changelog_mt_end); +    if (ret != 0) +        return -1; -        ctx->process_uuid = generate_glusterfs_ctx_id (); -        if (!ctx->process_uuid) -                return -1; +    ctx->process_uuid = generate_glusterfs_ctx_id(); +    if (!ctx->process_uuid) +        return -1; -        ctx->page_size  = 128 * GF_UNIT_KB; +    ctx->page_size = 128 * GF_UNIT_KB; -        ctx->iobuf_pool = iobuf_pool_new (); -        if (!ctx->iobuf_pool) -                return -1; +    ctx->iobuf_pool = iobuf_pool_new(); +    if (!ctx->iobuf_pool) +        return -1; -        ctx->event_pool = event_pool_new (GF_CHANGELOG_EVENT_POOL_SIZE, -                                          GF_CHANGELOG_EVENT_THREAD_COUNT); -        if (!ctx->event_pool) -                return -1; +    ctx->event_pool = event_pool_new(GF_CHANGELOG_EVENT_POOL_SIZE, +                                     GF_CHANGELOG_EVENT_THREAD_COUNT); +    if (!ctx->event_pool) +        return -1; -        pool = GF_CALLOC (1, sizeof (call_pool_t), -                          gf_changelog_mt_libgfchangelog_call_pool_t); -        if (!pool) -                return -1; +    pool = GF_CALLOC(1, sizeof(call_pool_t), +                     gf_changelog_mt_libgfchangelog_call_pool_t); +    if (!pool) +        return -1; -        /* frame_mem_pool size 112 * 64 */ -        pool->frame_mem_pool = mem_pool_new (call_frame_t, 32); -        if (!pool->frame_mem_pool) -                return -1; +    /* frame_mem_pool size 112 * 64 */ +    pool->frame_mem_pool = mem_pool_new(call_frame_t, 32); +    if (!pool->frame_mem_pool) +        return -1; -        /* stack_mem_pool size 256 * 128 */ -        pool->stack_mem_pool = mem_pool_new (call_stack_t, 16); +    /* stack_mem_pool size 256 * 128 */ +    pool->stack_mem_pool = mem_pool_new(call_stack_t, 16); -        if (!pool->stack_mem_pool) -                return -1; +    if (!pool->stack_mem_pool) +        return -1; -        ctx->stub_mem_pool = mem_pool_new (call_stub_t, 16); -        if (!ctx->stub_mem_pool) -                return -1; +    ctx->stub_mem_pool = mem_pool_new(call_stub_t, 16); +    if (!ctx->stub_mem_pool) +        return -1; -        ctx->dict_pool = mem_pool_new (dict_t, 32); -        if (!ctx->dict_pool) -                return -1; +    ctx->dict_pool = mem_pool_new(dict_t, 32); +    if (!ctx->dict_pool) +        return -1; -        ctx->dict_pair_pool = mem_pool_new (data_pair_t, 512); -        if (!ctx->dict_pair_pool) -                return -1; +    ctx->dict_pair_pool = mem_pool_new(data_pair_t, 512); +    if (!ctx->dict_pair_pool) +        return -1; -        ctx->dict_data_pool = mem_pool_new (data_t, 512); -        if (!ctx->dict_data_pool) -                return -1; +    ctx->dict_data_pool = mem_pool_new(data_t, 512); +    if (!ctx->dict_data_pool) +        return -1; -        ctx->logbuf_pool = mem_pool_new (log_buf_t, 256); -        if (!ctx->logbuf_pool) -                return -1; +    ctx->logbuf_pool = mem_pool_new(log_buf_t, 256); +    if (!ctx->logbuf_pool) +        return -1; -        INIT_LIST_HEAD (&pool->all_frames); -        LOCK_INIT (&pool->lock); -        ctx->pool = pool; +    INIT_LIST_HEAD(&pool->all_frames); +    LOCK_INIT(&pool->lock); +    ctx->pool = pool; -        LOCK_INIT (&ctx->lock); +    LOCK_INIT(&ctx->lock); -        cmd_args = &ctx->cmd_args; +    cmd_args = &ctx->cmd_args; -        INIT_LIST_HEAD (&cmd_args->xlator_options); +    INIT_LIST_HEAD(&cmd_args->xlator_options); -        lim.rlim_cur = RLIM_INFINITY; -        lim.rlim_max = RLIM_INFINITY; -        setrlimit (RLIMIT_CORE, &lim); +    lim.rlim_cur = RLIM_INFINITY; +    lim.rlim_max = RLIM_INFINITY; +    setrlimit(RLIMIT_CORE, &lim); -        return 0; +    return 0;  }  /* TODO: cleanup ctx defaults */  void -gf_changelog_cleanup_this (xlator_t *this) +gf_changelog_cleanup_this(xlator_t *this)  { -        glusterfs_ctx_t *ctx = NULL; +    glusterfs_ctx_t *ctx = NULL; -        if (!this) -                return; +    if (!this) +        return; -        ctx = this->ctx; -        syncenv_destroy (ctx->env); -        free (ctx); +    ctx = this->ctx; +    syncenv_destroy(ctx->env); +    free(ctx); -        this->private = NULL; -        this->ctx = NULL; +    this->private = NULL; +    this->ctx = NULL; -        mem_pools_fini (); +    mem_pools_fini();  }  static int -gf_changelog_init_context () +gf_changelog_init_context()  { -        glusterfs_ctx_t *ctx = NULL; +    glusterfs_ctx_t *ctx = NULL; -        ctx = glusterfs_ctx_new (); -        if (!ctx) -                goto error_return; +    ctx = glusterfs_ctx_new(); +    if (!ctx) +        goto error_return; -        if (glusterfs_globals_init (ctx)) -                goto free_ctx; +    if (glusterfs_globals_init(ctx)) +        goto free_ctx; -        THIS->ctx = ctx; -        if (gf_changelog_ctx_defaults_init (ctx)) -                goto free_ctx; +    THIS->ctx = ctx; +    if (gf_changelog_ctx_defaults_init(ctx)) +        goto free_ctx; -        ctx->env = syncenv_new (0, 0, 0); -        if (!ctx->env) -                goto free_ctx; -        return 0; +    ctx->env = syncenv_new(0, 0, 0); +    if (!ctx->env) +        goto free_ctx; +    return 0; - free_ctx: -        free (ctx); -        THIS->ctx = NULL; - error_return: -        return -1; +free_ctx: +    free(ctx); +    THIS->ctx = NULL; +error_return: +    return -1;  }  static int -gf_changelog_init_master () +gf_changelog_init_master()  { -        int              ret = 0; +    int ret = 0; -        mem_pools_init_early (); -        ret = gf_changelog_init_context (); -        mem_pools_init_late (); +    mem_pools_init_early(); +    ret = gf_changelog_init_context(); +    mem_pools_init_late(); -        return ret; +    return ret;  }  /* TODO: cleanup clnt/svc on failure */  int -gf_changelog_setup_rpc (xlator_t *this, -                        gf_changelog_t *entry, int proc) +gf_changelog_setup_rpc(xlator_t *this, gf_changelog_t *entry, int proc)  { -        int              ret = 0; -        rpcsvc_t        *svc = NULL; -        struct rpc_clnt *rpc = NULL; - -        /** -         * Initialize a connect back socket. A probe() RPC call to the server -         * triggers a reverse connect. -         */ -        svc = gf_changelog_reborp_init_rpc_listner (this, entry->brick, -                                                    RPC_SOCK (entry), entry); -        if (!svc) -                goto error_return; -        RPC_REBORP (entry) = svc; - -        /* Initialize an RPC client */ -        rpc = gf_changelog_rpc_init (this, entry); -        if (!rpc) -                goto error_return; -        RPC_PROBER (entry) = rpc; - -        /** -         * @FIXME -         * till we have connection state machine, let's delay the RPC call -         * for now.. -         */ -        sleep (2); - -        /** -         * Probe changelog translator for reverse connection. After a successful -         * call, there's less use of the client and can be disconnected, but -         * let's leave the connection active for any future RPC calls. -         */ -        ret = gf_changelog_invoke_rpc (this, entry, proc); -        if (ret) { -                gf_msg (this->name, GF_LOG_ERROR, 0, -                        CHANGELOG_LIB_MSG_INVOKE_RPC_FAILED, -                        "Could not initiate probe RPC, bailing out!!!"); -                goto error_return; -        } - -        return 0; - - error_return: -        return -1; +    int ret = 0; +    rpcsvc_t *svc = NULL; +    struct rpc_clnt *rpc = NULL; + +    /** +     * Initialize a connect back socket. A probe() RPC call to the server +     * triggers a reverse connect. +     */ +    svc = gf_changelog_reborp_init_rpc_listner(this, entry->brick, +                                               RPC_SOCK(entry), entry); +    if (!svc) +        goto error_return; +    RPC_REBORP(entry) = svc; + +    /* Initialize an RPC client */ +    rpc = gf_changelog_rpc_init(this, entry); +    if (!rpc) +        goto error_return; +    RPC_PROBER(entry) = rpc; + +    /** +     * @FIXME +     * till we have connection state machine, let's delay the RPC call +     * for now.. +     */ +    sleep(2); + +    /** +     * Probe changelog translator for reverse connection. After a successful +     * call, there's less use of the client and can be disconnected, but +     * let's leave the connection active for any future RPC calls. +     */ +    ret = gf_changelog_invoke_rpc(this, entry, proc); +    if (ret) { +        gf_msg(this->name, GF_LOG_ERROR, 0, CHANGELOG_LIB_MSG_INVOKE_RPC_FAILED, +               "Could not initiate probe RPC, bailing out!!!"); +        goto error_return; +    } + +    return 0; + +error_return: +    return -1;  }  int -gf_cleanup_event (xlator_t *this, struct gf_event_list *ev) +gf_cleanup_event(xlator_t *this, struct gf_event_list *ev)  { -        int ret = 0; - -        ret = gf_thread_cleanup (this, ev->invoker); -        if (ret) { -                gf_msg (this->name, GF_LOG_WARNING, -ret, -                        CHANGELOG_LIB_MSG_CLEANUP_ERROR, -                        "cannot cleanup callback invoker thread." -                        " Not freeing resources"); -                return -1; -        } +    int ret = 0; + +    ret = gf_thread_cleanup(this, ev->invoker); +    if (ret) { +        gf_msg(this->name, GF_LOG_WARNING, -ret, +               CHANGELOG_LIB_MSG_CLEANUP_ERROR, +               "cannot cleanup callback invoker thread." +               " Not freeing resources"); +        return -1; +    } -        ev->entry = NULL; +    ev->entry = NULL; -        return 0; +    return 0;  }  static int -gf_init_event (gf_changelog_t *entry) +gf_init_event(gf_changelog_t *entry)  { -        int ret = 0; -        struct gf_event_list *ev = NULL; - -        ev = &entry->event; -        ev->entry = entry; - -        ret = pthread_mutex_init (&ev->lock, NULL); -        if (ret != 0) -                goto error_return; -        ret = pthread_cond_init (&ev->cond, NULL); -        if (ret != 0) -                goto cleanup_mutex; -        INIT_LIST_HEAD (&ev->events); - -        ev->next_seq = 0;  /* bootstrap sequencing */ - -        if (GF_NEED_ORDERED_EVENTS (entry)) { -                entry->pickevent  = pick_event_ordered; -                entry->queueevent = queue_ordered_event; -        } else { -                entry->pickevent  = pick_event_unordered; -                entry->queueevent = queue_unordered_event; -        } - -        ret = gf_thread_create (&ev->invoker, NULL, -                                gf_changelog_callback_invoker, ev, "clogcbki"); -        if (ret != 0) { -                entry->pickevent = NULL; -                entry->queueevent = NULL; -                goto cleanup_cond; -        } - -        return 0; - - cleanup_cond: -        (void) pthread_cond_destroy (&ev->cond); - cleanup_mutex: -        (void) pthread_mutex_destroy (&ev->lock); - error_return: -        return -1; +    int ret = 0; +    struct gf_event_list *ev = NULL; + +    ev = &entry->event; +    ev->entry = entry; + +    ret = pthread_mutex_init(&ev->lock, NULL); +    if (ret != 0) +        goto error_return; +    ret = pthread_cond_init(&ev->cond, NULL); +    if (ret != 0) +        goto cleanup_mutex; +    INIT_LIST_HEAD(&ev->events); + +    ev->next_seq = 0; /* bootstrap sequencing */ + +    if (GF_NEED_ORDERED_EVENTS(entry)) { +        entry->pickevent = pick_event_ordered; +        entry->queueevent = queue_ordered_event; +    } else { +        entry->pickevent = pick_event_unordered; +        entry->queueevent = queue_unordered_event; +    } + +    ret = gf_thread_create(&ev->invoker, NULL, gf_changelog_callback_invoker, +                           ev, "clogcbki"); +    if (ret != 0) { +        entry->pickevent = NULL; +        entry->queueevent = NULL; +        goto cleanup_cond; +    } + +    return 0; + +cleanup_cond: +    (void)pthread_cond_destroy(&ev->cond); +cleanup_mutex: +    (void)pthread_mutex_destroy(&ev->lock); +error_return: +    return -1;  }  /** @@ -339,251 +339,242 @@ gf_init_event (gf_changelog_t *entry)   *  - destroy rpc{-clnt, svc}   */  int -gf_cleanup_brick_connection (xlator_t *this, gf_changelog_t *entry) +gf_cleanup_brick_connection(xlator_t *this, gf_changelog_t *entry)  { -        return 0; +    return 0;  }  int -gf_cleanup_connections (xlator_t *this) +gf_cleanup_connections(xlator_t *this)  { -        return 0; +    return 0;  }  static int -gf_setup_brick_connection (xlator_t *this, -                           struct gf_brick_spec *brick, -                           gf_boolean_t ordered, void *xl) +gf_setup_brick_connection(xlator_t *this, struct gf_brick_spec *brick, +                          gf_boolean_t ordered, void *xl)  { -        int ret = 0; -        gf_private_t *priv = NULL; -        gf_changelog_t *entry = NULL; - -        priv = this->private; - -        if (!brick->callback || !brick->init || !brick->fini) -                goto error_return; - -        entry = GF_CALLOC (1, sizeof (*entry), -                           gf_changelog_mt_libgfchangelog_t); -        if (!entry) -                goto error_return; -        INIT_LIST_HEAD (&entry->list); - -        LOCK_INIT (&entry->statelock); -        entry->connstate = GF_CHANGELOG_CONN_STATE_PENDING; - -        entry->notify = brick->filter; -        if (snprintf (entry->brick, PATH_MAX, "%s", brick->brick_path) -            >= PATH_MAX) -                goto free_entry; - -        entry->this = this; -        entry->invokerxl = xl; - -        entry->ordered = ordered; -        ret = gf_init_event (entry); -        if (ret) -                goto free_entry; - -        entry->fini         = brick->fini; -        entry->callback     = brick->callback; -        entry->connected    = brick->connected; -        entry->disconnected = brick->disconnected; - -        entry->ptr = brick->init (this, brick); -        if (!entry->ptr) -                goto cleanup_event; -        priv->api = entry->ptr;  /* pointer to API, if required */ - -        pthread_mutex_lock (&priv->lock); -        { -                list_add_tail (&entry->list, &priv->connections); -        } -        pthread_mutex_unlock (&priv->lock); - -        ret = gf_changelog_setup_rpc (this, entry, CHANGELOG_RPC_PROBE_FILTER); -        if (ret) -                goto cleanup_event; -        return 0; - - cleanup_event: -        (void) gf_cleanup_event (this, &entry->event); - free_entry: -        gf_msg_debug (this->name, 0, "freeing entry %p", entry); -        list_del (&entry->list); /* FIXME: kludge for now */ -        GF_FREE (entry); - error_return: -        return -1; +    int ret = 0; +    gf_private_t *priv = NULL; +    gf_changelog_t *entry = NULL; + +    priv = this->private; + +    if (!brick->callback || !brick->init || !brick->fini) +        goto error_return; + +    entry = GF_CALLOC(1, sizeof(*entry), gf_changelog_mt_libgfchangelog_t); +    if (!entry) +        goto error_return; +    INIT_LIST_HEAD(&entry->list); + +    LOCK_INIT(&entry->statelock); +    entry->connstate = GF_CHANGELOG_CONN_STATE_PENDING; + +    entry->notify = brick->filter; +    if (snprintf(entry->brick, PATH_MAX, "%s", brick->brick_path) >= PATH_MAX) +        goto free_entry; + +    entry->this = this; +    entry->invokerxl = xl; + +    entry->ordered = ordered; +    ret = gf_init_event(entry); +    if (ret) +        goto free_entry; + +    entry->fini = brick->fini; +    entry->callback = brick->callback; +    entry->connected = brick->connected; +    entry->disconnected = brick->disconnected; + +    entry->ptr = brick->init(this, brick); +    if (!entry->ptr) +        goto cleanup_event; +    priv->api = entry->ptr; /* pointer to API, if required */ + +    pthread_mutex_lock(&priv->lock); +    { +        list_add_tail(&entry->list, &priv->connections); +    } +    pthread_mutex_unlock(&priv->lock); + +    ret = gf_changelog_setup_rpc(this, entry, CHANGELOG_RPC_PROBE_FILTER); +    if (ret) +        goto cleanup_event; +    return 0; + +cleanup_event: +    (void)gf_cleanup_event(this, &entry->event); +free_entry: +    gf_msg_debug(this->name, 0, "freeing entry %p", entry); +    list_del(&entry->list); /* FIXME: kludge for now */ +    GF_FREE(entry); +error_return: +    return -1;  }  int -gf_changelog_register_brick (xlator_t *this, -                             struct gf_brick_spec *brick, -                             gf_boolean_t ordered, void *xl) +gf_changelog_register_brick(xlator_t *this, struct gf_brick_spec *brick, +                            gf_boolean_t ordered, void *xl)  { -        return gf_setup_brick_connection (this, brick, ordered, xl); +    return gf_setup_brick_connection(this, brick, ordered, xl);  }  static int -gf_changelog_setup_logging (xlator_t *this, char *logfile, int loglevel) +gf_changelog_setup_logging(xlator_t *this, char *logfile, int loglevel)  { -        /* passing ident as NULL means to use default ident for syslog */ -        if (gf_log_init (this->ctx, logfile, NULL)) -                return -1; +    /* passing ident as NULL means to use default ident for syslog */ +    if (gf_log_init(this->ctx, logfile, NULL)) +        return -1; -        gf_log_set_loglevel (this->ctx, (loglevel == -1) ? GF_LOG_INFO : -                             loglevel); -        return 0; +    gf_log_set_loglevel(this->ctx, (loglevel == -1) ? GF_LOG_INFO : loglevel); +    return 0;  }  static int -gf_changelog_set_master (xlator_t *master, void *xl) +gf_changelog_set_master(xlator_t *master, void *xl)  { -        int32_t ret = 0; -        xlator_t *this = NULL; -        xlator_t *old_this = NULL; -        gf_private_t *priv = NULL; - -        this = xl; -        if (!this || !this->ctx) { -                ret = gf_changelog_init_master (); -                if (ret) -                        return -1; -                this = THIS; -        } +    int32_t ret = 0; +    xlator_t *this = NULL; +    xlator_t *old_this = NULL; +    gf_private_t *priv = NULL; + +    this = xl; +    if (!this || !this->ctx) { +        ret = gf_changelog_init_master(); +        if (ret) +            return -1; +        this = THIS; +    } -        master->ctx = this->ctx; +    master->ctx = this->ctx; -        INIT_LIST_HEAD (&master->volume_options); -        SAVE_THIS (THIS); +    INIT_LIST_HEAD(&master->volume_options); +    SAVE_THIS(THIS); -        ret = xlator_mem_acct_init (THIS, gf_changelog_mt_end); -        if (ret != 0) -                goto restore_this; +    ret = xlator_mem_acct_init(THIS, gf_changelog_mt_end); +    if (ret != 0) +        goto restore_this; -        priv = gf_changelog_alloc_priv (); -        if (!priv) { -                ret = -1; -                goto restore_this; -        } +    priv = gf_changelog_alloc_priv(); +    if (!priv) { +        ret = -1; +        goto restore_this; +    } -        if (!xl) { -                /* poller thread */ -                ret = gf_thread_create (&priv->poller, -                                        NULL, changelog_rpc_poller, THIS, -                                        "clogpoll"); -                if (ret != 0) { -                        GF_FREE (priv); -                        gf_msg (master->name, GF_LOG_ERROR, 0, -                                CHANGELOG_LIB_MSG_THREAD_CREATION_FAILED, -                                "failed to spawn poller thread"); -                        goto restore_this; -                } +    if (!xl) { +        /* poller thread */ +        ret = gf_thread_create(&priv->poller, NULL, changelog_rpc_poller, THIS, +                               "clogpoll"); +        if (ret != 0) { +            GF_FREE(priv); +            gf_msg(master->name, GF_LOG_ERROR, 0, +                   CHANGELOG_LIB_MSG_THREAD_CREATION_FAILED, +                   "failed to spawn poller thread"); +            goto restore_this;          } +    } -        master->private = priv; +    master->private = priv; - restore_this: -        RESTORE_THIS (); +restore_this: +    RESTORE_THIS(); -        return ret; +    return ret;  }  int -gf_changelog_init (void *xl) +gf_changelog_init(void *xl)  { -        int ret = 0; -        gf_private_t *priv = NULL; - -        if (master) -                return 0; - -        master = calloc (1, sizeof (*master)); -        if (!master) -                goto error_return; - -        master->name = strdup ("gfchangelog"); -        if (!master->name) -                goto dealloc_master; - -        ret = gf_changelog_set_master (master, xl); -        if (ret) -                goto dealloc_name; - -        priv = master->private; -        ret = gf_thread_create (&priv->connectionjanitor, NULL, -                                gf_changelog_connection_janitor, master, -                                "clogjan"); -        if (ret != 0) { -                /* TODO: cleanup priv, mutex (poller thread for !xl) */ -                goto dealloc_name; -        } +    int ret = 0; +    gf_private_t *priv = NULL; +    if (master)          return 0; - dealloc_name: -        free (master->name); - dealloc_master: -        free (master); -        master = NULL; - error_return: -        return -1; +    master = calloc(1, sizeof(*master)); +    if (!master) +        goto error_return; + +    master->name = strdup("gfchangelog"); +    if (!master->name) +        goto dealloc_master; + +    ret = gf_changelog_set_master(master, xl); +    if (ret) +        goto dealloc_name; + +    priv = master->private; +    ret = gf_thread_create(&priv->connectionjanitor, NULL, +                           gf_changelog_connection_janitor, master, "clogjan"); +    if (ret != 0) { +        /* TODO: cleanup priv, mutex (poller thread for !xl) */ +        goto dealloc_name; +    } + +    return 0; + +dealloc_name: +    free(master->name); +dealloc_master: +    free(master); +    master = NULL; +error_return: +    return -1;  }  int -gf_changelog_register_generic (struct gf_brick_spec *bricks, int count, -                               int ordered, char *logfile, int lvl, void *xl) +gf_changelog_register_generic(struct gf_brick_spec *bricks, int count, +                              int ordered, char *logfile, int lvl, void *xl)  { -        int                   ret        = 0; -        xlator_t             *this       = NULL; -        xlator_t             *old_this   = NULL; -        struct gf_brick_spec *brick      = NULL; -        gf_boolean_t          need_order = _gf_false; +    int ret = 0; +    xlator_t *this = NULL; +    xlator_t *old_this = NULL; +    struct gf_brick_spec *brick = NULL; +    gf_boolean_t need_order = _gf_false; -        SAVE_THIS (xl); +    SAVE_THIS(xl); -        this = THIS; -        if (!this) -                goto error_return; +    this = THIS; +    if (!this) +        goto error_return; -        ret = gf_changelog_setup_logging (this, logfile, lvl); -        if (ret) -                goto error_return; - -        need_order = (ordered) ? _gf_true : _gf_false; - -        brick = bricks; -        while (count--) { -                gf_smsg (this->name, GF_LOG_INFO, 0, -                         CHANGELOG_LIB_MSG_NOTIFY_REGISTER_INFO, -                         "Registering brick", -                         "brick=%s", brick->brick_path, -                         "notify_filter=%d", brick->filter, -                         NULL); - -                ret = gf_changelog_register_brick (this, brick, need_order, xl); -                if (ret != 0) { -                        gf_msg (this->name, GF_LOG_ERROR, 0, -                                CHANGELOG_LIB_MSG_NOTIFY_REGISTER_FAILED, -                                "Error registering with changelog xlator"); -                        break; -                } - -                brick++; +    ret = gf_changelog_setup_logging(this, logfile, lvl); +    if (ret) +        goto error_return; + +    need_order = (ordered) ? _gf_true : _gf_false; + +    brick = bricks; +    while (count--) { +        gf_smsg(this->name, GF_LOG_INFO, 0, +                CHANGELOG_LIB_MSG_NOTIFY_REGISTER_INFO, "Registering brick", +                "brick=%s", brick->brick_path, "notify_filter=%d", +                brick->filter, NULL); + +        ret = gf_changelog_register_brick(this, brick, need_order, xl); +        if (ret != 0) { +            gf_msg(this->name, GF_LOG_ERROR, 0, +                   CHANGELOG_LIB_MSG_NOTIFY_REGISTER_FAILED, +                   "Error registering with changelog xlator"); +            break;          } -        if (ret != 0) -                goto cleanup_inited_bricks; +        brick++; +    } -        RESTORE_THIS(); -        return 0; +    if (ret != 0) +        goto cleanup_inited_bricks; - cleanup_inited_bricks: -        gf_cleanup_connections (this); - error_return: -        RESTORE_THIS(); -        return -1; +    RESTORE_THIS(); +    return 0; + +cleanup_inited_bricks: +    gf_cleanup_connections(this); +error_return: +    RESTORE_THIS(); +    return -1;  }  /** @@ -610,27 +601,29 @@ gf_changelog_register_generic (struct gf_brick_spec *bricks, int count,   * For generic API, refer gf_changelog_register_generic().   */  int -gf_changelog_register (char *brick_path, char *scratch_dir, -                       char *log_file, int log_level, int max_reconnects) +gf_changelog_register(char *brick_path, char *scratch_dir, char *log_file, +                      int log_level, int max_reconnects)  { -        struct gf_brick_spec brick = {0,}; +    struct gf_brick_spec brick = { +        0, +    }; -        if (master) -                THIS = master; -        else -                return -1; +    if (master) +        THIS = master; +    else +        return -1; -        brick.brick_path = brick_path; -        brick.filter     = CHANGELOG_OP_TYPE_JOURNAL; +    brick.brick_path = brick_path; +    brick.filter = CHANGELOG_OP_TYPE_JOURNAL; -        brick.init         = gf_changelog_journal_init; -        brick.fini         = gf_changelog_journal_fini; -        brick.callback     = gf_changelog_handle_journal; -        brick.connected    = gf_changelog_journal_connect; -        brick.disconnected = gf_changelog_journal_disconnect; +    brick.init = gf_changelog_journal_init; +    brick.fini = gf_changelog_journal_fini; +    brick.callback = gf_changelog_handle_journal; +    brick.connected = gf_changelog_journal_connect; +    brick.disconnected = gf_changelog_journal_disconnect; -        brick.ptr = scratch_dir; +    brick.ptr = scratch_dir; -        return gf_changelog_register_generic (&brick, 1, 1, -                                              log_file, log_level, NULL); +    return gf_changelog_register_generic(&brick, 1, 1, log_file, log_level, +                                         NULL);  }  | 
