[lustre-devel] [PATCH 20/34] LU-7734 lnet: protect peer_ni credits

NeilBrown neilb at suse.com
Mon Sep 24 18:07:15 PDT 2018


From: Amir Shehata <amir.shehata at intel.com>

Currently multiple NIs can talk to the same peer_ni. The per-CPT
lnet_net_lock therefore no longer protects the lpni against
concurrent updates. To resolve this issue a spinlock is added
to the lnet_peer_ni, which must be locked when the peer NI
credits, delayed message queue, and delayed routed message queue
are modified. The lock is not taken when reporting credits.

Signed-off-by: Amir Shehata <amir.shehata at intel.com>
Signed-off-by: Olaf Weber <olaf at sgi.com>
Change-Id: I52153680a74d43e595314b63487026cc3f6a5a8f
Reviewed-on: http://review.whamcloud.com/20702
Signed-off-by: NeilBrown <neilb at suse.com>
---
 drivers/staging/lustre/lnet/lnet/lib-move.c |   40 ++++++++++++++++++++-------
 drivers/staging/lustre/lnet/lnet/peer.c     |    8 ++++-
 drivers/staging/lustre/lnet/lnet/router.c   |    3 +-
 3 files changed, 38 insertions(+), 13 deletions(-)

diff --git a/drivers/staging/lustre/lnet/lnet/lib-move.c b/drivers/staging/lustre/lnet/lnet/lib-move.c
index 51224a4cb218..b4c7c8aa33a7 100644
--- a/drivers/staging/lustre/lnet/lnet/lib-move.c
+++ b/drivers/staging/lustre/lnet/lnet/lib-move.c
@@ -657,6 +657,7 @@ lnet_post_send_locked(struct lnet_msg *msg, int do_send)
 	}
 
 	if (!msg->msg_peertxcredit) {
+		spin_lock(&lp->lpni_lock);
 		LASSERT((lp->lpni_txcredits < 0) ==
 			!list_empty(&lp->lpni_txq));
 
@@ -670,8 +671,10 @@ lnet_post_send_locked(struct lnet_msg *msg, int do_send)
 		if (lp->lpni_txcredits < 0) {
 			msg->msg_tx_delayed = 1;
 			list_add_tail(&msg->msg_list, &lp->lpni_txq);
+			spin_unlock(&lp->lpni_lock);
 			return LNET_CREDIT_WAIT;
 		}
+		spin_unlock(&lp->lpni_lock);
 	}
 
 	if (!msg->msg_txcredit) {
@@ -744,6 +747,7 @@ lnet_post_routed_recv_locked(struct lnet_msg *msg, int do_recv)
 	LASSERT(!do_recv || msg->msg_rx_delayed);
 
 	if (!msg->msg_peerrtrcredit) {
+		spin_lock(&lp->lpni_lock);
 		LASSERT((lp->lpni_rtrcredits < 0) ==
 			!list_empty(&lp->lpni_rtrq));
 
@@ -757,8 +761,10 @@ lnet_post_routed_recv_locked(struct lnet_msg *msg, int do_recv)
 			LASSERT(msg->msg_rx_ready_delay);
 			msg->msg_rx_delayed = 1;
 			list_add_tail(&msg->msg_list, &lp->lpni_rtrq);
+			spin_unlock(&lp->lpni_lock);
 			return LNET_CREDIT_WAIT;
 		}
+		spin_unlock(&lp->lpni_lock);
 	}
 
 	rbp = lnet_msg2bufpool(msg);
@@ -822,6 +828,7 @@ lnet_return_tx_credits_locked(struct lnet_msg *msg)
 
 			LASSERT(msg2->msg_txni == ni);
 			LASSERT(msg2->msg_tx_delayed);
+			LASSERT(msg2->msg_tx_cpt == msg->msg_tx_cpt);
 
 			(void)lnet_post_send_locked(msg2, 1);
 		}
@@ -831,6 +838,7 @@ lnet_return_tx_credits_locked(struct lnet_msg *msg)
 		/* give back peer txcredits */
 		msg->msg_peertxcredit = 0;
 
+		spin_lock(&txpeer->lpni_lock);
 		LASSERT((txpeer->lpni_txcredits < 0) ==
 			!list_empty(&txpeer->lpni_txq));
 
@@ -842,11 +850,22 @@ lnet_return_tx_credits_locked(struct lnet_msg *msg)
 			msg2 = list_entry(txpeer->lpni_txq.next,
 					  struct lnet_msg, msg_list);
 			list_del(&msg2->msg_list);
+			spin_unlock(&txpeer->lpni_lock);
 
 			LASSERT(msg2->msg_txpeer == txpeer);
 			LASSERT(msg2->msg_tx_delayed);
 
+			if (msg2->msg_tx_cpt != msg->msg_tx_cpt) {
+				lnet_net_unlock(msg->msg_tx_cpt);
+				lnet_net_lock(msg2->msg_tx_cpt);
+			}
 			(void)lnet_post_send_locked(msg2, 1);
