summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorBasavanagowda Kanur <gowda@gluster.com>2009-04-08 04:24:22 +0530
committerAnand V. Avati <avati@amp.gluster.com>2009-04-08 15:04:48 +0530
commit07fcdfebf25c30811a9313ac3d9a0fdbbceaad6c (patch)
tree2650b7b7a2f93cc5530a6033b3f1426f2f6efb85
parentabf35ff6c7a2cc94d9e1e738fb76f711bd2abc16 (diff)
introduction of secondary index database in storage/bdb
Signed-off-by: Anand V. Avati <avati@amp.gluster.com>
-rw-r--r--xlators/storage/bdb/src/bctx.c31
-rw-r--r--xlators/storage/bdb/src/bdb-ll.c1295
-rw-r--r--xlators/storage/bdb/src/bdb.c2108
-rw-r--r--xlators/storage/bdb/src/bdb.h108
4 files changed, 2019 insertions, 1523 deletions
diff --git a/xlators/storage/bdb/src/bctx.c b/xlators/storage/bdb/src/bctx.c
index fce78e95f..18f563fb3 100644
--- a/xlators/storage/bdb/src/bctx.c
+++ b/xlators/storage/bdb/src/bctx.c
@@ -75,16 +75,31 @@ bctx_table_prune (bctx_table_t *table)
list_for_each_entry_safe (del, tmp, &purge, list) {
list_del_init (&del->list);
- if (del->dbp) {
- ret = del->dbp->close (del->dbp, 0);
+ if (del->primary) {
+ ret = del->primary->close (del->primary, 0);
if (ret != 0) {
- gf_log (table->this->name, GF_LOG_ERROR,
- "failed to close db on path (%s): %s",
+ gf_log (table->this->name, GF_LOG_DEBUG,
+ "_BCTX_TABLE_PRUNE %s: %s "
+ "(failed to close primary database)",
del->directory, db_strerror (ret));
} else {
- gf_log (table->this->name, GF_LOG_WARNING,
- "close db for path %s; "
- "table->lru_count = %d",
+ gf_log (table->this->name, GF_LOG_DEBUG,
+ "_BCTX_TABLE_PRUNE %s (lru=%d)"
+ "(closed primary database)",
+ del->directory, table->lru_size);
+ }
+ }
+ if (del->secondary) {
+ ret = del->secondary->close (del->secondary, 0);
+ if (ret != 0) {
+ gf_log (table->this->name, GF_LOG_DEBUG,
+ "_BCTX_TABLE_PRUNE %s: %s "
+ "(failed to close secondary database)",
+ del->directory, db_strerror (ret));
+ } else {
+ gf_log (table->this->name, GF_LOG_DEBUG,
+ "_BCTX_TABLE_PRUNE %s (lru=%d)"
+ "(closed secondary database)",
del->directory, table->lru_size);
}
}
@@ -130,7 +145,7 @@ __hash_bctx (bctx_t *bctx)
static inline bctx_t *
__bctx_passivate (bctx_t *bctx)
{
- if (bctx->dbp) {
+ if (bctx->primary) {
list_move_tail (&bctx->list, &(bctx->table->b_lru));
bctx->table->lru_size++;
} else {
diff --git a/xlators/storage/bdb/src/bdb-ll.c b/xlators/storage/bdb/src/bdb-ll.c
index cd2d1ac49..59d431d82 100644
--- a/xlators/storage/bdb/src/bdb-ll.c
+++ b/xlators/storage/bdb/src/bdb-ll.c
@@ -20,6 +20,7 @@
#include <libgen.h>
#include "bdb.h"
#include <list.h>
+#include "hashfn.h"
/*
* implement the procedures to interact with bdb */
@@ -31,22 +32,41 @@
ino_t
bdb_inode_transform (ino_t parent,
- bctx_t *bctx)
+ const char *name,
+ size_t namelen)
{
- struct bdb_private *private = NULL;
ino_t ino = -1;
+ uint64_t hash = 0;
- GF_VALIDATE_OR_GOTO ("bdb-ll", bctx, out);
+ hash = gf_dm_hashfn (name, namelen);
- private = bctx->table->this->private;
+ ino = (((parent << 32) | 0x00000000ffffffff)
+ & (hash | 0xffffffff00000000));
- LOCK (&private->ino_lock);
- ino = ++private->next_ino;
- UNLOCK (&private->ino_lock);
-out:
return ino;
}
+static int
+bdb_generate_secondary_hash (DB *secondary,
+ const DBT *pkey,
+ const DBT *data,
+ DBT *skey)
+{
+ char *primary = NULL;
+ uint32_t *hash = NULL;
+
+ primary = pkey->data;
+
+ hash = calloc (1, sizeof (uint32_t));
+
+ *hash = gf_dm_hashfn (primary, pkey->size);
+
+ skey->data = hash;
+ skey->size = sizeof (hash);
+ skey->flags = DB_DBT_APPMALLOC;
+
+ return 0;
+}
/***********************************************************
*
@@ -63,13 +83,13 @@ out:
* if (no-empty-slots), then prune open dbs and close as many as possible
* if (empty-slot-available), tika muchkonDu db open maaDu
*
- * NOTE: illi baro munche lock hiDkobEku
*/
-static DB *
+static int
bdb_db_open (bctx_t *bctx)
{
- DB *storage_dbp = NULL;
- int32_t op_ret = -1;
+ DB *primary = NULL;
+ DB *secondary = NULL;
+ int32_t ret = -1;
bctx_table_t *table = NULL;
GF_VALIDATE_OR_GOTO ("bdb-ll", bctx, out);
@@ -78,51 +98,94 @@ bdb_db_open (bctx_t *bctx)
GF_VALIDATE_OR_GOTO ("bdb-ll", table, out);
/* we have to do the following, we can't deny someone of db_open ;) */
- op_ret = db_create (&storage_dbp, table->dbenv, 0);
- if (op_ret != 0) {
- gf_log ("bdb-ll", GF_LOG_ERROR,
- "failed to do db_create for directory %s (%s)",
- bctx->directory, db_strerror (op_ret));
- storage_dbp = NULL;
+ ret = db_create (&primary, table->dbenv, 0);
+ if (ret < 0) {
+ gf_log ("bdb-ll", GF_LOG_DEBUG,
+ "_BDB_DB_OPEN %s: %s (failed to create database object"
+ " for primary database)",
+ bctx->directory, db_strerror (ret));
+ ret = -ENOMEM;
goto out;
}
if (table->page_size) {
- op_ret = storage_dbp->set_pagesize (storage_dbp,
- table->page_size);
- if (op_ret != 0) {
- gf_log ("bdb-ll", GF_LOG_ERROR,
- "failed to set the page_size (%"PRIu64") for "
- "directory %s (%s)",
- table->page_size, bctx->directory,
- db_strerror (op_ret));
- } else {
+ ret = primary->set_pagesize (primary,
+ table->page_size);
+ if (ret < 0) {
gf_log ("bdb-ll", GF_LOG_DEBUG,
- "page-size (%"PRIu64") set on DB",
+ "_BDB_DB_OPEN %s: %s (failed to set page-size "
+ "to %"PRIu64")",
+ bctx->directory, db_strerror (ret),
table->page_size);
+ } else {
+ gf_log ("bdb-ll", GF_LOG_DEBUG,
+ "_BDB_DB_OPEN %s: page-size set to %"PRIu64,
+ bctx->directory, table->page_size);
}
}
- op_ret = storage_dbp->open (storage_dbp,
- NULL,
- bctx->db_path,
- NULL,
- table->access_mode,
- table->dbflags,
- 0);
- if (op_ret != 0 ) {
- gf_log ("bdb-ll",
- GF_LOG_ERROR,
- "failed to open storage-db for directory %s (%s)",
- bctx->db_path, db_strerror (op_ret));
- storage_dbp = NULL;
+ ret = primary->open (primary, NULL, bctx->db_path, "primary",
+ table->access_mode, table->dbflags, 0);
+ if (ret < 0) {
+ gf_log ("bdb-ll", GF_LOG_ERROR,
+ "_BDB_DB_OPEN %s: %s "
+ "(failed to open primary database)",
+ bctx->directory, db_strerror (ret));
+ ret = -1;
+ goto cleanup;
+ }
+
+ ret = db_create (&secondary, table->dbenv, 0);
+ if (ret < 0) {
+ gf_log ("bdb-ll", GF_LOG_DEBUG,
+ "_BDB_DB_OPEN %s: %s (failed to create database object"
+ " for secondary database)",
+ bctx->directory, db_strerror (ret));
+ ret = -ENOMEM;
+ goto cleanup;
+ }
+
+ ret = secondary->open (secondary, NULL, bctx->db_path, "secondary",
+ table->access_mode, table->dbflags, 0);
+ if (ret != 0 ) {
+ gf_log ("bdb-ll", GF_LOG_ERROR,
+ "_BDB_DB_OPEN %s: %s "
+ "(failed to open secondary database)",
+ bctx->directory, db_strerror (ret));
+ ret = -1;
+ goto cleanup;
+ }
+
+ ret = primary->associate (primary, NULL, secondary,
+ bdb_generate_secondary_hash,
+#ifdef DB_IMMUTABLE_KEY
+ DB_IMMUTABLE_KEY);
+#else
+ 0);
+#endif
+ if (ret != 0 ) {
+ gf_log ("bdb-ll", GF_LOG_ERROR,
+ "_BDB_DB_OPEN %s: %s "
+ "(failed to associate primary database with "
+ "secondary database)",
+ bctx->directory, db_strerror (ret));
+ ret = -1;
+ goto cleanup;
}
out:
- return storage_dbp;
-}
+ bctx->primary = primary;
+ bctx->secondary = secondary;
+ return ret;
+cleanup:
+ if (primary)
+ primary->close (primary, 0);
+ if (secondary)
+ secondary->close (secondary, 0);
+ return ret;
+}
int32_t
bdb_cursor_close (bctx_t *bctx,
@@ -140,10 +203,10 @@ bdb_cursor_close (bctx_t *bctx,
#else
ret = cursorp->c_close (cursorp);
#endif
- if ((ret != 0)) {
- gf_log ("bdb-ll", GF_LOG_ERROR,
- "failed to close db cursor for directory "
- "%s (%s)",
+ if (ret < 0) {
+ gf_log ("bdb-ll", GF_LOG_DEBUG,
+ "_BDB_CURSOR_CLOSE %s: %s "
+ "(failed to close database cursor)",
bctx->directory, db_strerror (ret));
}
}
@@ -165,27 +228,30 @@ bdb_cursor_open (bctx_t *bctx,
LOCK (&bctx->lock);
{
- if (bctx->dbp) {
+ if (bctx->secondary) {
/* do nothing, just continue */
ret = 0;
} else {
- bctx->dbp = bdb_db_open (bctx);
- if (!bctx->dbp) {
- gf_log ("bdb-ll", GF_LOG_ERROR,
- "failed to open storage db for %s",
+ ret = bdb_db_open (bctx);
+ if (ret < 0) {
+ gf_log ("bdb-ll", GF_LOG_DEBUG,
+ "_BDB_CURSOR_OPEN %s: ENOMEM "
+ "(failed to open secondary database)",
bctx->directory);
- ret = -1;
+ ret = -ENOMEM;
} else {
ret = 0;
}
}
if (ret == 0) {
- /* all set, lets open cursor */
- ret = bctx->dbp->cursor (bctx->dbp, NULL, cursorpp, 0);
- if (ret != 0) {
- gf_log ("bdb-ll", GF_LOG_ERROR,
- "failed to create a cursor for %s (%s)",
+ /* all set, open cursor */
+ ret = bctx->secondary->cursor (bctx->secondary,
+ NULL, cursorpp, 0);
+ if (ret < 0) {
+ gf_log ("bdb-ll", GF_LOG_DEBUG,
+ "_BDB_CURSOR_OPEN %s: %s "
+ "(failed to open a cursor to database)",
bctx->directory, db_strerror (ret));
}
}
@@ -245,27 +311,37 @@ bdb_cache_insert (bctx_t *bctx,
/* FIXME: ugly, not supposed to disect any of the
* 'struct list_head' directly */
if (!list_empty (&bctx->c_list)) {
- bcache = list_entry (bctx->c_list.prev, bdb_cache_t, c_list);
+ bcache = list_entry (bctx->c_list.prev,
+ bdb_cache_t, c_list);
list_del_init (&bcache->c_list);
}
if (bcache->key) {
free (bcache->key);
- bcache->key = strdup ((char *)key->data);
- GF_VALIDATE_OR_GOTO ("bdb-ll", bcache->key, unlock);
+ bcache->key = calloc (key->size + 1,
+ sizeof (char));
+ GF_VALIDATE_OR_GOTO ("bdb-ll",
+ bcache->key, unlock);
+ memcpy (bcache->key, (char *)key->data,
+ key->size);
} else {
/* should never come here */
- gf_log ("bdb-ll", GF_LOG_CRITICAL,
- "bcache->key (null)");
+ gf_log ("bdb-ll", GF_LOG_DEBUG,
+ "_BDB_CACHE_INSERT %s (%s) "
+ "(found a cache entry with empty key)",
+ bctx->directory, (char *)key->data);
} /* if(bcache->key)...else */
if (bcache->data) {
free (bcache->data);
bcache->data = memdup (data->data, data->size);
- GF_VALIDATE_OR_GOTO ("bdb-ll", bcache->data, unlock);
+ GF_VALIDATE_OR_GOTO ("bdb-ll", bcache->data,
+ unlock);
bcache->size = data->size;
} else {
/* should never come here */
gf_log ("bdb-ll", GF_LOG_CRITICAL,
- "bcache->data (null)");
+ "_BDB_CACHE_INSERT %s (%s) "
+ "(found a cache entry with no data)",
+ bctx->directory, (char *)key->data);
} /* if(bcache->data)...else */
list_add (&bcache->c_list, &bctx->c_list);
ret = 0;
@@ -273,10 +349,14 @@ bdb_cache_insert (bctx_t *bctx,
/* we will be entering here very rarely */
bcache = CALLOC (1, sizeof (*bcache));
GF_VALIDATE_OR_GOTO ("bdb-ll", bcache, unlock);
- bcache->key = strdup ((char *)(key->data));
+
+ bcache->key = calloc (key->size + 1, sizeof (char));
GF_VALIDATE_OR_GOTO ("bdb-ll", bcache->key, unlock);
+ memcpy (bcache->key, key->data, key->size);
+
bcache->data = memdup (data->data, data->size);
GF_VALIDATE_OR_GOTO ("bdb-ll", bcache->data, unlock);
+
bcache->size = data->size;
list_add (&bcache->c_list, &bctx->c_list);
bctx->c_count++;
@@ -291,7 +371,7 @@ out:
static int32_t
bdb_cache_delete (bctx_t *bctx,
- char *key)
+ const char *key)
{
bdb_cache_t *bcache = NULL;
bdb_cache_t *trav = NULL;
@@ -333,12 +413,12 @@ bdb_db_stat (bctx_t *bctx,
LOCK (&bctx->lock);
{
- if (bctx->dbp == NULL) {
- bctx->dbp = bdb_db_open (bctx);
- storage = bctx->dbp;
+ if (bctx->primary == NULL) {
+ ret = bdb_db_open (bctx);
+ storage = bctx->primary;
} else {
/* we are just fine, lets continue */
- storage = bctx->dbp;
+ storage = bctx->primary;
} /* if(bctx->dbp==NULL)...else */
}
UNLOCK (&bctx->lock);
@@ -347,46 +427,48 @@ bdb_db_stat (bctx_t *bctx,
ret = storage->stat (storage, txnid, &stat, flags);
- if (ret != 0) {
- gf_log ("bdb-ll", GF_LOG_ERROR,
- "failed to do DB->stat() on db file %s: %s",
- bctx->db_path, db_strerror (ret));
- } else {
+ if (ret < 0) {
gf_log ("bdb-ll", GF_LOG_DEBUG,
- "successfully called DB->stat() on db file %s",
- bctx->db_path);
+ "_BDB_DB_STAT %s: %s "
+ "(failed to do stat database)",
+ bctx->directory, db_strerror (ret));
}
out:
return stat;
}
-/* bdb_storage_get - retrieve a key/value pair corresponding to @path from the corresponding
- * db file.
+/* bdb_storage_get - retrieve a key/value pair corresponding to @path from the
+ * corresponding db file.
*
- * @bctx: bctx_t * corresponding to the parent directory of @path. (should always be a valid
- * bctx). bdb_storage_get should never be called if @bctx = NULL.
- * @txnid: NULL if bdb_storage_get is not embedded in an explicit transaction or a valid
- * DB_TXN *, when embedded in an explicit transaction.
- * @path: path of the file to read from (translated to a database key using MAKE_KEY_FROM_PATH)
- * @buf: char ** - pointer to a pointer to char. a read buffer is created in this procedure
- * and pointer to the buffer is passed through @buf to the caller.
+ * @bctx: bctx_t * corresponding to the parent directory of @path. (should
+ * always be a valid bctx). bdb_storage_get should never be called if
+ * @bctx = NULL.
+ * @txnid: NULL if bdb_storage_get is not embedded in an explicit transaction
+ * or a valid DB_TXN *, when embedded in an explicit transaction.
+ * @path: path of the file to read from (translated to a database key using
+ * MAKE_KEY_FROM_PATH)
+ * @buf: char ** - pointer to a pointer to char. a read buffer is created in
+ * this procedure and pointer to the buffer is passed through @buf to the
+ * caller.
* @size: size of the file content to be read.
* @offset: offset from which the file content to be read.
*
- * NOTE: bdb_storage_get tries to open DB, if @bctx->dbp == NULL (@bctx->dbp == NULL,
- * nobody has opened DB till now or DB was closed by bdb_table_prune()).
+ * NOTE: bdb_storage_get tries to open DB, if @bctx->dbp == NULL
+ * (@bctx->dbp == NULL, nobody has opened DB till now or DB was closed by
+ * bdb_table_prune()).
*
- * NOTE: if private->cache is set (bdb xlator's internal caching enabled), then bdb_storage_get
- * first looks up the cache for key/value pair. if bdb_lookup_cache fails, then only
- * DB->get() is called. also, inserts a newly read key/value pair to cache through
- * bdb_insert_to_cache.
+ * NOTE: if private->cache is set (bdb xlator's internal caching enabled), then
+ * bdb_storage_get first looks up the cache for key/value pair. if
+ * bdb_lookup_cache fails, then only DB->get() is called. also, inserts a
+ * newly read key/value pair to cache through bdb_insert_to_cache.
*
* return: 'number of bytes read' on success or -1 on error.
*
- * also see: bdb_lookup_cache, bdb_insert_to_cache for details about bdb xlator's internal cache.
+ * also see: bdb_lookup_cache, bdb_insert_to_cache for details about bdb
+ * xlator's internal cache.
*/
-int32_t
+static int32_t
bdb_db_get (bctx_t *bctx,
DB_TXN *txnid,
const char *path,
@@ -420,12 +502,12 @@ bdb_db_get (bctx_t *bctx,
} else {
LOCK (&bctx->lock);
{
- if (bctx->dbp == NULL) {
- bctx->dbp = bdb_db_open (bctx);
- storage = bctx->dbp;
+ if (bctx->primary == NULL) {
+ ret = bdb_db_open (bctx);
+ storage = bctx->primary;
} else {
/* we are just fine, lets continue */
- storage = bctx->dbp;
+ storage = bctx->primary;
} /* if(bctx->dbp==NULL)...else */
}
UNLOCK (&bctx->lock);
@@ -457,22 +539,25 @@ bdb_db_get (bctx_t *bctx,
if (ret == DB_NOTFOUND) {
gf_log ("bdb-ll", GF_LOG_DEBUG,
- "failed to do DB->get() for key: %s."
- " key not found in storage DB",
- key_string);
+ "_BDB_DB_GET %s - %s: ENOENT"
+ "(specified key not found in database)",
+ bctx->directory, key_string);
ret = -1;
need_break = 1;
} else if (ret == DB_LOCK_DEADLOCK) {
retries++;
- gf_log ("bdb-ll", GF_LOG_ERROR,
- "deadlock detected in DB->put. retrying"
- " DB->put (%d)", retries);
- }else if (ret == 0) {
+ gf_log ("bdb-ll", GF_LOG_DEBUG,
+ "_BDB_DB_GET %s - %s"
+ "(deadlock detected, retrying for %d "
+ "time)",
+ bctx->directory, key_string, retries);
+ } else if (ret == 0) {
/* successfully read data, lets set everything
* in place and return */
if (buf) {
*buf = CALLOC (1, value.size);
- ERR_ABORT (*buf);
+ GF_VALIDATE_OR_GOTO ("bdb-ll",
+ *buf, out);
memcpy (*buf, value.data, value.size);
}
ret = value.size;
@@ -481,10 +566,12 @@ bdb_db_get (bctx_t *bctx,
free (value.data);
need_break = 1;
} else {
- gf_log ("bdb-ll",
- GF_LOG_ERROR,
- "failed to do DB->get() for key %s: %s",
- key_string, db_strerror (ret));
+ gf_log ("bdb-ll", GF_LOG_DEBUG,
+ "_BDB_DB_GET %s - %s: %s"
+ "(failed to retrieve specified key from"
+ " database)",
+ bctx->directory, key_string,
+ db_strerror (ret));
ret = -1;
need_break = 1;
}
@@ -494,6 +581,19 @@ out:
return ret;
}/* bdb_db_get */
+/* TODO: handle errors here and log. propogate only the errno to caller */
+int32_t
+bdb_db_fread (struct bdb_fd *bfd, char **buf, size_t size, off_t offset)
+{
+ return bdb_db_get (bfd->ctx, NULL, bfd->key, buf, size, offset);
+}
+
+int32_t
+bdb_db_iread (struct bdb_ctx *bctx, const char *key, char **buf)
+{
+ return bdb_db_get (bctx, NULL, key, buf, 0, 0);
+}
+
/* bdb_storage_put - insert a key/value specified to the corresponding DB.
*
* @bctx: bctx_t * corresponding to the parent directory of @path.
@@ -519,7 +619,7 @@ out:
* also see: bdb_cache_delete for details on how a cached key/value pair is
* removed.
*/
-int32_t
+static int32_t
bdb_db_put (bctx_t *bctx,
DB_TXN *txnid,
const char *key_string,
@@ -537,12 +637,12 @@ bdb_db_put (bctx_t *bctx,
LOCK (&bctx->lock);
{
- if (bctx->dbp == NULL) {
- bctx->dbp = bdb_db_open (bctx);
- storage = bctx->dbp;
+ if (bctx->primary == NULL) {
+ ret = bdb_db_open (bctx);
+ storage = bctx->primary;
} else {
/* we are just fine, lets continue */
- storage = bctx->dbp;
+ storage = bctx->primary;
}
}
UNLOCK (&bctx->lock);
@@ -582,15 +682,16 @@ bdb_db_put (bctx_t *bctx,
ret = storage->put (storage, txnid, &key, &value, db_flags);
if (ret == DB_LOCK_DEADLOCK) {
retries++;
- gf_log ("bdb-ll", GF_LOG_ERROR,
- "deadlock detected in DB->put. "
- "retrying DB->put (%d)",
- retries);
+ gf_log ("bdb-ll", GF_LOG_DEBUG,
+ "_BDB_DB_PUT %s - %s"
+ "(deadlock detected, retying for %d time)",
+ bctx->directory, key_string, retries);
} else if (ret) {
/* write failed */
- gf_log ("bdb-ll", GF_LOG_ERROR,
- "failed to do DB->put() for key %s: %s",
- key_string, db_strerror (ret));
+ gf_log ("bdb-ll", GF_LOG_DEBUG,
+ "_BDB_DB_PUT %s - %s: %s"
+ "(failed to put specified entry into database)",
+ bctx->directory, key_string, db_strerror (ret));
need_break = 1;
} else {
/* successfully wrote */
@@ -602,44 +703,68 @@ out:
return ret;
}/* bdb_db_put */
+int32_t
+bdb_db_icreate (struct bdb_ctx *bctx, const char *key)
+{
+ return bdb_db_put (bctx, NULL, key, NULL, 0, 0, 0);
+}
+
+/* TODO: handle errors here and log. propogate only the errno to caller */
+int32_t
+bdb_db_fwrite (struct bdb_fd *bfd, char *buf, size_t size, off_t offset)
+{
+ return bdb_db_put (bfd->ctx, NULL, bfd->key, buf, size, offset, 0);
+}
+
+/* TODO: handle errors here and log. propogate only the errno to caller */
+int32_t
+bdb_db_iwrite (struct bdb_ctx *bctx, const char *key, char *buf, size_t size)
+{
+ return bdb_db_put (bctx, NULL, key, buf, size, 0, 0);
+}
+
+int32_t
+bdb_db_itruncate (struct bdb_ctx *bctx, const char *key)
+{
+ return bdb_db_put (bctx, NULL, key, NULL, 0, 1, 0);
+}
-/* bdb_storage_del - delete a key/value pair corresponding to @path from corresponding db file.
+/* bdb_storage_del - delete a key/value pair corresponding to @path from
+ * corresponding db file.
*
* @bctx: bctx_t * corresponding to the parent directory of @path.
* (should always be a valid bctx). bdb_storage_del should never be called
* if @bctx = NULL.
- * @txnid: NULL if bdb_storage_del is not embedded in an explicit transaction or a
- * valid DB_TXN *, when embedded in an explicit transaction.
+ * @txnid: NULL if bdb_storage_del is not embedded in an explicit transaction
+ * or a valid DB_TXN *, when embedded in an explicit transaction.
* @path: path to the file, whose key/value pair has to be deleted.
*
- * NOTE: bdb_storage_del tries to open DB, if @bctx->dbp == NULL (@bctx->dbp == NULL,
- * nobody has opened DB till now or DB was closed by bdb_table_prune()).
+ * NOTE: bdb_storage_del tries to open DB, if @bctx->dbp == NULL
+ * (@bctx->dbp == NULL, nobody has opened DB till now or DB was closed by
+ * bdb_table_prune()).
*
* return: 0 on success or -1 on error.
*/
-int32_t
+static int32_t
bdb_db_del (bctx_t *bctx,
DB_TXN *txnid,
- const char *path)
+ const char *key_string)
{
DB *storage = NULL;
DBT key = {0,};
- char *key_string = NULL;
int32_t ret = -1;
int32_t db_flags = 0;
uint8_t need_break = 0;
int32_t retries = 1;
- MAKE_KEY_FROM_PATH (key_string, path);
-
LOCK (&bctx->lock);
{
- if (bctx->dbp == NULL) {
- bctx->dbp = bdb_db_open (bctx);
- storage = bctx->dbp;
+ if (bctx->primary == NULL) {
+ ret = bdb_db_open (bctx);
+ storage = bctx->primary;
} else {
/* we are just fine, lets continue */
- storage = bctx->dbp;
+ storage = bctx->primary;
}
}
UNLOCK (&bctx->lock);
@@ -649,7 +774,7 @@ bdb_db_del (bctx_t *bctx,
ret = bdb_cache_delete (bctx, key_string);
GF_VALIDATE_OR_GOTO ("bdb-ll", (ret == 0), out);
- key.data = key_string;
+ key.data = (char *)key_string;
key.size = strlen (key_string);
key.flags = DB_DBT_USERMEM;
@@ -658,26 +783,30 @@ bdb_db_del (bctx_t *bctx,
if (ret == DB_NOTFOUND) {
gf_log ("bdb-ll", GF_LOG_DEBUG,
- "failed to delete %s from storage db, "
- "doesn't exist in storage DB",
- path);
+ "_BDB_DB_DEL %s - %s: ENOENT"
+ "(failed to delete entry, could not be "
+ "found in the database)",
+ bctx->directory, key_string);
need_break = 1;
} else if (ret == DB_LOCK_DEADLOCK) {
retries++;
- gf_log ("bdb-ll", GF_LOG_ERROR,
- "deadlock detected in DB->put. "
- "retrying DB->put (%d)",
- retries);
- }else if (ret == 0) {
+ gf_log ("bdb-ll", GF_LOG_DEBUG,
+ "_BDB_DB_DEL %s - %s"
+ "(deadlock detected, retying for %d time)",
+ bctx->directory, key_string, retries);
+ } else if (ret == 0) {
/* successfully deleted the entry */
gf_log ("bdb-ll", GF_LOG_DEBUG,
- "deleted %s from storage db", path);
+ "_BDB_DB_DEL %s - %s"
+ "(successfully deleted entry from database)",
+ bctx->directory, key_string);
ret = 0;
need_break = 1;
} else {
- gf_log ("bdb-ll", GF_LOG_ERROR,
- "failed to delete %s from storage db: %s",
- path, db_strerror (ret));
+ gf_log ("bdb-ll", GF_LOG_DEBUG,
+ "_BDB_DB_DEL %s - %s: %s"
+ "(failed to delete entry from database)",
+ bctx->directory, key_string, db_strerror (ret));
ret = -1;
need_break = 1;
}
@@ -686,11 +815,18 @@ out:
return ret;
}
+int32_t
+bdb_db_iremove (bctx_t *bctx,
+ const char *key)
+{
+ return bdb_db_del (bctx, NULL, key);
+}
+
/* NOTE: bdb version compatibility wrapper */
int32_t
bdb_cursor_get (DBC *cursorp,
- DBT *key,
- DBT *value,
+ DBT *sec, DBT *pri,
+ DBT *val,
int32_t flags)
{
int32_t ret = -1;
@@ -698,21 +834,21 @@ bdb_cursor_get (DBC *cursorp,
GF_VALIDATE_OR_GOTO ("bdb-ll", cursorp, out);
#ifdef HAVE_BDB_CURSOR_GET
- ret = cursorp->get (cursorp, key, value, flags);
+ ret = cursorp->pget (cursorp, sec, pri, val, flags);
#else
- ret = cursorp->c_get (cursorp, key, value, flags);
+ ret = cursorp->c_pget (cursorp, sec, pri, val, flags);
#endif
if ((ret != 0) && (ret != DB_NOTFOUND)) {
- gf_log ("bdb-ll", GF_LOG_ERROR,
- "failed to CURSOR->get() for key %s (%s)",
- (char *)key->data, db_strerror (ret));
+ gf_log ("bdb-ll", GF_LOG_DEBUG,
+ "_BDB_CURSOR_GET: %s"
+ "(failed to retrieve entry from database cursor)",
+ db_strerror (ret));
}
out:
return ret;
}/* bdb_cursor_get */
-
int32_t
bdb_dirent_size (DBT *key)
{
@@ -720,29 +856,6 @@ bdb_dirent_size (DBT *key)
}
-/* bdb_extract_bfd - translate a fd_t to a bfd (either a 'struct bdb_bfd' or 'struct bdb_dir')
- *
- * @fd->ctx is with bdb specific file handle during a successful bdb_open (also bdb_create)
- * or bdb_opendir.
- *
- * return: 'struct bdb_bfd *' or 'struct bdb_dir *' on success, or NULL on failure.
- */
-inline void *
-bdb_extract_bfd (fd_t *fd,
- xlator_t *this)
-{
- uint64_t tmp_bfd = 0;
- void *bfd = NULL;
-
- GF_VALIDATE_OR_GOTO ("bdb-ll", fd, out);
- GF_VALIDATE_OR_GOTO ("bdb-ll", this, out);
-
- fd_ctx_get (fd, this, &tmp_bfd);
- bfd = (void *)(long)bfd;
-
-out:
- return bfd;
-}
/* bdb_dbenv_init - initialize DB_ENV
*
@@ -751,10 +864,10 @@ out:
* NOTE: see private->envflags for flags used.
* 2. DB_ENV->set_lg_dir - set log directory to be used for storing log files
* (log files are the files in which transaction logs are written by db).
- * 3. DB_ENV->set_flags (DB_LOG_AUTOREMOVE) - set DB_ENV to automatically clear
- * the unwanted log files (flushed at each checkpoint).
- * 4. DB_ENV->set_errfile - set errfile to be used by db to report detailed error logs.
- * used only for debbuging purpose.
+ * 3. DB_ENV->set_flags (DB_LOG_AUTOREMOVE) - set DB_ENV to automatically
+ * clear the unwanted log files (flushed at each checkpoint).
+ * 4. DB_ENV->set_errfile - set errfile to be used by db to report detailed
+ * error logs. used only for debbuging purpose.
*
* return: returns a valid DB_ENV * on success or NULL on error.
*
@@ -769,55 +882,49 @@ bdb_dbenv_init (xlator_t *this,
bdb_private_t *private = NULL;
int32_t fatal_flags = 0;
- VALIDATE_OR_GOTO (this, out);
- VALIDATE_OR_GOTO (directory, out);
+ VALIDATE_OR_GOTO (this, err);
+ VALIDATE_OR_GOTO (directory, err);
private = this->private;
- VALIDATE_OR_GOTO (private, out);
+ VALIDATE_OR_GOTO (private, err);
ret = db_env_create (&dbenv, 0);
- VALIDATE_OR_GOTO ((ret == 0), out);
+ VALIDATE_OR_GOTO ((ret == 0), err);
/* NOTE: set_errpfx returns 'void' */
dbenv->set_errpfx(dbenv, this->name);
ret = dbenv->set_lk_detect (dbenv, DB_LOCK_DEFAULT);
- VALIDATE_OR_GOTO ((ret == 0), out);
+ VALIDATE_OR_GOTO ((ret == 0), err);
ret = dbenv->open(dbenv, directory,
private->envflags,
S_IRUSR | S_IWUSR);
if ((ret != 0) && (ret != DB_RUNRECOVERY)) {
gf_log (this->name, GF_LOG_CRITICAL,
- "failed to open DB environment (%s)",
- db_strerror (ret));
+ "failed to join Berkeley DB environment at %s: %s."
+ "please run manual recovery and retry running "
+ "glusterfs",
+ directory, db_strerror (ret));
dbenv = NULL;
- goto out;
+ goto err;
} else if (ret == DB_RUNRECOVERY) {
fatal_flags = ((private->envflags & (~DB_RECOVER))
| DB_RECOVER_FATAL);
ret = dbenv->open(dbenv, directory, fatal_flags,
S_IRUSR | S_IWUSR);
if (ret != 0) {
- gf_log (this->name, GF_LOG_ERROR,
- "failed to open DB environment (%s) with "
- "DB_REOVER_FATAL",
- db_strerror (ret));
+ gf_log (this->name, GF_LOG_CRITICAL,
+ "failed to join Berkeley DB environment in "
+ "recovery mode at %s: %s. please run manual "
+ "recovery and retry running glusterfs",
+ directory, db_strerror (ret));
dbenv = NULL;
- goto out;
- } else {
- gf_log (this->name, GF_LOG_WARNING,
- "opened DB environment after DB_RECOVER_FATAL:"
- " %s", db_strerror (ret));
+ goto err;
}
- } else {
- gf_log (this->name, GF_LOG_DEBUG,
- "DB environment successfull opened: %s",
- db_strerror (ret));
}
-
-
+ ret = 0;
#if (DB_VERSION_MAJOR == 4 && \
DB_VERSION_MINOR == 7)
if (private->log_auto_remove) {
@@ -832,41 +939,42 @@ bdb_dbenv_init (xlator_t *this,
ret = dbenv->set_flags (dbenv, DB_LOG_AUTOREMOVE, 0);
}
#endif
- if (ret != 0) {
- gf_log ("bctx", GF_LOG_ERROR,
- "failed to set DB_LOG_AUTOREMOVE on dbenv: %s",
+ if (ret < 0) {
+ gf_log ("bdb-ll", GF_LOG_ERROR,
+ "autoremoval of transactional log files could not be "
+ "configured (%s). you may have to do a manual "
+ "monitoring of transactional log files and remove "
+ "periodically.",
db_strerror (ret));
- } else {
- gf_log ("bctx", GF_LOG_DEBUG,
- "DB_LOG_AUTOREMOVE set on dbenv");
+ goto err;
}
if (private->transaction) {
ret = dbenv->set_flags(dbenv, DB_AUTO_COMMIT, 1);
if (ret != 0) {
- gf_log ("bctx", GF_LOG_ERROR,
- "failed to set DB_AUTO_COMMIT on dbenv: %s",
+ gf_log ("bdb-ll", GF_LOG_DEBUG,
+ "configuration of auto-commit failed for "
+ "database environment at %s. none of the "
+ "operations will be embedded in transaction "
+ "unless explicitly done so.",
db_strerror (ret));
- } else {
- gf_log ("bctx", GF_LOG_DEBUG,
- "DB_AUTO_COMMIT set on dbenv");
+ goto err;
}
if (private->txn_timeout) {
- ret = dbenv->set_timeout (dbenv,
- private->txn_timeout,
+ ret = dbenv->set_timeout (dbenv, private->txn_timeout,
DB_SET_TXN_TIMEOUT);
if (ret != 0) {
- gf_log ("bctx", GF_LOG_ERROR,
- "failed to set TXN_TIMEOUT to %d "
- "milliseconds on dbenv: %s",
+ gf_log ("bdb-ll", GF_LOG_ERROR,
+ "could not configure Berkeley DB "
+ "transaction timeout to %d (%s). please"
+ " review 'option transaction-timeout %d"
+ "' option.",
private->txn_timeout,
- db_strerror (ret));
- } else {
- gf_log ("bctx", GF_LOG_DEBUG,
- "TXN_TIMEOUT set to %d milliseconds",
+ db_strerror (ret),
private->txn_timeout);
+ goto err;
}
}
@@ -874,32 +982,28 @@ bdb_dbenv_init (xlator_t *this,
ret = dbenv->set_timeout(dbenv,
private->txn_timeout,
DB_SET_LOCK_TIMEOUT);
-
- if (ret != 0) {
- gf_log ("bctx", GF_LOG_ERROR,
- "failed to set LOCK_TIMEOUT to %d "
- "milliseconds on dbenv: %s",
+ if (ret < 0) {
+ gf_log ("bdb-ll", GF_LOG_ERROR,
+ "could not configure Berkeley DB "
+ "lock timeout to %d (%s). please"
+ " review 'option lock-timeout %d"
+ "' option.",
private->lock_timeout,
- db_strerror (ret));
- } else {
- gf_log ("bctx", GF_LOG_DEBUG,
- "LOCK_TIMEOUT set to %d milliseconds",
+ db_strerror (ret),
private->lock_timeout);
+ goto err;
}
}
ret = dbenv->set_lg_dir (dbenv, private->logdir);
-
- if (ret != 0) {
- gf_log ("bctx", GF_LOG_ERROR,
- "failed to set log directory for dbenv: %s",
- db_strerror (ret));
- } else {
- gf_log ("bctx", GF_LOG_DEBUG,
- "set dbenv log dir to %s",
- private->logdir);
+ if (ret < 0) {
+ gf_log ("bdb-ll", GF_LOG_ERROR,
+ "failed to configure libdb transaction log "
+ "directory at %s. please review the "
+ "'option logdir %s' option.",
+ db_strerror (ret), private->logdir);
+ goto err;
}
-
}
if (private->errfile) {
@@ -907,41 +1011,52 @@ bdb_dbenv_init (xlator_t *this,
if (private->errfp) {
dbenv->set_errfile (dbenv, private->errfp);
} else {
- gf_log ("bctx", GF_LOG_ERROR,
- "failed to open errfile: %s",
- strerror (errno));
+ gf_log ("bdb-ll", GF_LOG_ERROR,
+ "failed to open error logging file for "
+ "libdb (Berkeley DB) internal logging (%s)."
+ "please review the 'option errfile %s' option.",
+ strerror (errno), private->errfile);
+ goto err;
}
}
-out:
return dbenv;
+err:
+ if (dbenv) {
+ dbenv->close (dbenv, 0);
+ }
+
+ return NULL;
}
#define BDB_ENV(this) ((((struct bdb_private *)this->private)->b_table)->dbenv)
-/* bdb_checkpoint - during transactional usage, db does not directly write the data to db
- * files, instead db writes a 'log' (similar to a journal entry) into a
- * log file. db normally clears the log files during opening of an
- * environment. since we expect a filesystem server to run for a pretty
- * long duration and flushing 'log's during dbenv->open would prove very
- * costly, if we accumulate the log entries for one complete run of
- * glusterfs server. to flush the logs frequently, db provides a mechanism
- * called 'checkpointing'. when we do a checkpoint, db flushes the logs to
- * disk (writes changes to db files) and we can also clear the accumulated
- * log files after checkpointing. NOTE: removing unwanted log files is not
- * part of dbenv->txn_checkpoint() call.
+/* bdb_checkpoint - during transactional usage, db does not directly write the
+ * data to db files, instead db writes a 'log' (similar to a journal entry)
+ * into a log file. db normally clears the log files during opening of an
+ * environment. since we expect a filesystem server to run for a pretty long
+ * duration and flushing 'log's during dbenv->open would prove very costly, if
+ * we accumulate the log entries for one complete run of glusterfs server. to
+ * flush the logs frequently, db provides a mechanism called 'checkpointing'.
+ * when we do a checkpoint, db flushes the logs to disk (writes changes to db
+ * files) and we can also clear the accumulated log files after checkpointing.
+ * NOTE: removing unwanted log files is not part of dbenv->txn_checkpoint()
+ * call.
*
* @data: xlator_t of the current instance of bdb xlator.
*
- * bdb_checkpoint is called in a different thread from the main glusterfs thread. bdb
- * xlator creates the checkpoint thread after successfully opening the db environment.
- * NOTE: bdb_checkpoint thread shares the DB_ENV handle with the filesystem thread.
+ * bdb_checkpoint is called in a different thread from the main glusterfs
+ * thread. bdb xlator creates the checkpoint thread after successfully opening
+ * the db environment.
+ * NOTE: bdb_checkpoint thread shares the DB_ENV handle with the filesystem
+ * thread.
*
* db environment checkpointing frequency is controlled by
* 'option checkpoint-timeout <time-in-seconds>' in volfile.
*
- * NOTE: checkpointing thread is started only if 'option transaction on' specified in
- * volfile. checkpointing is not valid for non-transactional environments.
+ * NOTE: checkpointing thread is started only if 'option transaction on'
+ * specified in volfile. checkpointing is not valid for non-transactional
+ * environments.
*
*/
static void *
@@ -965,23 +1080,29 @@ bdb_checkpoint (void *data)
if (active) {
ret = dbenv->txn_checkpoint (dbenv, 1024, 0, 0);
if (ret) {
- gf_log ("bctx", GF_LOG_ERROR,
- "failed to checkpoint environment: %s",
+ gf_log ("bdb-ll", GF_LOG_DEBUG,
+ "_BDB_CHECKPOINT: %s"
+ "(failed to checkpoint environment)",
db_strerror (ret));
} else {
- gf_log ("bctx", GF_LOG_DEBUG,
- "checkpointing successful");
+ gf_log ("bdb-ll", GF_LOG_DEBUG,
+ "_BDB_CHECKPOINT: successfully "
+ "checkpointed");
}
} else {
ret = dbenv->txn_checkpoint (dbenv, 1024, 0, 0);
if (ret) {
- gf_log ("bctx", GF_LOG_ERROR,
- "failed to do final checkpoint "
- "environment: %s",
+ gf_log ("bdb-ll", GF_LOG_ERROR,
+ "_BDB_CHECKPOINT: %s"
+ "(final checkpointing failed. might "
+ "need to run recovery tool manually on "
+ "next usage of this database "
+ "environment)",
db_strerror (ret));
} else {
- gf_log ("bctx", GF_LOG_DEBUG,
- "final checkpointing successful");
+ gf_log ("bdb-ll", GF_LOG_DEBUG,
+ "_BDB_CHECKPOINT: final successfully "
+ "checkpointed");
}
break;
}
@@ -990,449 +1111,321 @@ bdb_checkpoint (void *data)
return NULL;
}
-static inline void
-bdb_cache_init (xlator_t *this,
- dict_t *options,
- struct bdb_private *private)
-{
- /* cache is always on */
- private->cache = ON;
-}
-
-static inline void
-bdb_log_remove_init (xlator_t *this,
- dict_t *options,
- struct bdb_private *private)
-{
- private->log_auto_remove = 1;
- gf_log (this->name, GF_LOG_DEBUG,
- "DB_ENV will use DB_LOG_AUTO_REMOVE");
-}
-static inline void
-bdb_errfile_init (xlator_t *this,
- dict_t *options,
- struct bdb_private *private)
-{
- int ret = -1;
- char *errfile = NULL;
-
- ret = dict_get_str (options, "errfile", &errfile);
- if (ret == 0) {
- private->errfile = strdup (errfile);
- gf_log (this->name, GF_LOG_DEBUG,
- "using errfile: %s", private->errfile);
- }
-}
-
-static inline void
-bdb_table_init (xlator_t *this,
- dict_t *options,
- struct bdb_private *private)
+/* bdb_db_init - initialize bdb xlator
+ *
+ * reads the options from @options dictionary and sets appropriate values in
+ * @this->private. also initializes DB_ENV.
+ *
+ * return: 0 on success or -1 on error
+ * (with logging the error through gf_log()).
+ */
+int
+bdb_db_init (xlator_t *this,
+ dict_t *options)
{
- bctx_table_t *table = NULL;
- int32_t idx = 0;
-
- int ret = -1;
- char *lru_limit_str = NULL;
- char *page_size_str = NULL;
-
- table = CALLOC (1, sizeof (*table));
- if (table) {
- INIT_LIST_HEAD(&(table->b_lru));
- INIT_LIST_HEAD(&(table->active));
- INIT_LIST_HEAD(&(table->purge));
-
- LOCK_INIT (&table->lock);
- LOCK_INIT (&table->checkpoint_lock);
-
- table->transaction = private->transaction;
- table->access_mode = private->access_mode;
- table->dbflags = private->dbflags;
- table->this = this;
-
- {
- ret = dict_get_str (options, "lru-limit",
- &lru_limit_str);
-
- /* TODO: set max lockers and max txns to accomodate
- * for more than lru_limit */
- if (ret == 0) {
- ret = gf_string2uint32 (lru_limit_str,
- &table->lru_limit);
- gf_log ("bdb-ll", GF_LOG_DEBUG,
- "setting bctx lru limit to %d",
- table->lru_limit);
- } else {
- table->lru_limit = BDB_DEFAULT_LRU_LIMIT;
- }
- }
-
- {
- ret = dict_get_str (options, "page-size",
- &page_size_str);
-
- if (ret == 0) {
- ret = gf_string2bytesize (page_size_str,
- &table->page_size);
- if (ret != 0) {
- gf_log ("bdb-ll", GF_LOG_ERROR,
- "invalid number format \"%s\""
- " of \"option page-size\"",
- page_size_str);
- }
+ /* create a db entry for root */
+ int32_t op_ret = 0;
+ bdb_private_t *private = NULL;
+ bctx_table_t *table = NULL;
- if (!PAGE_SIZE_IN_RANGE(table->page_size)) {
- gf_log ("bdb-ll", GF_LOG_ERROR,
- "pagesize %s is out of range."
- "Allowed pagesize is between "
- "%d and %d",
- page_size_str,
- BDB_LL_PAGE_SIZE_MIN,
- BDB_LL_PAGE_SIZE_MAX);
- }
- }
- else {
- table->page_size = BDB_LL_PAGE_SIZE_DEFAULT;
- }
- gf_log ("bdb-ll",
- GF_LOG_DEBUG, "using page-size %"PRIu64,
- table->page_size);
- }
+ char *checkpoint_interval_str = NULL;
+ char *page_size_str = NULL;
+ char *lru_limit_str = NULL;
+ char *timeout_str = NULL;
+ char *access_mode = NULL;
+ char *endptr = NULL;
+ char *errfile = NULL;
+ char *directory = NULL;
+ char *logdir = NULL;
+ char *mode = NULL;
+ char *mode_str = NULL;
+ int ret = -1;
+ int idx = 0;
+ struct stat stbuf = {0,};
- table->hash_size = BDB_DEFAULT_HASH_SIZE;
- table->b_hash = CALLOC (BDB_DEFAULT_HASH_SIZE,
- sizeof (struct list_head));
+ private = this->private;
- for (idx = 0; idx < table->hash_size; idx++)
- INIT_LIST_HEAD(&(table->b_hash[idx]));
+ /* cache is always on */
+ private->cache = ON;
- private->b_table = table;
+ ret = dict_get_str (options, "access-mode", &access_mode);
+ if ((ret == 0)
+ && (!strcmp (access_mode, "btree"))) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "using BTREE access mode to access libdb "
+ "(Berkeley DB)");
+ private->access_mode = DB_BTREE;
} else {
- gf_log ("bdb-ll", GF_LOG_CRITICAL,
- "failed to allocate bctx table: out of memory");
+ gf_log (this->name, GF_LOG_DEBUG,
+ "using HASH access mode to access libdb (Berkeley DB)");
+ private->access_mode = DB_HASH;
}
-}
-
-static inline void
-bdb_directory_init (xlator_t *this,
- dict_t *options,
- struct bdb_private *private)
-{
- int ret = -1;
- char *directory = NULL;
- char *logdir = NULL;
- int32_t op_ret = -1;
- struct stat stbuf = {0};
- ret = dict_get_str (options, "directory", &directory);
+ ret = dict_get_str (options, "mode", &mode);
+ if ((ret == 0)
+ && (!strcmp (mode, "cache"))) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "cache data mode selected for 'storage/bdb'. filesystem"
+ " operations are not transactionally protected and "
+ "system crash does not guarantee recoverability of "
+ "data");
+ private->envflags = DB_CREATE | DB_INIT_LOG |
+ DB_INIT_MPOOL | DB_THREAD;
+ private->dbflags = DB_CREATE | DB_THREAD;
+ private->transaction = OFF;
+ } else {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "persistent data mode selected for 'storage/bdb'. each"
+ "filesystem operation is guaranteed to be Berkeley DB "
+ "transaction protected.");
+ private->transaction = ON;
+ private->envflags = DB_CREATE | DB_INIT_LOCK | DB_INIT_LOG |
+ DB_INIT_MPOOL | DB_INIT_TXN | DB_RECOVER | DB_THREAD;
+ private->dbflags = DB_CREATE | DB_THREAD;
- if (ret == 0) {
- ret = dict_get_str (options, "logdir", &logdir);
- if (ret != 0) {
- gf_log ("bdb-ll", GF_LOG_DEBUG,
- "using default logdir as database home");
- private->logdir = strdup (directory);
+ ret = dict_get_str (options, "lock-timeout", &timeout_str);
- } else {
- private->logdir = strdup (logdir);
- gf_log ("bdb-ll", GF_LOG_DEBUG,
- "using logdir: %s",
- private->logdir);
- umask (000);
- if (mkdir (private->logdir, 0777) == 0) {
- gf_log ("bdb-ll", GF_LOG_WARNING,
- "logdir specified (%s) not exists, "
- "created",
- private->logdir);
- }
-
- op_ret = stat (private->logdir, &stbuf);
- if ((op_ret != 0)
- || (!S_ISDIR (stbuf.st_mode))) {
- gf_log ("bdb-ll", GF_LOG_ERROR,
- "specified logdir doesn't exist, "
- "using default "
- "(environment home directory: %s)",
- directory);
- private->logdir = strdup (directory);
+ if (ret == 0) {
+ ret = gf_string2time (timeout_str,
+ &private->lock_timeout);
+
+ if (private->lock_timeout > 4260000) {
+ /* db allows us to DB_SET_LOCK_TIMEOUT to be
+ * set to a maximum of 71 mins
+ * (4260000 milliseconds) */
+ gf_log (this->name, GF_LOG_DEBUG,
+ "Berkeley DB lock-timeout parameter "
+ "(%d) is out of range. please specify"
+ " a valid timeout value for "
+ "lock-timeout and retry.",
+ private->lock_timeout);
+ goto err;
}
}
-
- private->b_table->dbenv = bdb_dbenv_init (this, directory);
-
- if (!private->b_table->dbenv) {
- gf_log ("bdb-ll", GF_LOG_ERROR,
- "failed to initialize db environment");
- FREE (private);
- op_ret = -1;
- } else {
- if (private->transaction) {
- /* all well, start the checkpointing thread */
- LOCK_INIT (&private->active_lock);
-
- LOCK (&private->active_lock);
- {
- private->active = 1;
- }
- UNLOCK (&private->active_lock);
- pthread_create (&private->checkpoint_thread,
- NULL, bdb_checkpoint, this);
+ ret = dict_get_str (options, "transaction-timeout",
+ &timeout_str);
+ if (ret == 0) {
+ ret = gf_string2time (timeout_str,
+ &private->txn_timeout);
+
+ if (private->txn_timeout > 4260000) {
+ /* db allows us to DB_SET_TXN_TIMEOUT to be set
+ * to a maximum of 71 mins
+ * (4260000 milliseconds) */
+ gf_log (this->name, GF_LOG_DEBUG,
+ "Berkeley DB lock-timeout parameter "
+ "(%d) is out of range. please specify"
+ " a valid timeout value for "
+ "lock-timeout and retry.",
+ private->lock_timeout);
+ goto err;
}
}
- }
-}
-
-static inline void
-bdb_dir_mode_init (xlator_t *this,
- dict_t *options,
- struct bdb_private *private)
-{
- int ret = -1;
- char *mode_str = NULL;
- char *endptr = NULL;
- ret = dict_get_str (options, "dir-mode", &mode_str);
-
- if (ret == 0) {
- private->dir_mode = strtol (mode_str, &endptr, 8);
- if ((*endptr) ||
- (!IS_VALID_FILE_MODE(private->dir_mode))) {
- gf_log (this->name, GF_LOG_DEBUG,
- "invalid dir-mode %o. setting to default %o",
- private->dir_mode,
- DEFAULT_DIR_MODE);
- private->dir_mode = DEFAULT_DIR_MODE;
- } else {
- gf_log (this->name, GF_LOG_DEBUG,
- "setting dir-mode to %o",
- private->dir_mode);
+ private->checkpoint_interval = BDB_DEFAULT_CHECKPOINT_INTERVAL;
+ ret = dict_get_str (options, "checkpoint-interval",
+ &checkpoint_interval_str);
+ if (ret == 0) {
+ ret = gf_string2time (checkpoint_interval_str,
+ &private->checkpoint_interval);
+
+ if (ret < 0) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "'%"PRIu32"' is not a valid parameter "
+ "for checkpoint-interval option. "
+ "please specify a valid "
+ "checkpoint-interval and retry",
+ private->checkpoint_interval);
+ goto err;
+ }
}
- } else {
- private->dir_mode = DEFAULT_DIR_MODE;
}
- private->dir_mode = private->dir_mode | S_IFDIR;
-}
-
-static inline void
-bdb_file_mode_init (xlator_t *this,
- dict_t *options,
- struct bdb_private *private)
-{
- int ret = -1;
- char *mode_str = NULL;
- char *endptr = NULL;
-
ret = dict_get_str (options, "file-mode", &mode_str);
-
if (ret == 0) {
private->file_mode = strtol (mode_str, &endptr, 8);
if ((*endptr) ||
(!IS_VALID_FILE_MODE(private->file_mode))) {
gf_log (this->name, GF_LOG_DEBUG,
- "invalid file-mode %o. setting to default %o",
- private->file_mode, DEFAULT_FILE_MODE);
- private->file_mode = DEFAULT_FILE_MODE;
- } else {
- gf_log (this->name, GF_LOG_DEBUG,
- "setting file-mode to %o",
+ "'%o' is not a valid parameter for file-mode "
+ "option. please specify a valid parameter for "
+ "file-mode and retry.",
private->file_mode);
- private->file_mode = private->file_mode;
+ goto err;
}
} else {
private->file_mode = DEFAULT_FILE_MODE;
}
-
private->symlink_mode = private->file_mode | S_IFLNK;
private->file_mode = private->file_mode | S_IFREG;
-}
-
-static inline void
-bdb_checkpoint_interval_init (xlator_t *this,
- dict_t *options,
- struct bdb_private *private)
-{
- int ret = -1;
- char *checkpoint_interval_str = NULL;
-
- private->checkpoint_interval = BDB_DEFAULT_CHECKPOINT_INTERVAL;
-
- ret = dict_get_str (options, "checkpoint-interval",
- &checkpoint_interval_str);
+ ret = dict_get_str (options, "dir-mode", &mode_str);
if (ret == 0) {
- ret = gf_string2time (checkpoint_interval_str,
- &private->checkpoint_interval);
-
- if (ret == 0) {
+ private->dir_mode = strtol (mode_str, &endptr, 8);
+ if ((*endptr) ||
+ (!IS_VALID_FILE_MODE(private->dir_mode))) {
gf_log (this->name, GF_LOG_DEBUG,
- "setting checkpoint-interval to %"PRIu32" seconds",
- private->checkpoint_interval);
+ "'%o' is not a valid parameter for dir-mode "
+ "option. please specify a valid parameter for "
+ "dir-mode and retry.",
+ private->dir_mode);
+ goto err;
}
} else {
- gf_log (this->name, GF_LOG_DEBUG,
- "setting checkpoint-interval to default: %"PRIu32" seconds",
- private->checkpoint_interval);
+ private->dir_mode = DEFAULT_DIR_MODE;
}
-}
-static inline void
-bdb_lock_timeout_init (xlator_t *this,
- dict_t *options,
- struct bdb_private *private)
-{
- int ret = -1;
- char *timeout_str = NULL;
+ private->dir_mode = private->dir_mode | S_IFDIR;
- ret = dict_get_str (options, "lock-timeout", &timeout_str);
+ table = CALLOC (1, sizeof (*table));
+ if (table == NULL) {
+ gf_log ("bdb-ll", GF_LOG_CRITICAL,
+ "memory allocation for 'storage/bdb' internal "
+ "context table failed.");
+ goto err;
+ }
- if (ret == 0) {
- ret = gf_string2time (timeout_str, &private->lock_timeout);
+ INIT_LIST_HEAD(&(table->b_lru));
+ INIT_LIST_HEAD(&(table->active));
+ INIT_LIST_HEAD(&(table->purge));
- if (private->lock_timeout > 4260000) {
- /* db allows us to DB_SET_LOCK_TIMEOUT to be set to a
- * maximum of 71 mins (4260000 milliseconds) */
- gf_log (this->name, GF_LOG_DEBUG,
- "lock-timeout %d, out of range",
- private->lock_timeout);
- private->lock_timeout = 0;
- } else {
- gf_log (this->name, GF_LOG_DEBUG,
- "setting lock-timeout to %d milliseconds",
- private->lock_timeout);
- }
- }
-}
+ LOCK_INIT (&table->lock);
+ LOCK_INIT (&table->checkpoint_lock);
-static inline void
-bdb_transaction_timeout_init (xlator_t *this,
- dict_t *options,
- struct bdb_private *private)
-{
- int ret = -1;
- char *timeout_str = NULL;
+ table->transaction = private->transaction;
+ table->access_mode = private->access_mode;
+ table->dbflags = private->dbflags;
+ table->this = this;
- ret = dict_get_str (options, "transaction-timeout", &timeout_str);
+ ret = dict_get_str (options, "lru-limit",
+ &lru_limit_str);
+ /* TODO: set max lockers and max txns to accomodate
+ * for more than lru_limit */
if (ret == 0) {
- ret = gf_string2time (timeout_str, &private->txn_timeout);
-
- if (private->txn_timeout > 4260000) {
- /* db allows us to DB_SET_TXN_TIMEOUT to be set to
- * a maximum of 71 mins (4260000 milliseconds) */
- gf_log (this->name, GF_LOG_DEBUG,
- "transaction-timeout %d, out of range",
- private->txn_timeout);
- private->txn_timeout = 0;
- } else {
- gf_log (this->name, GF_LOG_DEBUG,
- "setting transaction-timeout to %d "
- "milliseconds",
- private->txn_timeout);
- }
+ ret = gf_string2uint32 (lru_limit_str,
+ &table->lru_limit);
+ gf_log ("bdb-ll", GF_LOG_DEBUG,
+ "setting lru limit of 'storage/bdb' internal context"
+ "table to %d. maximum of %d unused databases can be "
+ "open at any given point of time.",
+ table->lru_limit, table->lru_limit);
+ } else {
+ table->lru_limit = BDB_DEFAULT_LRU_LIMIT;
}
-}
-static inline void
-bdb_transaction_init (xlator_t *this,
- dict_t *options,
- struct bdb_private *private)
-{
- int ret = -1;
- char *mode = NULL;
+ ret = dict_get_str (options, "page-size",
+ &page_size_str);
- ret = dict_get_str (options, "mode", &mode);
+ if (ret == 0) {
+ ret = gf_string2bytesize (page_size_str,
+ &table->page_size);
+ if (ret < 0) {
+ gf_log ("bdb-ll", GF_LOG_ERROR,
+ "\"%s\" is an invalid parameter to "
+ "\"option page-size\". please specify a valid "
+ "size and retry.",
+ page_size_str);
+ goto err;
+ }
- if ((ret == 0)
- && (!strcmp (mode, "cache"))) {
- gf_log (this->name, GF_LOG_DEBUG,
- "cache mode selected");
- private->envflags = DB_CREATE | DB_INIT_LOG |
- DB_INIT_MPOOL | DB_THREAD;
- private->dbflags = DB_CREATE | DB_THREAD;
- private->transaction = OFF;
+ if (!PAGE_SIZE_IN_RANGE(table->page_size)) {
+ gf_log ("bdb-ll", GF_LOG_ERROR,
+ "\"%s\" is out of range for Berkeley DB "
+ "page-size. allowed page-size range is %d to "
+ "%d. please specify a page-size value in the "
+ "range and retry.",
+ page_size_str, BDB_LL_PAGE_SIZE_MIN,
+ BDB_LL_PAGE_SIZE_MAX);
+ goto err;
+ }
} else {
- gf_log (this->name, GF_LOG_DEBUG,
- "persistant mode selected");
- private->transaction = ON;
- private->envflags = DB_CREATE | DB_INIT_LOCK | DB_INIT_LOG |
- DB_INIT_MPOOL | DB_INIT_TXN | DB_RECOVER | DB_THREAD;
- private->dbflags = DB_CREATE | DB_THREAD;
-
- bdb_lock_timeout_init (this, options, private);
-
- bdb_transaction_timeout_init (this, options, private);
-
- bdb_log_remove_init (this, options, private);
-
- bdb_checkpoint_interval_init (this, options, private);
+ table->page_size = BDB_LL_PAGE_SIZE_DEFAULT;
}
-}
-static inline void
-bdb_access_mode_init (xlator_t *this,
- dict_t *options,
- struct bdb_private *private)
-{
- int ret = -1;
- char *access_mode = NULL;
+ table->hash_size = BDB_DEFAULT_HASH_SIZE;
+ table->b_hash = CALLOC (BDB_DEFAULT_HASH_SIZE,
+ sizeof (struct list_head));
- ret = dict_get_str (options, "access-mode", &access_mode);
+ for (idx = 0; idx < table->hash_size; idx++)
+ INIT_LIST_HEAD(&(table->b_hash[idx]));
- if ((ret == 0)
- && (!strcmp (access_mode, "btree"))) {
- gf_log (this->name, GF_LOG_DEBUG,
- "using access mode BTREE");
- private->access_mode = DB_BTREE;
- } else {
+ private->b_table = table;
+
+ ret = dict_get_str (options, "errfile", &errfile);
+ if (ret == 0) {
+ private->errfile = strdup (errfile);
gf_log (this->name, GF_LOG_DEBUG,
- "using access mode HASH");
- private->access_mode = DB_HASH;
+ "using %s as error logging file for libdb (Berkeley DB "
+ "library) internal logging.", private->errfile);
}
-}
+ ret = dict_get_str (options, "directory", &directory);
-/* bdb_db_init - initialize bdb xlator
- *
- * reads the options from @options dictionary and sets appropriate values in
- * @this->private. also initializes DB_ENV.
- *
- * return: 0 on success or -1 on error
- * (with logging the error through gf_log()).
- */
-int
-bdb_db_init (xlator_t *this,
- dict_t *options)
-{
- /* create a db entry for root */
- int32_t op_ret = 0;
- bdb_private_t *private = NULL;
+ if (ret == 0) {
+ ret = dict_get_str (options, "logdir", &logdir);
- private = this->private;
+ if (ret < 0) {
+ gf_log ("bdb-ll", GF_LOG_DEBUG,
+ "using the database environment home "
+ "directory (%s) itself as transaction log "
+ "directory", directory);
+ private->logdir = strdup (directory);
- bdb_cache_init (this, options, private);
+ } else {
+ private->logdir = strdup (logdir);
- bdb_access_mode_init (this, options, private);
+ op_ret = stat (private->logdir, &stbuf);
+ if ((op_ret != 0)
+ || (!S_ISDIR (stbuf.st_mode))) {
+ gf_log ("bdb-ll", GF_LOG_ERROR,
+ "specified logdir %s does not exist. "
+ "please provide a valid existing "
+ "directory as parameter to 'option "
+ "logdir'",
+ private->logdir);
+ goto err;
+ }
+ }
- bdb_transaction_init (this, options, private);
+ private->b_table->dbenv = bdb_dbenv_init (this, directory);
+ if (private->b_table->dbenv == NULL) {
+ gf_log ("bdb-ll", GF_LOG_ERROR,
+ "initialization of database environment "
+ "failed");
+ goto err;
+ } else {
+ if (private->transaction) {
+ /* all well, start the checkpointing thread */
+ LOCK_INIT (&private->active_lock);
- {
- LOCK_INIT (&private->ino_lock);
- private->next_ino = 2;
+ LOCK (&private->active_lock);
+ {
+ private->active = 1;
+ }
+ UNLOCK (&private->active_lock);
+ pthread_create (&private->checkpoint_thread,
+ NULL, bdb_checkpoint, this);
+ }
+ }
}
- bdb_file_mode_init (this, options, private);
-
- bdb_dir_mode_init (this, options, private);
-
- bdb_table_init (this, options, private);
-
- bdb_errfile_init (this, options, private);
+ return op_ret;
+err:
+ if (table) {
+ FREE (table->b_hash);
+ FREE (table);
+ }
+ if (private) {
+ if (private->errfile)
+ FREE (private->errfile);
- bdb_directory_init (this, options, private);
+ if (private->logdir)
+ FREE (private->logdir);
+ }
- return op_ret;
+ return -1;
}
diff --git a/xlators/storage/bdb/src/bdb.c b/xlators/storage/bdb/src/bdb.c
index a3c6c44ea..85f08ea9a 100644
--- a/xlators/storage/bdb/src/bdb.c
+++ b/xlators/storage/bdb/src/bdb.c
@@ -82,49 +82,57 @@ bdb_mknod (call_frame_t *frame,
if (!S_ISREG(mode)) {
gf_log (this->name, GF_LOG_DEBUG,
- "mknod for non-regular file");
+ "MKNOD %"PRId64"/%s (%s): EPERM"
+ "(mknod supported only for regular files. "
+ "file mode '%o' not supported)",
+ loc->parent->ino, loc->name, loc->path, mode);
op_ret = -1;
op_errno = EPERM;
goto out;
} /* if(!S_ISREG(mode)) */
bctx = bctx_parent (B_TABLE(this), loc->path);
-
if (bctx == NULL) {
- gf_log (this->name, GF_LOG_ERROR,
- "failed to get bctx for path: %s",
- loc->path);
- op_ret = -1;
- op_errno = ENOENT;
+ gf_log (this->name, GF_LOG_DEBUG,
+ "MKNOD %"PRId64"/%s (%s): ENOMEM"
+ "(failed to lookup database handle)",
+ loc->parent->ino, loc->name, loc->path);
+ op_ret = -1;
+ op_errno = ENOMEM;
goto out;
- } /* if(bctx == NULL) */
+ }
MAKE_REAL_PATH_TO_STORAGE_DB (db_path, this, bctx->directory);
op_ret = lstat (db_path, &stbuf);
- op_errno = errno;
if (op_ret != 0) {
- gf_log (this->name, GF_LOG_ERROR,
- "failed to lstat on %s (%s)",
- db_path, strerror (op_errno));
+ op_errno = EINVAL;
+ gf_log (this->name, GF_LOG_DEBUG,
+ "MKNOD %"PRId64"/%s (%s): EINVAL"
+ "(failed to lookup database handle)",
+ loc->parent->ino, loc->name, loc->path);
goto out;
}
MAKE_KEY_FROM_PATH (key_string, loc->path);
- op_ret = bdb_db_put (bctx, NULL, key_string, NULL, 0, 0, 0);
+ op_ret = bdb_db_icreate (bctx, key_string);
if (op_ret > 0) {
/* create successful */
- stbuf.st_ino = bdb_inode_transform (stbuf.st_ino, bctx);
+ stbuf.st_ino = bdb_inode_transform (loc->parent->ino,
+ key_string,
+ strlen (key_string));
stbuf.st_mode = mode;
stbuf.st_size = 0;
stbuf.st_blocks = BDB_COUNT_BLOCKS (stbuf.st_size, \
stbuf.st_blksize);
} else {
- gf_log (this->name, GF_LOG_ERROR,
- "bdb_db_get() failed for path: %s",
- loc->path);
- op_ret = -1;
- op_errno = ENOENT;
+ gf_log (this->name, GF_LOG_DEBUG,
+ "MKNOD %"PRId64"/%s (%s): ENOMEM"
+ "(failed to create database entry)",
+ loc->parent->ino, loc->name, loc->path);
+ op_ret = -1;
+ op_errno = EINVAL; /* TODO: errno sari illa */
+ goto out;
}/* if (!op_ret)...else */
out:
@@ -156,11 +164,7 @@ is_dir_empty (xlator_t *this,
bctx = bctx_lookup (B_TABLE(this), loc->path);
if (bctx == NULL) {
- gf_log (this->name, GF_LOG_DEBUG,
- "failed to get bctx from inode for dir: %s,"
- "assuming empty directory",
- loc->path);
- ret = 1;
+ ret = -ENOMEM;
goto out;
}
@@ -180,33 +184,24 @@ is_dir_empty (xlator_t *this,
break;
case DB_UNKNOWN:
gf_log (this->name, GF_LOG_CRITICAL,
- "unknown access-mode set for db");
+ "unknown access-mode set for database");
ret = 0;
}
} else {
- gf_log (this->name, GF_LOG_ERROR,
- "failed to get db stat for db at path: %s",
- loc->path);
- ret = 1;
+ ret = -EBUSY;
goto out;
}
MAKE_REAL_PATH (real_path, this, loc->path);
dir = opendir (real_path);
if (dir == NULL) {
- gf_log (this->name, GF_LOG_DEBUG,
- "failed to opendir(%s)",
- loc->path);
- ret = 0;
+ ret = -errno;
goto out;
}
while ((entry = readdir (dir))) {
if ((!IS_BDB_PRIVATE_FILE(entry->d_name)) &&
(!IS_DOT_DOTDOT(entry->d_name))) {
- gf_log (this->name, GF_LOG_DEBUG,
- "directory (%s) not empty, has a non-db entry",
- loc->path);
ret = 0;
break;
}/* if(!IS_BDB_PRIVATE_FILE()) */
@@ -256,26 +251,19 @@ is_space_left (xlator_t *this,
ret = statvfs (private->export_path, &stbuf);
if (ret != 0) {
- gf_log (this->name, GF_LOG_ERROR,
- "failed to do statvfs on %s",
- private->export_path);
- return 0;
+ ret = 0;
} else {
req_blocks = (size / stbuf.f_frsize) + 1;
usable_blocks = (stbuf.f_bfree - BDB_ENOSPC_THRESHOLD);
- gf_log (this->name, GF_LOG_DEBUG,
- "requested size: %"GF_PRI_SIZET"\n"
- "free blocks: %"PRIu64"\n"
- "block size: %lu\nfrag size: %lu",
- size, stbuf.f_bfree, stbuf.f_bsize, stbuf.f_frsize);
-
if (req_blocks < usable_blocks)
- return 1;
+ ret = 1;
else
- return 0;
+ ret = 0;
}
+
+ return ret;
}
int32_t
@@ -303,40 +291,68 @@ bdb_create (call_frame_t *frame,
private = this->private;
bctx = bctx_parent (B_TABLE(this), loc->path);
- op_errno = ENOENT;
- GF_VALIDATE_OR_GOTO (this->name, bctx, out);
+ if (bctx == NULL) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "CREATE %"PRId64"/%s (%s): ENOMEM"
+ "(failed to lookup database handle)",
+ loc->parent->ino, loc->name, loc->path);
+ op_ret = -1;
+ op_errno = ENOMEM;
+ goto out;
+ }
MAKE_REAL_PATH_TO_STORAGE_DB (db_path, this, bctx->directory);
op_ret = lstat (db_path, &stbuf);
- op_errno = errno;
if (op_ret != 0) {
- gf_log (this->name, GF_LOG_ERROR,
- "failed to lstat on %s (%s)",
- db_path, strerror (op_errno));
+ op_errno = EINVAL;
+ gf_log (this->name, GF_LOG_DEBUG,
+ "CREATE %"PRId64"/%s (%s): EINVAL"
+ "(database file missing)",
+ loc->parent->ino, loc->name, loc->path);
goto out;
}
MAKE_KEY_FROM_PATH (key_string, loc->path);
- op_ret = bdb_db_put (bctx, NULL, key_string, NULL, 0, 0, 0);
- op_errno = EINVAL;
- GF_VALIDATE_OR_GOTO (this->name, (op_ret == 0), out);
+ op_ret = bdb_db_icreate (bctx, key_string);
+ if (op_ret < 0) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "CREATE %"PRId64"/%s (%s): ENOMEM"
+ "(failed to create database entry)",
+ loc->parent->ino, loc->name, loc->path);
+ op_errno = EINVAL; /* TODO: errno sari illa */
+ goto out;
+ }
/* create successful */
bfd = CALLOC (1, sizeof (*bfd));
- op_ret = -1;
- op_errno = ENOMEM;
- GF_VALIDATE_OR_GOTO (this->name, bfd, out);
+ if (bfd == NULL) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "CREATE %"PRId64"/%s (%s): ENOMEM"
+ "(failed to allocate memory for internal fd context)",
+ loc->parent->ino, loc->name, loc->path);
+ op_ret = -1;
+ op_errno = ENOMEM;
+ goto out;
+ }
/* NOTE: bdb_get_bctx_from () returns bctx with a ref */
bfd->ctx = bctx;
bfd->key = strdup (key_string);
- op_ret = -1;
- op_errno = ENOMEM;
- GF_VALIDATE_OR_GOTO (this->name, bfd->key, out);
+ if (bfd->key == NULL) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "CREATE %"PRId64" (%s): ENOMEM"
+ "(failed to allocate memory for internal fd->key)",
+ loc->ino, loc->path);
+ op_ret = -1;
+ op_errno = ENOMEM;
+ goto out;
+ }
- BDB_SET_BFD (this, fd, bfd);
+ BDB_FCTX_SET (fd, this, bfd);
- stbuf.st_ino = bdb_inode_transform (stbuf.st_ino, bctx);
+ stbuf.st_ino = bdb_inode_transform (loc->parent->ino,
+ key_string,
+ strlen (key_string));
stbuf.st_mode = private->file_mode;
stbuf.st_size = 0;
stbuf.st_nlink = 1;
@@ -377,23 +393,43 @@ bdb_open (call_frame_t *frame,
GF_VALIDATE_OR_GOTO (this->name, fd, out);
bctx = bctx_parent (B_TABLE(this), loc->path);
- op_errno = EBADFD;
- GF_VALIDATE_OR_GOTO (this->name, bctx, out);
+ if (bctx == NULL) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "OPEN %"PRId64" (%s): ENOMEM"
+ "(failed to lookup database handle)",
+ loc->ino, loc->path);
+ op_ret = -1;
+ op_errno = ENOMEM;
+ goto out;
+ }
bfd = CALLOC (1, sizeof (*bfd));
- op_errno = ENOMEM;
- GF_VALIDATE_OR_GOTO (this->name, bfd, out);
+ if (bfd == NULL) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "OPEN %"PRId64" (%s): ENOMEM"
+ "(failed to allocate memory for internal fd context)",
+ loc->ino, loc->path);
+ op_ret = -1;
+ op_errno = ENOMEM;
+ goto out;
+ }
/* NOTE: bctx_parent () returns bctx with a ref */
bfd->ctx = bctx;
MAKE_KEY_FROM_PATH (key_string, loc->path);
bfd->key = strdup (key_string);
- op_ret = -1;
- op_errno = ENOMEM;
- GF_VALIDATE_OR_GOTO (this->name, bfd->key, out);
+ if (bfd->key == NULL) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "OPEN %"PRId64" (%s): ENOMEM"
+ "(failed to allocate memory for internal fd->key)",
+ loc->ino, loc->path);
+ op_ret = -1;
+ op_errno = ENOMEM;
+ goto out;
+ }
- BDB_SET_BFD (this, fd, bfd);
+ BDB_FCTX_SET (fd, this, bfd);
op_ret = 0;
out:
frame->root->rsp_refs = NULL;
@@ -416,7 +452,6 @@ bdb_readv (call_frame_t *frame,
struct bdb_fd *bfd = NULL;
dict_t *reply_dict = NULL;
char *buf = NULL;
- data_t *buf_data = NULL;
char *db_path = NULL;
int32_t read_size = 0;
@@ -424,29 +459,37 @@ bdb_readv (call_frame_t *frame,
GF_VALIDATE_OR_GOTO ("bdb", this, out);
GF_VALIDATE_OR_GOTO (this->name, fd, out);
- bfd = bdb_extract_bfd (fd, this);
- op_errno = EBADFD;
- GF_VALIDATE_OR_GOTO (this->name, bfd, out);
+ BDB_FCTX_GET (fd, this, &bfd);
+ if (bfd == NULL) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "READV %"PRId64" - %"PRId32",%"PRId64": EBADFD"
+ "(internal fd not found through fd)",
+ fd->inode->ino, size, offset);
+ op_errno = EBADFD;
+ op_ret = -1;
+ goto out;
+ }
MAKE_REAL_PATH_TO_STORAGE_DB (db_path, this, bfd->ctx->directory);
op_ret = lstat (db_path, &stbuf);
- op_errno = errno;
if (op_ret != 0) {
- gf_log (this->name, GF_LOG_ERROR,
- "failed to lstat on %s (%s)",
- db_path, strerror (op_errno));
+ op_errno = errno;
+ gf_log (this->name, GF_LOG_DEBUG,
+ "READV %"PRId64" - %"PRId32",%"PRId64": EINVAL"
+ "(database file missing)",
+ fd->inode->ino, size, offset);
goto out;
}
/* we are ready to go */
- op_ret = bdb_db_get (bfd->ctx, NULL,
- bfd->key, &buf,
- size, offset);
+ op_ret = bdb_db_fread (bfd, &buf, size, offset);
read_size = op_ret;
if (op_ret == -1) {
- gf_log (this->name, GF_LOG_ERROR,
- "failed to do db_storage_get()");
- op_ret = -1;
+ gf_log (this->name, GF_LOG_DEBUG,
+ "READV %"PRId64" - %"PRId32",%"PRId64": EBADFD"
+ "(failed to find entry in database)",
+ fd->inode->ino, size, offset);
+ op_ret = -1;
op_errno = ENOENT;
goto out;
} else if (op_ret == 0) {
@@ -454,17 +497,21 @@ bdb_readv (call_frame_t *frame,
}
reply_dict = dict_new ();
- op_ret = -1;
- op_errno = ENOMEM;
- GF_VALIDATE_OR_GOTO (this->name, reply_dict, out);
+ if (reply_dict == NULL) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "READV %"PRId64" - %"PRId32",%"PRId64": EBADFD"
+ "(failed to allocate memory for reply dictionary)",
+ fd->inode->ino, size, offset);
+ op_ret = -1;
+ op_errno = ENOMEM;
+ goto out;
+ }
if (size < read_size) {
op_ret = size;
read_size = size;
}
- buf_data->len = op_ret;
-
op_ret = dict_set_dynptr (reply_dict, NULL, buf, op_ret);
if (op_ret < 0) {
op_ret = -1;
@@ -513,44 +560,51 @@ bdb_writev (call_frame_t *frame,
GF_VALIDATE_OR_GOTO (this->name, fd, out);
GF_VALIDATE_OR_GOTO (this->name, vector, out);
- bfd = bdb_extract_bfd (fd, this);
- op_errno = EBADFD;
- GF_VALIDATE_OR_GOTO (this->name, bfd, out);
+ BDB_FCTX_GET (fd, this, &bfd);
+ if (bfd == NULL) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "WRITEV %"PRId64" - %"PRId32",%"PRId64": EBADFD"
+ "(internal fd not found through fd)",
+ fd->inode->ino, count, offset);
+ op_ret = -1;
+ op_errno = EBADFD;
+ goto out;
+ }
MAKE_REAL_PATH_TO_STORAGE_DB (db_path, this, bfd->ctx->directory);
op_ret = lstat (db_path, &stbuf);
- op_errno = errno;
if (op_ret != 0) {
+ op_errno = errno;
gf_log (this->name, GF_LOG_ERROR,
- "failed to lstat on %s (%s)",
- db_path, strerror (op_errno));
+ "WRITEV %"PRId64" - %"PRId32",%"PRId64": EINVAL"
+ "(database file missing)",
+ fd->inode->ino, count, offset);
goto out;
}
-
for (idx = 0; idx < count; idx++)
total_size += vector[idx].iov_len;
if (!is_space_left (this, total_size)) {
gf_log (this->name, GF_LOG_ERROR,
- "requested storage for %"GF_PRI_SIZET", ENOSPC",
- total_size);
+ "WRITEV %"PRId64" - %"PRId32" (%"PRId32"),%"PRId64": "
+ "ENOSPC "
+ "(not enough space after internal measurement)",
+ fd->inode->ino, count, total_size, offset);
op_ret = -1;
op_errno = ENOSPC;
goto out;
}
-
/* we are ready to go */
for (idx = 0; idx < count; idx++) {
- c_ret = bdb_db_put (bfd->ctx, NULL,
- bfd->key, vector[idx].iov_base,
- vector[idx].iov_len, c_off, 0);
- if (c_ret != 0) {
+ c_ret = bdb_db_fwrite (bfd, vector[idx].iov_base,
+ vector[idx].iov_len, c_off);
+ if (c_ret < 0) {
gf_log (this->name, GF_LOG_ERROR,
- "failed to do bdb_db_put at offset: "
- "%"PRIu64" for file: %s",
- c_off, bfd->key);
+ "WRITEV %"PRId64" - %"PRId32",%"PRId64": EINVAL"
+ "(database write at %"PRId64" failed)",
+ fd->inode->ino, count, offset, c_off);
break;
} else {
c_off += vector[idx].iov_len;
@@ -559,16 +613,15 @@ bdb_writev (call_frame_t *frame,
} /* for(idx=0;...)... */
if (c_ret) {
- /* write failed */
- gf_log (this->name, GF_LOG_ERROR,
- "failed to do bdb_db_put(): %s",
- db_strerror (op_ret));
- op_ret = -1;
- op_errno = EBADFD; /* TODO: search for a meaningful errno */
+ /* write failed after a point, not an error */
+ stbuf.st_size = bdb_db_fread (bfd, NULL, 0, 0);
+ stbuf.st_blocks = BDB_COUNT_BLOCKS (stbuf.st_size,
+ stbuf.st_blksize);
goto out;
}
+
/* NOTE: we want to increment stbuf->st_size, as stored in db */
- stbuf.st_size = op_ret;
+ stbuf.st_size = op_ret;
stbuf.st_blocks = BDB_COUNT_BLOCKS (stbuf.st_size, stbuf.st_blksize);
op_errno = 0;
@@ -591,9 +644,16 @@ bdb_flush (call_frame_t *frame,
GF_VALIDATE_OR_GOTO ("bdb", this, out);
GF_VALIDATE_OR_GOTO (this->name, fd, out);
- bfd = bdb_extract_bfd (fd, this);
- op_errno = EBADFD;
- GF_VALIDATE_OR_GOTO (this->name, bfd, out);
+ BDB_FCTX_GET (fd, this, &bfd);
+ if (bfd == NULL) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "FLUSH %"PRId64": EBADFD"
+ "(internal fd not found through fd)",
+ fd->inode->ino);
+ op_ret = -1;
+ op_errno = EBADFD;
+ goto out;
+ }
/* do nothing */
op_ret = 0;
@@ -613,23 +673,27 @@ bdb_release (xlator_t *this,
int32_t op_errno = EBADFD;
struct bdb_fd *bfd = NULL;
- if ((bfd = bdb_extract_bfd (fd, this)) == NULL){
- gf_log (this->name, GF_LOG_ERROR,
- "failed to extract %s specific information from fd:%p",
- this->name, fd);
+ BDB_FCTX_GET (fd, this, &bfd);
+ if (bfd == NULL) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "RELEASE %"PRId64": EBADFD"
+ "(internal fd not found through fd)",
+ fd->inode->ino);
op_ret = -1;
op_errno = EBADFD;
- } else {
- bctx_unref (bfd->ctx);
- bfd->ctx = NULL;
+ goto out;
+ }
- if (bfd->key)
- free (bfd->key); /* we did strdup() in bdb_open() */
- free (bfd);
- op_ret = 0;
- op_errno = 0;
- } /* if((fd->ctx == NULL)...)...else */
+ bctx_unref (bfd->ctx);
+ bfd->ctx = NULL;
+ if (bfd->key)
+ FREE (bfd->key); /* we did strdup() in bdb_open() */
+ FREE (bfd);
+ op_ret = 0;
+ op_errno = 0;
+
+out:
return 0;
}/* bdb_release */
@@ -656,15 +720,16 @@ bdb_lk (call_frame_t *frame,
{
struct flock nullock = {0, };
- gf_bdb_lk_log++;
- if (!(gf_bdb_lk_log % GF_UNIVERSAL_ANSWER)) {
- gf_log (this->name, GF_LOG_ERROR,
- "\"features/posix-locks\" translator is not loaded, "
- "you need to use it");
+ if (BDB_TIMED_LOG (ENOTSUP, gf_bdb_lk_log)) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "LK %"PRId64": ENOTSUP "
+ "(load \"features/locks\" translator to enable "
+ "lock support)",
+ fd->inode->ino);
}
frame->root->rsp_refs = NULL;
- STACK_UNWIND (frame, -1, ENOSYS, &nullock);
+ STACK_UNWIND (frame, -1, ENOTSUP, &nullock);
return 0;
}/* bdb_lk */
@@ -678,8 +743,8 @@ bdb_lk (call_frame_t *frame,
* case 1 and 2 are handled by doing lstat() on the @loc. if the file is a
* directory or symlink, lstat() succeeds. lookup continues to check if the
* @loc belongs to case-3 only if lstat() fails.
- * to check for case 3, bdb_lookup does a bdb_db_get() for the given @loc.
- * (see description of bdb_db_get() for more details on how @loc is transformed
+ * to check for case 3, bdb_lookup does a bdb_db_iread() for the given @loc.
+ * (see description of bdb_db_iread() for more details on how @loc is transformed
* into db handle and key). if check for case 1, 2 and 3 fail, we proceed to
* conclude that file doesn't exist (case 4).
*
@@ -741,20 +806,26 @@ bdb_lookup (call_frame_t *frame,
if (!strcmp (directory, loc->path)) {
/* SPECIAL CASE: looking up root */
op_ret = lstat (real_path, &stbuf);
- op_errno = errno;
if (op_ret != 0) {
- gf_log (this->name, GF_LOG_ERROR,
- "failed to lstat on %s (%s)",
- real_path, strerror (op_errno));
+ op_errno = errno;
+ gf_log (this->name, GF_LOG_DEBUG,
+ "LOOKUP %"PRId64" (%s): %s",
+ loc->ino, loc->path, strerror (op_errno));
goto out;
}
/* bctx_lookup() returns NULL only when its time to wind up,
* we should shutdown functioning */
bctx = bctx_lookup (B_TABLE(this), (char *)loc->path);
- op_ret = -1;
- op_errno = EINVAL;
- GF_VALIDATE_OR_GOTO (this->name, bctx, out);
+ if (bctx == NULL) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "LOOKUP %"PRId64" (%s): ENOMEM"
+ "(failed to lookup database handle)",
+ loc->ino, loc->path);
+ op_ret = -1;
+ op_errno = ENOMEM;
+ goto out;
+ }
stbuf.st_ino = 1;
stbuf.st_mode = private->dir_mode;
@@ -767,80 +838,99 @@ bdb_lookup (call_frame_t *frame,
op_ret = lstat (real_path, &stbuf);
if ((op_ret == 0) && (S_ISDIR (stbuf.st_mode))){
bctx = bctx_lookup (B_TABLE(this), (char *)loc->path);
- op_ret = -1;
- op_errno = ENOMEM;
- GF_VALIDATE_OR_GOTO (this->name, bctx, out);
+ if (bctx == NULL) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "LOOKUP %"PRId64"/%s (%s): ENOMEM"
+ "(failed to lookup database handle)",
+ loc->parent->ino, loc->name, loc->path);
+ op_ret = -1;
+ op_errno = ENOMEM;
+ goto out;
+ }
if (loc->ino) {
/* revalidating directory inode */
- gf_log (this->name, GF_LOG_DEBUG,
- "revalidating directory %s",
- (char *)loc->path);
stbuf.st_ino = loc->ino;
} else {
- stbuf.st_ino = bdb_inode_transform (stbuf.st_ino, bctx);
+ stbuf.st_ino = bdb_inode_transform (loc->parent->ino,
+ key_string,
+ strlen (key_string));
}
stbuf.st_mode = private->dir_mode;
+
op_ret = 0;
- op_errno = 0;
goto out;
+
} else if (op_ret == 0) {
/* a symlink */
- gf_log (this->name, GF_LOG_DEBUG,
- "lookup called for symlink: %s",
- loc->path);
bctx = bctx_parent (B_TABLE(this), loc->path);
- op_ret = -1;
- op_errno = ENOMEM;
- GF_VALIDATE_OR_GOTO (this->name, bctx, out);
+ if (bctx == NULL) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "LOOKUP %"PRId64"/%s (%s): ENOMEM"
+ "(failed to lookup database handle)",
+ loc->parent->ino, loc->name, loc->path);
+ op_ret = -1;
+ op_errno = ENOMEM;
+ goto out;
+ }
if (loc->ino) {
stbuf.st_ino = loc->ino;
} else {
- stbuf.st_ino = bdb_inode_transform (stbuf.st_ino, bctx);
+ stbuf.st_ino = bdb_inode_transform (loc->parent->ino,
+ key_string,
+ strlen (key_string));
}
+
stbuf.st_mode = private->symlink_mode;
+
op_ret = 0;
- op_errno = 0;
goto out;
+
}
/* for regular files */
bctx = bctx_parent (B_TABLE(this), loc->path);
- op_ret = -1;
- op_errno = ENOENT;
- GF_VALIDATE_OR_GOTO (this->name, bctx, out);
+ if (bctx == NULL) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "LOOKUP %"PRId64"/%s (%s): ENOMEM"
+ "(failed to lookup database handle for parent)",
+ loc->parent->ino, loc->name, loc->path);
+ op_ret = -1;
+ op_errno = ENOMEM;
+ goto out;
+ }
if (GF_FILE_CONTENT_REQUESTED(xattr_req, &need_xattr)) {
- entry_size = bdb_db_get (bctx, NULL,
- loc->path, &file_content,
- 0, 0);
+ entry_size = bdb_db_iread (bctx, key_string, &file_content);
} else {
- entry_size = bdb_db_get (bctx, NULL, loc->path, NULL,
- 0, 0);
+ entry_size = bdb_db_iread (bctx, key_string, NULL);
}
op_ret = entry_size;
- op_errno = ENOENT;
if (op_ret == -1) {
gf_log (this->name, GF_LOG_DEBUG,
- "returning ENOENT for %s",
- loc->path);
+ "LOOKUP %"PRId64"/%s (%s): ENOENT"
+ "(database entry not found)",
+ loc->parent->ino, loc->name, loc->path);
+ op_errno = ENOENT;
goto out;
}
MAKE_REAL_PATH_TO_STORAGE_DB (db_path, this, bctx->directory);
op_ret = lstat (db_path, &stbuf);
- op_errno = errno;
if (op_ret != 0) {
- gf_log (this->name, GF_LOG_ERROR,
- "failed to lstat on %s (%s)",
- db_path, strerror (op_errno));
+ op_errno = errno;
+ gf_log (this->name, GF_LOG_DEBUG,
+ "LOOKUP %"PRId64"/%s (%s): %s",
+ loc->parent->ino, loc->name, loc->path,
+ strerror (op_errno));
goto out;
}
- if ((need_xattr >= entry_size)
- && (entry_size) && (file_content)) {
+ if (entry_size
+ && (need_xattr >= entry_size)
+ && (file_content)) {
xattr = dict_new ();
op_ret = dict_set_dynptr (xattr, "glusterfs.content",
file_content, entry_size);
@@ -861,7 +951,9 @@ bdb_lookup (call_frame_t *frame,
stbuf.st_blksize);
} else {
/* fresh lookup, create an inode number */
- stbuf.st_ino = bdb_inode_transform (stbuf.st_ino, bctx);
+ stbuf.st_ino = bdb_inode_transform (loc->parent->ino,
+ key_string,
+ strlen (key_string));
stbuf.st_size = entry_size;
stbuf.st_blocks = BDB_COUNT_BLOCKS (stbuf.st_size,
stbuf.st_blksize);
@@ -930,21 +1022,28 @@ bdb_stat (call_frame_t *frame,
}
bctx = bctx_parent (B_TABLE(this), loc->path);
- op_ret = -1;
- op_errno = ENOENT;
- GF_VALIDATE_OR_GOTO (this->name, bctx, out);
+ if (bctx == NULL) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "STAT %"PRId64" (%s): ENOMEM"
+ "(no database handle for parent)",
+ loc->ino, loc->path);
+ op_ret = -1;
+ op_errno = ENOMEM;
+ goto out;
+ }
MAKE_REAL_PATH_TO_STORAGE_DB (db_path, this, bctx->directory);
op_ret = lstat (db_path, &stbuf);
- op_errno = errno;
- if (op_ret != 0) {
- gf_log (this->name, GF_LOG_ERROR,
- "failed to lstat on %s (%s)",
- db_path, strerror (op_errno));
+ if (op_ret < 0) {
+ op_errno = errno;
+ gf_log (this->name, GF_LOG_DEBUG,
+ "STAT %"PRId64" (%s): %s"
+ "(failed to stat on database file)",
+ loc->ino, loc->path, strerror (op_errno));
goto out;
}
- stbuf.st_size = bdb_db_get (bctx, NULL, loc->path, NULL, 0, 0);
+ stbuf.st_size = bdb_db_iread (bctx, loc->path, NULL);
stbuf.st_blocks = BDB_COUNT_BLOCKS (stbuf.st_size, stbuf.st_blksize);
stbuf.st_ino = loc->inode->ino;
@@ -999,34 +1098,70 @@ bdb_opendir (call_frame_t *frame,
MAKE_REAL_PATH (real_path, this, loc->path);
bctx = bctx_lookup (B_TABLE(this), (char *)loc->path);
- op_errno = EBADFD;
- GF_VALIDATE_OR_GOTO (this->name, bctx, out);
+ if (bctx == NULL) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "OPENDIR %"PRId64" (%s): ENOMEM"
+ "(no database handle for directory)",
+ loc->ino, loc->path);
+ op_ret = -1;
+ op_errno = ENOMEM;
+ goto out;
+ }
bfd = CALLOC (1, sizeof (*bfd));
- op_errno = ENOMEM;
- GF_VALIDATE_OR_GOTO (this->name, bfd, out);
+ if (bfd == NULL) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "OPENDIR %"PRId64" (%s): ENOMEM"
+ "(failed to allocate memory for internal fd)",
+ loc->ino, loc->path);
+ op_ret = -1;
+ op_errno = ENOMEM;
+ goto err;
+ }
bfd->dir = opendir (real_path);
- op_errno = errno;
- GF_VALIDATE_OR_GOTO (this->name, bfd->dir, out);
+ if (bfd->dir == NULL) {
+ op_ret = -1;
+ op_errno = errno;
+ gf_log (this->name, GF_LOG_DEBUG,
+ "OPENDIR %"PRId64" (%s): %s",
+ loc->ino, loc->path, strerror (op_errno));
+ goto err;
+ }
/* NOTE: bctx_lookup() return bctx with ref */
bfd->ctx = bctx;
bfd->path = strdup (real_path);
- op_errno = ENOMEM;
- GF_VALIDATE_OR_GOTO (this->name, bfd->path, out);
+ if (bfd == NULL) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "OPENDIR %"PRId64" (%s): ENOMEM"
+ "(failed to allocate memory for internal fd->path)",
+ loc->ino, loc->path);
+ op_ret = -1;
+ op_errno = ENOMEM;
+ goto err;
+ }
- BDB_SET_BFD (this, fd, bfd);
+ BDB_FCTX_SET (fd, this, bfd);
op_ret = 0;
out:
frame->root->rsp_refs = NULL;
STACK_UNWIND (frame, op_ret, op_errno, fd);
+ return 0;
+err:
+ if (bctx)
+ bctx_unref (bctx);
+ if (bfd) {
+ if (bfd->dir)
+ closedir (bfd->dir);
+
+ FREE (bfd);
+ }
return 0;
}/* bdb_opendir */
-
int32_t
bdb_getdents (call_frame_t *frame,
xlator_t *this,
@@ -1035,192 +1170,281 @@ bdb_getdents (call_frame_t *frame,
off_t off,
int32_t flag)
{
- int32_t op_ret = -1;
- int32_t op_errno = EINVAL;
+ struct bdb_dir *bfd = NULL;
+ int32_t op_ret = -1;
+ int32_t op_errno = EINVAL;
+ size_t filled = 0;
+ dir_entry_t entries = {0, };
+ dir_entry_t *this_entry = NULL;
+ char *entry_path = NULL;
+ struct dirent *dirent = NULL;
+ off_t in_case = 0;
+ int32_t this_size = 0;
+ DBC *cursorp = NULL;
int32_t ret = -1;
int32_t real_path_len = 0;
int32_t entry_path_len = 0;
int32_t count = 0;
- char *real_path = NULL;
- char *entry_path = NULL;
- char *db_path = NULL;
- dir_entry_t entries = {0, };
- dir_entry_t *tmp = NULL;
- DIR *dir = NULL;
- struct dirent *dirent = NULL;
- struct bdb_dir *bfd = NULL;
+ off_t offset = 0;
+ size_t tmp_name_len = 0;
struct stat db_stbuf = {0,};
struct stat buf = {0,};
- DBC *cursorp = NULL;
- size_t tmp_name_len = 0;
GF_VALIDATE_OR_GOTO ("bdb", frame, out);
GF_VALIDATE_OR_GOTO ("bdb", this, out);
GF_VALIDATE_OR_GOTO (this->name, fd, out);
- bfd = bdb_extract_bfd (fd, this);
- op_errno = EBADFD;
- GF_VALIDATE_OR_GOTO (this->name, bfd, out);
+ BDB_FCTX_GET (fd, this, &bfd);
+ if (bfd == NULL) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "GETDENTS %"PRId64" - %"PRId32",%"PRId64" %o: EBADFD "
+ "(failed to find internal context in fd)",
+ fd->inode->ino, size, off, flag);
+ op_errno = EBADFD;
+ op_ret = -1;
+ goto out;
+ }
- MAKE_REAL_PATH (real_path, this, bfd->path);
- dir = bfd->dir;
+ op_ret = bdb_cursor_open (bfd->ctx, &cursorp);
+ if (op_ret < 0) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "GETDENTS %"PRId64" - %"PRId32",%"PRId64": EBADFD "
+ "(failed to open cursor to database handle)",
+ fd->inode->ino, size, off);
+ op_errno = EBADFD;
+ goto out;
+ }
- while ((dirent = readdir (dir))) {
- if (!dirent)
+ if (off) {
+ DBT sec = {0,}, pri = {0,}, val = {0,};
+ sec.data = &(off);
+ sec.size = sizeof (off);
+ sec.flags = DB_DBT_USERMEM;
+ val.dlen = 0;
+ val.doff = 0;
+ val.flags = DB_DBT_PARTIAL;
+
+ op_ret = bdb_cursor_get (cursorp, &sec, &pri, &val, DB_SET);
+ if (op_ret == DB_NOTFOUND) {
+ offset = off;
+ goto dir_read;
+ }
+ }
+
+ while (filled <= size) {
+ DBT sec = {0,}, pri = {0,}, val = {0,};
+
+ this_entry = NULL;
+
+ sec.flags = DB_DBT_MALLOC;
+ pri.flags = DB_DBT_MALLOC;
+ val.dlen = 0;
+ val.doff = 0;
+ val.flags = DB_DBT_PARTIAL;
+ op_ret = bdb_cursor_get (cursorp, &sec, &pri, &val, DB_NEXT);
+
+ if (op_ret == DB_NOTFOUND) {
+ /* we reached end of the directory */
+ op_ret = 0;
+ op_errno = 0;
+ break;
+ } else if (op_ret < 0) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "GETDENTS %"PRId64" - %"PRId32",%"PRId64":"
+ "(failed to read the next entry from database)",
+ fd->inode->ino, size, off);
+ op_errno = ENOENT;
break;
+ } /* if (op_ret == DB_NOTFOUND)...else if...else */
- if (IS_BDB_PRIVATE_FILE(dirent->d_name)) {
+ if (pri.data == NULL) {
+ /* NOTE: currently ignore when we get key.data == NULL.
+ * FIXME: we should not get key.data = NULL */
+ gf_log (this->name, GF_LOG_DEBUG,
+ "GETDENTS %"PRId64" - %"PRId32",%"PRId64":"
+ "(null key read for entry from database)",
+ fd->inode->ino, size, off);
continue;
+ }/* if(key.data)...else */
+
+ this_entry = CALLOC (1, sizeof (*this_entry));
+ if (this_entry == NULL) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "GETDENTS %"PRId64" - %"PRId32",%"PRId64" - %s:"
+ "(failed to allocate memory for an entry)",
+ fd->inode->ino, size, off, strerror (errno));
+ op_errno = ENOMEM;
+ op_ret = -1;
+ goto out;
}
+ this_entry->name = CALLOC (pri.size + 1, sizeof (char));
+ if (this_entry->name == NULL) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "GETDENTS %"PRId64" - %"PRId32",%"PRId64" - %s:"
+ "(failed to allocate memory for an "
+ "entry->name)",
+ fd->inode->ino, size, off, strerror (errno));
+ op_errno = ENOMEM;
+ op_ret = -1;
+ goto out;
+ }
+
+ memcpy (this_entry->name, pri.data, pri.size);
+ this_entry->buf = db_stbuf;
+ this_entry->buf.st_size = bdb_db_iread (bfd->ctx,
+ this_entry->name, NULL);
+ this_entry->buf.st_blocks = BDB_COUNT_BLOCKS (
+ this_entry->buf.st_size,
+ this_entry->buf.st_blksize);
+
+ this_entry->buf.st_ino = bdb_inode_transform (fd->inode->ino,
+ pri.data,
+ pri.size);
+ count++;
+
+ this_entry->next = entries.next;
+ this_entry->link = "";
+ entries.next = this_entry;
+ /* if size is 0, count can never be = size,
+ * so entire dir is read */
+ if (sec.data)
+ FREE (sec.data);
+
+ if (pri.data)
+ FREE (pri.data);
+
+ if (count == size)
+ break;
+ }/* while */
+ bdb_cursor_close (bfd->ctx, cursorp);
+ op_ret = count;
+ op_errno = 0;
+ if (count >= size)
+ goto out;
+dir_read:
+ /* hungry kyaa? */
+ if (!offset) {
+ rewinddir (bfd->dir);
+ } else {
+ seekdir (bfd->dir, offset);
+ }
+
+ while (filled <= size) {
+ this_entry = NULL;
+ this_size = 0;
+
+ in_case = telldir (bfd->dir);
+ dirent = readdir (bfd->dir);
+ if (!dirent)
+ break;
+
+ if (IS_BDB_PRIVATE_FILE(dirent->d_name))
+ continue;
+
tmp_name_len = strlen (dirent->d_name);
if (entry_path_len < (real_path_len + 1 + (tmp_name_len) + 1)) {
entry_path_len = real_path_len + tmp_name_len + 1024;
entry_path = realloc (entry_path, entry_path_len);
- op_errno = ENOMEM;
- GF_VALIDATE_OR_GOTO (this->name, entry_path, out);
+ if (entry_path == NULL) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "GETDENTS %"PRId64" - %"PRId32","
+ "%"PRId64" - %s: (failed to allocate "
+ "memory for an entry_path)",
+ fd->inode->ino, size, off,
+ strerror (errno));
+ op_errno = ENOMEM;
+ op_ret = -1;
+ goto out;
+ }
}
strncpy (&entry_path[real_path_len+1], dirent->d_name,
tmp_name_len);
op_ret = stat (entry_path, &buf);
- op_errno = errno;
- if (op_ret != 0) {
- gf_log (this->name, GF_LOG_ERROR,
- "failed to lstat on %s (%s)",
- entry_path, strerror (op_errno));
- goto out;
+ if (op_ret < 0) {
+ op_errno = errno;
+ gf_log (this->name, GF_LOG_DEBUG,
+ "GETDENTS %"PRId64" - %"PRId32",%"PRId64" - %s:"
+ " (failed to stat on an entry '%s')",
+ fd->inode->ino, size, off,
+ strerror (errno), entry_path);
+ goto out; /* FIXME: shouldn't we continue here */
}
if ((flag == GF_GET_DIR_ONLY) &&
- (ret != -1 && !S_ISDIR(buf.st_mode))) {
+ ((ret != -1) && (!S_ISDIR(buf.st_mode)))) {
continue;
}
- tmp = CALLOC (1, sizeof (*tmp));
- op_errno = ENOMEM;
- GF_VALIDATE_OR_GOTO (this->name, tmp, out);
+ this_entry = CALLOC (1, sizeof (*this_entry));
+ if (this_entry == NULL) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "GETDENTS %"PRId64" - %"PRId32",%"PRId64" - %s:"
+ "(failed to allocate memory for an entry)",
+ fd->inode->ino, size, off, strerror (errno));
+ op_errno = ENOMEM;
+ op_ret = -1;
+ goto out;
+ }
- tmp->name = strdup (dirent->d_name);
- op_errno = ENOMEM;
- GF_VALIDATE_OR_GOTO (this->name, dirent->d_name, out);
+ this_entry->name = strdup (dirent->d_name);
+ if (this_entry->name == NULL) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "GETDENTS %"PRId64" - %"PRId32",%"PRId64" - %s:"
+ "(failed to allocate memory for an "
+ "entry->name)",
+ fd->inode->ino, size, off, strerror (errno));
+ op_errno = ENOMEM;
+ op_ret = -1;
+ goto out;
+ }
- memcpy (&tmp->buf, &buf, sizeof (buf));
+ this_entry->buf = buf;
- tmp->buf.st_ino = -1;
- if (S_ISLNK(tmp->buf.st_mode)) {
+ this_entry->buf.st_ino = -1;
+ if (S_ISLNK(this_entry->buf.st_mode)) {
char linkpath[ZR_PATH_MAX] = {0,};
ret = readlink (entry_path, linkpath, ZR_PATH_MAX);
if (ret != -1) {
linkpath[ret] = '\0';
- tmp->link = strdup (linkpath);
+ this_entry->link = strdup (linkpath);
}
} else {
- tmp->link = "";
+ this_entry->link = "";
}
count++;
- tmp->next = entries.next;
- entries.next = tmp;
- /* if size is 0, count can never be = size,
- so entire dir is read */
+ this_entry->next = entries.next;
+ entries.next = this_entry;
+ /* if size is 0, count can never be = size,
+ * so entire dir is read */
if (count == size)
break;
}
-
- if ((flag != GF_GET_DIR_ONLY) && (count < size)) {
- /* read from db */
- op_ret = bdb_cursor_open (bfd->ctx, &cursorp);
- op_errno = EINVAL;
- GF_VALIDATE_OR_GOTO (this->name, (op_ret == 0), out);
-
- MAKE_REAL_PATH_TO_STORAGE_DB (db_path, this,
- bfd->ctx->directory);
- op_ret = lstat (db_path, &db_stbuf);
- op_errno = errno;
- if (op_ret != 0) {
- gf_log (this->name, GF_LOG_ERROR,
- "failed to lstat on %s (%s)",
- db_path, strerror (op_errno));
- goto out;
- }
-
- /* read all the entries in database, one after the other and
- * put into dictionary */
- while (1) {
- DBT key = {0,}, value = {0,};
-
- key.flags = DB_DBT_MALLOC;
- value.flags = DB_DBT_MALLOC;
- op_ret = bdb_cursor_get (cursorp, &key, &value,
- DB_NEXT);
-
- if (op_ret == DB_NOTFOUND) {
- gf_log (this->name, GF_LOG_DEBUG,
- "end of list of key/value pair in db"
- " for directory: %s",
- bfd->ctx->directory);
- op_ret = 0;
- op_errno = 0;
- break;
- } else if (op_ret != 0){
- gf_log (this->name, GF_LOG_ERROR,
- "failed to do cursor get for "
- "directory %s: %s",
- bfd->ctx->directory,
- db_strerror (op_ret));
- op_ret = -1;
- op_errno = ENOENT;
- break;
- }
- /* successfully read */
- tmp = CALLOC (1, sizeof (*tmp));
- op_errno = ENOMEM;
- GF_VALIDATE_OR_GOTO (this->name, tmp, out);
-
- tmp->name = CALLOC (1, key.size + 1);
- op_errno = ENOMEM;
- GF_VALIDATE_OR_GOTO (this->name, tmp->name, out);
-
- memcpy (tmp->name, key.data, key.size);
- tmp->buf = db_stbuf;
- tmp->buf.st_size = bdb_db_get (bfd->ctx, NULL,
- tmp->name, NULL,
- 0, 0);
- tmp->buf.st_blocks = BDB_COUNT_BLOCKS (tmp->buf.st_size, \
- tmp->buf.st_blksize);
- /* FIXME: wat will be the effect of this? */
- tmp->buf.st_ino = -1;
- count++;
-
- tmp->next = entries.next;
- tmp->link = "";
- entries.next = tmp;
- /* if size is 0, count can never be = size, so entire dir is read */
- if (count == size)
- break;
-
- free (key.data);
- } /* while(1){ } */
- bdb_cursor_close (bfd->ctx, cursorp);
- } else {
- /* do nothing */
- }
- FREE (entry_path);
- op_ret = 0;
+ op_ret = filled;
+ op_errno = 0;
out:
frame->root->rsp_refs = NULL;
- STACK_UNWIND (frame, op_ret, op_errno, &entries, count);
+
+ gf_log (this->name, GF_LOG_DEBUG,
+ "GETDENTS %"PRId64" - %"PRId32" (%"PRId32")/%"PRId32","
+ "%"PRId64":"
+ "(failed to read the next entry from database)",
+ fd->inode->ino, filled, count, size, off);
+
+ STACK_UNWIND (frame, count, op_errno, &entries);
while (entries.next) {
- tmp = entries.next;
+ this_entry = entries.next;
entries.next = entries.next->next;
- FREE (tmp->name);
- FREE (tmp);
+ FREE (this_entry->name);
+ FREE (this_entry);
}
+
return 0;
}/* bdb_getdents */
@@ -1233,34 +1457,43 @@ bdb_releasedir (xlator_t *this,
int32_t op_errno = 0;
struct bdb_dir *bfd = NULL;
- if ((bfd = bdb_extract_bfd (fd, this)) == NULL) {
- gf_log (this->name, GF_LOG_ERROR,
- "failed to extract fd data from fd=%p", fd);
- op_ret = -1;
- op_errno = EBADF;
+ BDB_FCTX_GET (fd, this, &bfd);
+ if (bfd == NULL) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "RELEASEDIR %"PRId64": EBADFD",
+ fd->inode->ino);
+ op_errno = EBADFD;
+ op_ret = -1;
+ goto out;
+ }
+
+ if (bfd->path) {
+ free (bfd->path);
} else {
- if (bfd->path) {
- free (bfd->path);
- } else {
- gf_log (this->name, GF_LOG_ERROR, "bfd->path was NULL. fd=%p bfd=%p",
- fd, bfd);
- }
+ gf_log (this->name, GF_LOG_DEBUG,
+ "RELEASEDIR %"PRId64": (bfd->path is NULL)",
+ fd->inode->ino);
+ }
- if (bfd->dir) {
- closedir (bfd->dir);
- } else {
- gf_log (this->name, GF_LOG_ERROR,
- "bfd->dir is NULL.");
- }
- if (bfd->ctx) {
- bctx_unref (bfd->ctx);
- } else {
- gf_log (this->name, GF_LOG_ERROR,
- "bfd->ctx is NULL");
- }
- free (bfd);
+ if (bfd->dir) {
+ closedir (bfd->dir);
+ } else {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "RELEASEDIR %"PRId64": (bfd->dir is NULL)",
+ fd->inode->ino);
}
+ if (bfd->ctx) {
+ bctx_unref (bfd->ctx);
+ } else {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "RELEASEDIR %"PRId64": (bfd->ctx is NULL)",
+ fd->inode->ino);
+ }
+
+ free (bfd);
+
+out:
return 0;
}/* bdb_releasedir */
@@ -1290,12 +1523,11 @@ bdb_readlink (call_frame_t *frame,
if (op_ret > 0)
dest[op_ret] = 0;
- op_errno = errno;
-
if (op_ret == -1) {
+ op_errno = errno;
gf_log (this->name, GF_LOG_DEBUG,
- "readlink failed on %s: %s",
- loc->path, strerror (op_errno));
+ "READLINK %"PRId64" (%s): %s",
+ loc->ino, loc->path, strerror (op_errno));
}
out:
frame->root->rsp_refs = NULL;
@@ -1317,57 +1549,69 @@ bdb_mkdir (call_frame_t *frame,
char *real_path = NULL;
struct stat stbuf = {0, };
bctx_t *bctx = NULL;
+ char *key_string = NULL;
GF_VALIDATE_OR_GOTO ("bdb", frame, out);
GF_VALIDATE_OR_GOTO ("bdb", this, out);
GF_VALIDATE_OR_GOTO (this->name, loc, out);
+ MAKE_KEY_FROM_PATH (key_string, loc->path);
MAKE_REAL_PATH (real_path, this, loc->path);
op_ret = mkdir (real_path, mode);
- op_errno = errno;
- if (op_ret != 0) {
- gf_log (this->name, GF_LOG_ERROR,
- "failed to mkdir %s (%s)",
- real_path, strerror (op_errno));
+ if (op_ret < 0) {
+ op_errno = errno;
+ gf_log (this->name, GF_LOG_DEBUG,
+ "MKDIR %"PRId64" (%s): %s",
+ loc->ino, loc->path, strerror (op_errno));
goto out;
}
op_ret = chown (real_path, frame->root->uid, frame->root->gid);
- op_errno = errno;
- if (op_ret != 0) {
- gf_log (this->name, GF_LOG_ERROR,
- "failed to chmod on %s (%s)",
- real_path, strerror (op_errno));
+ if (op_ret < 0) {
+ op_errno = errno;
+ gf_log (this->name, GF_LOG_DEBUG,
+ "MKDIR %"PRId64" (%s): %s "
+ "(failed to do chmod)",
+ loc->ino, loc->path, strerror (op_errno));
goto err;
}
op_ret = lstat (real_path, &stbuf);
- op_errno = errno;
- if (op_ret != 0) {
- gf_log (this->name, GF_LOG_ERROR,
- "failed to lstat on %s (%s)",
- real_path, strerror (op_errno));
+ if (op_ret < 0) {
+ op_errno = errno;
+ gf_log (this->name, GF_LOG_DEBUG,
+ "MKDIR %"PRId64" (%s): %s "
+ "(failed to do lstat)",
+ loc->ino, loc->path, strerror (op_errno));
goto err;
}
bctx = bctx_lookup (B_TABLE(this), (char *)loc->path);
- op_errno = ENOMEM;
- GF_VALIDATE_OR_GOTO (this->name, bctx, err);
+ if (bctx == NULL) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "MKDIR %"PRId64" (%s): ENOMEM"
+ "(no database handle for parent)",
+ loc->ino, loc->path);
+ op_ret = -1;
+ op_errno = ENOMEM;
+ goto err;
+ }
- stbuf.st_ino = bdb_inode_transform (stbuf.st_ino, bctx);
+ stbuf.st_ino = bdb_inode_transform (loc->parent->ino, key_string,
+ strlen (key_string));
goto out;
err:
ret = rmdir (real_path);
- if (ret != 0) {
- gf_log (this->name, GF_LOG_ERROR,
- "failed to rmdir the directory created (%s)",
- strerror (errno));
+ if (ret < 0) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "MKDIR %"PRId64" (%s): %s"
+ "(failed to do rmdir)",
+ loc->ino, loc->path, strerror (errno));
}
-
out:
if (bctx) {
/* NOTE: bctx_unref always returns success,
@@ -1391,27 +1635,36 @@ bdb_unlink (call_frame_t *frame,
int32_t op_errno = EINVAL;
bctx_t *bctx = NULL;
char *real_path = NULL;
+ char *key_string = NULL;
GF_VALIDATE_OR_GOTO ("bdb", frame, out);
GF_VALIDATE_OR_GOTO ("bdb", this, out);
GF_VALIDATE_OR_GOTO (this->name, loc, out);
bctx = bctx_parent (B_TABLE(this), loc->path);
- op_errno = ENOENT;
- GF_VALIDATE_OR_GOTO (this->name, bctx, out);
+ if (bctx == NULL) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "UNLINK %"PRId64" (%s): ENOMEM"
+ "(no database handle for parent)",
+ loc->ino, loc->path);
+ op_ret = -1;
+ op_errno = ENOMEM;
+ goto out;
+ }
- op_ret = bdb_db_del (bctx, NULL, loc->path);
+ MAKE_KEY_FROM_PATH (key_string, loc->path);
+ op_ret = bdb_db_iremove (bctx, key_string);
if (op_ret == DB_NOTFOUND) {
MAKE_REAL_PATH (real_path, this, loc->path);
op_ret = unlink (real_path);
- op_errno = errno;
if (op_ret != 0) {
- gf_log (this->name, GF_LOG_ERROR,
- "failed to unlink on %s (%s)",
- real_path, strerror (op_errno));
+ op_errno = errno;
+ gf_log (this->name, GF_LOG_DEBUG,
+ "UNLINK %"PRId64" (%s): %s"
+ "(symlink unlink failed)",
+ loc->ino, loc->path, strerror (op_errno));
goto out;
}
-
} else if (op_ret == 0) {
op_errno = 0;
}
@@ -1430,7 +1683,7 @@ out:
-int32_t
+static int32_t
bdb_do_rmdir (xlator_t *this,
loc_t *loc)
{
@@ -1448,38 +1701,46 @@ bdb_do_rmdir (xlator_t *this,
MAKE_REAL_PATH (real_path, this, loc->path);
bctx = bctx_lookup (B_TABLE(this), loc->path);
- GF_VALIDATE_OR_GOTO (this->name, bctx, out);
+ if (bctx == NULL) {
+ ret = -ENOMEM;
+ goto out;
+ }
LOCK(&bctx->lock);
{
- if (bctx->dbp == NULL) {
+ if ((bctx->primary == NULL)
+ || (bctx->secondary == NULL)) {
goto unlock;
}
- ret = bctx->dbp->close (bctx->dbp, 0);
- GF_VALIDATE_OR_GOTO (this->name, (ret == 0), unlock);
+ ret = bctx->primary->close (bctx->primary, 0);
+ if (ret < 0) {
+ ret = -EINVAL;
+ }
- bctx->dbp = NULL;
+ ret = bctx->secondary->close (bctx->secondary, 0);
+ if (ret < 0) {
+ ret = -EINVAL;
+ }
- ret = dbenv->dbremove (dbenv, NULL, bctx->db_path, NULL, 0);
+ ret = dbenv->dbremove (dbenv, NULL, bctx->db_path,
+ "primary", 0);
+ if (ret < 0) {
+ ret = -EBUSY;
+ }
+
+ ret = dbenv->dbremove (dbenv, NULL, bctx->db_path,
+ "secondary", 0);
if (ret != 0) {
- gf_log (this->name, GF_LOG_ERROR,
- "failed to DB_ENV->dbremove() on path %s: %s",
- loc->path, db_strerror (ret));
+ ret = -EBUSY;
}
}
unlock:
UNLOCK(&bctx->lock);
if (ret) {
- gf_log (this->name, GF_LOG_ERROR,
- "failed to remove db %s: %s",
- bctx->db_path, db_strerror (ret));
- ret = -1;
goto out;
}
- gf_log (this->name, GF_LOG_DEBUG,
- "removed db %s", bctx->db_path);
ret = rmdir (real_path);
out:
@@ -1498,22 +1759,31 @@ bdb_rmdir (call_frame_t *frame,
loc_t *loc)
{
int32_t op_ret = -1;
- int32_t op_errno = ENOTEMPTY;
+ int32_t op_errno = 0;
- if (!is_dir_empty (this, loc)) {
+ op_ret = is_dir_empty (this, loc);
+ if (op_ret < 0) {
+ op_errno = -op_ret;
gf_log (this->name, GF_LOG_DEBUG,
- "rmdir: directory %s not empty",
- loc->path);
+ "RMDIR %"PRId64" (%s): %s"
+ "(internal rmdir routine returned error)",
+ loc->ino, loc->path, strerror (op_errno));
+ } else if (op_ret == 0) {
+ op_ret = -1;
op_errno = ENOTEMPTY;
- op_ret = -1;
+ gf_log (this->name, GF_LOG_DEBUG,
+ "RMDIR %"PRId64" (%s): ENOTEMPTY",
+ loc->ino, loc->path);
goto out;
}
op_ret = bdb_do_rmdir (this, loc);
- if (op_ret != 0) {
- gf_log (this->name, GF_LOG_ERROR,
- "failed to bdb_do_rmdir on %s",
- loc->path);
+ if (op_ret < 0) {
+ op_errno = -op_ret;
+ gf_log (this->name, GF_LOG_DEBUG,
+ "RMDIR %"PRId64" (%s): %s"
+ "(internal rmdir routine returned error)",
+ loc->ino, loc->path, strerror (op_errno));
goto out;
}
@@ -1536,6 +1806,7 @@ bdb_symlink (call_frame_t *frame,
struct stat stbuf = {0,};
struct bdb_private *private = NULL;
bctx_t *bctx = NULL;
+ char *key_string = NULL;
GF_VALIDATE_OR_GOTO ("bdb", frame, out);
GF_VALIDATE_OR_GOTO ("bdb", this, out);
@@ -1545,23 +1816,35 @@ bdb_symlink (call_frame_t *frame,
private = this->private;
GF_VALIDATE_OR_GOTO (this->name, private, out);
+ MAKE_KEY_FROM_PATH (key_string, loc->path);
+
MAKE_REAL_PATH (real_path, this, loc->path);
op_ret = symlink (linkname, real_path);
op_errno = errno;
if (op_ret == 0) {
op_ret = lstat (real_path, &stbuf);
- op_errno = errno;
if (op_ret != 0) {
- gf_log (this->name, GF_LOG_ERROR,
- "failed to lstat on %s (%s)",
- real_path, strerror (op_errno));
+ op_errno = errno;
+ gf_log (this->name, GF_LOG_DEBUG,
+ "SYMLINK %"PRId64" (%s): %s",
+ loc->ino, loc->path, strerror (op_errno));
goto err;
}
bctx = bctx_parent (B_TABLE(this), loc->path);
- GF_VALIDATE_OR_GOTO (this->name, bctx, err);
+ if (bctx == NULL) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "SYMLINK %"PRId64" (%s): ENOMEM"
+ "(no database handle for parent)",
+ loc->ino, loc->path);
+ op_ret = -1;
+ op_errno = ENOMEM;
+ goto err;
+ }
- stbuf.st_ino = bdb_inode_transform (stbuf.st_ino, bctx);
+ stbuf.st_ino = bdb_inode_transform (loc->parent->ino,
+ key_string,
+ strlen (key_string));
stbuf.st_mode = private->symlink_mode;
goto out;
@@ -1570,9 +1853,10 @@ err:
op_ret = unlink (real_path);
op_errno = errno;
if (op_ret != 0) {
- gf_log (this->name, GF_LOG_ERROR,
- "failed to unlink the previously created symlink (%s)",
- strerror (op_errno));
+ gf_log (this->name, GF_LOG_DEBUG,
+ "SYMLINK %"PRId64" (%s): %s"
+ "(failed to unlink the created symlink)",
+ loc->ino, loc->path, strerror (op_errno));
}
op_ret = -1;
op_errno = ENOENT;
@@ -1608,9 +1892,14 @@ bdb_chmod (call_frame_t *frame,
op_ret = lstat (real_path, &stbuf);
op_errno = errno;
if (op_ret != 0) {
- gf_log (this->name, GF_LOG_ERROR,
- "failed to lstat on %s (%s)",
- real_path, strerror (op_errno));
+ if (op_errno == ENOENT) {
+ op_errno = EPERM;
+ } else {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "CHMOD %"PRId64" (%s): %s"
+ "(lstat failed)",
+ loc->ino, loc->path, strerror (op_errno));
+ }
goto out;
}
@@ -1644,11 +1933,16 @@ bdb_chown (call_frame_t *frame,
MAKE_REAL_PATH (real_path, this, loc->path);
op_ret = lstat (real_path, &stbuf);
- op_errno = errno;
if (op_ret != 0) {
- gf_log (this->name, GF_LOG_ERROR,
- "failed to lstat on %s (%s)",
- real_path, strerror (op_errno));
+ op_errno = errno;
+ if (op_errno == ENOENT) {
+ op_errno = EPERM;
+ } else {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "CHOWN %"PRId64" (%s): %s"
+ "(lstat failed)",
+ loc->ino, loc->path, strerror (op_errno));
+ }
goto out;
}
@@ -1682,8 +1976,15 @@ bdb_truncate (call_frame_t *frame,
GF_VALIDATE_OR_GOTO (this->name, loc, out);
bctx = bctx_parent (B_TABLE(this), loc->path);
- op_errno = ENOENT;
- GF_VALIDATE_OR_GOTO (this->name, bctx, out);
+ if (bctx == NULL) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "TRUNCATE %"PRId64" (%s): ENOMEM"
+ "(no database handle for parent)",
+ loc->ino, loc->path);
+ op_ret = -1;
+ op_errno = ENOMEM;
+ goto out;
+ }
MAKE_REAL_PATH (real_path, this, loc->path);
MAKE_KEY_FROM_PATH (key_string, loc->path);
@@ -1691,26 +1992,29 @@ bdb_truncate (call_frame_t *frame,
/* now truncate */
MAKE_REAL_PATH_TO_STORAGE_DB (db_path, this, bctx->directory);
op_ret = lstat (db_path, &stbuf);
- op_errno = errno;
if (op_ret != 0) {
- gf_log (this->name, GF_LOG_ERROR,
- "failed to lstat on %s (%s)",
- db_path, strerror (op_errno));
+ op_errno = errno;
+ gf_log (this->name, GF_LOG_DEBUG,
+ "TRUNCATE %"PRId64" (%s): %s"
+ "(lstat on database file failed)",
+ loc->ino, loc->path, strerror (op_errno));
goto out;
}
if (loc->inode->ino) {
stbuf.st_ino = loc->inode->ino;
}else {
- stbuf.st_ino = bdb_inode_transform (stbuf.st_ino, bctx);
+ stbuf.st_ino = bdb_inode_transform (loc->parent->ino,
+ key_string,
+ strlen (key_string));
}
- op_ret = bdb_db_put (bctx, NULL, key_string, NULL, 0, 1, 0);
- if (op_ret == -1) {
+ op_ret = bdb_db_itruncate (bctx, key_string);
+ if (op_ret < 0) {
gf_log (this->name, GF_LOG_DEBUG,
- "failed to do bdb_db_put: %s",
- db_strerror (op_ret));
- op_ret = -1;
+ "TRUNCATE %"PRId64" (%s): EINVAL"
+ "(truncating entry in database failed - %s)",
+ loc->ino, loc->path, db_strerror (op_ret));
op_errno = EINVAL; /* TODO: better errno */
}
@@ -1745,40 +2049,44 @@ bdb_utimens (call_frame_t *frame,
GF_VALIDATE_OR_GOTO (this->name, loc, out);
MAKE_REAL_PATH (real_path, this, loc->path);
- op_ret = lstat (real_path, &stbuf);
- op_errno = errno;
+ op_ret = sys_lstat (real_path, &stbuf);
if (op_ret != 0) {
- op_errno = EPERM;
- gf_log (this->name, GF_LOG_ERROR,
- "failed to lstat on %s (%s)",
- real_path, strerror (op_errno));
+ op_errno = errno;
+ if (op_errno == ENOENT) {
+ op_errno = EPERM;
+ } else {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "UTIMENS %"PRId64" (%s): %s",
+ loc->ino, loc->path, strerror (op_errno));
+ }
goto out;
}
/* directory or symlink */
- tv[0].tv_sec = ts[0].tv_sec;
+ tv[0].tv_sec = ts[0].tv_sec;
tv[0].tv_usec = ts[0].tv_nsec / 1000;
- tv[1].tv_sec = ts[1].tv_sec;
+ tv[1].tv_sec = ts[1].tv_sec;
tv[1].tv_usec = ts[1].tv_nsec / 1000;
op_ret = lutimes (real_path, tv);
- if (op_ret == -1 && errno == ENOSYS) {
- op_ret = utimes (real_path, tv);
+ if ((op_ret == -1) && (errno == ENOSYS)) {
+ op_ret = sys_utimes (real_path, tv);
}
- op_errno = errno;
+
if (op_ret == -1) {
- gf_log (this->name, GF_LOG_WARNING,
- "utimes on %s failed: %s",
- loc->path, strerror (op_errno));
+ op_errno = errno;
+ gf_log (this->name, GF_LOG_DEBUG,
+ "UTIMENS %"PRId64" (%s): %s",
+ loc->ino, loc->path, strerror (op_errno));
goto out;
}
- op_ret = lstat (real_path, &stbuf);
- op_errno = errno;
+ op_ret = sys_lstat (real_path, &stbuf);
if (op_ret != 0) {
- gf_log (this->name, GF_LOG_ERROR,
- "failed to lstat on %s (%s)",
- real_path, strerror (op_errno));
+ op_errno = errno;
+ gf_log (this->name, GF_LOG_DEBUG,
+ "UTIMENS %"PRId64" (%s): %s",
+ loc->ino, loc->path, strerror (op_errno));
goto out;
}
@@ -1858,52 +2166,54 @@ bdb_setxattr (call_frame_t *frame,
MAKE_REAL_PATH (real_path, this, loc->path);
if (!S_ISDIR (loc->inode->st_mode)) {
op_ret = -1;
- op_errno = EPERM;
+ op_errno = ENOATTR;
goto out;
}
while (trav) {
- if (ZR_FILE_CONTENT_REQUEST(trav->key) ) {
- bctx = bctx_lookup (B_TABLE(this), loc->path);
- op_errno = EINVAL;
- GF_VALIDATE_OR_GOTO (this->name, bctx, out);
+ if (GF_FILE_CONTENT_REQUEST(trav->key) ) {
+ key = BDB_KEY_FROM_FREQUEST_KEY(trav->key);
- key = &(trav->key[15]);
+ bctx = bctx_lookup (B_TABLE(this), loc->path);
+ if (bctx == NULL) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "SETXATTR %"PRId64" (%s) - %s: ENOMEM"
+ "(no database handle for directory)",
+ loc->ino, loc->path, key);
+ op_ret = -1;
+ op_errno = ENOMEM;
+ goto out;
+ }
if (flags & XATTR_REPLACE) {
- /* replace only if previously exists, otherwise
- * error out */
- op_ret = bdb_db_get (bctx, NULL, key,
- NULL, 0, 0);
+ op_ret = bdb_db_itruncate (bctx, key);
if (op_ret == -1) {
/* key doesn't exist in database */
gf_log (this->name, GF_LOG_DEBUG,
- "cannot XATTR_REPLACE, xattr %s"
- " doesn't exist on path %s",
- key, loc->path);
+ "SETXATTR %"PRId64" (%s) - %s:"
+ " (entry not present in "
+ "database)",
+ loc->ino, loc->path, key);
op_ret = -1;
- op_errno = ENOENT;
+ op_errno = ENOATTR;
break;
}
- op_ret = bdb_db_put (bctx, NULL,
- key, trav->value->data,
- trav->value->len,
- op_ret,
- BDB_TRUNCATE_RECORD);
+ op_ret = bdb_db_iwrite (bctx, key,
+ trav->value->data,
+ trav->value->len);
if (op_ret != 0) {
op_ret = -1;
- op_errno = EINVAL;
+ op_errno = ENOATTR;
break;
}
} else {
/* fresh create */
- op_ret = bdb_db_put (bctx, NULL, key,
- trav->value->data,
- trav->value->len,
- 0, 0);
+ op_ret = bdb_db_iwrite (bctx, key,
+ trav->value->data,
+ trav->value->len);
if (op_ret != 0) {
op_ret = -1;
- op_errno = EINVAL;
+ op_errno = EEXIST;
break;
} else {
op_ret = 0;
@@ -1918,25 +2228,26 @@ bdb_setxattr (call_frame_t *frame,
} else {
/* do plain setxattr */
op_ret = lsetxattr (real_path,
- trav->key,
- trav->value->data,
+ trav->key, trav->value->data,
trav->value->len,
flags);
op_errno = errno;
- if ((op_ret == -1) && (op_errno != ENOENT)) {
- if (op_errno == ENOTSUP) {
- gf_bdb_xattr_log++;
- if (!(gf_bdb_xattr_log % GF_UNIVERSAL_ANSWER)) {
- gf_log (this->name, GF_LOG_WARNING,
- "Extended Attributes support not present."\
- "Please check");
- }
- } else {
- gf_log (this->name, GF_LOG_DEBUG,
- "setxattr failed on %s (%s)",
- loc->path, strerror (op_errno));
- }
+
+ if ((op_errno == ENOATTR) || (op_errno == EEXIST)) {
+ /* don't log, normal behaviour */
+ ;
+ } else if (BDB_TIMED_LOG (op_errno, gf_bdb_xattr_log)) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "SETXATTR %"PRId64" (%s) - %s: %s",
+ loc->ino, loc->path, trav->key,
+ strerror (op_errno));
+ /* do not continue, break out */
break;
+ } else {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "SETXATTR %"PRId64" (%s) - %s: %s",
+ loc->ino, loc->path, trav->key,
+ strerror (op_errno));
}
} /* if(ZR_FILE_CONTENT_REQUEST())...else */
trav = trav->next;
@@ -1988,109 +2299,131 @@ bdb_getxattr (call_frame_t *frame,
GF_VALIDATE_OR_GOTO (this->name, loc, out);
GF_VALIDATE_OR_GOTO (this->name, name, out);
- dict = get_new_dict ();
+ dict = dict_new ();
GF_VALIDATE_OR_GOTO (this->name, dict, out);
if (!S_ISDIR (loc->inode->st_mode)) {
gf_log (this->name, GF_LOG_DEBUG,
- "operation not permitted on a non-directory file: %s",
- loc->path);
- op_ret = -1;
- op_errno = ENODATA;
+ "GETXATTR %"PRId64" (%s) - %s: ENOATTR "
+ "(not a directory)",
+ loc->ino, loc->path, name);
+ op_ret = -1;
+ op_errno = ENOATTR;
goto out;
}
- if (name && ZR_FILE_CONTENT_REQUEST(name)) {
+ if (name && GF_FILE_CONTENT_REQUEST(name)) {
bctx = bctx_lookup (B_TABLE(this), loc->path);
- op_errno = EINVAL;
- GF_VALIDATE_OR_GOTO (this->name, bctx, out);
+ if (bctx == NULL) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "GETXATTR %"PRId64" (%s) - %s: ENOMEM"
+ "(no database handle for directory)",
+ loc->ino, loc->path, name);
+ op_ret = -1;
+ op_errno = ENOMEM;
+ goto out;
+ }
- key_string = (char *)&(name[15]);
+ key_string = BDB_KEY_FROM_FREQUEST_KEY(name);
- op_ret = bdb_db_get (bctx, NULL, key_string, &buf, 0, 0);
+ op_ret = bdb_db_iread (bctx, key_string, &buf);
if (op_ret == -1) {
gf_log (this->name, GF_LOG_DEBUG,
- "failed to db get on directory: %s for key: %s",
- bctx->directory, name);
- op_ret = -1;
- op_errno = ENODATA;
+ "GETXATTR %"PRId64" (%s) - %s: ENOATTR"
+ "(attribute not present in database)",
+ loc->ino, loc->path, name);
+ op_errno = ENOATTR;
goto out;
}
op_ret = dict_set_dynptr (dict, (char *)name, buf, op_ret);
if (op_ret < 0) {
gf_log (this->name, GF_LOG_DEBUG,
- "failed to set to dictionary");
- op_ret = -1;
+ "GETXATTR %"PRId64" (%s) - %s: ENOATTR"
+ "(attribute present in database, "
+ "dict set failed)",
+ loc->ino, loc->path, name);
op_errno = ENODATA;
}
- } else {
- MAKE_REAL_PATH (real_path, this, loc->path);
- size = llistxattr (real_path, NULL, 0);
- op_errno = errno;
- if (size <= 0) {
- /* There are no extended attributes, send an empty
- * dictionary */
- if (size == -1 && op_errno != ENODATA) {
- if (op_errno == ENOTSUP) {
- gf_bdb_xattr_log++;
- if (!(gf_bdb_xattr_log % GF_UNIVERSAL_ANSWER))
- gf_log (this->name, GF_LOG_WARNING,
- "Extended Attributes support not present."\
- "Please check");
- } else {
- gf_log (this->name, GF_LOG_WARNING,
- "llistxattr failed on %s (%s)",
- loc->path, strerror (op_errno));
- }
- }
- op_ret = -1;
- op_errno = ENODATA;
+
+ goto out;
+ }
+
+ MAKE_REAL_PATH (real_path, this, loc->path);
+ size = sys_llistxattr (real_path, NULL, 0);
+ op_errno = errno;
+ if (size < 0) {
+ if (BDB_TIMED_LOG (op_errno, gf_bdb_xattr_log)) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "GETXATTR %"PRId64" (%s) - %s: %s",
+ loc->ino, loc->path, name, strerror (op_errno));
} else {
- list = alloca (size + 1);
- op_errno = ENOMEM;
- GF_VALIDATE_OR_GOTO (this->name, list, out);
+ gf_log (this->name, GF_LOG_DEBUG,
+ "GETXATTR %"PRId64" (%s) - %s: %s",
+ loc->ino, loc->path, name, strerror (op_errno));
+ }
+ op_ret = -1;
+ op_errno = ENOATTR;
- size = llistxattr (real_path, list, size);
- op_ret = size;
- op_errno = errno;
- if (size == -1) {
- gf_log (this->name, GF_LOG_ERROR,
- "llistxattr failed on %s (%s)",
- loc->path, strerror (errno));
- goto out;
- }
- remaining_size = size;
- list_offset = 0;
- while (remaining_size > 0) {
- if(*(list+list_offset) == '\0')
- break;
- strcpy (key, list + list_offset);
- op_ret = lgetxattr (real_path, key, NULL, 0);
- if (op_ret == -1)
- break;
- value = CALLOC (op_ret + 1, sizeof(char));
- GF_VALIDATE_OR_GOTO (this->name, value, out);
+ goto out;
+ }
- op_ret = lgetxattr (real_path, key, value,
- op_ret);
- if (op_ret == -1)
- break;
- value [op_ret] = '\0';
- op_ret = dict_set_dynptr (dict, key,
- value, op_ret);
- if (op_ret < 0) {
- FREE (value);
- gf_log (this->name, GF_LOG_DEBUG,
- "skipping key %s", key);
- continue;
- }
- remaining_size -= strlen (key) + 1;
- list_offset += strlen (key) + 1;
- } /* while(remaining_size>0) */
- } /* if(size <= 0)...else */
- } /* if(name...)...else */
+ if (size == 0)
+ goto done;
+
+ list = alloca (size + 1);
+ if (list == NULL) {
+ op_ret = -1;
+ op_errno = errno;
+ gf_log (this->name, GF_LOG_DEBUG,
+ "GETXATTR %"PRId64" (%s) - %s: %s",
+ loc->ino, loc->path, name, strerror (op_errno));
+ }
+
+ size = sys_llistxattr (real_path, list, size);
+ op_ret = size;
+ if (op_ret == -1) {
+ op_errno = errno;
+ gf_log (this->name, GF_LOG_DEBUG,
+ "GETXATTR %"PRId64" (%s) - %s: %s",
+ loc->ino, loc->path, name, strerror (op_errno));
+ goto out;
+ }
+ remaining_size = size;
+ list_offset = 0;
+ while (remaining_size > 0) {
+ if(*(list+list_offset) == '\0')
+ break;
+
+ strcpy (key, list + list_offset);
+
+ op_ret = sys_lgetxattr (real_path, key, NULL, 0);
+ if (op_ret == -1)
+ break;
+
+ value = CALLOC (op_ret + 1, sizeof(char));
+ GF_VALIDATE_OR_GOTO (this->name, value, out);
+
+ op_ret = sys_lgetxattr (real_path, key, value,
+ op_ret);
+ if (op_ret == -1)
+ break;
+ value [op_ret] = '\0';
+ op_ret = dict_set_dynptr (dict, key,
+ value, op_ret);
+ if (op_ret < 0) {
+ FREE (value);
+ gf_log (this->name, GF_LOG_DEBUG,
+ "GETXATTR %"PRId64" (%s) - %s: "
+ "(skipping key %s)",
+ loc->ino, loc->path, name, key);
+ continue;
+ }
+ remaining_size -= strlen (key) + 1;
+ list_offset += strlen (key) + 1;
+ } /* while(remaining_size>0) */
+done:
out:
if(bctx) {
/* NOTE: bctx_unref always returns success,
@@ -2098,9 +2431,6 @@ out:
bctx_unref (bctx);
}
- if (dict)
- dict_ref (dict);
-
STACK_UNWIND (frame, op_ret, op_errno, dict);
if (dict)
@@ -2127,45 +2457,52 @@ bdb_removexattr (call_frame_t *frame,
GF_VALIDATE_OR_GOTO (this->name, name, out);
if (!S_ISDIR(loc->inode->st_mode)) {
- gf_log (this->name, GF_LOG_WARNING,
- "operation not permitted on non-directory files");
+ gf_log (this->name, GF_LOG_DEBUG,
+ "REMOVEXATTR %"PRId64" (%s) - %s: ENOATTR "
+ "(not a directory)",
+ loc->ino, loc->path, name);
op_ret = -1;
- op_errno = EPERM;
+ op_errno = ENOATTR;
goto out;
}
- if (ZR_FILE_CONTENT_REQUEST(name)) {
+ if (GF_FILE_CONTENT_REQUEST(name)) {
bctx = bctx_lookup (B_TABLE(this), loc->path);
- op_errno = EINVAL;
- GF_VALIDATE_OR_GOTO (this->name, bctx, out);
-
- op_ret = bdb_db_del (bctx, NULL, name);
- if (op_ret == -1) {
- gf_log (this->name, GF_LOG_ERROR,
- "failed to delete %s from db of %s directory",
- name, loc->path);
- op_errno = EINVAL; /* TODO: errno */
+ if (bctx == NULL) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "REMOVEXATTR %"PRId64" (%s) - %s: ENOATTR"
+ "(no database handle for directory)",
+ loc->ino, loc->path, name);
+ op_ret = -1;
+ op_errno = ENOATTR;
goto out;
}
- } else {
- MAKE_REAL_PATH(real_path, this, loc->path);
- op_ret = lremovexattr (real_path, name);
- op_errno = errno;
+
+ op_ret = bdb_db_iremove (bctx, name);
if (op_ret == -1) {
- if (op_errno == ENOTSUP) {
- gf_bdb_xattr_log++;
- if (!(gf_bdb_xattr_log % GF_UNIVERSAL_ANSWER))
- gf_log (this->name, GF_LOG_WARNING,
- "Extended Attributes support not present."
- "Please check");
- } else {
- gf_log (this->name, GF_LOG_WARNING,
- "%s: %s",
- loc->path, strerror (op_errno));
- }
- } /* if(op_ret == -1) */
- } /* if (ZR_FILE_CONTENT_REQUEST(name))...else */
+ gf_log (this->name, GF_LOG_DEBUG,
+ "REMOVEXATTR %"PRId64" (%s) - %s: ENOATTR"
+ "(no such attribute in database)",
+ loc->ino, loc->path, name);
+ op_errno = ENOATTR;
+ }
+ goto out;
+ }
+ MAKE_REAL_PATH(real_path, this, loc->path);
+ op_ret = lremovexattr (real_path, name);
+ op_errno = errno;
+ if (op_ret == -1) {
+ if (BDB_TIMED_LOG (op_errno, gf_bdb_xattr_log)) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "REMOVEXATTR %"PRId64" (%s) - %s: %s",
+ loc->ino, loc->path, name, strerror (op_errno));
+ } else {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "REMOVEXATTR %"PRId64" (%s) - %s: %s",
+ loc->ino, loc->path, name, strerror (op_errno));
+ }
+ } /* if(op_ret == -1) */
out:
if (bctx) {
/* NOTE: bctx_unref always returns success,
@@ -2195,9 +2532,15 @@ bdb_fsyncdir (call_frame_t *frame,
frame->root->rsp_refs = NULL;
- bfd = bdb_extract_bfd (fd, this);
- op_errno = EBADFD;
- GF_VALIDATE_OR_GOTO (this->name, bfd, out);
+ BDB_FCTX_GET (fd, this, &bfd);
+ if (bfd == NULL) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "FSYNCDIR %"PRId64": EBADFD"
+ "(failed to find internal context from fd)",
+ fd->inode->ino);
+ op_errno = EBADFD;
+ op_ret = -1;
+ }
out:
STACK_UNWIND (frame, op_ret, op_errno);
@@ -2321,9 +2664,15 @@ bdb_setdents (call_frame_t *frame,
frame->root->rsp_refs = NULL;
- bfd = bdb_extract_bfd (fd, this);
- op_errno = EBADFD;
- GF_VALIDATE_OR_GOTO (this->name, bfd, out);
+ BDB_FCTX_GET (fd, this, &bfd);
+ if (bfd == NULL) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "SETDENTS %"PRId64": EBADFD",
+ fd->inode->ino);
+ op_errno = EBADFD;
+ op_ret = -1;
+ goto out;
+ }
real_path_len = strlen (bfd->path);
entry_path_len = real_path_len + 256;
@@ -2346,60 +2695,68 @@ bdb_setdents (call_frame_t *frame,
*/
ret = mkdir (pathname, trav->buf.st_mode);
if ((ret == -1) && (errno != EEXIST)) {
- gf_log (this->name, GF_LOG_ERROR,
- "failed to created directory %s: %s",
- pathname, strerror(errno));
+ op_errno = errno;
+ op_ret = ret;
+ gf_log (this->name, GF_LOG_DEBUG,
+ "SETDENTS %"PRId64" - %s: %s "
+ "(mkdir failed)",
+ fd->inode->ino, pathname,
+ strerror (op_errno));
goto loop;
}
- gf_log (this->name, GF_LOG_DEBUG,
- "Creating directory %s with mode (0%o)",
- pathname,
- trav->buf.st_mode);
/* Change the mode
* NOTE: setdents tries its best to restore the state
* of storage. if chmod and chown fail, they can
* be ignored now */
ret = chmod (pathname, trav->buf.st_mode);
- if (ret != 0) {
- op_ret = -1;
+ if (ret < 0) {
+ op_ret = -1;
op_errno = errno;
- gf_log (this->name, GF_LOG_ERROR,
- "chmod failed on %s (%s)",
- pathname, strerror (errno));
+ gf_log (this->name, GF_LOG_DEBUG,
+ "SETDENTS %"PRId64" - %s: %s "
+ "(chmod failed)",
+ fd->inode->ino, pathname,
+ strerror (op_errno));
goto loop;
}
/* change the ownership */
ret = chown (pathname, trav->buf.st_uid,
trav->buf.st_gid);
if (ret != 0) {
- op_ret = -1;
+ op_ret = -1;
op_errno = errno;
- gf_log (this->name, GF_LOG_ERROR,
- "chown failed on %s (%s)",
- pathname, strerror (errno));
+ gf_log (this->name, GF_LOG_DEBUG,
+ "SETDENTS %"PRId64" - %s: %s "
+ "(chown failed)",
+ fd->inode->ino, pathname,
+ strerror (op_errno));
goto loop;
}
} else if ((flags == GF_SET_IF_NOT_PRESENT) ||
(flags != GF_SET_DIR_ONLY)) {
/* Create a 0 byte file here */
if (S_ISREG (trav->buf.st_mode)) {
- op_ret = bdb_db_put (bfd->ctx, NULL,
- trav->name, NULL, 0, 0, 0);
- if (op_ret != 0) {
- /* create successful */
- gf_log (this->name, GF_LOG_ERROR,
- "failed to create file %s",
- pathname);
- } /* if (!op_ret)...else */
+ op_ret = bdb_db_icreate (bfd->ctx,
+ trav->name);
+ if (op_ret < 0) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "SETDENTS %"PRId64" (%s) - %s: "
+ "%s (database entry creation"
+ " failed)",
+ fd->inode->ino,
+ bfd->ctx->directory, trav->name,
+ strerror (op_errno));
+ }
} else if (S_ISLNK (trav->buf.st_mode)) {
/* TODO: impelement */;
} else {
- gf_log (this->name, GF_LOG_ERROR,
- "storage/bdb allows to create regular"
- " files only file %s (mode = %d) cannot"
- " be created",
- pathname, trav->buf.st_mode);
+ gf_log (this->name, GF_LOG_DEBUG,
+ "SETDENTS %"PRId64" (%s) - %s mode=%o: "
+ "(unsupported file type)",
+ fd->inode->ino,
+ bfd->ctx->directory, trav->name,
+ trav->buf.st_mode);
} /* if(S_ISREG())...else */
} /* if(S_ISDIR())...else if */
loop:
@@ -2431,9 +2788,16 @@ bdb_fstat (call_frame_t *frame,
GF_VALIDATE_OR_GOTO ("bdb", this, out);
GF_VALIDATE_OR_GOTO (this->name, fd, out);
- bfd = bdb_extract_bfd (fd, this);
- op_errno = EBADFD;
- GF_VALIDATE_OR_GOTO (this->name, bfd, out);
+ BDB_FCTX_GET (fd, this, &bfd);
+ if (bfd == NULL) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "FSTAT %"PRId64": EBADFD "
+ "(failed to find internal context in fd)",
+ fd->inode->ino);
+ op_errno = EBADFD;
+ op_ret = -1;
+ goto out;
+ }
bctx = bfd->ctx;
@@ -2441,14 +2805,15 @@ bdb_fstat (call_frame_t *frame,
op_ret = lstat (db_path, &stbuf);
op_errno = errno;
if (op_ret != 0) {
- gf_log (this->name, GF_LOG_ERROR,
- "failed to lstat on %s (%s)",
- db_path, strerror (op_errno));
+ gf_log (this->name, GF_LOG_DEBUG,
+ "FSTAT %"PRId64": %s"
+ "(failed to stat database file %s)",
+ fd->inode->ino, strerror (op_errno), db_path);
goto out;
}
stbuf.st_ino = fd->inode->ino;
- stbuf.st_size = bdb_db_get (bctx, NULL, bfd->key, NULL, 0, 0);
+ stbuf.st_size = bdb_db_fread (bfd, NULL, 0, 0);
stbuf.st_blocks = BDB_COUNT_BLOCKS (stbuf.st_size, stbuf.st_blksize);
out:
@@ -2458,6 +2823,20 @@ out:
return 0;
}
+gf_dirent_t *
+gf_dirent_for_namen (const char *name,
+ size_t len)
+{
+ char *tmp_name = NULL;
+
+ tmp_name = alloca (len + 1);
+
+ memcpy (tmp_name, name, len);
+
+ tmp_name[len] = 0;
+
+ return gf_dirent_for_name (tmp_name);
+}
int32_t
bdb_readdir (call_frame_t *frame,
@@ -2477,6 +2856,7 @@ bdb_readdir (call_frame_t *frame,
int32_t this_size = 0;
DBC *cursorp = NULL;
int32_t count = 0;
+ off_t offset = 0;
GF_VALIDATE_OR_GOTO ("bdb", frame, out);
GF_VALIDATE_OR_GOTO ("bdb", this, out);
@@ -2484,137 +2864,164 @@ bdb_readdir (call_frame_t *frame,
INIT_LIST_HEAD (&entries.list);
- bfd = bdb_extract_bfd (fd, this);
- op_errno = EBADFD;
- GF_VALIDATE_OR_GOTO (this->name, bfd, out);
-
- op_errno = ENOMEM;
-
- while (filled <= size) {
- this_entry = NULL;
- entry = NULL;
- in_case = 0;
- this_size = 0;
-
- in_case = telldir (bfd->dir);
- entry = readdir (bfd->dir);
- if (!entry)
- break;
-
- if (IS_BDB_PRIVATE_FILE(entry->d_name))
- continue;
-
- this_size = dirent_size (entry);
-
- if (this_size + filled > size) {
- seekdir (bfd->dir, in_case);
- break;
- }
-
- count++;
-
- this_entry = gf_dirent_for_name (entry->d_name);
- this_entry->d_ino = entry->d_ino;
-
- this_entry->d_off = -1;
-
- this_entry->d_type = entry->d_type;
- this_entry->d_len = entry->d_reclen;
-
-
- list_add (&this_entry->list, &entries.list);
-
- filled += this_size;
- }
- op_ret = filled;
- op_errno = 0;
- if (filled >= size) {
+ BDB_FCTX_GET (fd, this, &bfd);
+ if (bfd == NULL) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "READDIR %"PRId64" - %"PRId32",%"PRId64": EBADFD "
+ "(failed to find internal context in fd)",
+ fd->inode->ino, size, off);
+ op_errno = EBADFD;
+ op_ret = -1;
goto out;
}
- /* hungry kyaa? */
op_ret = bdb_cursor_open (bfd->ctx, &cursorp);
- op_errno = EBADFD;
- GF_VALIDATE_OR_GOTO (this->name, (op_ret == 0), out);
-
- /* TODO: fix d_off, don't use bfd->offset. wrong method */
- if (strlen (bfd->offset)) {
- DBT key = {0,}, value = {0,};
- key.data = bfd->offset;
- key.size = strlen (bfd->offset);
- key.flags = DB_DBT_USERMEM;
- value.dlen = 0;
- value.doff = 0;
- value.flags = DB_DBT_PARTIAL;
-
- op_ret = bdb_cursor_get (cursorp, &key, &value, DB_SET);
+ if (op_ret < 0) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "READDIR %"PRId64" - %"PRId32",%"PRId64": EBADFD "
+ "(failed to open cursor to database handle)",
+ fd->inode->ino, size, off);
op_errno = EBADFD;
- GF_VALIDATE_OR_GOTO (this->name, (op_ret == 0), out);
+ goto out;
+ }
- } else {
- /* first time or last time, do nothing */
+ if (off) {
+ DBT sec = {0,}, pri = {0,}, val = {0,};
+ sec.data = &(off);
+ sec.size = sizeof (off);
+ sec.flags = DB_DBT_USERMEM;
+ val.dlen = 0;
+ val.doff = 0;
+ val.flags = DB_DBT_PARTIAL;
+
+ op_ret = bdb_cursor_get (cursorp, &sec, &pri, &val, DB_SET);
+ if (op_ret == DB_NOTFOUND) {
+ offset = off;
+ goto dir_read;
+ }
}
while (filled <= size) {
- DBT key = {0,}, value = {0,};
+ DBT sec = {0,}, pri = {0,}, val = {0,};
+
this_entry = NULL;
- key.flags = DB_DBT_MALLOC;
- value.dlen = 0;
- value.doff = 0;
- value.flags = DB_DBT_PARTIAL;
- op_ret = bdb_cursor_get (cursorp, &key, &value, DB_NEXT);
+ sec.flags = DB_DBT_MALLOC;
+ pri.flags = DB_DBT_MALLOC;
+ val.dlen = 0;
+ val.doff = 0;
+ val.flags = DB_DBT_PARTIAL;
+ op_ret = bdb_cursor_get (cursorp, &sec, &pri, &val, DB_NEXT);
if (op_ret == DB_NOTFOUND) {
/* we reached end of the directory */
op_ret = 0;
op_errno = 0;
break;
- } else if (op_ret != 0) {
+ } else if (op_ret < 0) {
gf_log (this->name, GF_LOG_DEBUG,
- "database error during readdir");
- op_ret = -1;
+ "READDIR %"PRId64" - %"PRId32",%"PRId64":"
+ "(failed to read the next entry from database)",
+ fd->inode->ino, size, off);
op_errno = ENOENT;
break;
} /* if (op_ret == DB_NOTFOUND)...else if...else */
- if (key.data == NULL) {
+ if (pri.data == NULL) {
/* NOTE: currently ignore when we get key.data == NULL.
* TODO: we should not get key.data = NULL */
gf_log (this->name, GF_LOG_DEBUG,
- "null key read from db");
+ "READDIR %"PRId64" - %"PRId32",%"PRId64":"
+ "(null key read for entry from database)",
+ fd->inode->ino, size, off);
continue;
}/* if(key.data)...else */
count++;
- this_size = bdb_dirent_size (&key);
+ this_size = bdb_dirent_size (&pri);
if (this_size + filled > size)
break;
/* TODO - consider endianness here */
- this_entry = gf_dirent_for_name ((const char *)key.data);
- /* FIXME: bug, if someone is going to use ->d_ino */
- this_entry->d_ino = -1;
- this_entry->d_off = 0;
+ this_entry = gf_dirent_for_namen ((const char *)pri.data,
+ pri.size);
+
+ this_entry->d_ino = bdb_inode_transform (fd->inode->ino,
+ pri.data,
+ pri.size);
+ this_entry->d_off = *(uint32_t *)sec.data;
this_entry->d_type = 0;
- this_entry->d_len = key.size;
+ this_entry->d_len = pri.size + 1;
- if (key.data) {
- strncpy (bfd->offset, key.data, key.size);
- bfd->offset [key.size] = '\0';
- free (key.data);
+ if (sec.data) {
+ FREE (sec.data);
}
- list_add (&this_entry->list, &entries.list);
+ if (pri.data)
+ FREE (pri.data);
+
+ list_add_tail (&this_entry->list, &entries.list);
filled += this_size;
}/* while */
bdb_cursor_close (bfd->ctx, cursorp);
op_ret = filled;
op_errno = 0;
+ if (filled >= size) {
+ goto out;
+ }
+dir_read:
+ /* hungry kyaa? */
+ if (!offset) {
+ rewinddir (bfd->dir);
+ } else {
+ seekdir (bfd->dir, offset);
+ }
+
+ while (filled <= size) {
+ this_entry = NULL;
+ entry = NULL;
+ this_size = 0;
+
+ in_case = telldir (bfd->dir);
+ entry = readdir (bfd->dir);
+ if (!entry)
+ break;
+
+ if (IS_BDB_PRIVATE_FILE(entry->d_name))
+ continue;
+
+ this_size = dirent_size (entry);
+
+ if (this_size + filled > size) {
+ seekdir (bfd->dir, in_case);
+ break;
+ }
+
+ count++;
+
+ this_entry = gf_dirent_for_name (entry->d_name);
+ this_entry->d_ino = entry->d_ino;
+
+ this_entry->d_off = entry->d_off;
+
+ this_entry->d_type = entry->d_type;
+ this_entry->d_len = entry->d_reclen;
+
+
+ list_add_tail (&this_entry->list, &entries.list);
+
+ filled += this_size;
+ }
+ op_ret = filled;
+ op_errno = 0;
+
out:
frame->root->rsp_refs = NULL;
+
gf_log (this->name, GF_LOG_DEBUG,
- "read %"GF_PRI_SIZET" bytes for %d entries",
- filled, count);
+ "READDIR %"PRId64" - %"PRId32" (%"PRId32")/%"PRId32",%"PRId64":"
+ "(failed to read the next entry from database)",
+ fd->inode->ino, filled, count, size, off);
+
STACK_UNWIND (frame, count, op_errno, &entries);
gf_dirent_free (&entries);
@@ -2629,11 +3036,11 @@ bdb_stats (call_frame_t *frame,
int32_t flags)
{
- int32_t op_ret = 0;
+ int32_t op_ret = 0;
int32_t op_errno = 0;
struct xlator_stats xlstats = {0, }, *stats = NULL;
- struct statvfs buf;
+ struct statvfs buf = {0,};
struct timeval tv;
struct bdb_private *private = NULL;
int64_t avg_read = 0;
@@ -2647,10 +3054,10 @@ bdb_stats (call_frame_t *frame,
stats = &xlstats;
op_ret = statvfs (private->export_path, &buf);
- op_errno = errno;
if (op_ret != 0) {
- gf_log (this->name, GF_LOG_ERROR,
- "failed to statvfs on %s (%s)",
+ op_errno = errno;
+ gf_log (this->name, GF_LOG_DEBUG,
+ "STATS %s: %s",
private->export_path, strerror (op_errno));
goto out;
}
@@ -2661,9 +3068,9 @@ bdb_stats (call_frame_t *frame,
stats->nr_clients = private->stats.nr_clients;
/* Number of Free block in the filesystem. */
- stats->free_disk = buf.f_bfree * buf.f_bsize;
+ stats->free_disk = buf.f_bfree * buf.f_bsize;
stats->total_disk_size = buf.f_blocks * buf.f_bsize; /* */
- stats->disk_usage = (buf.f_blocks - buf.f_bavail) * buf.f_bsize;
+ stats->disk_usage = (buf.f_blocks - buf.f_bavail) * buf.f_bsize;
/* Calculate read and write usage */
gettimeofday (&tv, NULL);
@@ -2672,7 +3079,7 @@ bdb_stats (call_frame_t *frame,
_time_ms = (tv.tv_sec - private->init_time.tv_sec) * 1000 +
((tv.tv_usec - private->init_time.tv_usec) / 1000);
- avg_read = (_time_ms) ? (private->read_value / _time_ms) : 0; /* KBps */
+ avg_read = (_time_ms) ? (private->read_value / _time_ms) : 0;/* KBps */
avg_write = (_time_ms) ? (private->write_value / _time_ms) : 0;
_time_ms = (tv.tv_sec - private->prev_fetch_time.tv_sec) * 1000 +
@@ -2706,9 +3113,10 @@ bdb_inodelk (call_frame_t *frame, xlator_t *this,
{
frame->root->rsp_refs = NULL;
- gf_log (this->name, GF_LOG_CRITICAL,
- "\"features/posix-locks\" translator is not loaded. "
- "You need to use it for proper functioning of GlusterFS");
+ gf_log (this->name, GF_LOG_ERROR,
+ "glusterfs internal locking request. please load "
+ "'features/locks' translator to enable glusterfs "
+ "support");
STACK_UNWIND (frame, -1, ENOSYS);
return 0;
@@ -2721,9 +3129,10 @@ bdb_finodelk (call_frame_t *frame, xlator_t *this,
{
frame->root->rsp_refs = NULL;
- gf_log (this->name, GF_LOG_CRITICAL,
- "\"features/posix-locks\" translator is not loaded. "
- "You need to use it for proper functioning of GlusterFS");
+ gf_log (this->name, GF_LOG_ERROR,
+ "glusterfs internal locking request. please load "
+ "'features/locks' translator to enable glusterfs "
+ "support");
STACK_UNWIND (frame, -1, ENOSYS);
return 0;
@@ -2737,9 +3146,10 @@ bdb_entrylk (call_frame_t *frame, xlator_t *this,
{
frame->root->rsp_refs = NULL;
- gf_log (this->name, GF_LOG_CRITICAL,
- "\"features/posix-locks\" translator is not loaded. "
- "You need to use it for proper functioning of GlusterFS");
+ gf_log (this->name, GF_LOG_ERROR,
+ "glusterfs internal locking request. please load "
+ "'features/locks' translator to enable glusterfs "
+ "support");
STACK_UNWIND (frame, -1, ENOSYS);
return 0;
@@ -2753,15 +3163,15 @@ bdb_fentrylk (call_frame_t *frame, xlator_t *this,
{
frame->root->rsp_refs = NULL;
- gf_log (this->name, GF_LOG_CRITICAL,
- "\"features/posix-locks\" translator is not loaded. "
- "You need to use it for proper functioning of GlusterFS");
+ gf_log (this->name, GF_LOG_ERROR,
+ "glusterfs internal locking request. please load "
+ "'features/locks' translator to enable glusterfs "
+ "support");
STACK_UNWIND (frame, -1, ENOSYS);
return 0;
}
-
int32_t
bdb_checksum (call_frame_t *frame,
xlator_t *this,
@@ -2775,10 +3185,11 @@ bdb_checksum (call_frame_t *frame,
uint8_t dir_checksum[ZR_FILENAME_MAX] = {0,};
int32_t op_ret = -1;
int32_t op_errno = EINVAL;
- int32_t i = 0, length = 0;
+ int32_t idx = 0, length = 0;
bctx_t *bctx = NULL;
DBC *cursorp = NULL;
char *data = NULL;
+ uint8_t no_break = 1;
GF_VALIDATE_OR_GOTO ("bdb", frame, out);
GF_VALIDATE_OR_GOTO ("bdb", this, out);
@@ -2798,55 +3209,66 @@ bdb_checksum (call_frame_t *frame,
continue;
length = strlen (dirent->d_name);
- for (i = 0; i < length; i++)
- dir_checksum[i] ^= dirent->d_name[i];
+ for (idx = 0; idx < length; idx++)
+ dir_checksum[idx] ^= dirent->d_name[idx];
} /* while((dirent...)) */
closedir (dir);
}
{
bctx = bctx_lookup (B_TABLE(this), (char *)loc->path);
- op_errno = EINVAL;
- GF_VALIDATE_OR_GOTO (this->name, bctx, out);
+ if (bctx == NULL) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "CHECKSUM %"PRId64" (%s): ENOMEM"
+ "(failed to lookup database handle)",
+ loc->inode->ino, loc->path);
+ op_ret = -1;
+ op_errno = ENOMEM;
+ goto out;
+ }
op_ret = bdb_cursor_open (bctx, &cursorp);
- op_errno = EINVAL;
- GF_VALIDATE_OR_GOTO (this->name, (op_ret == 0), out);
+ if (op_ret < 0) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "CHECKSUM %"PRId64" (%s): EBADFD"
+ "(failed to open cursor to database handle)",
+ loc->inode->ino, loc->path);
+ op_ret = -1;
+ op_errno = EBADFD;
+ goto out;
+ }
- while (1) {
- DBT key = {0,}, value = {0,};
+
+ do {
+ DBT key = {0,}, value = {0,}, sec = {0,};
key.flags = DB_DBT_MALLOC;
value.doff = 0;
value.dlen = 0;
- op_ret = bdb_cursor_get (cursorp, &key, &value,
- DB_NEXT);
+ op_ret = bdb_cursor_get (cursorp, &sec, &key,
+ &value, DB_NEXT);
if (op_ret == DB_NOTFOUND) {
- gf_log (this->name, GF_LOG_DEBUG,
- "end of list of key/value pair in db"
- " for directory: %s", bctx->directory);
op_ret = 0;
op_errno = 0;
- break;
+ no_break = 0;
} else if (op_ret == 0){
/* successfully read */
data = key.data;
length = key.size;
- for (i = 0; i < length; i++)
- file_checksum[i] ^= data[i];
+ for (idx = 0; idx < length; idx++)
+ file_checksum[idx] ^= data[idx];
- free (key.data);
+ FREE (key.data);
} else {
- gf_log (this->name, GF_LOG_ERROR,
- "failed to do cursor get for directory"
- " %s: %s",
- bctx->directory, db_strerror (op_ret));
+ gf_log (this->name, GF_LOG_DEBUG,
+ "CHECKSUM %"PRId64" (%s)",
+ loc->inode->ino, loc->path);
op_ret = -1;
- op_errno = ENOENT;
- break;
+ op_errno = ENOENT; /* TODO: watch errno */
+ no_break = 0;
}/* if(op_ret == DB_NOTFOUND)...else if...else */
- } /* while(1) */
+ } while (no_break);
bdb_cursor_close (bctx, cursorp);
}
out:
@@ -2904,44 +3326,77 @@ init (xlator_t *this)
GF_VALIDATE_OR_GOTO ("bdb", this, out);
- _private = CALLOC (1, sizeof (*_private));
- GF_VALIDATE_OR_GOTO (this->name, _private, out);
-
if (this->children) {
gf_log (this->name, GF_LOG_ERROR,
- "FATAL: storage/bdb cannot have subvolumes");
- FREE (_private);
- goto out;;
+ "'storage/bdb' translator should be used as leaf node "
+ "in translator tree. please remove the subvolumes"
+ " specified and retry.");
+ goto err;
}
if (!this->parents) {
- gf_log (this->name, GF_LOG_WARNING,
- "dangling volume. check volfile ");
+ gf_log (this->name, GF_LOG_ERROR,
+ "'storage/bdb' translator needs at least one among "
+ "'protocol/server' or 'mount/fuse' translator as "
+ "parent. please add 'protocol/server' or 'mount/fuse' "
+ "as parent of 'storage/bdb' and retry. or you can also"
+ " try specifying mount-point on command-line.");
+ goto err;
}
+ _private = CALLOC (1, sizeof (*_private));
+ if (_private == NULL) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "could not allocate memory for 'storage/bdb' "
+ "configuration data-structure. cannot continue from "
+ "here");
+ goto err;
+ }
+
+
ret = dict_get_str (this->options, "directory", &directory);
if (ret < 0) {
gf_log (this->name, GF_LOG_ERROR,
- "export directory not specified in volfile");
- FREE (_private);
- goto out;
+ "'storage/bdb' needs at least "
+ "'option directory <path-to-export-directory>' as "
+ "minimal configuration option. please specify an "
+ "export directory using "
+ "'option directory <path-to-export-directory>' and "
+ "retry.");
+ goto err;
}
+
umask (000); /* umask `masking' is done at the client side */
/* Check whether the specified directory exists, if not create it. */
ret = stat (directory, &buf);
- if ((ret != 0) || !S_ISDIR (buf.st_mode)) {
+ if (ret < 0) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "specified export path '%s' does not exist. "
+ "please create the export path '%s' and retry.",
+ directory, directory);
+ goto err;
+ } else if (!S_ISDIR (buf.st_mode)) {
gf_log (this->name, GF_LOG_ERROR,
- "specified directory '%s' doesn't exists, Exiting",
+ "specified export path '%s' is not a directory. "
+ "please specify a valid and existing directory as "
+ "export directory and retry.",
directory);
- FREE (_private);
- goto out;
+ goto err;
} else {
ret = 0;
}
_private->export_path = strdup (directory);
+ if (_private->export_path == NULL) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "could not allocate memory for 'storage/bdb' "
+ "configuration data-structure. cannot continue from "
+ "here");
+ goto err;
+ }
+
_private->export_path_length = strlen (_private->export_path);
{
@@ -2953,27 +3408,40 @@ init (xlator_t *this)
}
this->private = (void *)_private;
+
{
ret = bdb_db_init (this, this->options);
- if (ret == -1){
- gf_log (this->name, GF_LOG_DEBUG,
- "failed to initialize database");
- goto out;
+ if (ret < 0){
+ gf_log (this->name, GF_LOG_ERROR,
+ "database environment initialisation failed. "
+ "manually run database recovery tool and "
+ "retry to run glusterfs");
+ goto err;
} else {
bctx = bctx_lookup (_private->b_table, "/");
/* NOTE: we are not doing bctx_unref() for root bctx,
* let it remain in active list forever */
- if (!bctx) {
+ if (bctx == NULL) {
gf_log (this->name, GF_LOG_ERROR,
- "failed to allocate memory for root (/)"
- " bctx: out of memory");
- goto out;
+ "could not allocate memory for "
+ "'storage/bdb' configuration data-"
+ "structure. cannot continue from "
+ "here");
+ goto err;
} else {
ret = 0;
+ goto out;
}
}
}
+err:
+ if (_private) {
+ if (_private->export_path)
+ FREE (_private->export_path);
+
+ FREE (_private);
+ }
out:
return ret;
}
@@ -2984,12 +3452,17 @@ bctx_cleanup (struct list_head *head)
bctx_t *trav = NULL;
bctx_t *tmp = NULL;
DB *storage = NULL;
+ DB *secondary = NULL;
list_for_each_entry_safe (trav, tmp, head, list) {
LOCK (&trav->lock);
{
- storage = trav->dbp;
- trav->dbp = NULL;
+ storage = trav->primary;
+ trav->primary = NULL;
+
+ secondary = trav->secondary;
+ trav->secondary = NULL;
+
list_del_init (&trav->list);
}
UNLOCK (&trav->lock);
@@ -2998,6 +3471,11 @@ bctx_cleanup (struct list_head *head)
storage->close (storage, 0);
storage = NULL;
}
+
+ if (secondary) {
+ secondary->close (secondary, 0);
+ secondary = NULL;
+ }
}
return;
}
@@ -3025,7 +3503,11 @@ fini (xlator_t *this)
ret = pthread_join (private->checkpoint_thread, NULL);
if (ret != 0) {
gf_log (this->name, GF_LOG_CRITICAL,
- "failed to join checkpoint thread");
+ "could not complete checkpointing "
+ "database environment. this might "
+ "result in inconsistencies in few"
+ " recent data and meta-data "
+ "operations");
}
BDB_ENV(this)->close (BDB_ENV(this), 0);
diff --git a/xlators/storage/bdb/src/bdb.h b/xlators/storage/bdb/src/bdb.h
index c9db02c10..e25978cc6 100644
--- a/xlators/storage/bdb/src/bdb.h
+++ b/xlators/storage/bdb/src/bdb.h
@@ -54,6 +54,8 @@
#include "inode.h"
#include "compat.h"
#include "compat-errno.h"
+#include "fd.h"
+#include "syscall.h"
#define BDB_STORAGE "/glusterfs_storage.db"
@@ -73,6 +75,8 @@
#define BDB_EXPORT_PATH_LEN(_private) \
(((struct bdb_private *)_private)->export_path_length)
+#define BDB_KEY_FROM_FREQUEST_KEY(_key) (&(key[15]))
+
#define BDB_EXPORT_PATH(_private) \
(((struct bdb_private *)_private)->export_path)
/* MAKE_REAL_PATH(var,this,path)
@@ -89,6 +93,12 @@
strcpy (&var[base_len], path); \
} while (0)
+
+#define BDB_TIMED_LOG(_errno,_counter) \
+ ((_errno == ENOTSUP) && (((++_counter) % GF_UNIVERSAL_ANSWER) == 1))
+
+#define GF_FILE_CONTENT_REQUEST ZR_FILE_CONTENT_REQUEST
+
/* MAKE_REAL_PATH_TO_STORAGE_DB(var,this,path)
* make the real path to the storage-database file on file-system
*
@@ -119,21 +129,6 @@
key = basename (tmp); \
}while (0);
-/* BDB_DO_LSTAT(path,stbuf,dirent)
- * construct real-path to a dirent and do lstat on the real-path
- *
- * @path: path to the directory whose readdir is currently in progress
- * @stbuf: a 'struct stat *'
- * @dirent: a 'struct dirent *'
- */
-#define BDB_DO_LSTAT(path, stbuf, dirent) do { \
- char tmp_real_path[GF_PATH_MAX]; \
- strcpy(tmp_real_path, path); \
- strcat (tmp_real_path, "/"); \
- strcat(tmp_real_path, dirent->d_name); \
- ret = lstat (tmp_real_path, stbuf); \
- } while(0);
-
/* IS_BDB_PRIVATE_FILE(name)
* check if a given 'name' is bdb xlator's internal file name
*
@@ -152,8 +147,7 @@
#define IS_DOT_DOTDOT(name) \
((!strncmp(name,".", 1)) || (!strncmp(name,"..", 2)))
-/* BDB_SET_BCTX(this,inode,bctx)
- * put a stamp on inode. d00d, you are using bdb.. huhaha.
+/* BDB_ICTX_SET(this,inode,bctx)
* pointer to 'struct bdb_ctx' is stored in inode's ctx of all directories.
* this will happen either in lookup() or mkdir().
*
@@ -161,29 +155,35 @@
* @inode: inode where 'struct bdb_ctx *' has to be stored.
* @bctx: a 'struct bdb_ctx *'
*/
-#define BDB_SET_BCTX(this,inode,bctx) do{ \
- inode_ctx_put(inode, this, (uint64_t)(long)bctx); \
+#define BDB_ICTX_SET(_inode,_this,_bctx) do{ \
+ inode_ctx_put(_inode, _this, (uint64_t)(long)_bctx); \
+ }while (0);
+
+#define BDB_ICTX_GET(_inode,_this,_bctxp) do { \
+ uint64_t tmp_bctx = 0; \
+ inode_ctx_get (_inode, _this, &tmp_bctx); \
+ *_bctxp = tmp_bctx; \
}while (0);
-/* MAKE_BCTX_FROM_INODE(this,bctx,inode)
- * extract bdb xlator's 'struct bdb_ctx *' from an inode's ctx.
- * valid only if done for directory inodes, otherwise bctx = NULL.
+/* BDB_FCTX_SET(this,fd,bctx)
+ * pointer to 'struct bdb_ctx' is stored in inode's ctx of all directories.
+ * this will happen either in lookup() or mkdir().
*
* @this: pointer xlator_t of bdb xlator.
+ * @inode: inode where 'struct bdb_ctx *' has to be stored.
* @bctx: a 'struct bdb_ctx *'
- * @inode: inode from where 'struct bdb_ctx *' has to be extracted.
*/
-#define MAKE_BCTX_FROM_INODE(this,bctx,inode) do{ \
- uint64_t tmp_bctx = 0; \
- inode_ctx_get (inode, this, &tmp_bctx); \
- if (ret == 0) \
- bctx = (void *)(long)tmp_bctx; \
+#define BDB_FCTX_SET(_fd,_this,_bfd) do{ \
+ fd_ctx_set(_fd, _this, (uint64_t)(long)_bfd); \
}while (0);
-#define BDB_SET_BFD(this,fd,bfd) do{ \
- fd_ctx_set (fd, this, (uint64_t)(long)bfd); \
+#define BDB_FCTX_GET(_fd,_this,_bfdp) do { \
+ uint64_t tmp_bfd = 0; \
+ fd_ctx_get (_fd, _this, &tmp_bfd); \
+ *_bfdp = (void *)(long)tmp_bfd; \
}while (0);
+
/* maximum number of open dbs that bdb xlator will ever have */
#define BDB_MAX_OPEN_DBS 100
@@ -270,7 +270,8 @@ struct bdb_ctx {
char *directory; /* directory path */
/* pointer to open database, that resides inside this directory */
- DB *dbp;
+ DB *primary;
+ DB *secondary;
uint32_t cache; /* cache ON or OFF */
/* per directory cache, bdb xlator's internal cache */
@@ -298,8 +299,6 @@ struct bdb_dir {
/* open directory pointer, as returned by opendir() */
DIR *dir;
- /* FIXME: readdir offset, too crude. must go */
- char offset[NAME_MAX];
char *path; /* path to this directory */
};
@@ -386,12 +385,6 @@ struct bdb_private {
* (option checkpoint-interval <time-in-seconds>) */
uint32_t checkpoint_interval;
- /* inode number allocation counter */
- ino_t next_ino;
-
- /* lock to protect 'next_ino' */
- gf_lock_t ino_lock;
-
/* environment log directory (option logdir <directory>) */
char *logdir;
@@ -436,26 +429,28 @@ bdb_txn_commit (DB_TXN *txnid)
return txnid->commit (txnid, 0);
}
-inline void *
-bdb_extract_bfd (fd_t *fd, xlator_t *this);
-
-
void *
bdb_db_stat (bctx_t *bctx,
DB_TXN *txnid,
uint32_t flags);
-int32_t
+/*int32_t
bdb_db_get(struct bdb_ctx *bctx,
DB_TXN *txnid,
const char *key_string,
char **buf,
size_t size,
off_t offset);
+*/
+int32_t
+bdb_db_fread (struct bdb_fd *bfd, char **bufp, size_t size, off_t offset);
+
+int32_t
+bdb_db_iread (struct bdb_ctx *bctx, const char *key, char **bufp);
#define BDB_TRUNCATE_RECORD 0xcafebabe
-int32_t
+/*int32_t
bdb_db_put (struct bdb_ctx *bctx,
DB_TXN *txnid,
const char *key_string,
@@ -463,16 +458,27 @@ bdb_db_put (struct bdb_ctx *bctx,
size_t size,
off_t offset,
int32_t flags);
+*/
+int32_t
+bdb_db_icreate (struct bdb_ctx *bctx, const char *key);
int32_t
-bdb_db_del (struct bdb_ctx *bctx,
- DB_TXN *txnid,
- const char *path);
+bdb_db_fwrite (struct bdb_fd *bfd, char *buf, size_t size, off_t offset);
+
+int32_t
+bdb_db_iwrite (struct bdb_ctx *bctx, const char *key, char *buf, size_t size);
+
+int32_t
+bdb_db_itruncate (struct bdb_ctx *bctx, const char *key);
+
+int32_t
+bdb_db_iremove (struct bdb_ctx *bctx,
+ const char *key);
ino_t
bdb_inode_transform (ino_t parent,
- struct bdb_ctx *bctx);
-
+ const char *name,
+ size_t namelen);
int32_t
bdb_cursor_open (struct bdb_ctx *bctx,
@@ -480,7 +486,7 @@ bdb_cursor_open (struct bdb_ctx *bctx,
int32_t
bdb_cursor_get (DBC *cursorp,
- DBT *key,
+ DBT *sec, DBT *pri,
DBT *value,
int32_t flags);