summaryrefslogtreecommitdiffstats
path: root/libglusterfs/src/syncop.c
diff options
context:
space:
mode:
Diffstat (limited to 'libglusterfs/src/syncop.c')
-rw-r--r--libglusterfs/src/syncop.c294
1 files changed, 260 insertions, 34 deletions
diff --git a/libglusterfs/src/syncop.c b/libglusterfs/src/syncop.c
index 9cceaf55c..c1620bb70 100644
--- a/libglusterfs/src/syncop.c
+++ b/libglusterfs/src/syncop.c
@@ -15,6 +15,160 @@
#include "syncop.h"
+int
+syncopctx_setfsuid (void *uid)
+{
+ struct syncopctx *opctx = NULL;
+ int ret = 0;
+
+ /* In args check */
+ if (!uid) {
+ ret = -1;
+ errno = EINVAL;
+ goto out;
+ }
+
+ opctx = syncopctx_getctx ();
+
+ /* alloc for this thread the first time */
+ if (!opctx) {
+ opctx = GF_CALLOC (1, sizeof (*opctx), gf_common_mt_syncopctx);
+ if (!opctx) {
+ ret = -1;
+ goto out;
+ }
+
+ ret = syncopctx_setctx (opctx);
+ if (ret != 0) {
+ GF_FREE (opctx);
+ opctx = NULL;
+ goto out;
+ }
+ }
+
+out:
+ if (opctx && uid) {
+ opctx->uid = *(uid_t *)uid;
+ opctx->valid |= SYNCOPCTX_UID;
+ }
+
+ return ret;
+}
+
+int
+syncopctx_setfsgid (void *gid)
+{
+ struct syncopctx *opctx = NULL;
+ int ret = 0;
+
+ /* In args check */
+ if (!gid) {
+ ret = -1;
+ errno = EINVAL;
+ goto out;
+ }
+
+ opctx = syncopctx_getctx ();
+
+ /* alloc for this thread the first time */
+ if (!opctx) {
+ opctx = GF_CALLOC (1, sizeof (*opctx), gf_common_mt_syncopctx);
+ if (!opctx) {
+ ret = -1;
+ goto out;
+ }
+
+ ret = syncopctx_setctx (opctx);
+ if (ret != 0) {
+ GF_FREE (opctx);
+ opctx = NULL;
+ goto out;
+ }
+ }
+
+out:
+ if (opctx && gid) {
+ opctx->gid = *(gid_t *)gid;
+ opctx->valid |= SYNCOPCTX_GID;
+ }
+
+ return ret;
+}
+
+int
+syncopctx_setfsgroups (int count, const void *groups)
+{
+ struct syncopctx *opctx = NULL;
+ gid_t *tmpgroups = NULL;
+ int ret = 0;
+
+ /* In args check */
+ if (count != 0 && !groups) {
+ ret = -1;
+ errno = EINVAL;
+ goto out;
+ }
+
+ opctx = syncopctx_getctx ();
+
+ /* alloc for this thread the first time */
+ if (!opctx) {
+ opctx = GF_CALLOC (1, sizeof (*opctx), gf_common_mt_syncopctx);
+ if (!opctx) {
+ ret = -1;
+ goto out;
+ }
+
+ ret = syncopctx_setctx (opctx);
+ if (ret != 0) {
+ GF_FREE (opctx);
+ opctx = NULL;
+ goto out;
+ }
+ }
+
+ /* resize internal groups as required */
+ if (count && opctx->grpsize < count) {
+ if (opctx->groups) {
+ tmpgroups = GF_REALLOC (opctx->groups,
+ (sizeof (gid_t) * count));
+ /* NOTE: Not really required to zero the reallocation,
+ * as ngrps controls the validity of data,
+ * making a note irrespective */
+ if (tmpgroups == NULL) {
+ opctx->grpsize = 0;
+ GF_FREE (opctx->groups);
+ opctx->groups = NULL;
+ ret = -1;
+ goto out;
+ }
+ }
+ else {
+ tmpgroups = GF_CALLOC (count, sizeof (gid_t),
+ gf_common_mt_syncopctx);
+ if (tmpgroups == NULL) {
+ opctx->grpsize = 0;
+ ret = -1;
+ goto out;
+ }
+ }
+
+ opctx->groups = tmpgroups;
+ opctx->grpsize = count;
+ }
+
+ /* copy out the groups passed */
+ if (count)
+ memcpy (opctx->groups, groups, (sizeof (gid_t) * count));
+
+ /* set/reset the ngrps, this is where reset of groups is handled */
+ opctx->ngrps = count;
+ opctx->valid |= SYNCOPCTX_GROUPS;
+
+out:
+ return ret;
+}
+
static void
__run (struct synctask *task)
{
@@ -28,7 +182,7 @@ __run (struct synctask *task)
case SYNCTASK_SUSPEND:
break;
case SYNCTASK_RUN:
- gf_log (task->xl->name, GF_LOG_WARNING,
+ gf_log (task->xl->name, GF_LOG_DEBUG,
"re-running already running task");
env->runcount--;
break;
@@ -38,7 +192,11 @@ __run (struct synctask *task)
case SYNCTASK_DONE:
gf_log (task->xl->name, GF_LOG_WARNING,
"running completed task");
- break;
+ return;
+ case SYNCTASK_ZOMBIE:
+ gf_log (task->xl->name, GF_LOG_WARNING,
+ "attempted to wake up zombie!!");
+ return;
}
list_add_tail (&task->all_tasks, &env->runq);
@@ -70,7 +228,11 @@ __wait (struct synctask *task)
case SYNCTASK_DONE:
gf_log (task->xl->name, GF_LOG_WARNING,
"running completed task");
- break;
+ return;
+ case SYNCTASK_ZOMBIE:
+ gf_log (task->xl->name, GF_LOG_WARNING,
+ "attempted to sleep a zombie!!");
+ return;
}
list_add_tail (&task->all_tasks, &env->waitq);
@@ -168,6 +330,7 @@ synctask_done (struct synctask *task)
pthread_mutex_lock (&task->mutex);
{
+ task->state = SYNCTASK_ZOMBIE;
task->done = 1;
pthread_cond_broadcast (&task->cond);
}
@@ -191,20 +354,19 @@ synctask_setid (struct synctask *task, uid_t uid, gid_t gid)
}
-int
-synctask_new (struct syncenv *env, synctask_fn_t fn, synctask_cbk_t cbk,
- call_frame_t *frame, void *opaque)
+struct synctask *
+synctask_create (struct syncenv *env, synctask_fn_t fn, synctask_cbk_t cbk,
+ call_frame_t *frame, void *opaque)
{
struct synctask *newtask = NULL;
xlator_t *this = THIS;
- int ret = 0;
VALIDATE_OR_GOTO (env, err);
VALIDATE_OR_GOTO (fn, err);
newtask = CALLOC (1, sizeof (*newtask));
if (!newtask)
- return -ENOMEM;
+ return NULL;
newtask->frame = frame;
if (!frame) {
@@ -263,21 +425,7 @@ synctask_new (struct syncenv *env, synctask_fn_t fn, synctask_cbk_t cbk,
*/
syncenv_scale(env);
- if (!cbk) {
- pthread_mutex_lock (&newtask->mutex);
- {
- while (!newtask->done) {
- pthread_cond_wait (&newtask->cond, &newtask->mutex);
- }
- }
- pthread_mutex_unlock (&newtask->mutex);
-
- ret = newtask->ret;
-
- synctask_destroy (newtask);
- }
-
- return ret;
+ return newtask;
err:
if (newtask) {
FREE (newtask->stack);
@@ -285,7 +433,46 @@ err:
STACK_DESTROY (newtask->opframe->root);
FREE (newtask);
}
- return -1;
+
+ return NULL;
+}
+
+
+int
+synctask_join (struct synctask *task)
+{
+ int ret = 0;
+
+ pthread_mutex_lock (&task->mutex);
+ {
+ while (!task->done)
+ pthread_cond_wait (&task->cond, &task->mutex);
+ }
+ pthread_mutex_unlock (&task->mutex);
+
+ ret = task->ret;
+
+ synctask_destroy (task);
+
+ return ret;
+}
+
+
+int
+synctask_new (struct syncenv *env, synctask_fn_t fn, synctask_cbk_t cbk,
+ call_frame_t *frame, void *opaque)
+{
+ struct synctask *newtask = NULL;
+ int ret = 0;
+
+ newtask = synctask_create (env, fn, cbk, frame, opaque);
+ if (!newtask)
+ return -1;
+
+ if (!cbk)
+ ret = synctask_join (newtask);
+
+ return ret;
}
@@ -308,7 +495,7 @@ syncenv_task (struct syncproc *proc)
if (!list_empty (&env->runq))
break;
if ((ret == ETIMEDOUT) &&
- (env->procs > SYNCENV_PROC_MIN)) {
+ (env->procs > env->procmin)) {
task = NULL;
env->procs--;
memset (proc, 0, sizeof (*proc));
@@ -408,20 +595,20 @@ syncenv_scale (struct syncenv *env)
goto unlock;
scale = env->runcount;
- if (scale > SYNCENV_PROC_MAX)
- scale = SYNCENV_PROC_MAX;
+ if (scale > env->procmax)
+ scale = env->procmax;
if (scale > env->procs)
diff = scale - env->procs;
while (diff) {
diff--;
- for (; (i < SYNCENV_PROC_MAX); i++) {
+ for (; (i < env->procmax); i++) {
if (env->proc[i].processor == 0)
break;
}
env->proc[i].env = env;
- ret = pthread_create (&env->proc[i].processor, NULL,
- syncenv_processor, &env->proc[i]);
+ ret = gf_thread_create (&env->proc[i].processor, NULL,
+ syncenv_processor, &env->proc[i]);
if (ret)
break;
env->procs++;
@@ -441,12 +628,20 @@ syncenv_destroy (struct syncenv *env)
struct syncenv *
-syncenv_new (size_t stacksize)
+syncenv_new (size_t stacksize, int procmin, int procmax)
{
struct syncenv *newenv = NULL;
int ret = 0;
int i = 0;
+ if (!procmin || procmin < 0)
+ procmin = SYNCENV_PROC_MIN;
+ if (!procmax || procmax > SYNCENV_PROC_MAX)
+ procmax = SYNCENV_PROC_MAX;
+
+ if (procmin > procmax)
+ return NULL;
+
newenv = CALLOC (1, sizeof (*newenv));
if (!newenv)
@@ -461,11 +656,13 @@ syncenv_new (size_t stacksize)
newenv->stacksize = SYNCENV_DEFAULT_STACKSIZE;
if (stacksize)
newenv->stacksize = stacksize;
+ newenv->procmin = procmin;
+ newenv->procmax = procmax;
- for (i = 0; i < SYNCENV_PROC_MIN; i++) {
+ for (i = 0; i < newenv->procmin; i++) {
newenv->proc[i].env = newenv;
- ret = pthread_create (&newenv->proc[i].processor, NULL,
- syncenv_processor, &newenv->proc[i]);
+ ret = gf_thread_create (&newenv->proc[i].processor, NULL,
+ syncenv_processor, &newenv->proc[i]);
if (ret)
break;
newenv->procs++;
@@ -1976,6 +2173,35 @@ syncop_discard(xlator_t *subvol, fd_t *fd, off_t offset, size_t len)
return args.op_ret;
}
+int
+syncop_zerofill_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+ int op_ret, int op_errno, struct iatt *prebuf,
+ struct iatt *postbuf, dict_t *xdata)
+{
+ struct syncargs *args = NULL;
+
+ args = cookie;
+
+ args->op_ret = op_ret;
+ args->op_errno = op_errno;
+
+ __wake (args);
+
+ return 0;
+}
+
+int
+syncop_zerofill(xlator_t *subvol, fd_t *fd, off_t offset, size_t len)
+{
+ struct syncargs args = {0, };
+
+ SYNCOP (subvol, (&args), syncop_zerofill_cbk, subvol->fops->zerofill,
+ fd, offset, len, NULL);
+
+ errno = args.op_errno;
+ return args.op_ret;
+}
+
int
syncop_lk_cbk (call_frame_t *frame, void *cookie, xlator_t *this,