+			if (msg2->msg_tx_cpt != msg->msg_tx_cpt) {
+				lnet_net_unlock(msg2->msg_tx_cpt);
+				lnet_net_lock(msg->msg_tx_cpt);
+			}
+		} else {
+			spin_unlock(&txpeer->lpni_lock);
 		}
 	}
 
@@ -887,17 +906,12 @@ lnet_schedule_blocked_locked(struct lnet_rtrbufpool *rbp)
 void
 lnet_drop_routed_msgs_locked(struct list_head *list, int cpt)
 {
-	struct list_head drop;
 	struct lnet_msg *msg;
 
-	INIT_LIST_HEAD(&drop);
-
-	list_splice_init(list, &drop);
-
 	lnet_net_unlock(cpt);
 
-	while(!list_empty(&drop)) {
-		msg = list_first_entry(&drop, struct lnet_msg, msg_list);
+	while (!list_empty(list)) {
+		msg = list_first_entry(list, struct lnet_msg, msg_list);
 		lnet_ni_recv(msg->msg_rxni, msg->msg_private, NULL,
 			     0, 0, 0, msg->msg_hdr.payload_length);
 		list_del_init(&msg->msg_list);
@@ -968,6 +982,7 @@ lnet_return_rx_credits_locked(struct lnet_msg *msg)
 		/* give back peer router credits */
 		msg->msg_peerrtrcredit = 0;
 
+		spin_lock(&rxpeer->lpni_lock);
 		LASSERT((rxpeer->lpni_rtrcredits < 0) ==
 			!list_empty(&rxpeer->lpni_rtrq));
 
@@ -977,14 +992,19 @@ lnet_return_rx_credits_locked(struct lnet_msg *msg)
 		 * peer.
 		 */
 		if (!the_lnet.ln_routing) {
-			lnet_drop_routed_msgs_locked(&rxpeer->lpni_rtrq,
-						     msg->msg_rx_cpt);
+			LIST_HEAD(drop);
+
+			list_splice_init(&rxpeer->lpni_rtrq, &drop);
+			spin_unlock(&rxpeer->lpni_lock);
+			lnet_drop_routed_msgs_locked(&drop, msg->msg_rx_cpt);
 		} else if (rxpeer->lpni_rtrcredits <= 0) {
 			msg2 = list_entry(rxpeer->lpni_rtrq.next,
 					  struct lnet_msg, msg_list);
 			list_del(&msg2->msg_list);
-
+			spin_unlock(&rxpeer->lpni_lock);
 			(void)lnet_post_routed_recv_locked(msg2, 1);
+		} else {
+			spin_unlock(&rxpeer->lpni_lock);
 		}
 	}
 	if (rxni) {
diff --git a/drivers/staging/lustre/lnet/lnet/peer.c b/drivers/staging/lustre/lnet/lnet/peer.c
index dc4527f86113..3555e9bd1db1 100644
--- a/drivers/staging/lustre/lnet/lnet/peer.c
+++ b/drivers/staging/lustre/lnet/lnet/peer.c
@@ -56,12 +56,16 @@ lnet_peer_net_added(struct lnet_net *net)
 				 lpni_on_remote_peer_ni_list) {
 		if (LNET_NIDNET(lpni->lpni_nid) == net->net_id) {
 			lpni->lpni_net = net;
+
+			spin_lock(&lpni->lpni_lock);
 			lpni->lpni_txcredits =
-				lpni->lpni_mintxcredits =
 				lpni->lpni_net->net_tunables.lct_peer_tx_credits;
+			lpni->lpni_mintxcredits =
+				lpni->lpni_txcredits;
 			lpni->lpni_rtrcredits =
-				lpni->lpni_minrtrcredits =
 				lnet_peer_buffer_credits(lpni->lpni_net);
+			lpni->lpni_minrtrcredits = lpni->lpni_rtrcredits;
+			spin_unlock(&lpni->lpni_lock);
 
 			lnet_peer_remove_from_remote_list(lpni);
 		}
diff --git a/drivers/staging/lustre/lnet/lnet/router.c b/drivers/staging/lustre/lnet/lnet/router.c
index d3c41f5664a4..6c50e8cc7833 100644
--- a/drivers/staging/lustre/lnet/lnet/router.c
+++ b/drivers/staging/lustre/lnet/lnet/router.c
@@ -1358,7 +1358,8 @@ lnet_rtrpool_free_bufs(struct lnet_rtrbufpool *rbp, int cpt)
 	INIT_LIST_HEAD(&tmp);
 
 	lnet_net_lock(cpt);
-	lnet_drop_routed_msgs_locked(&rbp->rbp_msgs, cpt);
+	list_splice_init(&rbp->rbp_msgs, &tmp);
+	lnet_drop_routed_msgs_locked(&tmp, cpt);
 	list_splice_init(&rbp->rbp_bufs, &tmp);
 	rbp->rbp_req_nbuffers = 0;
 	rbp->rbp_nbuffers = 0;




More information about the lustre-devel mailing list