diff options
author | Basavanagowda Kanur <gowda@gluster.com> | 2009-04-08 04:24:22 +0530 |
---|---|---|
committer | Anand V. Avati <avati@amp.gluster.com> | 2009-04-08 15:04:48 +0530 |
commit | 07fcdfebf25c30811a9313ac3d9a0fdbbceaad6c (patch) | |
tree | 2650b7b7a2f93cc5530a6033b3f1426f2f6efb85 /xlators | |
parent | abf35ff6c7a2cc94d9e1e738fb76f711bd2abc16 (diff) |
introduction of secondary index database in storage/bdb
Signed-off-by: Anand V. Avati <avati@amp.gluster.com>
Diffstat (limited to 'xlators')
-rw-r--r-- | xlators/storage/bdb/src/bctx.c | 31 | ||||
-rw-r--r-- | xlators/storage/bdb/src/bdb-ll.c | 1295 | ||||
-rw-r--r-- | xlators/storage/bdb/src/bdb.c | 2108 | ||||
-rw-r--r-- | xlators/storage/bdb/src/bdb.h | 108 |
4 files changed, 2019 insertions, 1523 deletions
diff --git a/xlators/storage/bdb/src/bctx.c b/xlators/storage/bdb/src/bctx.c index fce78e95f..18f563fb3 100644 --- a/xlators/storage/bdb/src/bctx.c +++ b/xlators/storage/bdb/src/bctx.c @@ -75,16 +75,31 @@ bctx_table_prune (bctx_table_t *table) list_for_each_entry_safe (del, tmp, &purge, list) { list_del_init (&del->list); - if (del->dbp) { - ret = del->dbp->close (del->dbp, 0); + if (del->primary) { + ret = del->primary->close (del->primary, 0); if (ret != 0) { - gf_log (table->this->name, GF_LOG_ERROR, - "failed to close db on path (%s): %s", + gf_log (table->this->name, GF_LOG_DEBUG, + "_BCTX_TABLE_PRUNE %s: %s " + "(failed to close primary database)", del->directory, db_strerror (ret)); } else { - gf_log (table->this->name, GF_LOG_WARNING, - "close db for path %s; " - "table->lru_count = %d", + gf_log (table->this->name, GF_LOG_DEBUG, + "_BCTX_TABLE_PRUNE %s (lru=%d)" + "(closed primary database)", + del->directory, table->lru_size); + } + } + if (del->secondary) { + ret = del->secondary->close (del->secondary, 0); + if (ret != 0) { + gf_log (table->this->name, GF_LOG_DEBUG, + "_BCTX_TABLE_PRUNE %s: %s " + "(failed to close secondary database)", + del->directory, db_strerror (ret)); + } else { + gf_log (table->this->name, GF_LOG_DEBUG, + "_BCTX_TABLE_PRUNE %s (lru=%d)" + "(closed secondary database)", del->directory, table->lru_size); } } @@ -130,7 +145,7 @@ __hash_bctx (bctx_t *bctx) static inline bctx_t * __bctx_passivate (bctx_t *bctx) { - if (bctx->dbp) { + if (bctx->primary) { list_move_tail (&bctx->list, &(bctx->table->b_lru)); bctx->table->lru_size++; } else { diff --git a/xlators/storage/bdb/src/bdb-ll.c b/xlators/storage/bdb/src/bdb-ll.c index cd2d1ac49..59d431d82 100644 --- a/xlators/storage/bdb/src/bdb-ll.c +++ b/xlators/storage/bdb/src/bdb-ll.c @@ -20,6 +20,7 @@ #include <libgen.h> #include "bdb.h" #include <list.h> +#include "hashfn.h" /* * implement the procedures to interact with bdb */ @@ -31,22 +32,41 @@ ino_t bdb_inode_transform (ino_t parent, - bctx_t *bctx) + const char *name, + size_t namelen) { - struct bdb_private *private = NULL; ino_t ino = -1; + uint64_t hash = 0; - GF_VALIDATE_OR_GOTO ("bdb-ll", bctx, out); + hash = gf_dm_hashfn (name, namelen); - private = bctx->table->this->private; + ino = (((parent << 32) | 0x00000000ffffffff) + & (hash | 0xffffffff00000000)); - LOCK (&private->ino_lock); - ino = ++private->next_ino; - UNLOCK (&private->ino_lock); -out: return ino; } +static int +bdb_generate_secondary_hash (DB *secondary, + const DBT *pkey, + const DBT *data, + DBT *skey) +{ + char *primary = NULL; + uint32_t *hash = NULL; + + primary = pkey->data; + + hash = calloc (1, sizeof (uint32_t)); + + *hash = gf_dm_hashfn (primary, pkey->size); + + skey->data = hash; + skey->size = sizeof (hash); + skey->flags = DB_DBT_APPMALLOC; + + return 0; +} /*********************************************************** * @@ -63,13 +83,13 @@ out: * if (no-empty-slots), then prune open dbs and close as many as possible * if (empty-slot-available), tika muchkonDu db open maaDu * - * NOTE: illi baro munche lock hiDkobEku */ -static DB * +static int bdb_db_open (bctx_t *bctx) { - DB *storage_dbp = NULL; - int32_t op_ret = -1; + DB *primary = NULL; + DB *secondary = NULL; + int32_t ret = -1; bctx_table_t *table = NULL; GF_VALIDATE_OR_GOTO ("bdb-ll", bctx, out); @@ -78,51 +98,94 @@ bdb_db_open (bctx_t *bctx) GF_VALIDATE_OR_GOTO ("bdb-ll", table, out); /* we have to do the following, we can't deny someone of db_open ;) */ - op_ret = db_create (&storage_dbp, table->dbenv, 0); - if (op_ret != 0) { - gf_log ("bdb-ll", GF_LOG_ERROR, - "failed to do db_create for directory %s (%s)", - bctx->directory, db_strerror (op_ret)); - storage_dbp = NULL; + ret = db_create (&primary, table->dbenv, 0); + if (ret < 0) { + gf_log ("bdb-ll", GF_LOG_DEBUG, + "_BDB_DB_OPEN %s: %s (failed to create database object" + " for primary database)", + bctx->directory, db_strerror (ret)); + ret = -ENOMEM; goto out; } if (table->page_size) { - op_ret = storage_dbp->set_pagesize (storage_dbp, - table->page_size); - if (op_ret != 0) { - gf_log ("bdb-ll", GF_LOG_ERROR, - "failed to set the page_size (%"PRIu64") for " - "directory %s (%s)", - table->page_size, bctx->directory, - db_strerror (op_ret)); - } else { + ret = primary->set_pagesize (primary, + table->page_size); + if (ret < 0) { gf_log ("bdb-ll", GF_LOG_DEBUG, - "page-size (%"PRIu64") set on DB", + "_BDB_DB_OPEN %s: %s (failed to set page-size " + "to %"PRIu64")", + bctx->directory, db_strerror (ret), table->page_size); + } else { + gf_log ("bdb-ll", GF_LOG_DEBUG, + "_BDB_DB_OPEN %s: page-size set to %"PRIu64, + bctx->directory, table->page_size); } } - op_ret = storage_dbp->open (storage_dbp, - NULL, - bctx->db_path, - NULL, - table->access_mode, - table->dbflags, - 0); - if (op_ret != 0 ) { - gf_log ("bdb-ll", - GF_LOG_ERROR, - "failed to open storage-db for directory %s (%s)", - bctx->db_path, db_strerror (op_ret)); - storage_dbp = NULL; + ret = primary->open (primary, NULL, bctx->db_path, "primary", + table->access_mode, table->dbflags, 0); + if (ret < 0) { + gf_log ("bdb-ll", GF_LOG_ERROR, + "_BDB_DB_OPEN %s: %s " + "(failed to open primary database)", + bctx->directory, db_strerror (ret)); + ret = -1; + goto cleanup; + } + + ret = db_create (&secondary, table->dbenv, 0); + if (ret < 0) { + gf_log ("bdb-ll", GF_LOG_DEBUG, + "_BDB_DB_OPEN %s: %s (failed to create database object" + " for secondary database)", + bctx->directory, db_strerror (ret)); + ret = -ENOMEM; + goto cleanup; + } + + ret = secondary->open (secondary, NULL, bctx->db_path, "secondary", + table->access_mode, table->dbflags, 0); + if (ret != 0 ) { + gf_log ("bdb-ll", GF_LOG_ERROR, + "_BDB_DB_OPEN %s: %s " + "(failed to open secondary database)", + bctx->directory, db_strerror (ret)); + ret = -1; + goto cleanup; + } + + ret = primary->associate (primary, NULL, secondary, + bdb_generate_secondary_hash, +#ifdef DB_IMMUTABLE_KEY + DB_IMMUTABLE_KEY); +#else + 0); +#endif + if (ret != 0 ) { + gf_log ("bdb-ll", GF_LOG_ERROR, + "_BDB_DB_OPEN %s: %s " + "(failed to associate primary database with " + "secondary database)", + bctx->directory, db_strerror (ret)); + ret = -1; + goto cleanup; } out: - return storage_dbp; -} + bctx->primary = primary; + bctx->secondary = secondary; + return ret; +cleanup: + if (primary) + primary->close (primary, 0); + if (secondary) + secondary->close (secondary, 0); + return ret; +} int32_t bdb_cursor_close (bctx_t *bctx, @@ -140,10 +203,10 @@ bdb_cursor_close (bctx_t *bctx, #else ret = cursorp->c_close (cursorp); #endif - if ((ret != 0)) { - gf_log ("bdb-ll", GF_LOG_ERROR, - "failed to close db cursor for directory " - "%s (%s)", + if (ret < 0) { + gf_log ("bdb-ll", GF_LOG_DEBUG, + "_BDB_CURSOR_CLOSE %s: %s " + "(failed to close database cursor)", bctx->directory, db_strerror (ret)); } } @@ -165,27 +228,30 @@ bdb_cursor_open (bctx_t *bctx, LOCK (&bctx->lock); { - if (bctx->dbp) { + if (bctx->secondary) { /* do nothing, just continue */ ret = 0; } else { - bctx->dbp = bdb_db_open (bctx); - if (!bctx->dbp) { - gf_log ("bdb-ll", GF_LOG_ERROR, - "failed to open storage db for %s", + ret = bdb_db_open (bctx); + if (ret < 0) { + gf_log ("bdb-ll", GF_LOG_DEBUG, + "_BDB_CURSOR_OPEN %s: ENOMEM " + "(failed to open secondary database)", bctx->directory); - ret = -1; + ret = -ENOMEM; } else { ret = 0; } } if (ret == 0) { - /* all set, lets open cursor */ - ret = bctx->dbp->cursor (bctx->dbp, NULL, cursorpp, 0); - if (ret != 0) { - gf_log ("bdb-ll", GF_LOG_ERROR, - "failed to create a cursor for %s (%s)", + /* all set, open cursor */ + ret = bctx->secondary->cursor (bctx->secondary, + NULL, cursorpp, 0); + if (ret < 0) { + gf_log ("bdb-ll", GF_LOG_DEBUG, + "_BDB_CURSOR_OPEN %s: %s " + "(failed to open a cursor to database)", bctx->directory, db_strerror (ret)); } } @@ -245,27 +311,37 @@ bdb_cache_insert (bctx_t *bctx, /* FIXME: ugly, not supposed to disect any of the * 'struct list_head' directly */ if (!list_empty (&bctx->c_list)) { - bcache = list_entry (bctx->c_list.prev, bdb_cache_t, c_list); + bcache = list_entry (bctx->c_list.prev, + bdb_cache_t, c_list); list_del_init (&bcache->c_list); } if (bcache->key) { free (bcache->key); - bcache->key = strdup ((char *)key->data); - GF_VALIDATE_OR_GOTO ("bdb-ll", bcache->key, unlock); + bcache->key = calloc (key->size + 1, + sizeof (char)); + GF_VALIDATE_OR_GOTO ("bdb-ll", + bcache->key, unlock); + memcpy (bcache->key, (char *)key->data, + key->size); } else { /* should never come here */ - gf_log ("bdb-ll", GF_LOG_CRITICAL, - "bcache->key (null)"); + gf_log ("bdb-ll", GF_LOG_DEBUG, + "_BDB_CACHE_INSERT %s (%s) " + "(found a cache entry with empty key)", + bctx->directory, (char *)key->data); } /* if(bcache->key)...else */ if (bcache->data) { free (bcache->data); bcache->data = memdup (data->data, data->size); - GF_VALIDATE_OR_GOTO ("bdb-ll", bcache->data, unlock); + GF_VALIDATE_OR_GOTO ("bdb-ll", bcache->data, + unlock); bcache->size = data->size; } else { /* should never come here */ gf_log ("bdb-ll", GF_LOG_CRITICAL, - "bcache->data (null)"); + "_BDB_CACHE_INSERT %s (%s) " + "(found a cache entry with no data)", + bctx->directory, (char *)key->data); } /* if(bcache->data)...else */ list_add (&bcache->c_list, &bctx->c_list); ret = 0; @@ -273,10 +349,14 @@ bdb_cache_insert (bctx_t *bctx, /* we will be entering here very rarely */ bcache = CALLOC (1, sizeof (*bcache)); GF_VALIDATE_OR_GOTO ("bdb-ll", bcache, unlock); - bcache->key = strdup ((char *)(key->data)); + + bcache->key = calloc (key->size + 1, sizeof (char)); GF_VALIDATE_OR_GOTO ("bdb-ll", bcache->key, unlock); + memcpy (bcache->key, key->data, key->size); + bcache->data = memdup (data->data, data->size); GF_VALIDATE_OR_GOTO ("bdb-ll", bcache->data, unlock); + bcache->size = data->size; list_add (&bcache->c_list, &bctx->c_list); bctx->c_count++; @@ -291,7 +371,7 @@ out: static int32_t bdb_cache_delete (bctx_t *bctx, - char *key) + const char *key) { bdb_cache_t *bcache = NULL; bdb_cache_t *trav = NULL; @@ -333,12 +413,12 @@ bdb_db_stat (bctx_t *bctx, LOCK (&bctx->lock); { - if (bctx->dbp == NULL) { - bctx->dbp = bdb_db_open (bctx); - storage = bctx->dbp; + if (bctx->primary == NULL) { + ret = bdb_db_open (bctx); + storage = bctx->primary; } else { /* we are just fine, lets continue */ - storage = bctx->dbp; + storage = bctx->primary; } /* if(bctx->dbp==NULL)...else */ } UNLOCK (&bctx->lock); @@ -347,46 +427,48 @@ bdb_db_stat (bctx_t *bctx, ret = storage->stat (storage, txnid, &stat, flags); - if (ret != 0) { - gf_log ("bdb-ll", GF_LOG_ERROR, - "failed to do DB->stat() on db file %s: %s", - bctx->db_path, db_strerror (ret)); - } else { + if (ret < 0) { gf_log ("bdb-ll", GF_LOG_DEBUG, - "successfully called DB->stat() on db file %s", - bctx->db_path); + "_BDB_DB_STAT %s: %s " + "(failed to do stat database)", + bctx->directory, db_strerror (ret)); } out: return stat; } -/* bdb_storage_get - retrieve a key/value pair corresponding to @path from the corresponding - * db file. +/* bdb_storage_get - retrieve a key/value pair corresponding to @path from the + * corresponding db file. * - * @bctx: bctx_t * corresponding to the parent directory of @path. (should always be a valid - * bctx). bdb_storage_get should never be called if @bctx = NULL. - * @txnid: NULL if bdb_storage_get is not embedded in an explicit transaction or a valid - * DB_TXN *, when embedded in an explicit transaction. - * @path: path of the file to read from (translated to a database key using MAKE_KEY_FROM_PATH) - * @buf: char ** - pointer to a pointer to char. a read buffer is created in this procedure - * and pointer to the buffer is passed through @buf to the caller. + * @bctx: bctx_t * corresponding to the parent directory of @path. (should + * always be a valid bctx). bdb_storage_get should never be called if + * @bctx = NULL. + * @txnid: NULL if bdb_storage_get is not embedded in an explicit transaction + * or a valid DB_TXN *, when embedded in an explicit transaction. + * @path: path of the file to read from (translated to a database key using + * MAKE_KEY_FROM_PATH) + * @buf: char ** - pointer to a pointer to char. a read buffer is created in + * this procedure and pointer to the buffer is passed through @buf to the + * caller. * @size: size of the file content to be read. * @offset: offset from which the file content to be read. * - * NOTE: bdb_storage_get tries to open DB, if @bctx->dbp == NULL (@bctx->dbp == NULL, - * nobody has opened DB till now or DB was closed by bdb_table_prune()). + * NOTE: bdb_storage_get tries to open DB, if @bctx->dbp == NULL + * (@bctx->dbp == NULL, nobody has opened DB till now or DB was closed by + * bdb_table_prune()). * - * NOTE: if private->cache is set (bdb xlator's internal caching enabled), then bdb_storage_get - * first looks up the cache for key/value pair. if bdb_lookup_cache fails, then only - * DB->get() is called. also, inserts a newly read key/value pair to cache through - * bdb_insert_to_cache. + * NOTE: if private->cache is set (bdb xlator's internal caching enabled), then + * bdb_storage_get first looks up the cache for key/value pair. if + * bdb_lookup_cache fails, then only DB->get() is called. also, inserts a + * newly read key/value pair to cache through bdb_insert_to_cache. * * return: 'number of bytes read' on success or -1 on error. * - * also see: bdb_lookup_cache, bdb_insert_to_cache for details about bdb xlator's internal cache. + * also see: bdb_lookup_cache, bdb_insert_to_cache for details about bdb + * xlator's internal cache. */ -int32_t +static int32_t bdb_db_get (bctx_t *bctx, DB_TXN *txnid, const char *path, @@ -420,12 +502,12 @@ bdb_db_get (bctx_t *bctx, } else { LOCK (&bctx->lock); { - if (bctx->dbp == NULL) { - bctx->dbp = bdb_db_open (bctx); - storage = bctx->dbp; + if (bctx->primary == NULL) { + ret = bdb_db_open (bctx); + storage = bctx->primary; } else { /* we are just fine, lets continue */ - storage = bctx->dbp; + storage = bctx->primary; } /* if(bctx->dbp==NULL)...else */ } UNLOCK (&bctx->lock); @@ -457,22 +539,25 @@ bdb_db_get (bctx_t *bctx, if (ret == DB_NOTFOUND) { gf_log ("bdb-ll", GF_LOG_DEBUG, - "failed to do DB->get() for key: %s." - " key not found in storage DB", - key_string); + "_BDB_DB_GET %s - %s: ENOENT" + "(specified key not found in database)", + bctx->directory, key_string); ret = -1; need_break = 1; } else if (ret == DB_LOCK_DEADLOCK) { retries++; - gf_log ("bdb-ll", GF_LOG_ERROR, - "deadlock detected in DB->put. retrying" - " DB->put (%d)", retries); - }else if (ret == 0) { + gf_log ("bdb-ll", GF_LOG_DEBUG, + "_BDB_DB_GET %s - %s" + "(deadlock detected, retrying for %d " + "time)", + bctx->directory, key_string, retries); + } else if (ret == 0) { /* successfully read data, lets set everything * in place and return */ if (buf) { *buf = CALLOC (1, value.size); - ERR_ABORT (*buf); + GF_VALIDATE_OR_GOTO ("bdb-ll", + *buf, out); memcpy (*buf, value.data, value.size); } ret = value.size; @@ -481,10 +566,12 @@ bdb_db_get (bctx_t *bctx, free (value.data); need_break = 1; } else { - gf_log ("bdb-ll", - GF_LOG_ERROR, - "failed to do DB->get() for key %s: %s", - key_string, db_strerror (ret)); + gf_log ("bdb-ll", GF_LOG_DEBUG, + "_BDB_DB_GET %s - %s: %s" + "(failed to retrieve specified key from" + " database)", + bctx->directory, key_string, + db_strerror (ret)); ret = -1; need_break = 1; } @@ -494,6 +581,19 @@ out: return ret; }/* bdb_db_get */ +/* TODO: handle errors here and log. propogate only the errno to caller */ +int32_t +bdb_db_fread (struct bdb_fd *bfd, char **buf, size_t size, off_t offset) +{ + return bdb_db_get (bfd->ctx, NULL, bfd->key, buf, size, offset); +} + +int32_t +bdb_db_iread (struct bdb_ctx *bctx, const char *key, char **buf) +{ + return bdb_db_get (bctx, NULL, key, buf, 0, 0); +} + /* bdb_storage_put - insert a key/value specified to the corresponding DB. * * @bctx: bctx_t * corresponding to the parent directory of @path. @@ -519,7 +619,7 @@ out: * also see: bdb_cache_delete for details on how a cached key/value pair is * removed. */ -int32_t +static int32_t bdb_db_put (bctx_t *bctx, DB_TXN *txnid, const char *key_string, @@ -537,12 +637,12 @@ bdb_db_put (bctx_t *bctx, LOCK (&bctx->lock); { - if (bctx->dbp == NULL) { - bctx->dbp = bdb_db_open (bctx); - storage = bctx->dbp; + if (bctx->primary == NULL) { + ret = bdb_db_open (bctx); + storage = bctx->primary; } else { /* we are just fine, lets continue */ - storage = bctx->dbp; + storage = bctx->primary; } } UNLOCK (&bctx->lock); @@ -582,15 +682,16 @@ bdb_db_put (bctx_t *bctx, ret = storage->put (storage, txnid, &key, &value, db_flags); if (ret == DB_LOCK_DEADLOCK) { retries++; - gf_log ("bdb-ll", GF_LOG_ERROR, - "deadlock detected in DB->put. " - "retrying DB->put (%d)", - retries); + gf_log ("bdb-ll", GF_LOG_DEBUG, + "_BDB_DB_PUT %s - %s" + "(deadlock detected, retying for %d time)", + bctx->directory, key_string, retries); } else if (ret) { /* write failed */ - gf_log ("bdb-ll", GF_LOG_ERROR, - "failed to do DB->put() for key %s: %s", - key_string, db_strerror (ret)); + gf_log ("bdb-ll", GF_LOG_DEBUG, + "_BDB_DB_PUT %s - %s: %s" + "(failed to put specified entry into database)", + bctx->directory, key_string, db_strerror (ret)); need_break = 1; } else { /* successfully wrote */ @@ -602,44 +703,68 @@ out: return ret; }/* bdb_db_put */ +int32_t +bdb_db_icreate (struct bdb_ctx *bctx, const char *key) +{ + return bdb_db_put (bctx, NULL, key, NULL, 0, 0, 0); +} + +/* TODO: handle errors here and log. propogate only the errno to caller */ +int32_t +bdb_db_fwrite (struct bdb_fd *bfd, char *buf, size_t size, off_t offset) +{ + return bdb_db_put (bfd->ctx, NULL, bfd->key, buf, size, offset, 0); +} + +/* TODO: handle errors here and log. propogate only the errno to caller */ +int32_t +bdb_db_iwrite (struct bdb_ctx *bctx, const char *key, char *buf, size_t size) +{ + return bdb_db_put (bctx, NULL, key, buf, size, 0, 0); +} + +int32_t +bdb_db_itruncate (struct bdb_ctx *bctx, const char *key) +{ + return bdb_db_put (bctx, NULL, key, NULL, 0, 1, 0); +} -/* bdb_storage_del - delete a key/value pair corresponding to @path from corresponding db file. +/* bdb_storage_del - delete a key/value pair corresponding to @path from + * corresponding db file. * * @bctx: bctx_t * corresponding to the parent directory of @path. * (should always be a valid bctx). bdb_storage_del should never be called * if @bctx = NULL. - * @txnid: NULL if bdb_storage_del is not embedded in an explicit transaction or a - * valid DB_TXN *, when embedded in an explicit transaction. + * @txnid: NULL if bdb_storage_del is not embedded in an explicit transaction + * or a valid DB_TXN *, when embedded in an explicit transaction. * @path: path to the file, whose key/value pair has to be deleted. * - * NOTE: bdb_storage_del tries to open DB, if @bctx->dbp == NULL (@bctx->dbp == NULL, - * nobody has opened DB till now or DB was closed by bdb_table_prune()). + * NOTE: bdb_storage_del tries to open DB, if @bctx->dbp == NULL + * (@bctx->dbp == NULL, nobody has opened DB till now or DB was closed by + * bdb_table_prune()). * * return: 0 on success or -1 on error. */ -int32_t +static int32_t bdb_db_del (bctx_t *bctx, DB_TXN *txnid, - const char *path) + const char *key_string) { DB *storage = NULL; DBT key = {0,}; - char *key_string = NULL; int32_t ret = -1; int32_t db_flags = 0; uint8_t need_break = 0; int32_t retries = 1; - MAKE_KEY_FROM_PATH (key_string, path); - LOCK (&bctx->lock); { - if (bctx->dbp == NULL) { - bctx->dbp = bdb_db_open (bctx); - storage = bctx->dbp; + if (bctx->primary == NULL) { + ret = bdb_db_open (bctx); + storage = bctx->primary; } else { /* we are just fine, lets continue */ - storage = bctx->dbp; + storage = bctx->primary; } } UNLOCK (&bctx->lock); @@ -649,7 +774,7 @@ bdb_db_del (bctx_t *bctx, ret = bdb_cache_delete (bctx, key_string); GF_VALIDATE_OR_GOTO ("bdb-ll", (ret == 0), out); - key.data = key_string; + key.data = (char *)key_string; key.size = strlen (key_string); key.flags = DB_DBT_USERMEM; @@ -658,26 +783,30 @@ bdb_db_del (bctx_t *bctx, if (ret == DB_NOTFOUND) { gf_log ("bdb-ll", GF_LOG_DEBUG, - "failed to delete %s from storage db, " - "doesn't exist in storage DB", - path); + "_BDB_DB_DEL %s - %s: ENOENT" + "(failed to delete entry, could not be " + "found in the database)", + bctx->directory, key_string); need_break = 1; } else if (ret == DB_LOCK_DEADLOCK) { retries++; - gf_log ("bdb-ll", GF_LOG_ERROR, - "deadlock detected in DB->put. " - "retrying DB->put (%d)", - retries); - }else if (ret == 0) { + gf_log ("bdb-ll", GF_LOG_DEBUG, + "_BDB_DB_DEL %s - %s" + "(deadlock detected, retying for %d time)", + bctx->directory, key_string, retries); + } else if (ret == 0) { /* successfully deleted the entry */ gf_log ("bdb-ll", GF_LOG_DEBUG, - "deleted %s from storage db", path); + "_BDB_DB_DEL %s - %s" + "(successfully deleted entry from database)", + bctx->directory, key_string); ret = 0; need_break = 1; } else { - gf_log ("bdb-ll", GF_LOG_ERROR, - "failed to delete %s from storage db: %s", - path, db_strerror (ret)); + gf_log ("bdb-ll", GF_LOG_DEBUG, + "_BDB_DB_DEL %s - %s: %s" + "(failed to delete entry from database)", + bctx->directory, key_string, db_strerror (ret)); ret = -1; need_break = 1; } @@ -686,11 +815,18 @@ out: return ret; } +int32_t +bdb_db_iremove (bctx_t *bctx, + const char *key) +{ + return bdb_db_del (bctx, NULL, key); +} + /* NOTE: bdb version compatibility wrapper */ int32_t bdb_cursor_get (DBC *cursorp, - DBT *key, - DBT *value, + DBT *sec, DBT *pri, + DBT *val, int32_t flags) { int32_t ret = -1; @@ -698,21 +834,21 @@ bdb_cursor_get (DBC *cursorp, GF_VALIDATE_OR_GOTO ("bdb-ll", cursorp, out); #ifdef HAVE_BDB_CURSOR_GET - ret = cursorp->get (cursorp, key, value, flags); + ret = cursorp->pget (cursorp, sec, pri, val, flags); #else - ret = cursorp->c_get (cursorp, key, value, flags); + ret = cursorp->c_pget (cursorp, sec, pri, val, flags); #endif if ((ret != 0) && (ret != DB_NOTFOUND)) { - gf_log ("bdb-ll", GF_LOG_ERROR, - "failed to CURSOR->get() for key %s (%s)", - (char *)key->data, db_strerror (ret)); + gf_log ("bdb-ll", GF_LOG_DEBUG, + "_BDB_CURSOR_GET: %s" + "(failed to retrieve entry from database cursor)", + db_strerror (ret)); } out: return ret; }/* bdb_cursor_get */ - int32_t bdb_dirent_size (DBT *key) { @@ -720,29 +856,6 @@ bdb_dirent_size (DBT *key) } -/* bdb_extract_bfd - translate a fd_t to a bfd (either a 'struct bdb_bfd' or 'struct bdb_dir') - * - * @fd->ctx is with bdb specific file handle during a successful bdb_open (also bdb_create) - * or bdb_opendir. - * - * return: 'struct bdb_bfd *' or 'struct bdb_dir *' on success, or NULL on failure. - */ -inline void * -bdb_extract_bfd (fd_t *fd, - xlator_t *this) -{ - uint64_t tmp_bfd = 0; - void *bfd = NULL; - - GF_VALIDATE_OR_GOTO ("bdb-ll", fd, out); - GF_VALIDATE_OR_GOTO ("bdb-ll", this, out); - - fd_ctx_get (fd, this, &tmp_bfd); - bfd = (void *)(long)bfd; - -out: - return bfd; -} /* bdb_dbenv_init - initialize DB_ENV * @@ -751,10 +864,10 @@ out: * NOTE: see private->envflags for flags used. * 2. DB_ENV->set_lg_dir - set log directory to be used for storing log files * (log files are the files in which transaction logs are written by db). - * 3. DB_ENV->set_flags (DB_LOG_AUTOREMOVE) - set DB_ENV to automatically clear - * the unwanted log files (flushed at each checkpoint). - * 4. DB_ENV->set_errfile - set errfile to be used by db to report detailed error logs. - * used only for debbuging purpose. + * 3. DB_ENV->set_flags (DB_LOG_AUTOREMOVE) - set DB_ENV to automatically + * clear the unwanted log files (flushed at each checkpoint). + * 4. DB_ENV->set_errfile - set errfile to be used by db to report detailed + * error logs. used only for debbuging purpose. * * return: returns a valid DB_ENV * on success or NULL on error. * @@ -769,55 +882,49 @@ bdb_dbenv_init (xlator_t *this, bdb_private_t *private = NULL; int32_t fatal_flags = 0; - VALIDATE_OR_GOTO (this, out); - VALIDATE_OR_GOTO (directory, out); + VALIDATE_OR_GOTO (this, err); + VALIDATE_OR_GOTO (directory, err); private = this->private; - VALIDATE_OR_GOTO (private, out); + VALIDATE_OR_GOTO (private, err); ret = db_env_create (&dbenv, 0); - VALIDATE_OR_GOTO ((ret == 0), out); + VALIDATE_OR_GOTO ((ret == 0), err); /* NOTE: set_errpfx returns 'void' */ dbenv->set_errpfx(dbenv, this->name); ret = dbenv->set_lk_detect (dbenv, DB_LOCK_DEFAULT); - VALIDATE_OR_GOTO ((ret == 0), out); + VALIDATE_OR_GOTO ((ret == 0), err); ret = dbenv->open(dbenv, directory, private->envflags, S_IRUSR | S_IWUSR); if ((ret != 0) && (ret != DB_RUNRECOVERY)) { gf_log (this->name, GF_LOG_CRITICAL, - "failed to open DB environment (%s)", - db_strerror (ret)); + "failed to join Berkeley DB environment at %s: %s." + "please run manual recovery and retry running " + "glusterfs", + directory, db_strerror (ret)); dbenv = NULL; - goto out; + goto err; } else if (ret == DB_RUNRECOVERY) { fatal_flags = ((private->envflags & (~DB_RECOVER)) | DB_RECOVER_FATAL); ret = dbenv->open(dbenv, directory, fatal_flags, S_IRUSR | S_IWUSR); if (ret != 0) { - gf_log (this->name, GF_LOG_ERROR, - "failed to open DB environment (%s) with " - "DB_REOVER_FATAL", - db_strerror (ret)); + gf_log (this->name, GF_LOG_CRITICAL, + "failed to join Berkeley DB environment in " + "recovery mode at %s: %s. please run manual " + "recovery and retry running glusterfs", + directory, db_strerror (ret)); dbenv = NULL; - goto out; - } else { - gf_log (this->name, GF_LOG_WARNING, - "opened DB environment after DB_RECOVER_FATAL:" - " %s", db_strerror (ret)); + goto err; } - } else { - gf_log (this->name, GF_LOG_DEBUG, - "DB environment successfull opened: %s", - db_strerror (ret)); } - - + ret = 0; #if (DB_VERSION_MAJOR == 4 && \ DB_VERSION_MINOR == 7) if (private->log_auto_remove) { @@ -832,41 +939,42 @@ bdb_dbenv_init (xlator_t *this, ret = dbenv->set_flags (dbenv, DB_LOG_AUTOREMOVE, 0); } #endif - if (ret != 0) { - gf_log ("bctx", GF_LOG_ERROR, - "failed to set DB_LOG_AUTOREMOVE on dbenv: %s", + if (ret < 0) { + gf_log ("bdb-ll", GF_LOG_ERROR, + "autoremoval of transactional log files could not be " + "configured (%s). you may have to do a manual " + "monitoring of transactional log files and remove " + "periodically.", db_strerror (ret)); - } else { - gf_log ("bctx", GF_LOG_DEBUG, - "DB_LOG_AUTOREMOVE set on dbenv"); + goto err; } if (private->transaction) { ret = dbenv->set_flags(dbenv, DB_AUTO_COMMIT, 1); if (ret != 0) { - gf_log ("bctx", GF_LOG_ERROR, - "failed to set DB_AUTO_COMMIT on dbenv: %s", + gf_log ("bdb-ll", GF_LOG_DEBUG, + "configuration of auto-commit failed for " + "database environment at %s. none of the " + "operations will be embedded in transaction " + "unless explicitly done so.", db_strerror (ret)); - } else { - gf_log ("bctx", GF_LOG_DEBUG, - "DB_AUTO_COMMIT set on dbenv"); + goto err; } if (private->txn_timeout) { - ret = dbenv->set_timeout (dbenv, - private->txn_timeout, + ret = dbenv->set_timeout (dbenv, private->txn_timeout, DB_SET_TXN_TIMEOUT); if (ret != 0) { - gf_log ("bctx", GF_LOG_ERROR, - "failed to set TXN_TIMEOUT to %d " - "milliseconds on dbenv: %s", + gf_log ("bdb-ll", GF_LOG_ERROR, + "could not configure Berkeley DB " + "transaction timeout to %d (%s). please" + " review 'option transaction-timeout %d" + "' option.", private->txn_timeout, - db_strerror (ret)); - } else { - gf_log ("bctx", GF_LOG_DEBUG, - "TXN_TIMEOUT set to %d milliseconds", + db_strerror (ret), private->txn_timeout); + goto err; } } @@ -874,32 +982,28 @@ bdb_dbenv_init (xlator_t *this, ret = dbenv->set_timeout(dbenv, private->txn_timeout, DB_SET_LOCK_TIMEOUT); - - if (ret != 0) { - gf_log ("bctx", GF_LOG_ERROR, - "failed to set LOCK_TIMEOUT to %d " - "milliseconds on dbenv: %s", + if (ret < 0) { + gf_log ("bdb-ll", GF_LOG_ERROR, + "could not configure Berkeley DB " + "lock timeout to %d (%s). please" + " review 'option lock-timeout %d" + "' option.", private->lock_timeout, - db_strerror (ret)); - } else { - gf_log ("bctx", GF_LOG_DEBUG, - "LOCK_TIMEOUT set to %d milliseconds", + db_strerror (ret), private->lock_timeout); + goto err; } } ret = dbenv->set_lg_dir (dbenv, private->logdir); - - if (ret != 0) { - gf_log ("bctx", GF_LOG_ERROR, - "failed to set log directory for dbenv: %s", - db_strerror (ret)); - } else { - gf_log ("bctx", GF_LOG_DEBUG, - "set dbenv log dir to %s", - private->logdir); + if (ret < 0) { + gf_log ("bdb-ll", GF_LOG_ERROR, + "failed to configure libdb transaction log " + "directory at %s. please review the " + "'option logdir %s' option.", + db_strerror (ret), private->logdir); + goto err; } - } if (private->errfile) { @@ -907,41 +1011,52 @@ bdb_dbenv_init (xlator_t *this, if (private->errfp) { dbenv->set_errfile (dbenv, private->errfp); } else { - gf_log ("bctx", GF_LOG_ERROR, - "failed to open errfile: %s", - strerror (errno)); + gf_log ("bdb-ll", GF_LOG_ERROR, + "failed to open error logging file for " + "libdb (Berkeley DB) internal logging (%s)." + "please review the 'option errfile %s' option.", + strerror (errno), private->errfile); + goto err; } } -out: return dbenv; +err: + if (dbenv) { + dbenv->close (dbenv, 0); + } + + return NULL; } #define BDB_ENV(this) ((((struct bdb_private *)this->private)->b_table)->dbenv) -/* bdb_checkpoint - during transactional usage, db does not directly write the data to db - * files, instead db writes a 'log' (similar to a journal entry) into a - * log file. db normally clears the log files during opening of an - * environment. since we expect a filesystem server to run for a pretty - * long duration and flushing 'log's during dbenv->open would prove very - * costly, if we accumulate the log entries for one complete run of - * glusterfs server. to flush the logs frequently, db provides a mechanism - * called 'checkpointing'. when we do a checkpoint, db flushes the logs to - * disk (writes changes to db files) and we can also clear the accumulated - * log files after checkpointing. NOTE: removing unwanted log files is not - * part of dbenv->txn_checkpoint() call. +/* bdb_checkpoint - during transactional usage, db does not directly write the + * data to db files, instead db writes a 'log' (similar to a journal entry) + * into a log file. db normally clears the log files during opening of an + * environment. since we expect a filesystem server to run for a pretty long + * duration and flushing 'log's during dbenv->open would prove very costly, if + * we accumulate the log entries for one complete run of glusterfs server. to + * flush the logs frequently, db provides a mechanism called 'checkpointing'. + * when we do a checkpoint, db flushes the logs to disk (writes changes to db + * files) and we can also clear the accumulated log files after checkpointing. + * NOTE: removing unwanted log files is not part of dbenv->txn_checkpoint() + * call. * * @data: xlator_t of the current instance of bdb xlator. * - * bdb_checkpoint is called in a different thread from the main glusterfs thread. bdb - * xlator creates the checkpoint thread after successfully opening the db environment. - * NOTE: bdb_checkpoint thread shares the DB_ENV handle with the filesystem thread. + * bdb_checkpoint is called in a different thread from the main glusterfs + * thread. bdb xlator creates the checkpoint thread after successfully opening + * the db environment. + * NOTE: bdb_checkpoint thread shares the DB_ENV handle with the filesystem + * thread. * * db environment checkpointing frequency is controlled by * 'option checkpoint-timeout <time-in-seconds>' in volfile. * - * NOTE: checkpointing thread is started only if 'option transaction on' specified in - * volfile. checkpointing is not valid for non-transactional environments. + * NOTE: checkpointing thread is started only if 'option transaction on' + * specified in volfile. checkpointing is not valid for non-transactional + * environments. * */ static void * @@ -965,23 +1080,29 @@ bdb_checkpoint (void *data) if (active) { ret = dbenv->txn_checkpoint (dbenv, 1024, 0, 0); if (ret) { - gf_log ("bctx", GF_LOG_ERROR, - "failed to checkpoint environment: %s", + gf_log ("bdb-ll", GF_LOG_DEBUG, + "_BDB_CHECKPOINT: %s" + "(failed to checkpoint environment)", db_strerror (ret)); } else { - gf_log ("bctx", GF_LOG_DEBUG, - "checkpointing successful"); + gf_log ("bdb-ll", GF_LOG_DEBUG, + "_BDB_CHECKPOINT: successfully " + "checkpointed"); } } else { ret = dbenv->txn_checkpoint (dbenv, 1024, 0, 0); if (ret) { - gf_log ("bctx", GF_LOG_ERROR, - "failed to do final checkpoint " - "environment: %s", + gf_log ("bdb-ll", GF_LOG_ERROR, + "_BDB_CHECKPOINT: %s" + "(final checkpointing failed. might " + "need to run recovery tool manually on " + "next usage of this database " + "environment)", db_strerror (ret)); } else { - gf_log ("bctx", GF_LOG_DEBUG, - "final checkpointing successful"); + gf_log ("bdb-ll", GF_LOG_DEBUG, + "_BDB_CHECKPOINT: final successfully " + "checkpointed"); } break; } @@ -990,449 +1111,321 @@ bdb_checkpoint (void *data) return NULL; } -static inline void -bdb_cache_init (xlator_t *this, - dict_t *options, - struct bdb_private *private) -{ - /* cache is always on */ - private->cache = ON; -} - -static inline void -bdb_log_remove_init (xlator_t *this, - dict_t *options, - struct bdb_private *private) -{ - private->log_auto_remove = 1; - gf_log (this->name, GF_LOG_DEBUG, - "DB_ENV will use DB_LOG_AUTO_REMOVE"); -} -static inline void -bdb_errfile_init (xlator_t *this, - dict_t *options, - struct bdb_private *private) -{ - int ret = -1; - char *errfile = NULL; - - ret = dict_get_str (options, "errfile", &errfile); - if (ret == 0) { - private->errfile = strdup (errfile); - gf_log (this->name, GF_LOG_DEBUG, - "using errfile: %s", private->errfile); - } -} - -static inline void -bdb_table_init (xlator_t *this, - dict_t *options, - struct bdb_private *private) +/* bdb_db_init - initialize bdb xlator + * + * reads the options from @options dictionary and sets appropriate values in + * @this->private. also initializes DB_ENV. + * + * return: 0 on success or -1 on error + * (with logging the error through gf_log()). + */ +int +bdb_db_init (xlator_t *this, + dict_t *options) { - bctx_table_t *table = NULL; - int32_t idx = 0; - - int ret = -1; - char *lru_limit_str = NULL; - char *page_size_str = NULL; - - table = CALLOC (1, sizeof (*table)); - if (table) { - INIT_LIST_HEAD(&(table->b_lru)); - INIT_LIST_HEAD(&(table->active)); - INIT_LIST_HEAD(&(table->purge)); - - LOCK_INIT (&table->lock); - LOCK_INIT (&table->checkpoint_lock); - - table->transaction = private->transaction; - table->access_mode = private->access_mode; - table->dbflags = private->dbflags; - table->this = this; - - { - ret = dict_get_str (options, "lru-limit", - &lru_limit_str); - - /* TODO: set max lockers and max txns to accomodate - * for more than lru_limit */ - if (ret == 0) { - ret = gf_string2uint32 (lru_limit_str, - &table->lru_limit); - gf_log ("bdb-ll", GF_LOG_DEBUG, - "setting bctx lru limit to %d", - table->lru_limit); - } else { - table->lru_limit = BDB_DEFAULT_LRU_LIMIT; - } - } - - { - ret = dict_get_str (options, "page-size", - &page_size_str); - - if (ret == 0) { - ret = gf_string2bytesize (page_size_str, - &table->page_size); - if (ret != 0) { - gf_log ("bdb-ll", GF_LOG_ERROR, - "invalid number format \"%s\"" - " of \"option page-size\"", - page_size_str); - } + /* create a db entry for root */ + int32_t op_ret = 0; + bdb_private_t *private = NULL; + bctx_table_t *table = NULL; - if (!PAGE_SIZE_IN_RANGE(table->page_size)) { - gf_log ("bdb-ll", GF_LOG_ERROR, - "pagesize %s is out of range." - "Allowed pagesize is between " - "%d and %d", - page_size_str, - BDB_LL_PAGE_SIZE_MIN, - BDB_LL_PAGE_SIZE_MAX); - } - } - else { - table->page_size = BDB_LL_PAGE_SIZE_DEFAULT; - } - gf_log ("bdb-ll", - GF_LOG_DEBUG, "using page-size %"PRIu64, - table->page_size); - } + char *checkpoint_interval_str = NULL; + char *page_size_str = NULL; + char *lru_limit_str = NULL; + char *timeout_str = NULL; + char *access_mode = NULL; + char *endptr = NULL; + char *errfile = NULL; + char *directory = NULL; + char *logdir = NULL; + char *mode = NULL; + char *mode_str = NULL; + int ret = -1; + int idx = 0; + struct stat stbuf = {0,}; - table->hash_size = BDB_DEFAULT_HASH_SIZE; - table->b_hash = CALLOC (BDB_DEFAULT_HASH_SIZE, - sizeof (struct list_head)); + private = this->private; - for (idx = 0; idx < table->hash_size; idx++) - INIT_LIST_HEAD(&(table->b_hash[idx])); + /* cache is always on */ + private->cache = ON; - private->b_table = table; + ret = dict_get_str (options, "access-mode", &access_mode); + if ((ret == 0) + && (!strcmp (access_mode, "btree"))) { + gf_log (this->name, GF_LOG_DEBUG, + "using BTREE access mode to access libdb " + "(Berkeley DB)"); + private->access_mode = DB_BTREE; } else { - gf_log ("bdb-ll", GF_LOG_CRITICAL, - "failed to allocate bctx table: out of memory"); + gf_log (this->name, GF_LOG_DEBUG, + "using HASH access mode to access libdb (Berkeley DB)"); + private->access_mode = DB_HASH; } -} - -static inline void -bdb_directory_init (xlator_t *this, - dict_t *options, - struct bdb_private *private) -{ - int ret = -1; - char *directory = NULL; - char *logdir = NULL; - int32_t op_ret = -1; - struct stat stbuf = {0}; - ret = dict_get_str (options, "directory", &directory); + ret = dict_get_str (options, "mode", &mode); + if ((ret == 0) + && (!strcmp (mode, "cache"))) { + gf_log (this->name, GF_LOG_DEBUG, + "cache data mode selected for 'storage/bdb'. filesystem" + " operations are not transactionally protected and " + "system crash does not guarantee recoverability of " + "data"); + private->envflags = DB_CREATE | DB_INIT_LOG | + DB_INIT_MPOOL | DB_THREAD; + private->dbflags = DB_CREATE | DB_THREAD; + private->transaction = OFF; + } else { + gf_log (this->name, GF_LOG_DEBUG, + "persistent data mode selected for 'storage/bdb'. each" + "filesystem operation is guaranteed to be Berkeley DB " + "transaction protected."); + private->transaction = ON; + private->envflags = DB_CREATE | DB_INIT_LOCK | DB_INIT_LOG | + DB_INIT_MPOOL | DB_INIT_TXN | DB_RECOVER | DB_THREAD; + private->dbflags = DB_CREATE | DB_THREAD; - if (ret == 0) { - ret = dict_get_str (options, "logdir", &logdir); - if (ret != 0) { - gf_log ("bdb-ll", GF_LOG_DEBUG, - "using default logdir as database home"); - private->logdir = strdup (directory); + ret = dict_get_str (options, "lock-timeout", &timeout_str); - } else { - private->logdir = strdup (logdir); - gf_log ("bdb-ll", GF_LOG_DEBUG, - "using logdir: %s", - private->logdir); - umask (000); - if (mkdir (private->logdir, 0777) == 0) { - gf_log ("bdb-ll", GF_LOG_WARNING, - "logdir specified (%s) not exists, " - "created", - private->logdir); - } - - op_ret = stat (private->logdir, &stbuf); - if ((op_ret != 0) - || (!S_ISDIR (stbuf.st_mode))) { - gf_log ("bdb-ll", GF_LOG_ERROR, - "specified logdir doesn't exist, " - "using default " - "(environment home directory: %s)", - directory); - private->logdir = strdup (directory); + if (ret == 0) { + ret = gf_string2time (timeout_str, + &private->lock_timeout); + + if (private->lock_timeout > 4260000) { + /* db allows us to DB_SET_LOCK_TIMEOUT to be + * set to a maximum of 71 mins + * (4260000 milliseconds) */ + gf_log (this->name, GF_LOG_DEBUG, + "Berkeley DB lock-timeout parameter " + "(%d) is out of range. please specify" + " a valid timeout value for " + "lock-timeout and retry.", + private->lock_timeout); + goto err; } } - - private->b_table->dbenv = bdb_dbenv_init (this, directory); - - if (!private->b_table->dbenv) { - gf_log ("bdb-ll", GF_LOG_ERROR, - "failed to initialize db environment"); - FREE (private); - op_ret = -1; - } else { - if (private->transaction) { - /* all well, start the checkpointing thread */ - LOCK_INIT (&private->active_lock); - - LOCK (&private->active_lock); - { - private->active = 1; - } - UNLOCK (&private->active_lock); - pthread_create (&private->checkpoint_thread, - NULL, bdb_checkpoint, this); + ret = dict_get_str (options, "transaction-timeout", + &timeout_str); + if (ret == 0) { + ret = gf_string2time (timeout_str, + &private->txn_timeout); + + if (private->txn_timeout > 4260000) { + /* db allows us to DB_SET_TXN_TIMEOUT to be set + * to a maximum of 71 mins + * (4260000 milliseconds) */ + gf_log (this->name, GF_LOG_DEBUG, + "Berkeley DB lock-timeout parameter " + "(%d) is out of range. please specify" + " a valid timeout value for " + "lock-timeout and retry.", + private->lock_timeout); + goto err; } } - } -} - -static inline void -bdb_dir_mode_init (xlator_t *this, - dict_t *options, - struct bdb_private *private) -{ - int ret = -1; - char *mode_str = NULL; - char *endptr = NULL; - ret = dict_get_str (options, "dir-mode", &mode_str); - - if (ret == 0) { - private->dir_mode = strtol (mode_str, &endptr, 8); - if ((*endptr) || - (!IS_VALID_FILE_MODE(private->dir_mode))) { - gf_log (this->name, GF_LOG_DEBUG, - "invalid dir-mode %o. setting to default %o", - private->dir_mode, - DEFAULT_DIR_MODE); - private->dir_mode = DEFAULT_DIR_MODE; - } else { - gf_log (this->name, GF_LOG_DEBUG, - "setting dir-mode to %o", - private->dir_mode); + private->checkpoint_interval = BDB_DEFAULT_CHECKPOINT_INTERVAL; + ret = dict_get_str (options, "checkpoint-interval", + &checkpoint_interval_str); + if (ret == 0) { + ret = gf_string2time (checkpoint_interval_str, + &private->checkpoint_interval); + + if (ret < 0) { + gf_log (this->name, GF_LOG_DEBUG, + "'%"PRIu32"' is not a valid parameter " + "for checkpoint-interval option. " + "please specify a valid " + "checkpoint-interval and retry", + private->checkpoint_interval); + goto err; + } } - } else { - private->dir_mode = DEFAULT_DIR_MODE; } - private->dir_mode = private->dir_mode | S_IFDIR; -} - -static inline void -bdb_file_mode_init (xlator_t *this, - dict_t *options, - struct bdb_private *private) -{ - int ret = -1; - char *mode_str = NULL; - char *endptr = NULL; - ret = dict_get_str (options, "file-mode", &mode_str); - if (ret == 0) { private->file_mode = strtol (mode_str, &endptr, 8); if ((*endptr) || (!IS_VALID_FILE_MODE(private->file_mode))) { gf_log (this->name, GF_LOG_DEBUG, - "invalid file-mode %o. setting to default %o", - private->file_mode, DEFAULT_FILE_MODE); - private->file_mode = DEFAULT_FILE_MODE; - } else { - gf_log (this->name, GF_LOG_DEBUG, - "setting file-mode to %o", + "'%o' is not a valid parameter for file-mode " + "option. please specify a valid parameter for " + "file-mode and retry.", private->file_mode); - private->file_mode = private->file_mode; + goto err; } } else { private->file_mode = DEFAULT_FILE_MODE; } - private->symlink_mode = private->file_mode | S_IFLNK; private->file_mode = private->file_mode | S_IFREG; -} - -static inline void -bdb_checkpoint_interval_init (xlator_t *this, - dict_t *options, - struct bdb_private *private) -{ - int ret = -1; - char *checkpoint_interval_str = NULL; - - private->checkpoint_interval = BDB_DEFAULT_CHECKPOINT_INTERVAL; - - ret = dict_get_str (options, "checkpoint-interval", - &checkpoint_interval_str); + ret = dict_get_str (options, "dir-mode", &mode_str); if (ret == 0) { - ret = gf_string2time (checkpoint_interval_str, - &private->checkpoint_interval); - - if (ret == 0) { + private->dir_mode = strtol (mode_str, &endptr, 8); + if ((*endptr) || + (!IS_VALID_FILE_MODE(private->dir_mode))) { gf_log (this->name, GF_LOG_DEBUG, - "setting checkpoint-interval to %"PRIu32" seconds", - private->checkpoint_interval); + "'%o' is not a valid parameter for dir-mode " + "option. please specify a valid parameter for " + "dir-mode and retry.", + private->dir_mode); + goto err; } } else { - gf_log (this->name, GF_LOG_DEBUG, - "setting checkpoint-interval to default: %"PRIu32" seconds", - private->checkpoint_interval); + private->dir_mode = DEFAULT_DIR_MODE; } -} -static inline void -bdb_lock_timeout_init (xlator_t *this, - dict_t *options, - struct bdb_private *private) -{ - int ret = -1; - char *timeout_str = NULL; + private->dir_mode = private->dir_mode | S_IFDIR; - ret = dict_get_str (options, "lock-timeout", &timeout_str); + table = CALLOC (1, sizeof (*table)); + if (table == NULL) { + gf_log ("bdb-ll", GF_LOG_CRITICAL, + "memory allocation for 'storage/bdb' internal " + "context table failed."); + goto err; + } - if (ret == 0) { - ret = gf_string2time (timeout_str, &private->lock_timeout); + INIT_LIST_HEAD(&(table->b_lru)); + INIT_LIST_HEAD(&(table->active)); + INIT_LIST_HEAD(&(table->purge)); - if (private->lock_timeout > 4260000) { - /* db allows us to DB_SET_LOCK_TIMEOUT to be set to a - * maximum of 71 mins (4260000 milliseconds) */ - gf_log (this->name, GF_LOG_DEBUG, - "lock-timeout %d, out of range", - private->lock_timeout); - private->lock_timeout = 0; - } else { - gf_log (this->name, GF_LOG_DEBUG, - "setting lock-timeout to %d milliseconds", - private->lock_timeout); - } - } -} + LOCK_INIT (&table->lock); + LOCK_INIT (&table->checkpoint_lock); -static inline void -bdb_transaction_timeout_init (xlator_t *this, - dict_t *options, - struct bdb_private *private) -{ - int ret = -1; - char *timeout_str = NULL; + table->transaction = private->transaction; + table->access_mode = private->access_mode; + table->dbflags = private->dbflags; + table->this = this; - ret = dict_get_str (options, "transaction-timeout", &timeout_str); + ret = dict_get_str (options, "lru-limit", + &lru_limit_str); + /* TODO: set max lockers and max txns to accomodate + * for more than lru_limit */ if (ret == 0) { - ret = gf_string2time (timeout_str, &private->txn_timeout); - - if (private->txn_timeout > 4260000) { - /* db allows us to DB_SET_TXN_TIMEOUT to be set to - * a maximum of 71 mins (4260000 milliseconds) */ - gf_log (this->name, GF_LOG_DEBUG, - "transaction-timeout %d, out of range", - private->txn_timeout); - private->txn_timeout = 0; - } else { - gf_log (this->name, GF_LOG_DEBUG, - "setting transaction-timeout to %d " - "milliseconds", - private->txn_timeout); - } + ret = gf_string2uint32 (lru_limit_str, + &table->lru_limit); + gf_log ("bdb-ll", GF_LOG_DEBUG, + "setting lru limit of 'storage/bdb' internal context" + "table to %d. maximum of %d unused databases can be " + "open at any given point of time.", + table->lru_limit, table->lru_limit); + } else { + table->lru_limit = BDB_DEFAULT_LRU_LIMIT; } -} -static inline void -bdb_transaction_init (xlator_t *this, - dict_t *options, - struct bdb_private *private) -{ - int ret = -1; - char *mode = NULL; + ret = dict_get_str (options, "page-size", + &page_size_str); - ret = dict_get_str (options, "mode", &mode); + if (ret == 0) { + ret = gf_string2bytesize (page_size_str, + &table->page_size); + if (ret < 0) { + gf_log ("bdb-ll", GF_LOG_ERROR, + "\"%s\" is an invalid parameter to " + "\"option page-size\". please specify a valid " + "size and retry.", + page_size_str); + goto err; + } - if ((ret == 0) - && (!strcmp (mode, "cache"))) { - gf_log (this->name, GF_LOG_DEBUG, - "cache mode selected"); - private->envflags = DB_CREATE | DB_INIT_LOG | - DB_INIT_MPOOL | DB_THREAD; - private->dbflags = DB_CREATE | DB_THREAD; - private->transaction = OFF; + if (!PAGE_SIZE_IN_RANGE(table->page_size)) { + gf_log ("bdb-ll", GF_LOG_ERROR, + "\"%s\" is out of range for Berkeley DB " + "page-size. allowed page-size range is %d to " + "%d. please specify a page-size value in the " + "range and retry.", + page_size_str, BDB_LL_PAGE_SIZE_MIN, + BDB_LL_PAGE_SIZE_MAX); + goto err; + } } else { - gf_log (this->name, GF_LOG_DEBUG, - "persistant mode selected"); - private->transaction = ON; - private->envflags = DB_CREATE | DB_INIT_LOCK | DB_INIT_LOG | - DB_INIT_MPOOL | DB_INIT_TXN | DB_RECOVER | DB_THREAD; - private->dbflags = DB_CREATE | DB_THREAD; - - bdb_lock_timeout_init (this, options, private); - - bdb_transaction_timeout_init (this, options, private); - - bdb_log_remove_init (this, options, private); - - bdb_checkpoint_interval_init (this, options, private); + table->page_size = BDB_LL_PAGE_SIZE_DEFAULT; } -} -static inline void -bdb_access_mode_init (xlator_t *this, - dict_t *options, - struct bdb_private *private) -{ - int ret = -1; - char *access_mode = NULL; + table->hash_size = BDB_DEFAULT_HASH_SIZE; + table->b_hash = CALLOC (BDB_DEFAULT_HASH_SIZE, + sizeof (struct list_head)); - ret = dict_get_str (options, "access-mode", &access_mode); + for (idx = 0; idx < table->hash_size; idx++) + INIT_LIST_HEAD(&(table->b_hash[idx])); - if ((ret == 0) - && (!strcmp (access_mode, "btree"))) { - gf_log (this->name, GF_LOG_DEBUG, - "using access mode BTREE"); - private->access_mode = DB_BTREE; - } else { + private->b_table = table; + + ret = dict_get_str (options, "errfile", &errfile); + if (ret == 0) { + private->errfile = strdup (errfile); gf_log (this->name, GF_LOG_DEBUG, - "using access mode HASH"); - private->access_mode = DB_HASH; + "using %s as error logging file for libdb (Berkeley DB " + "library) internal logging.", private->errfile); } -} + ret = dict_get_str (options, "directory", &directory); -/* bdb_db_init - initialize bdb xlator - * - * reads the options from @options dictionary and sets appropriate values in - * @this->private. also initializes DB_ENV. - * - * return: 0 on success or -1 on error - * (with logging the error through gf_log()). - */ -int -bdb_db_init (xlator_t *this, - dict_t *options) -{ - /* create a db entry for root */ - int32_t op_ret = 0; - bdb_private_t *private = NULL; + if (ret == 0) { + ret = dict_get_str (options, "logdir", &logdir); - private = this->private; + if (ret < 0) { + gf_log ("bdb-ll", GF_LOG_DEBUG, + "using the database environment home " + "directory (%s) itself as transaction log " + "directory", directory); + private->logdir = strdup (directory); - bdb_cache_init (this, options, private); + } else { + private->logdir = strdup (logdir); - bdb_access_mode_init (this, options, private); + op_ret = stat (private->logdir, &stbuf); + if ((op_ret != 0) + || (!S_ISDIR (stbuf.st_mode))) { + gf_log ("bdb-ll", GF_LOG_ERROR, + "specified logdir %s does not exist. " + "please provide a valid existing " + "directory as parameter to 'option " + "logdir'", + private->logdir); + goto err; + } + } - bdb_transaction_init (this, options, private); + private->b_table->dbenv = bdb_dbenv_init (this, directory); + if (private->b_table->dbenv == NULL) { + gf_log ("bdb-ll", GF_LOG_ERROR, + "initialization of database environment " + "failed"); + goto err; + } else { + if (private->transaction) { + /* all well, start the checkpointing thread */ + LOCK_INIT (&private->active_lock); - { - LOCK_INIT (&private->ino_lock); - private->next_ino = 2; + LOCK (&private->active_lock); + { + private->active = 1; + } + UNLOCK (&private->active_lock); + pthread_create (&private->checkpoint_thread, + NULL, bdb_checkpoint, this); + } + } } - bdb_file_mode_init (this, options, private); - - bdb_dir_mode_init (this, options, private); - - bdb_table_init (this, options, private); - - bdb_errfile_init (this, options, private); + return op_ret; +err: + if (table) { + FREE (table->b_hash); + FREE (table); + } + if (private) { + if (private->errfile) + FREE (private->errfile); - bdb_directory_init (this, options, private); + if (private->logdir) + FREE (private->logdir); + } - return op_ret; + return -1; } diff --git a/xlators/storage/bdb/src/bdb.c b/xlators/storage/bdb/src/bdb.c index a3c6c44ea..85f08ea9a 100644 --- a/xlators/storage/bdb/src/bdb.c +++ b/xlators/storage/bdb/src/bdb.c @@ -82,49 +82,57 @@ bdb_mknod (call_frame_t *frame, if (!S_ISREG(mode)) { gf_log (this->name, GF_LOG_DEBUG, - "mknod for non-regular file"); + "MKNOD %"PRId64"/%s (%s): EPERM" + "(mknod supported only for regular files. " + "file mode '%o' not supported)", + loc->parent->ino, loc->name, loc->path, mode); op_ret = -1; op_errno = EPERM; goto out; } /* if(!S_ISREG(mode)) */ bctx = bctx_parent (B_TABLE(this), loc->path); - if (bctx == NULL) { - gf_log (this->name, GF_LOG_ERROR, - "failed to get bctx for path: %s", - loc->path); - op_ret = -1; - op_errno = ENOENT; + gf_log (this->name, GF_LOG_DEBUG, + "MKNOD %"PRId64"/%s (%s): ENOMEM" + "(failed to lookup database handle)", + loc->parent->ino, loc->name, loc->path); + op_ret = -1; + op_errno = ENOMEM; goto out; - } /* if(bctx == NULL) */ + } MAKE_REAL_PATH_TO_STORAGE_DB (db_path, this, bctx->directory); op_ret = lstat (db_path, &stbuf); - op_errno = errno; if (op_ret != 0) { - gf_log (this->name, GF_LOG_ERROR, - "failed to lstat on %s (%s)", - db_path, strerror (op_errno)); + op_errno = EINVAL; + gf_log (this->name, GF_LOG_DEBUG, + "MKNOD %"PRId64"/%s (%s): EINVAL" + "(failed to lookup database handle)", + loc->parent->ino, loc->name, loc->path); goto out; } MAKE_KEY_FROM_PATH (key_string, loc->path); - op_ret = bdb_db_put (bctx, NULL, key_string, NULL, 0, 0, 0); + op_ret = bdb_db_icreate (bctx, key_string); if (op_ret > 0) { /* create successful */ - stbuf.st_ino = bdb_inode_transform (stbuf.st_ino, bctx); + stbuf.st_ino = bdb_inode_transform (loc->parent->ino, + key_string, + strlen (key_string)); stbuf.st_mode = mode; stbuf.st_size = 0; stbuf.st_blocks = BDB_COUNT_BLOCKS (stbuf.st_size, \ stbuf.st_blksize); } else { - gf_log (this->name, GF_LOG_ERROR, - "bdb_db_get() failed for path: %s", - loc->path); - op_ret = -1; - op_errno = ENOENT; + gf_log (this->name, GF_LOG_DEBUG, + "MKNOD %"PRId64"/%s (%s): ENOMEM" + "(failed to create database entry)", + loc->parent->ino, loc->name, loc->path); + op_ret = -1; + op_errno = EINVAL; /* TODO: errno sari illa */ + goto out; }/* if (!op_ret)...else */ out: @@ -156,11 +164,7 @@ is_dir_empty (xlator_t *this, bctx = bctx_lookup (B_TABLE(this), loc->path); if (bctx == NULL) { - gf_log (this->name, GF_LOG_DEBUG, - "failed to get bctx from inode for dir: %s," - "assuming empty directory", - loc->path); - ret = 1; + ret = -ENOMEM; goto out; } @@ -180,33 +184,24 @@ is_dir_empty (xlator_t *this, break; case DB_UNKNOWN: gf_log (this->name, GF_LOG_CRITICAL, - "unknown access-mode set for db"); + "unknown access-mode set for database"); ret = 0; } } else { - gf_log (this->name, GF_LOG_ERROR, - "failed to get db stat for db at path: %s", - loc->path); - ret = 1; + ret = -EBUSY; goto out; } MAKE_REAL_PATH (real_path, this, loc->path); dir = opendir (real_path); if (dir == NULL) { - gf_log (this->name, GF_LOG_DEBUG, - "failed to opendir(%s)", - loc->path); - ret = 0; + ret = -errno; goto out; } while ((entry = readdir (dir))) { if ((!IS_BDB_PRIVATE_FILE(entry->d_name)) && (!IS_DOT_DOTDOT(entry->d_name))) { - gf_log (this->name, GF_LOG_DEBUG, - "directory (%s) not empty, has a non-db entry", - loc->path); ret = 0; break; }/* if(!IS_BDB_PRIVATE_FILE()) */ @@ -256,26 +251,19 @@ is_space_left (xlator_t *this, ret = statvfs (private->export_path, &stbuf); if (ret != 0) { - gf_log (this->name, GF_LOG_ERROR, - "failed to do statvfs on %s", - private->export_path); - return 0; + ret = 0; } else { req_blocks = (size / stbuf.f_frsize) + 1; usable_blocks = (stbuf.f_bfree - BDB_ENOSPC_THRESHOLD); - gf_log (this->name, GF_LOG_DEBUG, - "requested size: %"GF_PRI_SIZET"\n" - "free blocks: %"PRIu64"\n" - "block size: %lu\nfrag size: %lu", - size, stbuf.f_bfree, stbuf.f_bsize, stbuf.f_frsize); - if (req_blocks < usable_blocks) - return 1; + ret = 1; else - return 0; + ret = 0; } + + return ret; } int32_t @@ -303,40 +291,68 @@ bdb_create (call_frame_t *frame, private = this->private; bctx = bctx_parent (B_TABLE(this), loc->path); - op_errno = ENOENT; - GF_VALIDATE_OR_GOTO (this->name, bctx, out); + if (bctx == NULL) { + gf_log (this->name, GF_LOG_DEBUG, + "CREATE %"PRId64"/%s (%s): ENOMEM" + "(failed to lookup database handle)", + loc->parent->ino, loc->name, loc->path); + op_ret = -1; + op_errno = ENOMEM; + goto out; + } MAKE_REAL_PATH_TO_STORAGE_DB (db_path, this, bctx->directory); op_ret = lstat (db_path, &stbuf); - op_errno = errno; if (op_ret != 0) { - gf_log (this->name, GF_LOG_ERROR, - "failed to lstat on %s (%s)", - db_path, strerror (op_errno)); + op_errno = EINVAL; + gf_log (this->name, GF_LOG_DEBUG, + "CREATE %"PRId64"/%s (%s): EINVAL" + "(database file missing)", + loc->parent->ino, loc->name, loc->path); goto out; } MAKE_KEY_FROM_PATH (key_string, loc->path); - op_ret = bdb_db_put (bctx, NULL, key_string, NULL, 0, 0, 0); - op_errno = EINVAL; - GF_VALIDATE_OR_GOTO (this->name, (op_ret == 0), out); + op_ret = bdb_db_icreate (bctx, key_string); + if (op_ret < 0) { + gf_log (this->name, GF_LOG_DEBUG, + "CREATE %"PRId64"/%s (%s): ENOMEM" + "(failed to create database entry)", + loc->parent->ino, loc->name, loc->path); + op_errno = EINVAL; /* TODO: errno sari illa */ + goto out; + } /* create successful */ bfd = CALLOC (1, sizeof (*bfd)); - op_ret = -1; - op_errno = ENOMEM; - GF_VALIDATE_OR_GOTO (this->name, bfd, out); + if (bfd == NULL) { + gf_log (this->name, GF_LOG_DEBUG, + "CREATE %"PRId64"/%s (%s): ENOMEM" + "(failed to allocate memory for internal fd context)", + loc->parent->ino, loc->name, loc->path); + op_ret = -1; + op_errno = ENOMEM; + goto out; + } /* NOTE: bdb_get_bctx_from () returns bctx with a ref */ bfd->ctx = bctx; bfd->key = strdup (key_string); - op_ret = -1; - op_errno = ENOMEM; - GF_VALIDATE_OR_GOTO (this->name, bfd->key, out); + if (bfd->key == NULL) { + gf_log (this->name, GF_LOG_DEBUG, + "CREATE %"PRId64" (%s): ENOMEM" + "(failed to allocate memory for internal fd->key)", + loc->ino, loc->path); + op_ret = -1; + op_errno = ENOMEM; + goto out; + } - BDB_SET_BFD (this, fd, bfd); + BDB_FCTX_SET (fd, this, bfd); - stbuf.st_ino = bdb_inode_transform (stbuf.st_ino, bctx); + stbuf.st_ino = bdb_inode_transform (loc->parent->ino, + key_string, + strlen (key_string)); stbuf.st_mode = private->file_mode; stbuf.st_size = 0; stbuf.st_nlink = 1; @@ -377,23 +393,43 @@ bdb_open (call_frame_t *frame, GF_VALIDATE_OR_GOTO (this->name, fd, out); bctx = bctx_parent (B_TABLE(this), loc->path); - op_errno = EBADFD; - GF_VALIDATE_OR_GOTO (this->name, bctx, out); + if (bctx == NULL) { + gf_log (this->name, GF_LOG_DEBUG, + "OPEN %"PRId64" (%s): ENOMEM" + "(failed to lookup database handle)", + loc->ino, loc->path); + op_ret = -1; + op_errno = ENOMEM; + goto out; + } bfd = CALLOC (1, sizeof (*bfd)); - op_errno = ENOMEM; - GF_VALIDATE_OR_GOTO (this->name, bfd, out); + if (bfd == NULL) { + gf_log (this->name, GF_LOG_DEBUG, + "OPEN %"PRId64" (%s): ENOMEM" + "(failed to allocate memory for internal fd context)", + loc->ino, loc->path); + op_ret = -1; + op_errno = ENOMEM; + goto out; + } /* NOTE: bctx_parent () returns bctx with a ref */ bfd->ctx = bctx; MAKE_KEY_FROM_PATH (key_string, loc->path); bfd->key = strdup (key_string); - op_ret = -1; - op_errno = ENOMEM; - GF_VALIDATE_OR_GOTO (this->name, bfd->key, out); + if (bfd->key == NULL) { + gf_log (this->name, GF_LOG_DEBUG, + "OPEN %"PRId64" (%s): ENOMEM" + "(failed to allocate memory for internal fd->key)", + loc->ino, loc->path); + op_ret = -1; + op_errno = ENOMEM; + goto out; + } - BDB_SET_BFD (this, fd, bfd); + BDB_FCTX_SET (fd, this, bfd); op_ret = 0; out: frame->root->rsp_refs = NULL; @@ -416,7 +452,6 @@ bdb_readv (call_frame_t *frame, struct bdb_fd *bfd = NULL; dict_t *reply_dict = NULL; char *buf = NULL; - data_t *buf_data = NULL; char *db_path = NULL; int32_t read_size = 0; @@ -424,29 +459,37 @@ bdb_readv (call_frame_t *frame, GF_VALIDATE_OR_GOTO ("bdb", this, out); GF_VALIDATE_OR_GOTO (this->name, fd, out); - bfd = bdb_extract_bfd (fd, this); - op_errno = EBADFD; - GF_VALIDATE_OR_GOTO (this->name, bfd, out); + BDB_FCTX_GET (fd, this, &bfd); + if (bfd == NULL) { + gf_log (this->name, GF_LOG_DEBUG, + "READV %"PRId64" - %"PRId32",%"PRId64": EBADFD" + "(internal fd not found through fd)", + fd->inode->ino, size, offset); + op_errno = EBADFD; + op_ret = -1; + goto out; + } MAKE_REAL_PATH_TO_STORAGE_DB (db_path, this, bfd->ctx->directory); op_ret = lstat (db_path, &stbuf); - op_errno = errno; if (op_ret != 0) { - gf_log (this->name, GF_LOG_ERROR, - "failed to lstat on %s (%s)", - db_path, strerror (op_errno)); + op_errno = errno; + gf_log (this->name, GF_LOG_DEBUG, + "READV %"PRId64" - %"PRId32",%"PRId64": EINVAL" + "(database file missing)", + fd->inode->ino, size, offset); goto out; } /* we are ready to go */ - op_ret = bdb_db_get (bfd->ctx, NULL, - bfd->key, &buf, - size, offset); + op_ret = bdb_db_fread (bfd, &buf, size, offset); read_size = op_ret; if (op_ret == -1) { - gf_log (this->name, GF_LOG_ERROR, - "failed to do db_storage_get()"); - op_ret = -1; + gf_log (this->name, GF_LOG_DEBUG, + "READV %"PRId64" - %"PRId32",%"PRId64": EBADFD" + "(failed to find entry in database)", + fd->inode->ino, size, offset); + op_ret = -1; op_errno = ENOENT; goto out; } else if (op_ret == 0) { @@ -454,17 +497,21 @@ bdb_readv (call_frame_t *frame, } reply_dict = dict_new (); - op_ret = -1; - op_errno = ENOMEM; - GF_VALIDATE_OR_GOTO (this->name, reply_dict, out); + if (reply_dict == NULL) { + gf_log (this->name, GF_LOG_DEBUG, + "READV %"PRId64" - %"PRId32",%"PRId64": EBADFD" + "(failed to allocate memory for reply dictionary)", + fd->inode->ino, size, offset); + op_ret = -1; + op_errno = ENOMEM; + goto out; + } if (size < read_size) { op_ret = size; read_size = size; } - buf_data->len = op_ret; - op_ret = dict_set_dynptr (reply_dict, NULL, buf, op_ret); if (op_ret < 0) { op_ret = -1; @@ -513,44 +560,51 @@ bdb_writev (call_frame_t *frame, GF_VALIDATE_OR_GOTO (this->name, fd, out); GF_VALIDATE_OR_GOTO (this->name, vector, out); - bfd = bdb_extract_bfd (fd, this); - op_errno = EBADFD; - GF_VALIDATE_OR_GOTO (this->name, bfd, out); + BDB_FCTX_GET (fd, this, &bfd); + if (bfd == NULL) { + gf_log (this->name, GF_LOG_DEBUG, + "WRITEV %"PRId64" - %"PRId32",%"PRId64": EBADFD" + "(internal fd not found through fd)", + fd->inode->ino, count, offset); + op_ret = -1; + op_errno = EBADFD; + goto out; + } MAKE_REAL_PATH_TO_STORAGE_DB (db_path, this, bfd->ctx->directory); op_ret = lstat (db_path, &stbuf); - op_errno = errno; if (op_ret != 0) { + op_errno = errno; gf_log (this->name, GF_LOG_ERROR, - "failed to lstat on %s (%s)", - db_path, strerror (op_errno)); + "WRITEV %"PRId64" - %"PRId32",%"PRId64": EINVAL" + "(database file missing)", + fd->inode->ino, count, offset); goto out; } - for (idx = 0; idx < count; idx++) total_size += vector[idx].iov_len; if (!is_space_left (this, total_size)) { gf_log (this->name, GF_LOG_ERROR, - "requested storage for %"GF_PRI_SIZET", ENOSPC", - total_size); + "WRITEV %"PRId64" - %"PRId32" (%"PRId32"),%"PRId64": " + "ENOSPC " + "(not enough space after internal measurement)", + fd->inode->ino, count, total_size, offset); op_ret = -1; op_errno = ENOSPC; goto out; } - /* we are ready to go */ for (idx = 0; idx < count; idx++) { - c_ret = bdb_db_put (bfd->ctx, NULL, - bfd->key, vector[idx].iov_base, - vector[idx].iov_len, c_off, 0); - if (c_ret != 0) { + c_ret = bdb_db_fwrite (bfd, vector[idx].iov_base, + vector[idx].iov_len, c_off); + if (c_ret < 0) { gf_log (this->name, GF_LOG_ERROR, - "failed to do bdb_db_put at offset: " - "%"PRIu64" for file: %s", - c_off, bfd->key); + "WRITEV %"PRId64" - %"PRId32",%"PRId64": EINVAL" + "(database write at %"PRId64" failed)", + fd->inode->ino, count, offset, c_off); break; } else { c_off += vector[idx].iov_len; @@ -559,16 +613,15 @@ bdb_writev (call_frame_t *frame, } /* for(idx=0;...)... */ if (c_ret) { - /* write failed */ - gf_log (this->name, GF_LOG_ERROR, - "failed to do bdb_db_put(): %s", - db_strerror (op_ret)); - op_ret = -1; - op_errno = EBADFD; /* TODO: search for a meaningful errno */ + /* write failed after a point, not an error */ + stbuf.st_size = bdb_db_fread (bfd, NULL, 0, 0); + stbuf.st_blocks = BDB_COUNT_BLOCKS (stbuf.st_size, + stbuf.st_blksize); goto out; } + /* NOTE: we want to increment stbuf->st_size, as stored in db */ - stbuf.st_size = op_ret; + stbuf.st_size = op_ret; stbuf.st_blocks = BDB_COUNT_BLOCKS (stbuf.st_size, stbuf.st_blksize); op_errno = 0; @@ -591,9 +644,16 @@ bdb_flush (call_frame_t *frame, GF_VALIDATE_OR_GOTO ("bdb", this, out); GF_VALIDATE_OR_GOTO (this->name, fd, out); - bfd = bdb_extract_bfd (fd, this); - op_errno = EBADFD; - GF_VALIDATE_OR_GOTO (this->name, bfd, out); + BDB_FCTX_GET (fd, this, &bfd); + if (bfd == NULL) { + gf_log (this->name, GF_LOG_DEBUG, + "FLUSH %"PRId64": EBADFD" + "(internal fd not found through fd)", + fd->inode->ino); + op_ret = -1; + op_errno = EBADFD; + goto out; + } /* do nothing */ op_ret = 0; @@ -613,23 +673,27 @@ bdb_release (xlator_t *this, int32_t op_errno = EBADFD; struct bdb_fd *bfd = NULL; - if ((bfd = bdb_extract_bfd (fd, this)) == NULL){ - gf_log (this->name, GF_LOG_ERROR, - "failed to extract %s specific information from fd:%p", - this->name, fd); + BDB_FCTX_GET (fd, this, &bfd); + if (bfd == NULL) { + gf_log (this->name, GF_LOG_DEBUG, + "RELEASE %"PRId64": EBADFD" + "(internal fd not found through fd)", + fd->inode->ino); op_ret = -1; op_errno = EBADFD; - } else { - bctx_unref (bfd->ctx); - bfd->ctx = NULL; + goto out; + } - if (bfd->key) - free (bfd->key); /* we did strdup() in bdb_open() */ - free (bfd); - op_ret = 0; - op_errno = 0; - } /* if((fd->ctx == NULL)...)...else */ + bctx_unref (bfd->ctx); + bfd->ctx = NULL; + if (bfd->key) + FREE (bfd->key); /* we did strdup() in bdb_open() */ + FREE (bfd); + op_ret = 0; + op_errno = 0; + +out: return 0; }/* bdb_release */ @@ -656,15 +720,16 @@ bdb_lk (call_frame_t *frame, { struct flock nullock = {0, }; - gf_bdb_lk_log++; - if (!(gf_bdb_lk_log % GF_UNIVERSAL_ANSWER)) { - gf_log (this->name, GF_LOG_ERROR, - "\"features/posix-locks\" translator is not loaded, " - "you need to use it"); + if (BDB_TIMED_LOG (ENOTSUP, gf_bdb_lk_log)) { + gf_log (this->name, GF_LOG_DEBUG, + "LK %"PRId64": ENOTSUP " + "(load \"features/locks\" translator to enable " + "lock support)", + fd->inode->ino); } frame->root->rsp_refs = NULL; - STACK_UNWIND (frame, -1, ENOSYS, &nullock); + STACK_UNWIND (frame, -1, ENOTSUP, &nullock); return 0; }/* bdb_lk */ @@ -678,8 +743,8 @@ bdb_lk (call_frame_t *frame, * case 1 and 2 are handled by doing lstat() on the @loc. if the file is a * directory or symlink, lstat() succeeds. lookup continues to check if the * @loc belongs to case-3 only if lstat() fails. - * to check for case 3, bdb_lookup does a bdb_db_get() for the given @loc. - * (see description of bdb_db_get() for more details on how @loc is transformed + * to check for case 3, bdb_lookup does a bdb_db_iread() for the given @loc. + * (see description of bdb_db_iread() for more details on how @loc is transformed * into db handle and key). if check for case 1, 2 and 3 fail, we proceed to * conclude that file doesn't exist (case 4). * @@ -741,20 +806,26 @@ bdb_lookup (call_frame_t *frame, if (!strcmp (directory, loc->path)) { /* SPECIAL CASE: looking up root */ op_ret = lstat (real_path, &stbuf); - op_errno = errno; if (op_ret != 0) { - gf_log (this->name, GF_LOG_ERROR, - "failed to lstat on %s (%s)", - real_path, strerror (op_errno)); + op_errno = errno; + gf_log (this->name, GF_LOG_DEBUG, + "LOOKUP %"PRId64" (%s): %s", + loc->ino, loc->path, strerror (op_errno)); goto out; } /* bctx_lookup() returns NULL only when its time to wind up, * we should shutdown functioning */ bctx = bctx_lookup (B_TABLE(this), (char *)loc->path); - op_ret = -1; - op_errno = EINVAL; - GF_VALIDATE_OR_GOTO (this->name, bctx, out); + if (bctx == NULL) { + gf_log (this->name, GF_LOG_DEBUG, + "LOOKUP %"PRId64" (%s): ENOMEM" + "(failed to lookup database handle)", + loc->ino, loc->path); + op_ret = -1; + op_errno = ENOMEM; + goto out; + } stbuf.st_ino = 1; stbuf.st_mode = private->dir_mode; @@ -767,80 +838,99 @@ bdb_lookup (call_frame_t *frame, op_ret = lstat (real_path, &stbuf); if ((op_ret == 0) && (S_ISDIR (stbuf.st_mode))){ bctx = bctx_lookup (B_TABLE(this), (char *)loc->path); - op_ret = -1; - op_errno = ENOMEM; - GF_VALIDATE_OR_GOTO (this->name, bctx, out); + if (bctx == NULL) { + gf_log (this->name, GF_LOG_DEBUG, + "LOOKUP %"PRId64"/%s (%s): ENOMEM" + "(failed to lookup database handle)", + loc->parent->ino, loc->name, loc->path); + op_ret = -1; + op_errno = ENOMEM; + goto out; + } if (loc->ino) { /* revalidating directory inode */ - gf_log (this->name, GF_LOG_DEBUG, - "revalidating directory %s", - (char *)loc->path); stbuf.st_ino = loc->ino; } else { - stbuf.st_ino = bdb_inode_transform (stbuf.st_ino, bctx); + stbuf.st_ino = bdb_inode_transform (loc->parent->ino, + key_string, + strlen (key_string)); } stbuf.st_mode = private->dir_mode; + op_ret = 0; - op_errno = 0; goto out; + } else if (op_ret == 0) { /* a symlink */ - gf_log (this->name, GF_LOG_DEBUG, - "lookup called for symlink: %s", - loc->path); bctx = bctx_parent (B_TABLE(this), loc->path); - op_ret = -1; - op_errno = ENOMEM; - GF_VALIDATE_OR_GOTO (this->name, bctx, out); + if (bctx == NULL) { + gf_log (this->name, GF_LOG_DEBUG, + "LOOKUP %"PRId64"/%s (%s): ENOMEM" + "(failed to lookup database handle)", + loc->parent->ino, loc->name, loc->path); + op_ret = -1; + op_errno = ENOMEM; + goto out; + } if (loc->ino) { stbuf.st_ino = loc->ino; } else { - stbuf.st_ino = bdb_inode_transform (stbuf.st_ino, bctx); + stbuf.st_ino = bdb_inode_transform (loc->parent->ino, + key_string, + strlen (key_string)); } + stbuf.st_mode = private->symlink_mode; + op_ret = 0; - op_errno = 0; goto out; + } /* for regular files */ bctx = bctx_parent (B_TABLE(this), loc->path); - op_ret = -1; - op_errno = ENOENT; - GF_VALIDATE_OR_GOTO (this->name, bctx, out); + if (bctx == NULL) { + gf_log (this->name, GF_LOG_DEBUG, + "LOOKUP %"PRId64"/%s (%s): ENOMEM" + "(failed to lookup database handle for parent)", + loc->parent->ino, loc->name, loc->path); + op_ret = -1; + op_errno = ENOMEM; + goto out; + } if (GF_FILE_CONTENT_REQUESTED(xattr_req, &need_xattr)) { - entry_size = bdb_db_get (bctx, NULL, - loc->path, &file_content, - 0, 0); + entry_size = bdb_db_iread (bctx, key_string, &file_content); } else { - entry_size = bdb_db_get (bctx, NULL, loc->path, NULL, - 0, 0); + entry_size = bdb_db_iread (bctx, key_string, NULL); } op_ret = entry_size; - op_errno = ENOENT; if (op_ret == -1) { gf_log (this->name, GF_LOG_DEBUG, - "returning ENOENT for %s", - loc->path); + "LOOKUP %"PRId64"/%s (%s): ENOENT" + "(database entry not found)", + loc->parent->ino, loc->name, loc->path); + op_errno = ENOENT; goto out; } MAKE_REAL_PATH_TO_STORAGE_DB (db_path, this, bctx->directory); op_ret = lstat (db_path, &stbuf); - op_errno = errno; if (op_ret != 0) { - gf_log (this->name, GF_LOG_ERROR, - "failed to lstat on %s (%s)", - db_path, strerror (op_errno)); + op_errno = errno; + gf_log (this->name, GF_LOG_DEBUG, + "LOOKUP %"PRId64"/%s (%s): %s", + loc->parent->ino, loc->name, loc->path, + strerror (op_errno)); goto out; } - if ((need_xattr >= entry_size) - && (entry_size) && (file_content)) { + if (entry_size + && (need_xattr >= entry_size) + && (file_content)) { xattr = dict_new (); op_ret = dict_set_dynptr (xattr, "glusterfs.content", file_content, entry_size); @@ -861,7 +951,9 @@ bdb_lookup (call_frame_t *frame, stbuf.st_blksize); } else { /* fresh lookup, create an inode number */ - stbuf.st_ino = bdb_inode_transform (stbuf.st_ino, bctx); + stbuf.st_ino = bdb_inode_transform (loc->parent->ino, + key_string, + strlen (key_string)); stbuf.st_size = entry_size; stbuf.st_blocks = BDB_COUNT_BLOCKS (stbuf.st_size, stbuf.st_blksize); @@ -930,21 +1022,28 @@ bdb_stat (call_frame_t *frame, } bctx = bctx_parent (B_TABLE(this), loc->path); - op_ret = -1; - op_errno = ENOENT; - GF_VALIDATE_OR_GOTO (this->name, bctx, out); + if (bctx == NULL) { + gf_log (this->name, GF_LOG_DEBUG, + "STAT %"PRId64" (%s): ENOMEM" + "(no database handle for parent)", + loc->ino, loc->path); + op_ret = -1; + op_errno = ENOMEM; + goto out; + } MAKE_REAL_PATH_TO_STORAGE_DB (db_path, this, bctx->directory); op_ret = lstat (db_path, &stbuf); - op_errno = errno; - if (op_ret != 0) { - gf_log (this->name, GF_LOG_ERROR, - "failed to lstat on %s (%s)", - db_path, strerror (op_errno)); + if (op_ret < 0) { + op_errno = errno; + gf_log (this->name, GF_LOG_DEBUG, + "STAT %"PRId64" (%s): %s" + "(failed to stat on database file)", + loc->ino, loc->path, strerror (op_errno)); goto out; } - stbuf.st_size = bdb_db_get (bctx, NULL, loc->path, NULL, 0, 0); + stbuf.st_size = bdb_db_iread (bctx, loc->path, NULL); stbuf.st_blocks = BDB_COUNT_BLOCKS (stbuf.st_size, stbuf.st_blksize); stbuf.st_ino = loc->inode->ino; @@ -999,34 +1098,70 @@ bdb_opendir (call_frame_t *frame, MAKE_REAL_PATH (real_path, this, loc->path); bctx = bctx_lookup (B_TABLE(this), (char *)loc->path); - op_errno = EBADFD; - GF_VALIDATE_OR_GOTO (this->name, bctx, out); + if (bctx == NULL) { + gf_log (this->name, GF_LOG_DEBUG, + "OPENDIR %"PRId64" (%s): ENOMEM" + "(no database handle for directory)", + loc->ino, loc->path); + op_ret = -1; + op_errno = ENOMEM; + goto out; + } bfd = CALLOC (1, sizeof (*bfd)); - op_errno = ENOMEM; - GF_VALIDATE_OR_GOTO (this->name, bfd, out); + if (bfd == NULL) { + gf_log (this->name, GF_LOG_DEBUG, + "OPENDIR %"PRId64" (%s): ENOMEM" + "(failed to allocate memory for internal fd)", + loc->ino, loc->path); + op_ret = -1; + op_errno = ENOMEM; + goto err; + } bfd->dir = opendir (real_path); - op_errno = errno; - GF_VALIDATE_OR_GOTO (this->name, bfd->dir, out); + if (bfd->dir == NULL) { + op_ret = -1; + op_errno = errno; + gf_log (this->name, GF_LOG_DEBUG, + "OPENDIR %"PRId64" (%s): %s", + loc->ino, loc->path, strerror (op_errno)); + goto err; + } /* NOTE: bctx_lookup() return bctx with ref */ bfd->ctx = bctx; bfd->path = strdup (real_path); - op_errno = ENOMEM; - GF_VALIDATE_OR_GOTO (this->name, bfd->path, out); + if (bfd == NULL) { + gf_log (this->name, GF_LOG_DEBUG, + "OPENDIR %"PRId64" (%s): ENOMEM" + "(failed to allocate memory for internal fd->path)", + loc->ino, loc->path); + op_ret = -1; + op_errno = ENOMEM; + goto err; + } - BDB_SET_BFD (this, fd, bfd); + BDB_FCTX_SET (fd, this, bfd); op_ret = 0; out: frame->root->rsp_refs = NULL; STACK_UNWIND (frame, op_ret, op_errno, fd); + return 0; +err: + if (bctx) + bctx_unref (bctx); + if (bfd) { + if (bfd->dir) + closedir (bfd->dir); + + FREE (bfd); + } return 0; }/* bdb_opendir */ - int32_t bdb_getdents (call_frame_t *frame, xlator_t *this, @@ -1035,192 +1170,281 @@ bdb_getdents (call_frame_t *frame, off_t off, int32_t flag) { - int32_t op_ret = -1; - int32_t op_errno = EINVAL; + struct bdb_dir *bfd = NULL; + int32_t op_ret = -1; + int32_t op_errno = EINVAL; + size_t filled = 0; + dir_entry_t entries = {0, }; + dir_entry_t *this_entry = NULL; + char *entry_path = NULL; + struct dirent *dirent = NULL; + off_t in_case = 0; + int32_t this_size = 0; + DBC *cursorp = NULL; int32_t ret = -1; int32_t real_path_len = 0; int32_t entry_path_len = 0; int32_t count = 0; - char *real_path = NULL; - char *entry_path = NULL; - char *db_path = NULL; - dir_entry_t entries = {0, }; - dir_entry_t *tmp = NULL; - DIR *dir = NULL; - struct dirent *dirent = NULL; - struct bdb_dir *bfd = NULL; + off_t offset = 0; + size_t tmp_name_len = 0; struct stat db_stbuf = {0,}; struct stat buf = {0,}; - DBC *cursorp = NULL; - size_t tmp_name_len = 0; GF_VALIDATE_OR_GOTO ("bdb", frame, out); GF_VALIDATE_OR_GOTO ("bdb", this, out); GF_VALIDATE_OR_GOTO (this->name, fd, out); - bfd = bdb_extract_bfd (fd, this); - op_errno = EBADFD; - GF_VALIDATE_OR_GOTO (this->name, bfd, out); + BDB_FCTX_GET (fd, this, &bfd); + if (bfd == NULL) { + gf_log (this->name, GF_LOG_DEBUG, + "GETDENTS %"PRId64" - %"PRId32",%"PRId64" %o: EBADFD " + "(failed to find internal context in fd)", + fd->inode->ino, size, off, flag); + op_errno = EBADFD; + op_ret = -1; + goto out; + } - MAKE_REAL_PATH (real_path, this, bfd->path); - dir = bfd->dir; + op_ret = bdb_cursor_open (bfd->ctx, &cursorp); + if (op_ret < 0) { + gf_log (this->name, GF_LOG_DEBUG, + "GETDENTS %"PRId64" - %"PRId32",%"PRId64": EBADFD " + "(failed to open cursor to database handle)", + fd->inode->ino, size, off); + op_errno = EBADFD; + goto out; + } - while ((dirent = readdir (dir))) { - if (!dirent) + if (off) { + DBT sec = {0,}, pri = {0,}, val = {0,}; + sec.data = &(off); + sec.size = sizeof (off); + sec.flags = DB_DBT_USERMEM; + val.dlen = 0; + val.doff = 0; + val.flags = DB_DBT_PARTIAL; + + op_ret = bdb_cursor_get (cursorp, &sec, &pri, &val, DB_SET); + if (op_ret == DB_NOTFOUND) { + offset = off; + goto dir_read; + } + } + + while (filled <= size) { + DBT sec = {0,}, pri = {0,}, val = {0,}; + + this_entry = NULL; + + sec.flags = DB_DBT_MALLOC; + pri.flags = DB_DBT_MALLOC; + val.dlen = 0; + val.doff = 0; + val.flags = DB_DBT_PARTIAL; + op_ret = bdb_cursor_get (cursorp, &sec, &pri, &val, DB_NEXT); + + if (op_ret == DB_NOTFOUND) { + /* we reached end of the directory */ + op_ret = 0; + op_errno = 0; + break; + } else if (op_ret < 0) { + gf_log (this->name, GF_LOG_DEBUG, + "GETDENTS %"PRId64" - %"PRId32",%"PRId64":" + "(failed to read the next entry from database)", + fd->inode->ino, size, off); + op_errno = ENOENT; break; + } /* if (op_ret == DB_NOTFOUND)...else if...else */ - if (IS_BDB_PRIVATE_FILE(dirent->d_name)) { + if (pri.data == NULL) { + /* NOTE: currently ignore when we get key.data == NULL. + * FIXME: we should not get key.data = NULL */ + gf_log (this->name, GF_LOG_DEBUG, + "GETDENTS %"PRId64" - %"PRId32",%"PRId64":" + "(null key read for entry from database)", + fd->inode->ino, size, off); continue; + }/* if(key.data)...else */ + + this_entry = CALLOC (1, sizeof (*this_entry)); + if (this_entry == NULL) { + gf_log (this->name, GF_LOG_DEBUG, + "GETDENTS %"PRId64" - %"PRId32",%"PRId64" - %s:" + "(failed to allocate memory for an entry)", + fd->inode->ino, size, off, strerror (errno)); + op_errno = ENOMEM; + op_ret = -1; + goto out; } + this_entry->name = CALLOC (pri.size + 1, sizeof (char)); + if (this_entry->name == NULL) { + gf_log (this->name, GF_LOG_DEBUG, + "GETDENTS %"PRId64" - %"PRId32",%"PRId64" - %s:" + "(failed to allocate memory for an " + "entry->name)", + fd->inode->ino, size, off, strerror (errno)); + op_errno = ENOMEM; + op_ret = -1; + goto out; + } + + memcpy (this_entry->name, pri.data, pri.size); + this_entry->buf = db_stbuf; + this_entry->buf.st_size = bdb_db_iread (bfd->ctx, + this_entry->name, NULL); + this_entry->buf.st_blocks = BDB_COUNT_BLOCKS ( + this_entry->buf.st_size, + this_entry->buf.st_blksize); + + this_entry->buf.st_ino = bdb_inode_transform (fd->inode->ino, + pri.data, + pri.size); + count++; + + this_entry->next = entries.next; + this_entry->link = ""; + entries.next = this_entry; + /* if size is 0, count can never be = size, + * so entire dir is read */ + if (sec.data) + FREE (sec.data); + + if (pri.data) + FREE (pri.data); + + if (count == size) + break; + }/* while */ + bdb_cursor_close (bfd->ctx, cursorp); + op_ret = count; + op_errno = 0; + if (count >= size) + goto out; +dir_read: + /* hungry kyaa? */ + if (!offset) { + rewinddir (bfd->dir); + } else { + seekdir (bfd->dir, offset); + } + + while (filled <= size) { + this_entry = NULL; + this_size = 0; + + in_case = telldir (bfd->dir); + dirent = readdir (bfd->dir); + if (!dirent) + break; + + if (IS_BDB_PRIVATE_FILE(dirent->d_name)) + continue; + tmp_name_len = strlen (dirent->d_name); if (entry_path_len < (real_path_len + 1 + (tmp_name_len) + 1)) { entry_path_len = real_path_len + tmp_name_len + 1024; entry_path = realloc (entry_path, entry_path_len); - op_errno = ENOMEM; - GF_VALIDATE_OR_GOTO (this->name, entry_path, out); + if (entry_path == NULL) { + gf_log (this->name, GF_LOG_DEBUG, + "GETDENTS %"PRId64" - %"PRId32"," + "%"PRId64" - %s: (failed to allocate " + "memory for an entry_path)", + fd->inode->ino, size, off, + strerror (errno)); + op_errno = ENOMEM; + op_ret = -1; + goto out; + } } strncpy (&entry_path[real_path_len+1], dirent->d_name, tmp_name_len); op_ret = stat (entry_path, &buf); - op_errno = errno; - if (op_ret != 0) { - gf_log (this->name, GF_LOG_ERROR, - "failed to lstat on %s (%s)", - entry_path, strerror (op_errno)); - goto out; + if (op_ret < 0) { + op_errno = errno; + gf_log (this->name, GF_LOG_DEBUG, + "GETDENTS %"PRId64" - %"PRId32",%"PRId64" - %s:" + " (failed to stat on an entry '%s')", + fd->inode->ino, size, off, + strerror (errno), entry_path); + goto out; /* FIXME: shouldn't we continue here */ } if ((flag == GF_GET_DIR_ONLY) && - (ret != -1 && !S_ISDIR(buf.st_mode))) { + ((ret != -1) && (!S_ISDIR(buf.st_mode)))) { continue; } - tmp = CALLOC (1, sizeof (*tmp)); - op_errno = ENOMEM; - GF_VALIDATE_OR_GOTO (this->name, tmp, out); + this_entry = CALLOC (1, sizeof (*this_entry)); + if (this_entry == NULL) { + gf_log (this->name, GF_LOG_DEBUG, + "GETDENTS %"PRId64" - %"PRId32",%"PRId64" - %s:" + "(failed to allocate memory for an entry)", + fd->inode->ino, size, off, strerror (errno)); + op_errno = ENOMEM; + op_ret = -1; + goto out; + } - tmp->name = strdup (dirent->d_name); - op_errno = ENOMEM; - GF_VALIDATE_OR_GOTO (this->name, dirent->d_name, out); + this_entry->name = strdup (dirent->d_name); + if (this_entry->name == NULL) { + gf_log (this->name, GF_LOG_DEBUG, + "GETDENTS %"PRId64" - %"PRId32",%"PRId64" - %s:" + "(failed to allocate memory for an " + "entry->name)", + fd->inode->ino, size, off, strerror (errno)); + op_errno = ENOMEM; + op_ret = -1; + goto out; + } - memcpy (&tmp->buf, &buf, sizeof (buf)); + this_entry->buf = buf; - tmp->buf.st_ino = -1; - if (S_ISLNK(tmp->buf.st_mode)) { + this_entry->buf.st_ino = -1; + if (S_ISLNK(this_entry->buf.st_mode)) { char linkpath[ZR_PATH_MAX] = {0,}; ret = readlink (entry_path, linkpath, ZR_PATH_MAX); if (ret != -1) { linkpath[ret] = '\0'; - tmp->link = strdup (linkpath); + this_entry->link = strdup (linkpath); } } else { - tmp->link = ""; + this_entry->link = ""; } count++; - tmp->next = entries.next; - entries.next = tmp; - /* if size is 0, count can never be = size, - so entire dir is read */ + this_entry->next = entries.next; + entries.next = this_entry; + /* if size is 0, count can never be = size, + * so entire dir is read */ if (count == size) break; } - - if ((flag != GF_GET_DIR_ONLY) && (count < size)) { - /* read from db */ - op_ret = bdb_cursor_open (bfd->ctx, &cursorp); - op_errno = EINVAL; - GF_VALIDATE_OR_GOTO (this->name, (op_ret == 0), out); - - MAKE_REAL_PATH_TO_STORAGE_DB (db_path, this, - bfd->ctx->directory); - op_ret = lstat (db_path, &db_stbuf); - op_errno = errno; - if (op_ret != 0) { - gf_log (this->name, GF_LOG_ERROR, - "failed to lstat on %s (%s)", - db_path, strerror (op_errno)); - goto out; - } - - /* read all the entries in database, one after the other and - * put into dictionary */ - while (1) { - DBT key = {0,}, value = {0,}; - - key.flags = DB_DBT_MALLOC; - value.flags = DB_DBT_MALLOC; - op_ret = bdb_cursor_get (cursorp, &key, &value, - DB_NEXT); - - if (op_ret == DB_NOTFOUND) { - gf_log (this->name, GF_LOG_DEBUG, - "end of list of key/value pair in db" - " for directory: %s", - bfd->ctx->directory); - op_ret = 0; - op_errno = 0; - break; - } else if (op_ret != 0){ - gf_log (this->name, GF_LOG_ERROR, - "failed to do cursor get for " - "directory %s: %s", - bfd->ctx->directory, - db_strerror (op_ret)); - op_ret = -1; - op_errno = ENOENT; - break; - } - /* successfully read */ - tmp = CALLOC (1, sizeof (*tmp)); - op_errno = ENOMEM; - GF_VALIDATE_OR_GOTO (this->name, tmp, out); - - tmp->name = CALLOC (1, key.size + 1); - op_errno = ENOMEM; - GF_VALIDATE_OR_GOTO (this->name, tmp->name, out); - - memcpy (tmp->name, key.data, key.size); - tmp->buf = db_stbuf; - tmp->buf.st_size = bdb_db_get (bfd->ctx, NULL, - tmp->name, NULL, - 0, 0); - tmp->buf.st_blocks = BDB_COUNT_BLOCKS (tmp->buf.st_size, \ - tmp->buf.st_blksize); - /* FIXME: wat will be the effect of this? */ - tmp->buf.st_ino = -1; - count++; - - tmp->next = entries.next; - tmp->link = ""; - entries.next = tmp; - /* if size is 0, count can never be = size, so entire dir is read */ - if (count == size) - break; - - free (key.data); - } /* while(1){ } */ - bdb_cursor_close (bfd->ctx, cursorp); - } else { - /* do nothing */ - } - FREE (entry_path); - op_ret = 0; + op_ret = filled; + op_errno = 0; out: frame->root->rsp_refs = NULL; - STACK_UNWIND (frame, op_ret, op_errno, &entries, count); + + gf_log (this->name, GF_LOG_DEBUG, + "GETDENTS %"PRId64" - %"PRId32" (%"PRId32")/%"PRId32"," + "%"PRId64":" + "(failed to read the next entry from database)", + fd->inode->ino, filled, count, size, off); + + STACK_UNWIND (frame, count, op_errno, &entries); while (entries.next) { - tmp = entries.next; + this_entry = entries.next; entries.next = entries.next->next; - FREE (tmp->name); - FREE (tmp); + FREE (this_entry->name); + FREE (this_entry); } + return 0; }/* bdb_getdents */ @@ -1233,34 +1457,43 @@ bdb_releasedir (xlator_t *this, int32_t op_errno = 0; struct bdb_dir *bfd = NULL; - if ((bfd = bdb_extract_bfd (fd, this)) == NULL) { - gf_log (this->name, GF_LOG_ERROR, - "failed to extract fd data from fd=%p", fd); - op_ret = -1; - op_errno = EBADF; + BDB_FCTX_GET (fd, this, &bfd); + if (bfd == NULL) { + gf_log (this->name, GF_LOG_DEBUG, + "RELEASEDIR %"PRId64": EBADFD", + fd->inode->ino); + op_errno = EBADFD; + op_ret = -1; + goto out; + } + + if (bfd->path) { + free (bfd->path); } else { - if (bfd->path) { - free (bfd->path); - } else { - gf_log (this->name, GF_LOG_ERROR, "bfd->path was NULL. fd=%p bfd=%p", - fd, bfd); - } + gf_log (this->name, GF_LOG_DEBUG, + "RELEASEDIR %"PRId64": (bfd->path is NULL)", + fd->inode->ino); + } - if (bfd->dir) { - closedir (bfd->dir); - } else { - gf_log (this->name, GF_LOG_ERROR, - "bfd->dir is NULL."); - } - if (bfd->ctx) { - bctx_unref (bfd->ctx); - } else { - gf_log (this->name, GF_LOG_ERROR, - "bfd->ctx is NULL"); - } - free (bfd); + if (bfd->dir) { + closedir (bfd->dir); + } else { + gf_log (this->name, GF_LOG_DEBUG, + "RELEASEDIR %"PRId64": (bfd->dir is NULL)", + fd->inode->ino); } + if (bfd->ctx) { + bctx_unref (bfd->ctx); + } else { + gf_log (this->name, GF_LOG_DEBUG, + "RELEASEDIR %"PRId64": (bfd->ctx is NULL)", + fd->inode->ino); + } + + free (bfd); + +out: return 0; }/* bdb_releasedir */ @@ -1290,12 +1523,11 @@ bdb_readlink (call_frame_t *frame, if (op_ret > 0) dest[op_ret] = 0; - op_errno = errno; - if (op_ret == -1) { + op_errno = errno; gf_log (this->name, GF_LOG_DEBUG, - "readlink failed on %s: %s", - loc->path, strerror (op_errno)); + "READLINK %"PRId64" (%s): %s", + loc->ino, loc->path, strerror (op_errno)); } out: frame->root->rsp_refs = NULL; @@ -1317,57 +1549,69 @@ bdb_mkdir (call_frame_t *frame, char *real_path = NULL; struct stat stbuf = {0, }; bctx_t *bctx = NULL; + char *key_string = NULL; GF_VALIDATE_OR_GOTO ("bdb", frame, out); GF_VALIDATE_OR_GOTO ("bdb", this, out); GF_VALIDATE_OR_GOTO (this->name, loc, out); + MAKE_KEY_FROM_PATH (key_string, loc->path); MAKE_REAL_PATH (real_path, this, loc->path); op_ret = mkdir (real_path, mode); - op_errno = errno; - if (op_ret != 0) { - gf_log (this->name, GF_LOG_ERROR, - "failed to mkdir %s (%s)", - real_path, strerror (op_errno)); + if (op_ret < 0) { + op_errno = errno; + gf_log (this->name, GF_LOG_DEBUG, + "MKDIR %"PRId64" (%s): %s", + loc->ino, loc->path, strerror (op_errno)); goto out; } op_ret = chown (real_path, frame->root->uid, frame->root->gid); - op_errno = errno; - if (op_ret != 0) { - gf_log (this->name, GF_LOG_ERROR, - "failed to chmod on %s (%s)", - real_path, strerror (op_errno)); + if (op_ret < 0) { + op_errno = errno; + gf_log (this->name, GF_LOG_DEBUG, + "MKDIR %"PRId64" (%s): %s " + "(failed to do chmod)", + loc->ino, loc->path, strerror (op_errno)); goto err; } op_ret = lstat (real_path, &stbuf); - op_errno = errno; - if (op_ret != 0) { - gf_log (this->name, GF_LOG_ERROR, - "failed to lstat on %s (%s)", - real_path, strerror (op_errno)); + if (op_ret < 0) { + op_errno = errno; + gf_log (this->name, GF_LOG_DEBUG, + "MKDIR %"PRId64" (%s): %s " + "(failed to do lstat)", + loc->ino, loc->path, strerror (op_errno)); goto err; } bctx = bctx_lookup (B_TABLE(this), (char *)loc->path); - op_errno = ENOMEM; - GF_VALIDATE_OR_GOTO (this->name, bctx, err); + if (bctx == NULL) { + gf_log (this->name, GF_LOG_DEBUG, + "MKDIR %"PRId64" (%s): ENOMEM" + "(no database handle for parent)", + loc->ino, loc->path); + op_ret = -1; + op_errno = ENOMEM; + goto err; + } - stbuf.st_ino = bdb_inode_transform (stbuf.st_ino, bctx); + stbuf.st_ino = bdb_inode_transform (loc->parent->ino, key_string, + strlen (key_string)); goto out; err: ret = rmdir (real_path); - if (ret != 0) { - gf_log (this->name, GF_LOG_ERROR, - "failed to rmdir the directory created (%s)", - strerror (errno)); + if (ret < 0) { + gf_log (this->name, GF_LOG_DEBUG, + "MKDIR %"PRId64" (%s): %s" + "(failed to do rmdir)", + loc->ino, loc->path, strerror (errno)); } - out: if (bctx) { /* NOTE: bctx_unref always returns success, @@ -1391,27 +1635,36 @@ bdb_unlink (call_frame_t *frame, int32_t op_errno = EINVAL; bctx_t *bctx = NULL; char *real_path = NULL; + char *key_string = NULL; GF_VALIDATE_OR_GOTO ("bdb", frame, out); GF_VALIDATE_OR_GOTO ("bdb", this, out); GF_VALIDATE_OR_GOTO (this->name, loc, out); bctx = bctx_parent (B_TABLE(this), loc->path); - op_errno = ENOENT; - GF_VALIDATE_OR_GOTO (this->name, bctx, out); + if (bctx == NULL) { + gf_log (this->name, GF_LOG_DEBUG, + "UNLINK %"PRId64" (%s): ENOMEM" + "(no database handle for parent)", + loc->ino, loc->path); + op_ret = -1; + op_errno = ENOMEM; + goto out; + } - op_ret = bdb_db_del (bctx, NULL, loc->path); + MAKE_KEY_FROM_PATH (key_string, loc->path); + op_ret = bdb_db_iremove (bctx, key_string); if (op_ret == DB_NOTFOUND) { MAKE_REAL_PATH (real_path, this, loc->path); op_ret = unlink (real_path); - op_errno = errno; if (op_ret != 0) { - gf_log (this->name, GF_LOG_ERROR, - "failed to unlink on %s (%s)", - real_path, strerror (op_errno)); + op_errno = errno; + gf_log (this->name, GF_LOG_DEBUG, + "UNLINK %"PRId64" (%s): %s" + "(symlink unlink failed)", + loc->ino, loc->path, strerror (op_errno)); goto out; } - } else if (op_ret == 0) { op_errno = 0; } @@ -1430,7 +1683,7 @@ out: -int32_t +static int32_t bdb_do_rmdir (xlator_t *this, loc_t *loc) { @@ -1448,38 +1701,46 @@ bdb_do_rmdir (xlator_t *this, MAKE_REAL_PATH (real_path, this, loc->path); bctx = bctx_lookup (B_TABLE(this), loc->path); - GF_VALIDATE_OR_GOTO (this->name, bctx, out); + if (bctx == NULL) { + ret = -ENOMEM; + goto out; + } LOCK(&bctx->lock); { - if (bctx->dbp == NULL) { + if ((bctx->primary == NULL) + || (bctx->secondary == NULL)) { goto unlock; } - ret = bctx->dbp->close (bctx->dbp, 0); - GF_VALIDATE_OR_GOTO (this->name, (ret == 0), unlock); + ret = bctx->primary->close (bctx->primary, 0); + if (ret < 0) { + ret = -EINVAL; + } - bctx->dbp = NULL; + ret = bctx->secondary->close (bctx->secondary, 0); + if (ret < 0) { + ret = -EINVAL; + } - ret = dbenv->dbremove (dbenv, NULL, bctx->db_path, NULL, 0); + ret = dbenv->dbremove (dbenv, NULL, bctx->db_path, + "primary", 0); + if (ret < 0) { + ret = -EBUSY; + } + + ret = dbenv->dbremove (dbenv, NULL, bctx->db_path, + "secondary", 0); if (ret != 0) { - gf_log (this->name, GF_LOG_ERROR, - "failed to DB_ENV->dbremove() on path %s: %s", - loc->path, db_strerror (ret)); + ret = -EBUSY; } } unlock: UNLOCK(&bctx->lock); if (ret) { - gf_log (this->name, GF_LOG_ERROR, - "failed to remove db %s: %s", - bctx->db_path, db_strerror (ret)); - ret = -1; goto out; } - gf_log (this->name, GF_LOG_DEBUG, - "removed db %s", bctx->db_path); ret = rmdir (real_path); out: @@ -1498,22 +1759,31 @@ bdb_rmdir (call_frame_t *frame, loc_t *loc) { int32_t op_ret = -1; - int32_t op_errno = ENOTEMPTY; + int32_t op_errno = 0; - if (!is_dir_empty (this, loc)) { + op_ret = is_dir_empty (this, loc); + if (op_ret < 0) { + op_errno = -op_ret; gf_log (this->name, GF_LOG_DEBUG, - "rmdir: directory %s not empty", - loc->path); + "RMDIR %"PRId64" (%s): %s" + "(internal rmdir routine returned error)", + loc->ino, loc->path, strerror (op_errno)); + } else if (op_ret == 0) { + op_ret = -1; op_errno = ENOTEMPTY; - op_ret = -1; + gf_log (this->name, GF_LOG_DEBUG, + "RMDIR %"PRId64" (%s): ENOTEMPTY", + loc->ino, loc->path); goto out; } op_ret = bdb_do_rmdir (this, loc); - if (op_ret != 0) { - gf_log (this->name, GF_LOG_ERROR, - "failed to bdb_do_rmdir on %s", - loc->path); + if (op_ret < 0) { + op_errno = -op_ret; + gf_log (this->name, GF_LOG_DEBUG, + "RMDIR %"PRId64" (%s): %s" + "(internal rmdir routine returned error)", + loc->ino, loc->path, strerror (op_errno)); goto out; } @@ -1536,6 +1806,7 @@ bdb_symlink (call_frame_t *frame, struct stat stbuf = {0,}; struct bdb_private *private = NULL; bctx_t *bctx = NULL; + char *key_string = NULL; GF_VALIDATE_OR_GOTO ("bdb", frame, out); GF_VALIDATE_OR_GOTO ("bdb", this, out); @@ -1545,23 +1816,35 @@ bdb_symlink (call_frame_t *frame, private = this->private; GF_VALIDATE_OR_GOTO (this->name, private, out); + MAKE_KEY_FROM_PATH (key_string, loc->path); + MAKE_REAL_PATH (real_path, this, loc->path); op_ret = symlink (linkname, real_path); op_errno = errno; if (op_ret == 0) { op_ret = lstat (real_path, &stbuf); - op_errno = errno; if (op_ret != 0) { - gf_log (this->name, GF_LOG_ERROR, - "failed to lstat on %s (%s)", - real_path, strerror (op_errno)); + op_errno = errno; + gf_log (this->name, GF_LOG_DEBUG, + "SYMLINK %"PRId64" (%s): %s", + loc->ino, loc->path, strerror (op_errno)); goto err; } bctx = bctx_parent (B_TABLE(this), loc->path); - GF_VALIDATE_OR_GOTO (this->name, bctx, err); + if (bctx == NULL) { + gf_log (this->name, GF_LOG_DEBUG, + "SYMLINK %"PRId64" (%s): ENOMEM" + "(no database handle for parent)", + loc->ino, loc->path); + op_ret = -1; + op_errno = ENOMEM; + goto err; + } - stbuf.st_ino = bdb_inode_transform (stbuf.st_ino, bctx); + stbuf.st_ino = bdb_inode_transform (loc->parent->ino, + key_string, + strlen (key_string)); stbuf.st_mode = private->symlink_mode; goto out; @@ -1570,9 +1853,10 @@ err: op_ret = unlink (real_path); op_errno = errno; if (op_ret != 0) { - gf_log (this->name, GF_LOG_ERROR, - "failed to unlink the previously created symlink (%s)", - strerror (op_errno)); + gf_log (this->name, GF_LOG_DEBUG, + "SYMLINK %"PRId64" (%s): %s" + "(failed to unlink the created symlink)", + loc->ino, loc->path, strerror (op_errno)); } op_ret = -1; op_errno = ENOENT; @@ -1608,9 +1892,14 @@ bdb_chmod (call_frame_t *frame, op_ret = lstat (real_path, &stbuf); op_errno = errno; if (op_ret != 0) { - gf_log (this->name, GF_LOG_ERROR, - "failed to lstat on %s (%s)", - real_path, strerror (op_errno)); + if (op_errno == ENOENT) { + op_errno = EPERM; + } else { + gf_log (this->name, GF_LOG_DEBUG, + "CHMOD %"PRId64" (%s): %s" + "(lstat failed)", + loc->ino, loc->path, strerror (op_errno)); + } goto out; } @@ -1644,11 +1933,16 @@ bdb_chown (call_frame_t *frame, MAKE_REAL_PATH (real_path, this, loc->path); op_ret = lstat (real_path, &stbuf); - op_errno = errno; if (op_ret != 0) { - gf_log (this->name, GF_LOG_ERROR, - "failed to lstat on %s (%s)", - real_path, strerror (op_errno)); + op_errno = errno; + if (op_errno == ENOENT) { + op_errno = EPERM; + } else { + gf_log (this->name, GF_LOG_DEBUG, + "CHOWN %"PRId64" (%s): %s" + "(lstat failed)", + loc->ino, loc->path, strerror (op_errno)); + } goto out; } @@ -1682,8 +1976,15 @@ bdb_truncate (call_frame_t *frame, GF_VALIDATE_OR_GOTO (this->name, loc, out); bctx = bctx_parent (B_TABLE(this), loc->path); - op_errno = ENOENT; - GF_VALIDATE_OR_GOTO (this->name, bctx, out); + if (bctx == NULL) { + gf_log (this->name, GF_LOG_DEBUG, + "TRUNCATE %"PRId64" (%s): ENOMEM" + "(no database handle for parent)", + loc->ino, loc->path); + op_ret = -1; + op_errno = ENOMEM; + goto out; + } MAKE_REAL_PATH (real_path, this, loc->path); MAKE_KEY_FROM_PATH (key_string, loc->path); @@ -1691,26 +1992,29 @@ bdb_truncate (call_frame_t *frame, /* now truncate */ MAKE_REAL_PATH_TO_STORAGE_DB (db_path, this, bctx->directory); op_ret = lstat (db_path, &stbuf); - op_errno = errno; if (op_ret != 0) { - gf_log (this->name, GF_LOG_ERROR, - "failed to lstat on %s (%s)", - db_path, strerror (op_errno)); + op_errno = errno; + gf_log (this->name, GF_LOG_DEBUG, + "TRUNCATE %"PRId64" (%s): %s" + "(lstat on database file failed)", + loc->ino, loc->path, strerror (op_errno)); goto out; } if (loc->inode->ino) { stbuf.st_ino = loc->inode->ino; }else { - stbuf.st_ino = bdb_inode_transform (stbuf.st_ino, bctx); + stbuf.st_ino = bdb_inode_transform (loc->parent->ino, + key_string, + strlen (key_string)); } - op_ret = bdb_db_put (bctx, NULL, key_string, NULL, 0, 1, 0); - if (op_ret == -1) { + op_ret = bdb_db_itruncate (bctx, key_string); + if (op_ret < 0) { gf_log (this->name, GF_LOG_DEBUG, - "failed to do bdb_db_put: %s", - db_strerror (op_ret)); - op_ret = -1; + "TRUNCATE %"PRId64" (%s): EINVAL" + "(truncating entry in database failed - %s)", + loc->ino, loc->path, db_strerror (op_ret)); op_errno = EINVAL; /* TODO: better errno */ } @@ -1745,40 +2049,44 @@ bdb_utimens (call_frame_t *frame, GF_VALIDATE_OR_GOTO (this->name, loc, out); MAKE_REAL_PATH (real_path, this, loc->path); - op_ret = lstat (real_path, &stbuf); - op_errno = errno; + op_ret = sys_lstat (real_path, &stbuf); if (op_ret != 0) { - op_errno = EPERM; - gf_log (this->name, GF_LOG_ERROR, - "failed to lstat on %s (%s)", - real_path, strerror (op_errno)); + op_errno = errno; + if (op_errno == ENOENT) { + op_errno = EPERM; + } else { + gf_log (this->name, GF_LOG_DEBUG, + "UTIMENS %"PRId64" (%s): %s", + loc->ino, loc->path, strerror (op_errno)); + } goto out; } /* directory or symlink */ - tv[0].tv_sec = ts[0].tv_sec; + tv[0].tv_sec = ts[0].tv_sec; tv[0].tv_usec = ts[0].tv_nsec / 1000; - tv[1].tv_sec = ts[1].tv_sec; + tv[1].tv_sec = ts[1].tv_sec; tv[1].tv_usec = ts[1].tv_nsec / 1000; op_ret = lutimes (real_path, tv); - if (op_ret == -1 && errno == ENOSYS) { - op_ret = utimes (real_path, tv); + if ((op_ret == -1) && (errno == ENOSYS)) { + op_ret = sys_utimes (real_path, tv); } - op_errno = errno; + if (op_ret == -1) { - gf_log (this->name, GF_LOG_WARNING, - "utimes on %s failed: %s", - loc->path, strerror (op_errno)); + op_errno = errno; + gf_log (this->name, GF_LOG_DEBUG, + "UTIMENS %"PRId64" (%s): %s", + loc->ino, loc->path, strerror (op_errno)); goto out; } - op_ret = lstat (real_path, &stbuf); - op_errno = errno; + op_ret = sys_lstat (real_path, &stbuf); if (op_ret != 0) { - gf_log (this->name, GF_LOG_ERROR, - "failed to lstat on %s (%s)", - real_path, strerror (op_errno)); + op_errno = errno; + gf_log (this->name, GF_LOG_DEBUG, + "UTIMENS %"PRId64" (%s): %s", + loc->ino, loc->path, strerror (op_errno)); goto out; } @@ -1858,52 +2166,54 @@ bdb_setxattr (call_frame_t *frame, MAKE_REAL_PATH (real_path, this, loc->path); if (!S_ISDIR (loc->inode->st_mode)) { op_ret = -1; - op_errno = EPERM; + op_errno = ENOATTR; goto out; } while (trav) { - if (ZR_FILE_CONTENT_REQUEST(trav->key) ) { - bctx = bctx_lookup (B_TABLE(this), loc->path); - op_errno = EINVAL; - GF_VALIDATE_OR_GOTO (this->name, bctx, out); + if (GF_FILE_CONTENT_REQUEST(trav->key) ) { + key = BDB_KEY_FROM_FREQUEST_KEY(trav->key); - key = &(trav->key[15]); + bctx = bctx_lookup (B_TABLE(this), loc->path); + if (bctx == NULL) { + gf_log (this->name, GF_LOG_DEBUG, + "SETXATTR %"PRId64" (%s) - %s: ENOMEM" + "(no database handle for directory)", + loc->ino, loc->path, key); + op_ret = -1; + op_errno = ENOMEM; + goto out; + } if (flags & XATTR_REPLACE) { - /* replace only if previously exists, otherwise - * error out */ - op_ret = bdb_db_get (bctx, NULL, key, - NULL, 0, 0); + op_ret = bdb_db_itruncate (bctx, key); if (op_ret == -1) { /* key doesn't exist in database */ gf_log (this->name, GF_LOG_DEBUG, - "cannot XATTR_REPLACE, xattr %s" - " doesn't exist on path %s", - key, loc->path); + "SETXATTR %"PRId64" (%s) - %s:" + " (entry not present in " + "database)", + loc->ino, loc->path, key); op_ret = -1; - op_errno = ENOENT; + op_errno = ENOATTR; break; } - op_ret = bdb_db_put (bctx, NULL, - key, trav->value->data, - trav->value->len, - op_ret, - BDB_TRUNCATE_RECORD); + op_ret = bdb_db_iwrite (bctx, key, + trav->value->data, + trav->value->len); if (op_ret != 0) { op_ret = -1; - op_errno = EINVAL; + op_errno = ENOATTR; break; } } else { /* fresh create */ - op_ret = bdb_db_put (bctx, NULL, key, - trav->value->data, - trav->value->len, - 0, 0); + op_ret = bdb_db_iwrite (bctx, key, + trav->value->data, + trav->value->len); if (op_ret != 0) { op_ret = -1; - op_errno = EINVAL; + op_errno = EEXIST; break; } else { op_ret = 0; @@ -1918,25 +2228,26 @@ bdb_setxattr (call_frame_t *frame, } else { /* do plain setxattr */ op_ret = lsetxattr (real_path, - trav->key, - trav->value->data, + trav->key, trav->value->data, trav->value->len, flags); op_errno = errno; - if ((op_ret == -1) && (op_errno != ENOENT)) { - if (op_errno == ENOTSUP) { - gf_bdb_xattr_log++; - if (!(gf_bdb_xattr_log % GF_UNIVERSAL_ANSWER)) { - gf_log (this->name, GF_LOG_WARNING, - "Extended Attributes support not present."\ - "Please check"); - } - } else { - gf_log (this->name, GF_LOG_DEBUG, - "setxattr failed on %s (%s)", - loc->path, strerror (op_errno)); - } + + if ((op_errno == ENOATTR) || (op_errno == EEXIST)) { + /* don't log, normal behaviour */ + ; + } else if (BDB_TIMED_LOG (op_errno, gf_bdb_xattr_log)) { + gf_log (this->name, GF_LOG_DEBUG, + "SETXATTR %"PRId64" (%s) - %s: %s", + loc->ino, loc->path, trav->key, + strerror (op_errno)); + /* do not continue, break out */ break; + } else { + gf_log (this->name, GF_LOG_DEBUG, + "SETXATTR %"PRId64" (%s) - %s: %s", + loc->ino, loc->path, trav->key, + strerror (op_errno)); } } /* if(ZR_FILE_CONTENT_REQUEST())...else */ trav = trav->next; @@ -1988,109 +2299,131 @@ bdb_getxattr (call_frame_t *frame, GF_VALIDATE_OR_GOTO (this->name, loc, out); GF_VALIDATE_OR_GOTO (this->name, name, out); - dict = get_new_dict (); + dict = dict_new (); GF_VALIDATE_OR_GOTO (this->name, dict, out); if (!S_ISDIR (loc->inode->st_mode)) { gf_log (this->name, GF_LOG_DEBUG, - "operation not permitted on a non-directory file: %s", - loc->path); - op_ret = -1; - op_errno = ENODATA; + "GETXATTR %"PRId64" (%s) - %s: ENOATTR " + "(not a directory)", + loc->ino, loc->path, name); + op_ret = -1; + op_errno = ENOATTR; goto out; } - if (name && ZR_FILE_CONTENT_REQUEST(name)) { + if (name && GF_FILE_CONTENT_REQUEST(name)) { bctx = bctx_lookup (B_TABLE(this), loc->path); - op_errno = EINVAL; - GF_VALIDATE_OR_GOTO (this->name, bctx, out); + if (bctx == NULL) { + gf_log (this->name, GF_LOG_DEBUG, + "GETXATTR %"PRId64" (%s) - %s: ENOMEM" + "(no database handle for directory)", + loc->ino, loc->path, name); + op_ret = -1; + op_errno = ENOMEM; + goto out; + } - key_string = (char *)&(name[15]); + key_string = BDB_KEY_FROM_FREQUEST_KEY(name); - op_ret = bdb_db_get (bctx, NULL, key_string, &buf, 0, 0); + op_ret = bdb_db_iread (bctx, key_string, &buf); if (op_ret == -1) { gf_log (this->name, GF_LOG_DEBUG, - "failed to db get on directory: %s for key: %s", - bctx->directory, name); - op_ret = -1; - op_errno = ENODATA; + "GETXATTR %"PRId64" (%s) - %s: ENOATTR" + "(attribute not present in database)", + loc->ino, loc->path, name); + op_errno = ENOATTR; goto out; } op_ret = dict_set_dynptr (dict, (char *)name, buf, op_ret); if (op_ret < 0) { gf_log (this->name, GF_LOG_DEBUG, - "failed to set to dictionary"); - op_ret = -1; + "GETXATTR %"PRId64" (%s) - %s: ENOATTR" + "(attribute present in database, " + "dict set failed)", + loc->ino, loc->path, name); op_errno = ENODATA; } - } else { - MAKE_REAL_PATH (real_path, this, loc->path); - size = llistxattr (real_path, NULL, 0); - op_errno = errno; - if (size <= 0) { - /* There are no extended attributes, send an empty - * dictionary */ - if (size == -1 && op_errno != ENODATA) { - if (op_errno == ENOTSUP) { - gf_bdb_xattr_log++; - if (!(gf_bdb_xattr_log % GF_UNIVERSAL_ANSWER)) - gf_log (this->name, GF_LOG_WARNING, - "Extended Attributes support not present."\ - "Please check"); - } else { - gf_log (this->name, GF_LOG_WARNING, - "llistxattr failed on %s (%s)", - loc->path, strerror (op_errno)); - } - } - op_ret = -1; - op_errno = ENODATA; + + goto out; + } + + MAKE_REAL_PATH (real_path, this, loc->path); + size = sys_llistxattr (real_path, NULL, 0); + op_errno = errno; + if (size < 0) { + if (BDB_TIMED_LOG (op_errno, gf_bdb_xattr_log)) { + gf_log (this->name, GF_LOG_DEBUG, + "GETXATTR %"PRId64" (%s) - %s: %s", + loc->ino, loc->path, name, strerror (op_errno)); } else { - list = alloca (size + 1); - op_errno = ENOMEM; - GF_VALIDATE_OR_GOTO (this->name, list, out); + gf_log (this->name, GF_LOG_DEBUG, + "GETXATTR %"PRId64" (%s) - %s: %s", + loc->ino, loc->path, name, strerror (op_errno)); + } + op_ret = -1; + op_errno = ENOATTR; - size = llistxattr (real_path, list, size); - op_ret = size; - op_errno = errno; - if (size == -1) { - gf_log (this->name, GF_LOG_ERROR, - "llistxattr failed on %s (%s)", - loc->path, strerror (errno)); - goto out; - } - remaining_size = size; - list_offset = 0; - while (remaining_size > 0) { - if(*(list+list_offset) == '\0') - break; - strcpy (key, list + list_offset); - op_ret = lgetxattr (real_path, key, NULL, 0); - if (op_ret == -1) - break; - value = CALLOC (op_ret + 1, sizeof(char)); - GF_VALIDATE_OR_GOTO (this->name, value, out); + goto out; + } - op_ret = lgetxattr (real_path, key, value, - op_ret); - if (op_ret == -1) - break; - value [op_ret] = '\0'; - op_ret = dict_set_dynptr (dict, key, - value, op_ret); - if (op_ret < 0) { - FREE (value); - gf_log (this->name, GF_LOG_DEBUG, - "skipping key %s", key); - continue; - } - remaining_size -= strlen (key) + 1; - list_offset += strlen (key) + 1; - } /* while(remaining_size>0) */ - } /* if(size <= 0)...else */ - } /* if(name...)...else */ + if (size == 0) + goto done; + + list = alloca (size + 1); + if (list == NULL) { + op_ret = -1; + op_errno = errno; + gf_log (this->name, GF_LOG_DEBUG, + "GETXATTR %"PRId64" (%s) - %s: %s", + loc->ino, loc->path, name, strerror (op_errno)); + } + + size = sys_llistxattr (real_path, list, size); + op_ret = size; + if (op_ret == -1) { + op_errno = errno; + gf_log (this->name, GF_LOG_DEBUG, + "GETXATTR %"PRId64" (%s) - %s: %s", + loc->ino, loc->path, name, strerror (op_errno)); + goto out; + } + remaining_size = size; + list_offset = 0; + while (remaining_size > 0) { + if(*(list+list_offset) == '\0') + break; + + strcpy (key, list + list_offset); + + op_ret = sys_lgetxattr (real_path, key, NULL, 0); + if (op_ret == -1) + break; + + value = CALLOC (op_ret + 1, sizeof(char)); + GF_VALIDATE_OR_GOTO (this->name, value, out); + + op_ret = sys_lgetxattr (real_path, key, value, + op_ret); + if (op_ret == -1) + break; + value [op_ret] = '\0'; + op_ret = dict_set_dynptr (dict, key, + value, op_ret); + if (op_ret < 0) { + FREE (value); + gf_log (this->name, GF_LOG_DEBUG, + "GETXATTR %"PRId64" (%s) - %s: " + "(skipping key %s)", + loc->ino, loc->path, name, key); + continue; + } + remaining_size -= strlen (key) + 1; + list_offset += strlen (key) + 1; + } /* while(remaining_size>0) */ +done: out: if(bctx) { /* NOTE: bctx_unref always returns success, @@ -2098,9 +2431,6 @@ out: bctx_unref (bctx); } - if (dict) - dict_ref (dict); - STACK_UNWIND (frame, op_ret, op_errno, dict); if (dict) @@ -2127,45 +2457,52 @@ bdb_removexattr (call_frame_t *frame, GF_VALIDATE_OR_GOTO (this->name, name, out); if (!S_ISDIR(loc->inode->st_mode)) { - gf_log (this->name, GF_LOG_WARNING, - "operation not permitted on non-directory files"); + gf_log (this->name, GF_LOG_DEBUG, + "REMOVEXATTR %"PRId64" (%s) - %s: ENOATTR " + "(not a directory)", + loc->ino, loc->path, name); op_ret = -1; - op_errno = EPERM; + op_errno = ENOATTR; goto out; } - if (ZR_FILE_CONTENT_REQUEST(name)) { + if (GF_FILE_CONTENT_REQUEST(name)) { bctx = bctx_lookup (B_TABLE(this), loc->path); - op_errno = EINVAL; - GF_VALIDATE_OR_GOTO (this->name, bctx, out); - - op_ret = bdb_db_del (bctx, NULL, name); - if (op_ret == -1) { - gf_log (this->name, GF_LOG_ERROR, - "failed to delete %s from db of %s directory", - name, loc->path); - op_errno = EINVAL; /* TODO: errno */ + if (bctx == NULL) { + gf_log (this->name, GF_LOG_DEBUG, + "REMOVEXATTR %"PRId64" (%s) - %s: ENOATTR" + "(no database handle for directory)", + loc->ino, loc->path, name); + op_ret = -1; + op_errno = ENOATTR; goto out; } - } else { - MAKE_REAL_PATH(real_path, this, loc->path); - op_ret = lremovexattr (real_path, name); - op_errno = errno; + + op_ret = bdb_db_iremove (bctx, name); if (op_ret == -1) { - if (op_errno == ENOTSUP) { - gf_bdb_xattr_log++; - if (!(gf_bdb_xattr_log % GF_UNIVERSAL_ANSWER)) - gf_log (this->name, GF_LOG_WARNING, - "Extended Attributes support not present." - "Please check"); - } else { - gf_log (this->name, GF_LOG_WARNING, - "%s: %s", - loc->path, strerror (op_errno)); - } - } /* if(op_ret == -1) */ - } /* if (ZR_FILE_CONTENT_REQUEST(name))...else */ + gf_log (this->name, GF_LOG_DEBUG, + "REMOVEXATTR %"PRId64" (%s) - %s: ENOATTR" + "(no such attribute in database)", + loc->ino, loc->path, name); + op_errno = ENOATTR; + } + goto out; + } + MAKE_REAL_PATH(real_path, this, loc->path); + op_ret = lremovexattr (real_path, name); + op_errno = errno; + if (op_ret == -1) { + if (BDB_TIMED_LOG (op_errno, gf_bdb_xattr_log)) { + gf_log (this->name, GF_LOG_DEBUG, + "REMOVEXATTR %"PRId64" (%s) - %s: %s", + loc->ino, loc->path, name, strerror (op_errno)); + } else { + gf_log (this->name, GF_LOG_DEBUG, + "REMOVEXATTR %"PRId64" (%s) - %s: %s", + loc->ino, loc->path, name, strerror (op_errno)); + } + } /* if(op_ret == -1) */ out: if (bctx) { /* NOTE: bctx_unref always returns success, @@ -2195,9 +2532,15 @@ bdb_fsyncdir (call_frame_t *frame, frame->root->rsp_refs = NULL; - bfd = bdb_extract_bfd (fd, this); - op_errno = EBADFD; - GF_VALIDATE_OR_GOTO (this->name, bfd, out); + BDB_FCTX_GET (fd, this, &bfd); + if (bfd == NULL) { + gf_log (this->name, GF_LOG_DEBUG, + "FSYNCDIR %"PRId64": EBADFD" + "(failed to find internal context from fd)", + fd->inode->ino); + op_errno = EBADFD; + op_ret = -1; + } out: STACK_UNWIND (frame, op_ret, op_errno); @@ -2321,9 +2664,15 @@ bdb_setdents (call_frame_t *frame, frame->root->rsp_refs = NULL; - bfd = bdb_extract_bfd (fd, this); - op_errno = EBADFD; - GF_VALIDATE_OR_GOTO (this->name, bfd, out); + BDB_FCTX_GET (fd, this, &bfd); + if (bfd == NULL) { + gf_log (this->name, GF_LOG_DEBUG, + "SETDENTS %"PRId64": EBADFD", + fd->inode->ino); + op_errno = EBADFD; + op_ret = -1; + goto out; + } real_path_len = strlen (bfd->path); entry_path_len = real_path_len + 256; @@ -2346,60 +2695,68 @@ bdb_setdents (call_frame_t *frame, */ ret = mkdir (pathname, trav->buf.st_mode); if ((ret == -1) && (errno != EEXIST)) { - gf_log (this->name, GF_LOG_ERROR, - "failed to created directory %s: %s", - pathname, strerror(errno)); + op_errno = errno; + op_ret = ret; + gf_log (this->name, GF_LOG_DEBUG, + "SETDENTS %"PRId64" - %s: %s " + "(mkdir failed)", + fd->inode->ino, pathname, + strerror (op_errno)); goto loop; } - gf_log (this->name, GF_LOG_DEBUG, - "Creating directory %s with mode (0%o)", - pathname, - trav->buf.st_mode); /* Change the mode * NOTE: setdents tries its best to restore the state * of storage. if chmod and chown fail, they can * be ignored now */ ret = chmod (pathname, trav->buf.st_mode); - if (ret != 0) { - op_ret = -1; + if (ret < 0) { + op_ret = -1; op_errno = errno; - gf_log (this->name, GF_LOG_ERROR, - "chmod failed on %s (%s)", - pathname, strerror (errno)); + gf_log (this->name, GF_LOG_DEBUG, + "SETDENTS %"PRId64" - %s: %s " + "(chmod failed)", + fd->inode->ino, pathname, + strerror (op_errno)); goto loop; } /* change the ownership */ ret = chown (pathname, trav->buf.st_uid, trav->buf.st_gid); if (ret != 0) { - op_ret = -1; + op_ret = -1; op_errno = errno; - gf_log (this->name, GF_LOG_ERROR, - "chown failed on %s (%s)", - pathname, strerror (errno)); + gf_log (this->name, GF_LOG_DEBUG, + "SETDENTS %"PRId64" - %s: %s " + "(chown failed)", + fd->inode->ino, pathname, + strerror (op_errno)); goto loop; } } else if ((flags == GF_SET_IF_NOT_PRESENT) || (flags != GF_SET_DIR_ONLY)) { /* Create a 0 byte file here */ if (S_ISREG (trav->buf.st_mode)) { - op_ret = bdb_db_put (bfd->ctx, NULL, - trav->name, NULL, 0, 0, 0); - if (op_ret != 0) { - /* create successful */ - gf_log (this->name, GF_LOG_ERROR, - "failed to create file %s", - pathname); - } /* if (!op_ret)...else */ + op_ret = bdb_db_icreate (bfd->ctx, + trav->name); + if (op_ret < 0) { + gf_log (this->name, GF_LOG_DEBUG, + "SETDENTS %"PRId64" (%s) - %s: " + "%s (database entry creation" + " failed)", + fd->inode->ino, + bfd->ctx->directory, trav->name, + strerror (op_errno)); + } } else if (S_ISLNK (trav->buf.st_mode)) { /* TODO: impelement */; } else { - gf_log (this->name, GF_LOG_ERROR, - "storage/bdb allows to create regular" - " files only file %s (mode = %d) cannot" - " be created", - pathname, trav->buf.st_mode); + gf_log (this->name, GF_LOG_DEBUG, + "SETDENTS %"PRId64" (%s) - %s mode=%o: " + "(unsupported file type)", + fd->inode->ino, + bfd->ctx->directory, trav->name, + trav->buf.st_mode); } /* if(S_ISREG())...else */ } /* if(S_ISDIR())...else if */ loop: @@ -2431,9 +2788,16 @@ bdb_fstat (call_frame_t *frame, GF_VALIDATE_OR_GOTO ("bdb", this, out); GF_VALIDATE_OR_GOTO (this->name, fd, out); - bfd = bdb_extract_bfd (fd, this); - op_errno = EBADFD; - GF_VALIDATE_OR_GOTO (this->name, bfd, out); + BDB_FCTX_GET (fd, this, &bfd); + if (bfd == NULL) { + gf_log (this->name, GF_LOG_DEBUG, + "FSTAT %"PRId64": EBADFD " + "(failed to find internal context in fd)", + fd->inode->ino); + op_errno = EBADFD; + op_ret = -1; + goto out; + } bctx = bfd->ctx; @@ -2441,14 +2805,15 @@ bdb_fstat (call_frame_t *frame, op_ret = lstat (db_path, &stbuf); op_errno = errno; if (op_ret != 0) { - gf_log (this->name, GF_LOG_ERROR, - "failed to lstat on %s (%s)", - db_path, strerror (op_errno)); + gf_log (this->name, GF_LOG_DEBUG, + "FSTAT %"PRId64": %s" + "(failed to stat database file %s)", + fd->inode->ino, strerror (op_errno), db_path); goto out; } stbuf.st_ino = fd->inode->ino; - stbuf.st_size = bdb_db_get (bctx, NULL, bfd->key, NULL, 0, 0); + stbuf.st_size = bdb_db_fread (bfd, NULL, 0, 0); stbuf.st_blocks = BDB_COUNT_BLOCKS (stbuf.st_size, stbuf.st_blksize); out: @@ -2458,6 +2823,20 @@ out: return 0; } +gf_dirent_t * +gf_dirent_for_namen (const char *name, + size_t len) +{ + char *tmp_name = NULL; + + tmp_name = alloca (len + 1); + + memcpy (tmp_name, name, len); + + tmp_name[len] = 0; + + return gf_dirent_for_name (tmp_name); +} int32_t bdb_readdir (call_frame_t *frame, @@ -2477,6 +2856,7 @@ bdb_readdir (call_frame_t *frame, int32_t this_size = 0; DBC *cursorp = NULL; int32_t count = 0; + off_t offset = 0; GF_VALIDATE_OR_GOTO ("bdb", frame, out); GF_VALIDATE_OR_GOTO ("bdb", this, out); @@ -2484,137 +2864,164 @@ bdb_readdir (call_frame_t *frame, INIT_LIST_HEAD (&entries.list); - bfd = bdb_extract_bfd (fd, this); - op_errno = EBADFD; - GF_VALIDATE_OR_GOTO (this->name, bfd, out); - - op_errno = ENOMEM; - - while (filled <= size) { - this_entry = NULL; - entry = NULL; - in_case = 0; - this_size = 0; - - in_case = telldir (bfd->dir); - entry = readdir (bfd->dir); - if (!entry) - break; - - if (IS_BDB_PRIVATE_FILE(entry->d_name)) - continue; - - this_size = dirent_size (entry); - - if (this_size + filled > size) { - seekdir (bfd->dir, in_case); - break; - } - - count++; - - this_entry = gf_dirent_for_name (entry->d_name); - this_entry->d_ino = entry->d_ino; - - this_entry->d_off = -1; - - this_entry->d_type = entry->d_type; - this_entry->d_len = entry->d_reclen; - - - list_add (&this_entry->list, &entries.list); - - filled += this_size; - } - op_ret = filled; - op_errno = 0; - if (filled >= size) { + BDB_FCTX_GET (fd, this, &bfd); + if (bfd == NULL) { + gf_log (this->name, GF_LOG_DEBUG, + "READDIR %"PRId64" - %"PRId32",%"PRId64": EBADFD " + "(failed to find internal context in fd)", + fd->inode->ino, size, off); + op_errno = EBADFD; + op_ret = -1; goto out; } - /* hungry kyaa? */ op_ret = bdb_cursor_open (bfd->ctx, &cursorp); - op_errno = EBADFD; - GF_VALIDATE_OR_GOTO (this->name, (op_ret == 0), out); - - /* TODO: fix d_off, don't use bfd->offset. wrong method */ - if (strlen (bfd->offset)) { - DBT key = {0,}, value = {0,}; - key.data = bfd->offset; - key.size = strlen (bfd->offset); - key.flags = DB_DBT_USERMEM; - value.dlen = 0; - value.doff = 0; - value.flags = DB_DBT_PARTIAL; - - op_ret = bdb_cursor_get (cursorp, &key, &value, DB_SET); + if (op_ret < 0) { + gf_log (this->name, GF_LOG_DEBUG, + "READDIR %"PRId64" - %"PRId32",%"PRId64": EBADFD " + "(failed to open cursor to database handle)", + fd->inode->ino, size, off); op_errno = EBADFD; - GF_VALIDATE_OR_GOTO (this->name, (op_ret == 0), out); + goto out; + } - } else { - /* first time or last time, do nothing */ + if (off) { + DBT sec = {0,}, pri = {0,}, val = {0,}; + sec.data = &(off); + sec.size = sizeof (off); + sec.flags = DB_DBT_USERMEM; + val.dlen = 0; + val.doff = 0; + val.flags = DB_DBT_PARTIAL; + + op_ret = bdb_cursor_get (cursorp, &sec, &pri, &val, DB_SET); + if (op_ret == DB_NOTFOUND) { + offset = off; + goto dir_read; + } } while (filled <= size) { - DBT key = {0,}, value = {0,}; + DBT sec = {0,}, pri = {0,}, val = {0,}; + this_entry = NULL; - key.flags = DB_DBT_MALLOC; - value.dlen = 0; - value.doff = 0; - value.flags = DB_DBT_PARTIAL; - op_ret = bdb_cursor_get (cursorp, &key, &value, DB_NEXT); + sec.flags = DB_DBT_MALLOC; + pri.flags = DB_DBT_MALLOC; + val.dlen = 0; + val.doff = 0; + val.flags = DB_DBT_PARTIAL; + op_ret = bdb_cursor_get (cursorp, &sec, &pri, &val, DB_NEXT); if (op_ret == DB_NOTFOUND) { /* we reached end of the directory */ op_ret = 0; op_errno = 0; break; - } else if (op_ret != 0) { + } else if (op_ret < 0) { gf_log (this->name, GF_LOG_DEBUG, - "database error during readdir"); - op_ret = -1; + "READDIR %"PRId64" - %"PRId32",%"PRId64":" + "(failed to read the next entry from database)", + fd->inode->ino, size, off); op_errno = ENOENT; break; } /* if (op_ret == DB_NOTFOUND)...else if...else */ - if (key.data == NULL) { + if (pri.data == NULL) { /* NOTE: currently ignore when we get key.data == NULL. * TODO: we should not get key.data = NULL */ gf_log (this->name, GF_LOG_DEBUG, - "null key read from db"); + "READDIR %"PRId64" - %"PRId32",%"PRId64":" + "(null key read for entry from database)", + fd->inode->ino, size, off); continue; }/* if(key.data)...else */ count++; - this_size = bdb_dirent_size (&key); + this_size = bdb_dirent_size (&pri); if (this_size + filled > size) break; /* TODO - consider endianness here */ - this_entry = gf_dirent_for_name ((const char *)key.data); - /* FIXME: bug, if someone is going to use ->d_ino */ - this_entry->d_ino = -1; - this_entry->d_off = 0; + this_entry = gf_dirent_for_namen ((const char *)pri.data, + pri.size); + + this_entry->d_ino = bdb_inode_transform (fd->inode->ino, + pri.data, + pri.size); + this_entry->d_off = *(uint32_t *)sec.data; this_entry->d_type = 0; - this_entry->d_len = key.size; + this_entry->d_len = pri.size + 1; - if (key.data) { - strncpy (bfd->offset, key.data, key.size); - bfd->offset [key.size] = '\0'; - free (key.data); + if (sec.data) { + FREE (sec.data); } - list_add (&this_entry->list, &entries.list); + if (pri.data) + FREE (pri.data); + + list_add_tail (&this_entry->list, &entries.list); filled += this_size; }/* while */ bdb_cursor_close (bfd->ctx, cursorp); op_ret = filled; op_errno = 0; + if (filled >= size) { + goto out; + } +dir_read: + /* hungry kyaa? */ + if (!offset) { + rewinddir (bfd->dir); + } else { + seekdir (bfd->dir, offset); + } + + while (filled <= size) { + this_entry = NULL; + entry = NULL; + this_size = 0; + + in_case = telldir (bfd->dir); + entry = readdir (bfd->dir); + if (!entry) + break; + + if (IS_BDB_PRIVATE_FILE(entry->d_name)) + continue; + + this_size = dirent_size (entry); + + if (this_size + filled > size) { + seekdir (bfd->dir, in_case); + break; + } + + count++; + + this_entry = gf_dirent_for_name (entry->d_name); + this_entry->d_ino = entry->d_ino; + + this_entry->d_off = entry->d_off; + + this_entry->d_type = entry->d_type; + this_entry->d_len = entry->d_reclen; + + + list_add_tail (&this_entry->list, &entries.list); + + filled += this_size; + } + op_ret = filled; + op_errno = 0; + out: frame->root->rsp_refs = NULL; + gf_log (this->name, GF_LOG_DEBUG, - "read %"GF_PRI_SIZET" bytes for %d entries", - filled, count); + "READDIR %"PRId64" - %"PRId32" (%"PRId32")/%"PRId32",%"PRId64":" + "(failed to read the next entry from database)", + fd->inode->ino, filled, count, size, off); + STACK_UNWIND (frame, count, op_errno, &entries); gf_dirent_free (&entries); @@ -2629,11 +3036,11 @@ bdb_stats (call_frame_t *frame, int32_t flags) { - int32_t op_ret = 0; + int32_t op_ret = 0; int32_t op_errno = 0; struct xlator_stats xlstats = {0, }, *stats = NULL; - struct statvfs buf; + struct statvfs buf = {0,}; struct timeval tv; struct bdb_private *private = NULL; int64_t avg_read = 0; @@ -2647,10 +3054,10 @@ bdb_stats (call_frame_t *frame, stats = &xlstats; op_ret = statvfs (private->export_path, &buf); - op_errno = errno; if (op_ret != 0) { - gf_log (this->name, GF_LOG_ERROR, - "failed to statvfs on %s (%s)", + op_errno = errno; + gf_log (this->name, GF_LOG_DEBUG, + "STATS %s: %s", private->export_path, strerror (op_errno)); goto out; } @@ -2661,9 +3068,9 @@ bdb_stats (call_frame_t *frame, stats->nr_clients = private->stats.nr_clients; /* Number of Free block in the filesystem. */ - stats->free_disk = buf.f_bfree * buf.f_bsize; + stats->free_disk = buf.f_bfree * buf.f_bsize; stats->total_disk_size = buf.f_blocks * buf.f_bsize; /* */ - stats->disk_usage = (buf.f_blocks - buf.f_bavail) * buf.f_bsize; + stats->disk_usage = (buf.f_blocks - buf.f_bavail) * buf.f_bsize; /* Calculate read and write usage */ gettimeofday (&tv, NULL); @@ -2672,7 +3079,7 @@ bdb_stats (call_frame_t *frame, _time_ms = (tv.tv_sec - private->init_time.tv_sec) * 1000 + ((tv.tv_usec - private->init_time.tv_usec) / 1000); - avg_read = (_time_ms) ? (private->read_value / _time_ms) : 0; /* KBps */ + avg_read = (_time_ms) ? (private->read_value / _time_ms) : 0;/* KBps */ avg_write = (_time_ms) ? (private->write_value / _time_ms) : 0; _time_ms = (tv.tv_sec - private->prev_fetch_time.tv_sec) * 1000 + @@ -2706,9 +3113,10 @@ bdb_inodelk (call_frame_t *frame, xlator_t *this, { frame->root->rsp_refs = NULL; - gf_log (this->name, GF_LOG_CRITICAL, - "\"features/posix-locks\" translator is not loaded. " - "You need to use it for proper functioning of GlusterFS"); + gf_log (this->name, GF_LOG_ERROR, + "glusterfs internal locking request. please load " + "'features/locks' translator to enable glusterfs " + "support"); STACK_UNWIND (frame, -1, ENOSYS); return 0; @@ -2721,9 +3129,10 @@ bdb_finodelk (call_frame_t *frame, xlator_t *this, { frame->root->rsp_refs = NULL; - gf_log (this->name, GF_LOG_CRITICAL, - "\"features/posix-locks\" translator is not loaded. " - "You need to use it for proper functioning of GlusterFS"); + gf_log (this->name, GF_LOG_ERROR, + "glusterfs internal locking request. please load " + "'features/locks' translator to enable glusterfs " + "support"); STACK_UNWIND (frame, -1, ENOSYS); return 0; @@ -2737,9 +3146,10 @@ bdb_entrylk (call_frame_t *frame, xlator_t *this, { frame->root->rsp_refs = NULL; - gf_log (this->name, GF_LOG_CRITICAL, - "\"features/posix-locks\" translator is not loaded. " - "You need to use it for proper functioning of GlusterFS"); + gf_log (this->name, GF_LOG_ERROR, + "glusterfs internal locking request. please load " + "'features/locks' translator to enable glusterfs " + "support"); STACK_UNWIND (frame, -1, ENOSYS); return 0; @@ -2753,15 +3163,15 @@ bdb_fentrylk (call_frame_t *frame, xlator_t *this, { frame->root->rsp_refs = NULL; - gf_log (this->name, GF_LOG_CRITICAL, - "\"features/posix-locks\" translator is not loaded. " - "You need to use it for proper functioning of GlusterFS"); + gf_log (this->name, GF_LOG_ERROR, + "glusterfs internal locking request. please load " + "'features/locks' translator to enable glusterfs " + "support"); STACK_UNWIND (frame, -1, ENOSYS); return 0; } - int32_t bdb_checksum (call_frame_t *frame, xlator_t *this, @@ -2775,10 +3185,11 @@ bdb_checksum (call_frame_t *frame, uint8_t dir_checksum[ZR_FILENAME_MAX] = {0,}; int32_t op_ret = -1; int32_t op_errno = EINVAL; - int32_t i = 0, length = 0; + int32_t idx = 0, length = 0; bctx_t *bctx = NULL; DBC *cursorp = NULL; char *data = NULL; + uint8_t no_break = 1; GF_VALIDATE_OR_GOTO ("bdb", frame, out); GF_VALIDATE_OR_GOTO ("bdb", this, out); @@ -2798,55 +3209,66 @@ bdb_checksum (call_frame_t *frame, continue; length = strlen (dirent->d_name); - for (i = 0; i < length; i++) - dir_checksum[i] ^= dirent->d_name[i]; + for (idx = 0; idx < length; idx++) + dir_checksum[idx] ^= dirent->d_name[idx]; } /* while((dirent...)) */ closedir (dir); } { bctx = bctx_lookup (B_TABLE(this), (char *)loc->path); - op_errno = EINVAL; - GF_VALIDATE_OR_GOTO (this->name, bctx, out); + if (bctx == NULL) { + gf_log (this->name, GF_LOG_DEBUG, + "CHECKSUM %"PRId64" (%s): ENOMEM" + "(failed to lookup database handle)", + loc->inode->ino, loc->path); + op_ret = -1; + op_errno = ENOMEM; + goto out; + } op_ret = bdb_cursor_open (bctx, &cursorp); - op_errno = EINVAL; - GF_VALIDATE_OR_GOTO (this->name, (op_ret == 0), out); + if (op_ret < 0) { + gf_log (this->name, GF_LOG_DEBUG, + "CHECKSUM %"PRId64" (%s): EBADFD" + "(failed to open cursor to database handle)", + loc->inode->ino, loc->path); + op_ret = -1; + op_errno = EBADFD; + goto out; + } - while (1) { - DBT key = {0,}, value = {0,}; + + do { + DBT key = {0,}, value = {0,}, sec = {0,}; key.flags = DB_DBT_MALLOC; value.doff = 0; value.dlen = 0; - op_ret = bdb_cursor_get (cursorp, &key, &value, - DB_NEXT); + op_ret = bdb_cursor_get (cursorp, &sec, &key, + &value, DB_NEXT); if (op_ret == DB_NOTFOUND) { - gf_log (this->name, GF_LOG_DEBUG, - "end of list of key/value pair in db" - " for directory: %s", bctx->directory); op_ret = 0; op_errno = 0; - break; + no_break = 0; } else if (op_ret == 0){ /* successfully read */ data = key.data; length = key.size; - for (i = 0; i < length; i++) - file_checksum[i] ^= data[i]; + for (idx = 0; idx < length; idx++) + file_checksum[idx] ^= data[idx]; - free (key.data); + FREE (key.data); } else { - gf_log (this->name, GF_LOG_ERROR, - "failed to do cursor get for directory" - " %s: %s", - bctx->directory, db_strerror (op_ret)); + gf_log (this->name, GF_LOG_DEBUG, + "CHECKSUM %"PRId64" (%s)", + loc->inode->ino, loc->path); op_ret = -1; - op_errno = ENOENT; - break; + op_errno = ENOENT; /* TODO: watch errno */ + no_break = 0; }/* if(op_ret == DB_NOTFOUND)...else if...else */ - } /* while(1) */ + } while (no_break); bdb_cursor_close (bctx, cursorp); } out: @@ -2904,44 +3326,77 @@ init (xlator_t *this) GF_VALIDATE_OR_GOTO ("bdb", this, out); - _private = CALLOC (1, sizeof (*_private)); - GF_VALIDATE_OR_GOTO (this->name, _private, out); - if (this->children) { gf_log (this->name, GF_LOG_ERROR, - "FATAL: storage/bdb cannot have subvolumes"); - FREE (_private); - goto out;; + "'storage/bdb' translator should be used as leaf node " + "in translator tree. please remove the subvolumes" + " specified and retry."); + goto err; } if (!this->parents) { - gf_log (this->name, GF_LOG_WARNING, - "dangling volume. check volfile "); + gf_log (this->name, GF_LOG_ERROR, + "'storage/bdb' translator needs at least one among " + "'protocol/server' or 'mount/fuse' translator as " + "parent. please add 'protocol/server' or 'mount/fuse' " + "as parent of 'storage/bdb' and retry. or you can also" + " try specifying mount-point on command-line."); + goto err; } + _private = CALLOC (1, sizeof (*_private)); + if (_private == NULL) { + gf_log (this->name, GF_LOG_ERROR, + "could not allocate memory for 'storage/bdb' " + "configuration data-structure. cannot continue from " + "here"); + goto err; + } + + ret = dict_get_str (this->options, "directory", &directory); if (ret < 0) { gf_log (this->name, GF_LOG_ERROR, - "export directory not specified in volfile"); - FREE (_private); - goto out; + "'storage/bdb' needs at least " + "'option directory <path-to-export-directory>' as " + "minimal configuration option. please specify an " + "export directory using " + "'option directory <path-to-export-directory>' and " + "retry."); + goto err; } + umask (000); /* umask `masking' is done at the client side */ /* Check whether the specified directory exists, if not create it. */ ret = stat (directory, &buf); - if ((ret != 0) || !S_ISDIR (buf.st_mode)) { + if (ret < 0) { + gf_log (this->name, GF_LOG_ERROR, + "specified export path '%s' does not exist. " + "please create the export path '%s' and retry.", + directory, directory); + goto err; + } else if (!S_ISDIR (buf.st_mode)) { gf_log (this->name, GF_LOG_ERROR, - "specified directory '%s' doesn't exists, Exiting", + "specified export path '%s' is not a directory. " + "please specify a valid and existing directory as " + "export directory and retry.", directory); - FREE (_private); - goto out; + goto err; } else { ret = 0; } _private->export_path = strdup (directory); + if (_private->export_path == NULL) { + gf_log (this->name, GF_LOG_ERROR, + "could not allocate memory for 'storage/bdb' " + "configuration data-structure. cannot continue from " + "here"); + goto err; + } + _private->export_path_length = strlen (_private->export_path); { @@ -2953,27 +3408,40 @@ init (xlator_t *this) } this->private = (void *)_private; + { ret = bdb_db_init (this, this->options); - if (ret == -1){ - gf_log (this->name, GF_LOG_DEBUG, - "failed to initialize database"); - goto out; + if (ret < 0){ + gf_log (this->name, GF_LOG_ERROR, + "database environment initialisation failed. " + "manually run database recovery tool and " + "retry to run glusterfs"); + goto err; } else { bctx = bctx_lookup (_private->b_table, "/"); /* NOTE: we are not doing bctx_unref() for root bctx, * let it remain in active list forever */ - if (!bctx) { + if (bctx == NULL) { gf_log (this->name, GF_LOG_ERROR, - "failed to allocate memory for root (/)" - " bctx: out of memory"); - goto out; + "could not allocate memory for " + "'storage/bdb' configuration data-" + "structure. cannot continue from " + "here"); + goto err; } else { ret = 0; + goto out; } } } +err: + if (_private) { + if (_private->export_path) + FREE (_private->export_path); + + FREE (_private); + } out: return ret; } @@ -2984,12 +3452,17 @@ bctx_cleanup (struct list_head *head) bctx_t *trav = NULL; bctx_t *tmp = NULL; DB *storage = NULL; + DB *secondary = NULL; list_for_each_entry_safe (trav, tmp, head, list) { LOCK (&trav->lock); { - storage = trav->dbp; - trav->dbp = NULL; + storage = trav->primary; + trav->primary = NULL; + + secondary = trav->secondary; + trav->secondary = NULL; + list_del_init (&trav->list); } UNLOCK (&trav->lock); @@ -2998,6 +3471,11 @@ bctx_cleanup (struct list_head *head) storage->close (storage, 0); storage = NULL; } + + if (secondary) { + secondary->close (secondary, 0); + secondary = NULL; + } } return; } @@ -3025,7 +3503,11 @@ fini (xlator_t *this) ret = pthread_join (private->checkpoint_thread, NULL); if (ret != 0) { gf_log (this->name, GF_LOG_CRITICAL, - "failed to join checkpoint thread"); + "could not complete checkpointing " + "database environment. this might " + "result in inconsistencies in few" + " recent data and meta-data " + "operations"); } BDB_ENV(this)->close (BDB_ENV(this), 0); diff --git a/xlators/storage/bdb/src/bdb.h b/xlators/storage/bdb/src/bdb.h index c9db02c10..e25978cc6 100644 --- a/xlators/storage/bdb/src/bdb.h +++ b/xlators/storage/bdb/src/bdb.h @@ -54,6 +54,8 @@ #include "inode.h" #include "compat.h" #include "compat-errno.h" +#include "fd.h" +#include "syscall.h" #define BDB_STORAGE "/glusterfs_storage.db" @@ -73,6 +75,8 @@ #define BDB_EXPORT_PATH_LEN(_private) \ (((struct bdb_private *)_private)->export_path_length) +#define BDB_KEY_FROM_FREQUEST_KEY(_key) (&(key[15])) + #define BDB_EXPORT_PATH(_private) \ (((struct bdb_private *)_private)->export_path) /* MAKE_REAL_PATH(var,this,path) @@ -89,6 +93,12 @@ strcpy (&var[base_len], path); \ } while (0) + +#define BDB_TIMED_LOG(_errno,_counter) \ + ((_errno == ENOTSUP) && (((++_counter) % GF_UNIVERSAL_ANSWER) == 1)) + +#define GF_FILE_CONTENT_REQUEST ZR_FILE_CONTENT_REQUEST + /* MAKE_REAL_PATH_TO_STORAGE_DB(var,this,path) * make the real path to the storage-database file on file-system * @@ -119,21 +129,6 @@ key = basename (tmp); \ }while (0); -/* BDB_DO_LSTAT(path,stbuf,dirent) - * construct real-path to a dirent and do lstat on the real-path - * - * @path: path to the directory whose readdir is currently in progress - * @stbuf: a 'struct stat *' - * @dirent: a 'struct dirent *' - */ -#define BDB_DO_LSTAT(path, stbuf, dirent) do { \ - char tmp_real_path[GF_PATH_MAX]; \ - strcpy(tmp_real_path, path); \ - strcat (tmp_real_path, "/"); \ - strcat(tmp_real_path, dirent->d_name); \ - ret = lstat (tmp_real_path, stbuf); \ - } while(0); - /* IS_BDB_PRIVATE_FILE(name) * check if a given 'name' is bdb xlator's internal file name * @@ -152,8 +147,7 @@ #define IS_DOT_DOTDOT(name) \ ((!strncmp(name,".", 1)) || (!strncmp(name,"..", 2))) -/* BDB_SET_BCTX(this,inode,bctx) - * put a stamp on inode. d00d, you are using bdb.. huhaha. +/* BDB_ICTX_SET(this,inode,bctx) * pointer to 'struct bdb_ctx' is stored in inode's ctx of all directories. * this will happen either in lookup() or mkdir(). * @@ -161,29 +155,35 @@ * @inode: inode where 'struct bdb_ctx *' has to be stored. * @bctx: a 'struct bdb_ctx *' */ -#define BDB_SET_BCTX(this,inode,bctx) do{ \ - inode_ctx_put(inode, this, (uint64_t)(long)bctx); \ +#define BDB_ICTX_SET(_inode,_this,_bctx) do{ \ + inode_ctx_put(_inode, _this, (uint64_t)(long)_bctx); \ + }while (0); + +#define BDB_ICTX_GET(_inode,_this,_bctxp) do { \ + uint64_t tmp_bctx = 0; \ + inode_ctx_get (_inode, _this, &tmp_bctx); \ + *_bctxp = tmp_bctx; \ }while (0); -/* MAKE_BCTX_FROM_INODE(this,bctx,inode) - * extract bdb xlator's 'struct bdb_ctx *' from an inode's ctx. - * valid only if done for directory inodes, otherwise bctx = NULL. +/* BDB_FCTX_SET(this,fd,bctx) + * pointer to 'struct bdb_ctx' is stored in inode's ctx of all directories. + * this will happen either in lookup() or mkdir(). * * @this: pointer xlator_t of bdb xlator. + * @inode: inode where 'struct bdb_ctx *' has to be stored. * @bctx: a 'struct bdb_ctx *' - * @inode: inode from where 'struct bdb_ctx *' has to be extracted. */ -#define MAKE_BCTX_FROM_INODE(this,bctx,inode) do{ \ - uint64_t tmp_bctx = 0; \ - inode_ctx_get (inode, this, &tmp_bctx); \ - if (ret == 0) \ - bctx = (void *)(long)tmp_bctx; \ +#define BDB_FCTX_SET(_fd,_this,_bfd) do{ \ + fd_ctx_set(_fd, _this, (uint64_t)(long)_bfd); \ }while (0); -#define BDB_SET_BFD(this,fd,bfd) do{ \ - fd_ctx_set (fd, this, (uint64_t)(long)bfd); \ +#define BDB_FCTX_GET(_fd,_this,_bfdp) do { \ + uint64_t tmp_bfd = 0; \ + fd_ctx_get (_fd, _this, &tmp_bfd); \ + *_bfdp = (void *)(long)tmp_bfd; \ }while (0); + /* maximum number of open dbs that bdb xlator will ever have */ #define BDB_MAX_OPEN_DBS 100 @@ -270,7 +270,8 @@ struct bdb_ctx { char *directory; /* directory path */ /* pointer to open database, that resides inside this directory */ - DB *dbp; + DB *primary; + DB *secondary; uint32_t cache; /* cache ON or OFF */ /* per directory cache, bdb xlator's internal cache */ @@ -298,8 +299,6 @@ struct bdb_dir { /* open directory pointer, as returned by opendir() */ DIR *dir; - /* FIXME: readdir offset, too crude. must go */ - char offset[NAME_MAX]; char *path; /* path to this directory */ }; @@ -386,12 +385,6 @@ struct bdb_private { * (option checkpoint-interval <time-in-seconds>) */ uint32_t checkpoint_interval; - /* inode number allocation counter */ - ino_t next_ino; - - /* lock to protect 'next_ino' */ - gf_lock_t ino_lock; - /* environment log directory (option logdir <directory>) */ char *logdir; @@ -436,26 +429,28 @@ bdb_txn_commit (DB_TXN *txnid) return txnid->commit (txnid, 0); } -inline void * -bdb_extract_bfd (fd_t *fd, xlator_t *this); - - void * bdb_db_stat (bctx_t *bctx, DB_TXN *txnid, uint32_t flags); -int32_t +/*int32_t bdb_db_get(struct bdb_ctx *bctx, DB_TXN *txnid, const char *key_string, char **buf, size_t size, off_t offset); +*/ +int32_t +bdb_db_fread (struct bdb_fd *bfd, char **bufp, size_t size, off_t offset); + +int32_t +bdb_db_iread (struct bdb_ctx *bctx, const char *key, char **bufp); #define BDB_TRUNCATE_RECORD 0xcafebabe -int32_t +/*int32_t bdb_db_put (struct bdb_ctx *bctx, DB_TXN *txnid, const char *key_string, @@ -463,16 +458,27 @@ bdb_db_put (struct bdb_ctx *bctx, size_t size, off_t offset, int32_t flags); +*/ +int32_t +bdb_db_icreate (struct bdb_ctx *bctx, const char *key); int32_t -bdb_db_del (struct bdb_ctx *bctx, - DB_TXN *txnid, - const char *path); +bdb_db_fwrite (struct bdb_fd *bfd, char *buf, size_t size, off_t offset); + +int32_t +bdb_db_iwrite (struct bdb_ctx *bctx, const char *key, char *buf, size_t size); + +int32_t +bdb_db_itruncate (struct bdb_ctx *bctx, const char *key); + +int32_t +bdb_db_iremove (struct bdb_ctx *bctx, + const char *key); ino_t bdb_inode_transform (ino_t parent, - struct bdb_ctx *bctx); - + const char *name, + size_t namelen); int32_t bdb_cursor_open (struct bdb_ctx *bctx, @@ -480,7 +486,7 @@ bdb_cursor_open (struct bdb_ctx *bctx, int32_t bdb_cursor_get (DBC *cursorp, - DBT *key, + DBT *sec, DBT *pri, DBT *value, int32_t flags); |