summaryrefslogtreecommitdiffstats
path: root/libglusterfs/src/syncop.c
diff options
context:
space:
mode:
Diffstat (limited to 'libglusterfs/src/syncop.c')
-rw-r--r--libglusterfs/src/syncop.c429
1 files changed, 364 insertions, 65 deletions
diff --git a/libglusterfs/src/syncop.c b/libglusterfs/src/syncop.c
index 7dcdf3fef..c1620bb70 100644
--- a/libglusterfs/src/syncop.c
+++ b/libglusterfs/src/syncop.c
@@ -15,6 +15,160 @@
#include "syncop.h"
+int
+syncopctx_setfsuid (void *uid)
+{
+ struct syncopctx *opctx = NULL;
+ int ret = 0;
+
+ /* In args check */
+ if (!uid) {
+ ret = -1;
+ errno = EINVAL;
+ goto out;
+ }
+
+ opctx = syncopctx_getctx ();
+
+ /* alloc for this thread the first time */
+ if (!opctx) {
+ opctx = GF_CALLOC (1, sizeof (*opctx), gf_common_mt_syncopctx);
+ if (!opctx) {
+ ret = -1;
+ goto out;
+ }
+
+ ret = syncopctx_setctx (opctx);
+ if (ret != 0) {
+ GF_FREE (opctx);
+ opctx = NULL;
+ goto out;
+ }
+ }
+
+out:
+ if (opctx && uid) {
+ opctx->uid = *(uid_t *)uid;
+ opctx->valid |= SYNCOPCTX_UID;
+ }
+
+ return ret;
+}
+
+int
+syncopctx_setfsgid (void *gid)
+{
+ struct syncopctx *opctx = NULL;
+ int ret = 0;
+
+ /* In args check */
+ if (!gid) {
+ ret = -1;
+ errno = EINVAL;
+ goto out;
+ }
+
+ opctx = syncopctx_getctx ();
+
+ /* alloc for this thread the first time */
+ if (!opctx) {
+ opctx = GF_CALLOC (1, sizeof (*opctx), gf_common_mt_syncopctx);
+ if (!opctx) {
+ ret = -1;
+ goto out;
+ }
+
+ ret = syncopctx_setctx (opctx);
+ if (ret != 0) {
+ GF_FREE (opctx);
+ opctx = NULL;
+ goto out;
+ }
+ }
+
+out:
+ if (opctx && gid) {
+ opctx->gid = *(gid_t *)gid;
+ opctx->valid |= SYNCOPCTX_GID;
+ }
+
+ return ret;
+}
+
+int
+syncopctx_setfsgroups (int count, const void *groups)
+{
+ struct syncopctx *opctx = NULL;
+ gid_t *tmpgroups = NULL;
+ int ret = 0;
+
+ /* In args check */
+ if (count != 0 && !groups) {
+ ret = -1;
+ errno = EINVAL;
+ goto out;
+ }
+
+ opctx = syncopctx_getctx ();
+
+ /* alloc for this thread the first time */
+ if (!opctx) {
+ opctx = GF_CALLOC (1, sizeof (*opctx), gf_common_mt_syncopctx);
+ if (!opctx) {
+ ret = -1;
+ goto out;
+ }
+
+ ret = syncopctx_setctx (opctx);
+ if (ret != 0) {
+ GF_FREE (opctx);
+ opctx = NULL;
+ goto out;
+ }
+ }
+
+ /* resize internal groups as required */
+ if (count && opctx->grpsize < count) {
+ if (opctx->groups) {
+ tmpgroups = GF_REALLOC (opctx->groups,
+ (sizeof (gid_t) * count));
+ /* NOTE: Not really required to zero the reallocation,
+ * as ngrps controls the validity of data,
+ * making a note irrespective */
+ if (tmpgroups == NULL) {
+ opctx->grpsize = 0;
+ GF_FREE (opctx->groups);
+ opctx->groups = NULL;
+ ret = -1;
+ goto out;
+ }
+ }
+ else {
+ tmpgroups = GF_CALLOC (count, sizeof (gid_t),
+ gf_common_mt_syncopctx);
+ if (tmpgroups == NULL) {
+ opctx->grpsize = 0;
+ ret = -1;
+ goto out;
+ }
+ }
+
+ opctx->groups = tmpgroups;
+ opctx->grpsize = count;
+ }
+
+ /* copy out the groups passed */
+ if (count)
+ memcpy (opctx->groups, groups, (sizeof (gid_t) * count));
+
+ /* set/reset the ngrps, this is where reset of groups is handled */
+ opctx->ngrps = count;
+ opctx->valid |= SYNCOPCTX_GROUPS;
+
+out:
+ return ret;
+}
+
static void
__run (struct synctask *task)
{
@@ -28,7 +182,7 @@ __run (struct synctask *task)
case SYNCTASK_SUSPEND:
break;
case SYNCTASK_RUN:
- gf_log (task->xl->name, GF_LOG_WARNING,
+ gf_log (task->xl->name, GF_LOG_DEBUG,
"re-running already running task");
env->runcount--;
break;
@@ -38,7 +192,11 @@ __run (struct synctask *task)
case SYNCTASK_DONE:
gf_log (task->xl->name, GF_LOG_WARNING,
"running completed task");
- break;
+ return;
+ case SYNCTASK_ZOMBIE:
+ gf_log (task->xl->name, GF_LOG_WARNING,
+ "attempted to wake up zombie!!");
+ return;
}
list_add_tail (&task->all_tasks, &env->runq);
@@ -70,7 +228,11 @@ __wait (struct synctask *task)
case SYNCTASK_DONE:
gf_log (task->xl->name, GF_LOG_WARNING,
"running completed task");
- break;
+ return;
+ case SYNCTASK_ZOMBIE:
+ gf_log (task->xl->name, GF_LOG_WARNING,
+ "attempted to sleep a zombie!!");
+ return;
}
list_add_tail (&task->all_tasks, &env->waitq);
@@ -89,6 +251,8 @@ synctask_yield (struct synctask *task)
task->proc->sched.uc_flags &= ~_UC_TLSBASE;
#endif
+ if (task->state != SYNCTASK_DONE)
+ task->state = SYNCTASK_SUSPEND;
if (swapcontext (&task->ctx, &task->proc->sched) < 0) {
gf_log ("syncop", GF_LOG_ERROR,
"swapcontext failed (%s)", strerror (errno));
@@ -99,21 +263,6 @@ synctask_yield (struct synctask *task)
void
-synctask_yawn (struct synctask *task)
-{
- struct syncenv *env = NULL;
-
- env = task->env;
-
- pthread_mutex_lock (&env->mutex);
- {
- task->woken = 0;
- }
- pthread_mutex_unlock (&env->mutex);
-}
-
-
-void
synctask_wake (struct synctask *task)
{
struct syncenv *env = NULL;
@@ -181,6 +330,7 @@ synctask_done (struct synctask *task)
pthread_mutex_lock (&task->mutex);
{
+ task->state = SYNCTASK_ZOMBIE;
task->done = 1;
pthread_cond_broadcast (&task->cond);
}
@@ -204,20 +354,19 @@ synctask_setid (struct synctask *task, uid_t uid, gid_t gid)
}
-int
-synctask_new (struct syncenv *env, synctask_fn_t fn, synctask_cbk_t cbk,
- call_frame_t *frame, void *opaque)
+struct synctask *
+synctask_create (struct syncenv *env, synctask_fn_t fn, synctask_cbk_t cbk,
+ call_frame_t *frame, void *opaque)
{
struct synctask *newtask = NULL;
xlator_t *this = THIS;
- int ret = 0;
VALIDATE_OR_GOTO (env, err);
VALIDATE_OR_GOTO (fn, err);
newtask = CALLOC (1, sizeof (*newtask));
if (!newtask)
- return -ENOMEM;
+ return NULL;
newtask->frame = frame;
if (!frame) {
@@ -238,6 +387,7 @@ synctask_new (struct syncenv *env, synctask_fn_t fn, synctask_cbk_t cbk,
newtask->gid = newtask->opframe->root->gid;
INIT_LIST_HEAD (&newtask->all_tasks);
+ INIT_LIST_HEAD (&newtask->waitq);
if (getcontext (&newtask->ctx) < 0) {
gf_log ("syncop", GF_LOG_ERROR,
@@ -275,21 +425,7 @@ synctask_new (struct syncenv *env, synctask_fn_t fn, synctask_cbk_t cbk,
*/
syncenv_scale(env);
- if (!cbk) {
- pthread_mutex_lock (&newtask->mutex);
- {
- while (!newtask->done) {
- pthread_cond_wait (&newtask->cond, &newtask->mutex);
- }
- }
- pthread_mutex_unlock (&newtask->mutex);
-
- ret = newtask->ret;
-
- synctask_destroy (newtask);
- }
-
- return ret;
+ return newtask;
err:
if (newtask) {
FREE (newtask->stack);
@@ -297,7 +433,46 @@ err:
STACK_DESTROY (newtask->opframe->root);
FREE (newtask);
}
- return -1;
+
+ return NULL;
+}
+
+
+int
+synctask_join (struct synctask *task)
+{
+ int ret = 0;
+
+ pthread_mutex_lock (&task->mutex);
+ {
+ while (!task->done)
+ pthread_cond_wait (&task->cond, &task->mutex);
+ }
+ pthread_mutex_unlock (&task->mutex);
+
+ ret = task->ret;
+
+ synctask_destroy (task);
+
+ return ret;
+}
+
+
+int
+synctask_new (struct syncenv *env, synctask_fn_t fn, synctask_cbk_t cbk,
+ call_frame_t *frame, void *opaque)
+{
+ struct synctask *newtask = NULL;
+ int ret = 0;
+
+ newtask = synctask_create (env, fn, cbk, frame, opaque);
+ if (!newtask)
+ return -1;
+
+ if (!cbk)
+ ret = synctask_join (newtask);
+
+ return ret;
}
@@ -320,7 +495,7 @@ syncenv_task (struct syncproc *proc)
if (!list_empty (&env->runq))
break;
if ((ret == ETIMEDOUT) &&
- (env->procs > SYNCENV_PROC_MIN)) {
+ (env->procs > env->procmin)) {
task = NULL;
env->procs--;
memset (proc, 0, sizeof (*proc));
@@ -420,20 +595,20 @@ syncenv_scale (struct syncenv *env)
goto unlock;
scale = env->runcount;
- if (scale > SYNCENV_PROC_MAX)
- scale = SYNCENV_PROC_MAX;
+ if (scale > env->procmax)
+ scale = env->procmax;
if (scale > env->procs)
diff = scale - env->procs;
while (diff) {
diff--;
- for (; (i < SYNCENV_PROC_MAX); i++) {
+ for (; (i < env->procmax); i++) {
if (env->proc[i].processor == 0)
break;
}
env->proc[i].env = env;
- ret = pthread_create (&env->proc[i].processor, NULL,
- syncenv_processor, &env->proc[i]);
+ ret = gf_thread_create (&env->proc[i].processor, NULL,
+ syncenv_processor, &env->proc[i]);
if (ret)
break;
env->procs++;
@@ -453,12 +628,20 @@ syncenv_destroy (struct syncenv *env)
struct syncenv *
-syncenv_new (size_t stacksize)
+syncenv_new (size_t stacksize, int procmin, int procmax)
{
struct syncenv *newenv = NULL;
int ret = 0;
int i = 0;
+ if (!procmin || procmin < 0)
+ procmin = SYNCENV_PROC_MIN;
+ if (!procmax || procmax > SYNCENV_PROC_MAX)
+ procmax = SYNCENV_PROC_MAX;
+
+ if (procmin > procmax)
+ return NULL;
+
newenv = CALLOC (1, sizeof (*newenv));
if (!newenv)
@@ -473,11 +656,13 @@ syncenv_new (size_t stacksize)
newenv->stacksize = SYNCENV_DEFAULT_STACKSIZE;
if (stacksize)
newenv->stacksize = stacksize;
+ newenv->procmin = procmin;
+ newenv->procmax = procmax;
- for (i = 0; i < SYNCENV_PROC_MIN; i++) {
+ for (i = 0; i < newenv->procmin; i++) {
newenv->proc[i].env = newenv;
- ret = pthread_create (&newenv->proc[i].processor, NULL,
- syncenv_processor, &newenv->proc[i]);
+ ret = gf_thread_create (&newenv->proc[i].processor, NULL,
+ syncenv_processor, &newenv->proc[i]);
if (ret)
break;
newenv->procs++;
@@ -529,12 +714,11 @@ __synclock_lock (struct synclock *lock)
if (task) {
/* called within a synctask */
list_add_tail (&task->waitq, &lock->waitq);
- {
- pthread_mutex_unlock (&lock->guard);
- synctask_yield (task);
- pthread_mutex_lock (&lock->guard);
- }
- list_del_init (&task->waitq);
+ pthread_mutex_unlock (&lock->guard);
+ synctask_yield (task);
+ /* task is removed from waitq in unlock,
+ * under lock->guard.*/
+ pthread_mutex_lock (&lock->guard);
} else {
/* called by a non-synctask */
pthread_cond_wait (&lock->cond, &lock->guard);
@@ -616,6 +800,7 @@ __synclock_unlock (synclock_t *lock)
pthread_cond_signal (&lock->cond);
if (!list_empty (&lock->waitq)) {
task = list_entry (lock->waitq.next, struct synctask, waitq);
+ list_del_init (&task->waitq);
synctask_wake (task);
}
@@ -684,12 +869,9 @@ __syncbarrier_wait (struct syncbarrier *barrier, int waitfor)
if (task) {
/* called within a synctask */
list_add_tail (&task->waitq, &barrier->waitq);
- {
- pthread_mutex_unlock (&barrier->guard);
- synctask_yield (task);
- pthread_mutex_lock (&barrier->guard);
- }
- list_del_init (&task->waitq);
+ pthread_mutex_unlock (&barrier->guard);
+ synctask_yield (task);
+ pthread_mutex_lock (&barrier->guard);
} else {
/* called by a non-synctask */
pthread_cond_wait (&barrier->cond, &barrier->guard);
@@ -732,6 +914,7 @@ __syncbarrier_wake (struct syncbarrier *barrier)
pthread_cond_signal (&barrier->cond);
if (!list_empty (&barrier->waitq)) {
task = list_entry (barrier->waitq.next, struct synctask, waitq);
+ list_del_init (&task->waitq);
synctask_wake (task);
}
@@ -816,6 +999,8 @@ entry_copy (gf_dirent_t *source)
sink->d_type = source->d_type;
sink->d_stat = source->d_stat;
+ if (source->inode)
+ sink->inode = inode_ref (source->inode);
return sink;
}
@@ -1445,6 +1630,9 @@ syncop_create_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
args->op_ret = op_ret;
args->op_errno = op_errno;
+ if (buf)
+ args->iatt1 = *buf;
+
__wake (args);
return 0;
@@ -1452,7 +1640,7 @@ syncop_create_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
int
syncop_create (xlator_t *subvol, loc_t *loc, int32_t flags, mode_t mode,
- fd_t *fd, dict_t *xdata)
+ fd_t *fd, dict_t *xdata, struct iatt *iatt)
{
struct syncargs args = {0, };
@@ -1460,6 +1648,9 @@ syncop_create (xlator_t *subvol, loc_t *loc, int32_t flags, mode_t mode,
loc, flags, mode, 0, fd, xdata);
errno = args.op_errno;
+ if (iatt)
+ *iatt = args.iatt1;
+
return args.op_ret;
}
@@ -1756,6 +1947,8 @@ syncop_symlink_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
args->op_ret = op_ret;
args->op_errno = op_errno;
+ if (buf)
+ args->iatt1 = *buf;
__wake (args);
@@ -1763,7 +1956,8 @@ syncop_symlink_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
}
int
-syncop_symlink (xlator_t *subvol, loc_t *loc, const char *newpath, dict_t *dict)
+syncop_symlink (xlator_t *subvol, loc_t *loc, const char *newpath, dict_t *dict,
+ struct iatt *iatt)
{
struct syncargs args = {0, };
@@ -1771,6 +1965,9 @@ syncop_symlink (xlator_t *subvol, loc_t *loc, const char *newpath, dict_t *dict)
newpath, loc, 0, dict);
errno = args.op_errno;
+ if (iatt)
+ *iatt = args.iatt1;
+
return args.op_ret;
}
@@ -1824,6 +2021,9 @@ syncop_mknod_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
args->op_ret = op_ret;
args->op_errno = op_errno;
+ if (buf)
+ args->iatt1 = *buf;
+
__wake (args);
return 0;
@@ -1831,7 +2031,7 @@ syncop_mknod_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
int
syncop_mknod (xlator_t *subvol, loc_t *loc, mode_t mode, dev_t rdev,
- dict_t *dict)
+ dict_t *dict, struct iatt *iatt)
{
struct syncargs args = {0, };
@@ -1839,6 +2039,9 @@ syncop_mknod (xlator_t *subvol, loc_t *loc, mode_t mode, dev_t rdev,
loc, mode, rdev, 0, dict);
errno = args.op_errno;
+ if (iatt)
+ *iatt = args.iatt1;
+
return args.op_ret;
}
@@ -1856,6 +2059,8 @@ syncop_mkdir_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
args->op_ret = op_ret;
args->op_errno = op_errno;
+ if (buf)
+ args->iatt1 = *buf;
__wake (args);
@@ -1864,7 +2069,8 @@ syncop_mkdir_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
int
-syncop_mkdir (xlator_t *subvol, loc_t *loc, mode_t mode, dict_t *dict)
+syncop_mkdir (xlator_t *subvol, loc_t *loc, mode_t mode, dict_t *dict,
+ struct iatt *iatt)
{
struct syncargs args = {0, };
@@ -1872,6 +2078,9 @@ syncop_mkdir (xlator_t *subvol, loc_t *loc, mode_t mode, dict_t *dict)
loc, mode, 0, dict);
errno = args.op_errno;
+ if (iatt)
+ *iatt = args.iatt1;
+
return args.op_ret;
}
@@ -1905,6 +2114,96 @@ syncop_access (xlator_t *subvol, loc_t *loc, int32_t mask)
int
+syncop_fallocate_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+ int op_ret, int op_errno, struct iatt *prebuf,
+ struct iatt *postbuf, dict_t *xdata)
+{
+ struct syncargs *args = NULL;
+
+ args = cookie;
+
+ args->op_ret = op_ret;
+ args->op_errno = op_errno;
+
+ __wake (args);
+
+ return 0;
+}
+
+int
+syncop_fallocate(xlator_t *subvol, fd_t *fd, int32_t keep_size, off_t offset,
+ size_t len)
+{
+ struct syncargs args = {0, };
+
+ SYNCOP (subvol, (&args), syncop_fallocate_cbk, subvol->fops->fallocate,
+ fd, keep_size, offset, len, NULL);
+
+ errno = args.op_errno;
+ return args.op_ret;
+}
+
+
+int
+syncop_discard_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+ int op_ret, int op_errno, struct iatt *prebuf,
+ struct iatt *postbuf, dict_t *xdata)
+{
+ struct syncargs *args = NULL;
+
+ args = cookie;
+
+ args->op_ret = op_ret;
+ args->op_errno = op_errno;
+
+ __wake (args);
+
+ return 0;
+}
+
+int
+syncop_discard(xlator_t *subvol, fd_t *fd, off_t offset, size_t len)
+{
+ struct syncargs args = {0, };
+
+ SYNCOP (subvol, (&args), syncop_discard_cbk, subvol->fops->discard,
+ fd, offset, len, NULL);
+
+ errno = args.op_errno;
+ return args.op_ret;
+}
+
+int
+syncop_zerofill_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+ int op_ret, int op_errno, struct iatt *prebuf,
+ struct iatt *postbuf, dict_t *xdata)
+{
+ struct syncargs *args = NULL;
+
+ args = cookie;
+
+ args->op_ret = op_ret;
+ args->op_errno = op_errno;
+
+ __wake (args);
+
+ return 0;
+}
+
+int
+syncop_zerofill(xlator_t *subvol, fd_t *fd, off_t offset, size_t len)
+{
+ struct syncargs args = {0, };
+
+ SYNCOP (subvol, (&args), syncop_zerofill_cbk, subvol->fops->zerofill,
+ fd, offset, len, NULL);
+
+ errno = args.op_errno;
+ return args.op_ret;
+}
+
+
+int
syncop_lk_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
int op_ret, int op_errno, struct gf_flock *flock,
dict_t *xdata)