summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--rpc/rpc-lib/src/rpc-drc.c94
-rw-r--r--rpc/rpc-lib/src/rpcsvc.c50
2 files changed, 55 insertions, 89 deletions
diff --git a/rpc/rpc-lib/src/rpc-drc.c b/rpc/rpc-lib/src/rpc-drc.c
index 5f54baf42e0..6adadd85b0e 100644
--- a/rpc/rpc-lib/src/rpc-drc.c
+++ b/rpc/rpc-lib/src/rpc-drc.c
@@ -150,39 +150,6 @@ drc_compare_reqs (const void *item, const void *rb_node_data, void *param)
}
/**
- * drc_rb_calloc - used by rbtree api to allocate memory for nodes
- *
- * @param allocator - the libavl_allocator structure used by rbtree
- * @param size - not needed by this function
- * @return pointer to new cached reply (node in rbtree)
- */
-static void *
-drc_rb_calloc (struct libavl_allocator *allocator, size_t size)
-{
- rpcsvc_drc_globals_t *drc = NULL;
-
- /* get the drc pointer by simple typecast, since allocator
- * is the first member of rpcsvc_drc_globals_t
- */
- drc = (rpcsvc_drc_globals_t *)allocator;
-
- return mem_get (drc->mempool);
-}
-
-/**
- * drc_rb_free - used by rbtree api to free a node
- *
- * @param a - the libavl_allocator structure used by rbtree api
- * @param block - node that needs to be freed
- * @return void
- */
-static void
-drc_rb_free (struct libavl_allocator *a, void *block)
-{
- mem_put (block);
-}
-
-/**
* drc_init_client_cache - initialize a drc client and its rb tree
*
* @param drc - the main drc structure
@@ -195,11 +162,7 @@ drc_init_client_cache (rpcsvc_drc_globals_t *drc, drc_client_t *client)
GF_ASSERT (drc);
GF_ASSERT (client);
- drc->allocator.libavl_malloc = drc_rb_calloc;
- drc->allocator.libavl_free = drc_rb_free;
-
- client->rbtree = rb_create (drc_compare_reqs, drc,
- (struct libavl_allocator *)drc);
+ client->rbtree = rb_create (drc_compare_reqs, drc, NULL);
if (!client->rbtree) {
gf_log (GF_RPCSVC, GF_LOG_DEBUG, "rb tree creation failed");
return -1;
@@ -238,6 +201,7 @@ rpcsvc_get_drc_client (rpcsvc_drc_globals_t *drc,
client->ref = 0;
client->sock_union = (union gf_sock_union)*sockaddr;
client->op_count = 0;
+ INIT_LIST_HEAD (&client->client_list);
if (drc_init_client_cache (drc, client)) {
gf_log (GF_RPCSVC, GF_LOG_DEBUG,
@@ -345,10 +309,12 @@ rpcsvc_drc_lookup (rpcsvc_request_t *req)
&req->trans->peerinfo.sockaddr);
if (!client)
goto out;
- req->trans->drc_client = client;
+
+ req->trans->drc_client
+ = rpcsvc_drc_client_ref (client);
}
- client = rpcsvc_drc_client_ref (req->trans->drc_client);
+ client = req->trans->drc_client;
if (client->op_count == 0)
goto out;
@@ -356,9 +322,6 @@ rpcsvc_drc_lookup (rpcsvc_request_t *req)
reply = rb_find (client->rbtree, &new);
out:
- if (client)
- rpcsvc_drc_client_unref (req->svc->drc, client);
-
return reply;
}
@@ -540,7 +503,7 @@ rpcsvc_cache_request (rpcsvc_request_t *req)
goto out;
}
- reply = mem_get (drc->mempool);
+ reply = mem_get0 (drc->mempool);
if (!reply)
goto out;
@@ -551,6 +514,7 @@ rpcsvc_cache_request (rpcsvc_request_t *req)
reply->procnum = req->procnum;
reply->state = DRC_OP_IN_TRANSIT;
req->reply = reply;
+ INIT_LIST_HEAD (&reply->global_list);
ret = rpcsvc_add_op_to_cache (drc, reply);
if (ret) {
@@ -669,32 +633,32 @@ rpcsvc_drc_notify (rpcsvc_t *svc, void *xl,
return 0;
LOCK (&drc->lock);
+ {
+ trans = (rpc_transport_t *)data;
+ client = rpcsvc_get_drc_client (drc, &trans->peerinfo.sockaddr);
+ if (!client)
+ goto unlock;
- trans = (rpc_transport_t *)data;
- client = rpcsvc_get_drc_client (drc, &trans->peerinfo.sockaddr);
- if (!client)
- goto out;
-
- switch (event) {
- case RPCSVC_EVENT_ACCEPT:
- trans->drc_client = rpcsvc_drc_client_ref (client);
- ret = 0;
- break;
+ switch (event) {
+ case RPCSVC_EVENT_ACCEPT:
+ trans->drc_client = rpcsvc_drc_client_ref (client);
+ ret = 0;
+ break;
- case RPCSVC_EVENT_DISCONNECT:
- ret = 0;
- if (list_empty (&drc->clients_head))
+ case RPCSVC_EVENT_DISCONNECT:
+ ret = 0;
+ if (list_empty (&drc->clients_head))
+ break;
+ /* should be the last unref */
+ trans->drc_client = NULL;
+ rpcsvc_drc_client_unref (drc, client);
break;
- /* should be the last unref */
- rpcsvc_drc_client_unref (drc, client);
- trans->drc_client = NULL;
- break;
- default:
- break;
+ default:
+ break;
+ }
}
-
- out:
+unlock:
UNLOCK (&drc->lock);
return ret;
}
diff --git a/rpc/rpc-lib/src/rpcsvc.c b/rpc/rpc-lib/src/rpcsvc.c
index 814af05f7b6..a10f182647f 100644
--- a/rpc/rpc-lib/src/rpcsvc.c
+++ b/rpc/rpc-lib/src/rpcsvc.c
@@ -624,30 +624,32 @@ rpcsvc_handle_rpc_call (rpcsvc_t *svc, rpc_transport_t *trans,
drc = req->svc->drc;
LOCK (&drc->lock);
- reply = rpcsvc_drc_lookup (req);
-
- /* retransmission of completed request, send cached reply */
- if (reply && reply->state == DRC_OP_CACHED) {
- gf_log (GF_RPCSVC, GF_LOG_INFO, "duplicate request:"
- " XID: 0x%x", req->xid);
- ret = rpcsvc_send_cached_reply (req, reply);
- drc->cache_hits++;
- UNLOCK (&drc->lock);
- goto out;
-
- } /* retransmitted request, original op in transit, drop it */
- else if (reply && reply->state == DRC_OP_IN_TRANSIT) {
- gf_log (GF_RPCSVC, GF_LOG_INFO, "op in transit,"
- " discarding. XID: 0x%x", req->xid);
- ret = 0;
- drc->intransit_hits++;
- rpcsvc_request_destroy (req);
- UNLOCK (&drc->lock);
- goto out;
-
- } /* fresh request, cache it as in-transit and proceed */
- else {
- ret = rpcsvc_cache_request (req);
+ {
+ reply = rpcsvc_drc_lookup (req);
+
+ /* retransmission of completed request, send cached reply */
+ if (reply && reply->state == DRC_OP_CACHED) {
+ gf_log (GF_RPCSVC, GF_LOG_INFO, "duplicate request:"
+ " XID: 0x%x", req->xid);
+ ret = rpcsvc_send_cached_reply (req, reply);
+ drc->cache_hits++;
+ UNLOCK (&drc->lock);
+ goto out;
+
+ } /* retransmitted request, original op in transit, drop it */
+ else if (reply && reply->state == DRC_OP_IN_TRANSIT) {
+ gf_log (GF_RPCSVC, GF_LOG_INFO, "op in transit,"
+ " discarding. XID: 0x%x", req->xid);
+ ret = 0;
+ drc->intransit_hits++;
+ rpcsvc_request_destroy (req);
+ UNLOCK (&drc->lock);
+ goto out;
+
+ } /* fresh request, cache it as in-transit and proceed */
+ else {
+ ret = rpcsvc_cache_request (req);
+ }
}
UNLOCK (&drc->lock);
}