[lustre-devel] [PATCH 18/24] lustre: lnet: implement Peer Discovery

James Simmons jsimmons at infradead.org
Sun Oct 14 16:33:36 PDT 2018


> From: Olaf Weber <olaf at sgi.com>
> 
> Implement Peer Discovery.
> 
> A peer is queued for discovery by lnet_peer_queue_for_discovery().
> This set LNET_PEER_DISCOVERING, to indicate that discovery is in
> progress.
> 
> The discovery thread lnet_peer_discovery() checks the peer and
> updates its state as appropriate.
> 
> If LNET_PEER_DATA_PRESENT is set, then a valid Push message or
> Ping reply has been received. The peer is updated in accordance
> with the data, and LNET_PEER_NIDS_UPTODATE is set.
> 
> If LNET_PEER_PING_FAILED is set, then an attempt to send a Ping
> message failed, and peer state is updated accordingly. The discovery
> thread can do some cleanup like unlinking an MD that cannot be done
> from the message event handler.
> 
> If LNET_PEER_PUSH_FAILED is set, then an attempt to send a Push
> message failed, and peer state is updated accordingly. The discovery
> thread can do some cleanup like unlinking an MD that cannot be done
> from the message event handler.
> 
> If LNET_PEER_PING_REQUIRED is set, we must Ping the peer in order to
> correctly update our knowledge of it. This is set, for example, if
> we receive a Push message for a peer, but cannot handle it because
> the Push target was too small. In such a case we know that the
> state of the peer is incorrect, but need to do extra work to obtain
> the required information.
> 
> If discovery is not enabled, then the discovery process stops here
> and the peer is marked with LNET_PEER_UNDISCOVERED. This tells the
> discovery process that it doesn't need to revisit the peer while
> discovery remains disabled.
> 
> If LNET_PEER_NIDS_UPTODATE is not set, then we have reason to think
> the lnet_peer is not up to date, and will Ping it.
> 
> The peer needs a Push if it is multi-rail and the ping buffer
> sequence number for this node is newer than the sequence number it
> has acknowledged receiving by sending an Ack of a Push.
> 
> If none of the above is true, then discovery has completed its work
> on the peer.
> 
> Discovery signals that it is done with a peer by clearing the
> LNET_PEER_DISCOVERING flag, and setting LNET_PEER_DISCOVERED or
> LNET_PEER_UNDISCOVERED as appropriate. It then dequeues the peer
> and clears the LNET_PEER_QUEUED flag.
> 
> When the local node is discovered via the loopback network, the
> peer structure that is created will have an lnet_peer_ni for the
> local loopback interface. Subsequent traffic from this node to
> itself will use the loopback net.

Reviewed-by: James Simmons <jsimmons at infradead.org>
 
> WC-bug-id: https://jira.whamcloud.com/browse/LU-9480
> Signed-off-by: Olaf Weber <olaf at sgi.com>
> Reviewed-on: https://review.whamcloud.com/25789
> Reviewed-by: Olaf Weber <olaf.weber at hpe.com>
> Reviewed-by: Amir Shehata <amir.shehata at intel.com>
> Tested-by: Amir Shehata <amir.shehata at intel.com>
> Signed-off-by: NeilBrown <neilb at suse.com>
> ---
>  .../staging/lustre/include/linux/lnet/lib-lnet.h   |   20 
>  .../staging/lustre/include/linux/lnet/lib-types.h  |   39 +
>  drivers/staging/lustre/lnet/lnet/api-ni.c          |   59 +
>  drivers/staging/lustre/lnet/lnet/lib-move.c        |   18 
>  drivers/staging/lustre/lnet/lnet/peer.c            | 1499 +++++++++++++++++++-
>  5 files changed, 1543 insertions(+), 92 deletions(-)
> 
> diff --git a/drivers/staging/lustre/include/linux/lnet/lib-lnet.h b/drivers/staging/lustre/include/linux/lnet/lib-lnet.h
> index 5632e5aadf41..f82a699371f2 100644
> --- a/drivers/staging/lustre/include/linux/lnet/lib-lnet.h
> +++ b/drivers/staging/lustre/include/linux/lnet/lib-lnet.h
> @@ -76,6 +76,9 @@ extern struct lnet the_lnet;	/* THE network */
>  #define LNET_ACCEPTOR_MIN_RESERVED_PORT    512
>  #define LNET_ACCEPTOR_MAX_RESERVED_PORT    1023
>  
> +/* Discovery timeout - same as default peer_timeout */
> +#define DISCOVERY_TIMEOUT	180
> +
>  static inline int lnet_is_route_alive(struct lnet_route *route)
>  {
>  	/* gateway is down */
> @@ -713,9 +716,10 @@ struct lnet_peer_ni *lnet_nid2peerni_ex(lnet_nid_t nid, int cpt);
>  struct lnet_peer_ni *lnet_find_peer_ni_locked(lnet_nid_t nid);
>  void lnet_peer_net_added(struct lnet_net *net);
>  lnet_nid_t lnet_peer_primary_nid_locked(lnet_nid_t nid);
> -int lnet_discover_peer_locked(struct lnet_peer_ni *lpni, int cpt);
> +int lnet_discover_peer_locked(struct lnet_peer_ni *lpni, int cpt, bool block);
>  int lnet_peer_discovery_start(void);
>  void lnet_peer_discovery_stop(void);
> +void lnet_push_update_to_peers(int force);
>  void lnet_peer_tables_cleanup(struct lnet_net *net);
>  void lnet_peer_uninit(void);
>  int lnet_peer_tables_create(void);
> @@ -805,4 +809,18 @@ lnet_peer_ni_is_primary(struct lnet_peer_ni *lpni)
>  
>  bool lnet_peer_is_uptodate(struct lnet_peer *lp);
>  
> +static inline bool
> +lnet_peer_needs_push(struct lnet_peer *lp)
> +{
> +	if (!(lp->lp_state & LNET_PEER_MULTI_RAIL))
> +		return false;
> +	if (lp->lp_state & LNET_PEER_FORCE_PUSH)
> +		return true;
> +	if (lp->lp_state & LNET_PEER_NO_DISCOVERY)
> +		return false;
> +	if (lp->lp_node_seqno < atomic_read(&the_lnet.ln_ping_target_seqno))
> +		return true;
> +	return false;
> +}
> +
>  #endif
> diff --git a/drivers/staging/lustre/include/linux/lnet/lib-types.h b/drivers/staging/lustre/include/linux/lnet/lib-types.h
> index e00c13355d43..07baa86e61ab 100644
> --- a/drivers/staging/lustre/include/linux/lnet/lib-types.h
> +++ b/drivers/staging/lustre/include/linux/lnet/lib-types.h
> @@ -67,6 +67,13 @@ struct lnet_msg {
>  	lnet_nid_t		msg_from;
>  	__u32			msg_type;
>  
> +	/*
> +	 * hold parameters in case message is with held due
> +	 * to discovery
> +	 */
> +	lnet_nid_t		msg_src_nid_param;
> +	lnet_nid_t		msg_rtr_nid_param;
> +
>  	/* committed for sending */
>  	unsigned int		msg_tx_committed:1;
>  	/* CPT # this message committed for sending */
> @@ -395,6 +402,8 @@ struct lnet_ping_buffer {
>  #define LNET_PING_BUFFER_LONI(PBUF)	((PBUF)->pb_info.pi_ni[0].ns_nid)
>  #define LNET_PING_BUFFER_SEQNO(PBUF)	((PBUF)->pb_info.pi_ni[0].ns_status)
>  
> +#define LNET_PING_INFO_TO_BUFFER(PINFO)	\
> +	container_of((PINFO), struct lnet_ping_buffer, pb_info)
>  
>  /* router checker data, per router */
>  struct lnet_rc_data {
> @@ -503,6 +512,9 @@ struct lnet_peer {
>  	/* list of peer nets */
>  	struct list_head	lp_peer_nets;
>  
> +	/* list of messages pending discovery*/
> +	struct list_head	lp_dc_pendq;
> +
>  	/* primary NID of the peer */
>  	lnet_nid_t		lp_primary_nid;
>  
> @@ -524,15 +536,36 @@ struct lnet_peer {
>  	/* buffer for data pushed by peer */
>  	struct lnet_ping_buffer	*lp_data;
>  
> +	/* MD handle for ping in progress */
> +	struct lnet_handle_md	lp_ping_mdh;
> +
> +	/* MD handle for push in progress */
> +	struct lnet_handle_md	lp_push_mdh;
> +
>  	/* number of NIDs for sizing push data */
>  	int			lp_data_nnis;
>  
>  	/* NI config sequence number of peer */
>  	__u32			lp_peer_seqno;
>  
> -	/* Local NI config sequence number peer knows */
> +	/* Local NI config sequence number acked by peer */
>  	__u32			lp_node_seqno;
>  
> +	/* Local NI config sequence number sent to peer */
> +	__u32			lp_node_seqno_sent;
> +
> +	/* Ping error encountered during discovery. */
> +	int			lp_ping_error;
> +
> +	/* Push error encountered during discovery. */
> +	int			lp_push_error;
> +
> +	/* Error encountered during discovery. */
> +	int			lp_dc_error;
> +
> +	/* time it was put on the ln_dc_working queue */
> +	time64_t		lp_last_queued;
> +
>  	/* link on discovery-related lists */
>  	struct list_head	lp_dc_list;
>  
> @@ -691,6 +724,8 @@ struct lnet_remotenet {
>  #define LNET_CREDIT_OK		0
>  /** lnet message is waiting for credit */
>  #define LNET_CREDIT_WAIT	1
> +/** lnet message is waiting for discovery */
> +#define LNET_DC_WAIT		2
>  
>  struct lnet_rtrbufpool {
>  	struct list_head	rbp_bufs;	/* my free buffer pool */
> @@ -943,6 +978,8 @@ struct lnet {
>  	struct list_head		ln_dc_request;
>  	/* discovery working list */
>  	struct list_head		ln_dc_working;
> +	/* discovery expired list */
> +	struct list_head		ln_dc_expired;
>  	/* discovery thread wait queue */
>  	wait_queue_head_t		ln_dc_waitq;
>  	/* discovery startup/shutdown state */
> diff --git a/drivers/staging/lustre/lnet/lnet/api-ni.c b/drivers/staging/lustre/lnet/lnet/api-ni.c
> index e6bc54e9de71..955d1711eda4 100644
> --- a/drivers/staging/lustre/lnet/lnet/api-ni.c
> +++ b/drivers/staging/lustre/lnet/lnet/api-ni.c
> @@ -41,7 +41,14 @@
>  
>  #define D_LNI D_CONSOLE
>  
> -struct lnet the_lnet;		/* THE state of the network */
> +/*
> + * initialize ln_api_mutex statically, since it needs to be used in
> + * discovery_set callback. That module parameter callback can be called
> + * before module init completes. The mutex needs to be ready for use then.
> + */
> +struct lnet the_lnet = {
> +	.ln_api_mutex = __MUTEX_INITIALIZER(the_lnet.ln_api_mutex),
> +};		/* THE state of the network */
>  EXPORT_SYMBOL(the_lnet);
>  
>  static char *ip2nets = "";
> @@ -101,7 +108,9 @@ static int
>  discovery_set(const char *val, const struct kernel_param *kp)
>  {
>  	int rc;
> +	unsigned int *discovery = (unsigned int *)kp->arg;
>  	unsigned long value;
> +	struct lnet_ping_buffer *pbuf;
>  
>  	rc = kstrtoul(val, 0, &value);
>  	if (rc) {
> @@ -109,7 +118,38 @@ discovery_set(const char *val, const struct kernel_param *kp)
>  		return rc;
>  	}
>  
> -	*(unsigned int *)kp->arg = !!value;
> +	value = !!value;
> +
> +	/*
> +	 * The purpose of locking the api_mutex here is to ensure that
> +	 * the correct value ends up stored properly.
> +	 */
> +	mutex_lock(&the_lnet.ln_api_mutex);
> +
> +	if (value == *discovery) {
> +		mutex_unlock(&the_lnet.ln_api_mutex);
> +		return 0;
> +	}
> +
> +	*discovery = value;
> +
> +	if (the_lnet.ln_state == LNET_STATE_SHUTDOWN) {
> +		mutex_unlock(&the_lnet.ln_api_mutex);
> +		return 0;
> +	}
> +
> +	/* tell peers that discovery setting has changed */
> +	lnet_net_lock(LNET_LOCK_EX);
> +	pbuf = the_lnet.ln_ping_target;
> +	if (value)
> +		pbuf->pb_info.pi_features &= ~LNET_PING_FEAT_DISCOVERY;
> +	else
> +		pbuf->pb_info.pi_features |= LNET_PING_FEAT_DISCOVERY;
> +	lnet_net_unlock(LNET_LOCK_EX);
> +
> +	lnet_push_update_to_peers(1);
> +
> +	mutex_unlock(&the_lnet.ln_api_mutex);
>  
>  	return 0;
>  }
> @@ -171,7 +211,6 @@ lnet_init_locks(void)
>  	init_waitqueue_head(&the_lnet.ln_eq_waitq);
>  	init_waitqueue_head(&the_lnet.ln_rc_waitq);
>  	mutex_init(&the_lnet.ln_lnd_mutex);
> -	mutex_init(&the_lnet.ln_api_mutex);
>  }
>  
>  static int
> @@ -654,6 +693,10 @@ lnet_prepare(lnet_pid_t requested_pid)
>  	INIT_LIST_HEAD(&the_lnet.ln_routers);
>  	INIT_LIST_HEAD(&the_lnet.ln_drop_rules);
>  	INIT_LIST_HEAD(&the_lnet.ln_delay_rules);
> +	INIT_LIST_HEAD(&the_lnet.ln_dc_request);
> +	INIT_LIST_HEAD(&the_lnet.ln_dc_working);
> +	INIT_LIST_HEAD(&the_lnet.ln_dc_expired);
> +	init_waitqueue_head(&the_lnet.ln_dc_waitq);
>  
>  	rc = lnet_create_remote_nets_table();
>  	if (rc)
> @@ -998,7 +1041,8 @@ lnet_ping_target_create(int nnis)
>  	pbuf->pb_info.pi_nnis = nnis;
>  	pbuf->pb_info.pi_pid = the_lnet.ln_pid;
>  	pbuf->pb_info.pi_magic = LNET_PROTO_PING_MAGIC;
> -	pbuf->pb_info.pi_features = LNET_PING_FEAT_NI_STATUS;
> +	pbuf->pb_info.pi_features =
> +		LNET_PING_FEAT_NI_STATUS | LNET_PING_FEAT_MULTI_RAIL;
>  
>  	return pbuf;
>  }
> @@ -1231,6 +1275,8 @@ lnet_ping_target_update(struct lnet_ping_buffer *pbuf,
>  
>  	if (!the_lnet.ln_routing)
>  		pbuf->pb_info.pi_features |= LNET_PING_FEAT_RTE_DISABLED;
> +	if (!lnet_peer_discovery_disabled)
> +		pbuf->pb_info.pi_features |= LNET_PING_FEAT_DISCOVERY;
>  
>  	/* Ensure only known feature bits have been set. */
>  	LASSERT(pbuf->pb_info.pi_features & LNET_PING_FEAT_BITS);
> @@ -1252,6 +1298,8 @@ lnet_ping_target_update(struct lnet_ping_buffer *pbuf,
>  		lnet_ping_md_unlink(old_pbuf, &old_ping_md);
>  		lnet_ping_buffer_decref(old_pbuf);
>  	}
> +
> +	lnet_push_update_to_peers(0);
>  }
>  
>  static void
> @@ -1353,6 +1401,7 @@ static void lnet_push_target_event_handler(struct lnet_event *ev)
>  	if (pbuf->pb_info.pi_magic == __swab32(LNET_PROTO_PING_MAGIC))
>  		lnet_swap_pinginfo(pbuf);
>  
> +	lnet_peer_push_event(ev);
>  	if (ev->unlinked)
>  		lnet_ping_buffer_decref(pbuf);
>  }
> @@ -1910,8 +1959,6 @@ int lnet_lib_init(void)
>  
>  	lnet_assert_wire_constants();
>  
> -	memset(&the_lnet, 0, sizeof(the_lnet));
> -
>  	/* refer to global cfs_cpt_tab for now */
>  	the_lnet.ln_cpt_table	= cfs_cpt_tab;
>  	the_lnet.ln_cpt_number	= cfs_cpt_number(cfs_cpt_tab);
> diff --git a/drivers/staging/lustre/lnet/lnet/lib-move.c b/drivers/staging/lustre/lnet/lnet/lib-move.c
> index 4773180cc7b3..2ff329bf91ba 100644
> --- a/drivers/staging/lustre/lnet/lnet/lib-move.c
> +++ b/drivers/staging/lustre/lnet/lnet/lib-move.c
> @@ -444,6 +444,8 @@ lnet_prep_send(struct lnet_msg *msg, int type, struct lnet_process_id target,
>  
>  	memset(&msg->msg_hdr, 0, sizeof(msg->msg_hdr));
>  	msg->msg_hdr.type	   = cpu_to_le32(type);
> +	/* dest_nid will be overwritten by lnet_select_pathway() */
> +	msg->msg_hdr.dest_nid       = cpu_to_le64(target.nid);
>  	msg->msg_hdr.dest_pid       = cpu_to_le32(target.pid);
>  	/* src_nid will be set later */
>  	msg->msg_hdr.src_pid	= cpu_to_le32(the_lnet.ln_pid);
> @@ -1292,7 +1294,7 @@ lnet_select_pathway(lnet_nid_t src_nid, lnet_nid_t dst_nid,
>  	 */
>  	peer = lpni->lpni_peer_net->lpn_peer;
>  	if (lnet_msg_discovery(msg) && !lnet_peer_is_uptodate(peer)) {
> -		rc = lnet_discover_peer_locked(lpni, cpt);
> +		rc = lnet_discover_peer_locked(lpni, cpt, false);
>  		if (rc) {
>  			lnet_peer_ni_decref_locked(lpni);
>  			lnet_net_unlock(cpt);
> @@ -1300,6 +1302,18 @@ lnet_select_pathway(lnet_nid_t src_nid, lnet_nid_t dst_nid,
>  		}
>  		/* The peer may have changed. */
>  		peer = lpni->lpni_peer_net->lpn_peer;
> +		/* queue message and return */
> +		msg->msg_src_nid_param = src_nid;
> +		msg->msg_rtr_nid_param = rtr_nid;
> +		msg->msg_sending = 0;
> +		list_add_tail(&msg->msg_list, &peer->lp_dc_pendq);
> +		lnet_peer_ni_decref_locked(lpni);
> +		lnet_net_unlock(cpt);
> +
> +		CDEBUG(D_NET, "%s pending discovery\n",
> +		       libcfs_nid2str(peer->lp_primary_nid));
> +
> +		return LNET_DC_WAIT;
>  	}
>  	lnet_peer_ni_decref_locked(lpni);
>  
> @@ -1840,7 +1854,7 @@ lnet_send(lnet_nid_t src_nid, struct lnet_msg *msg, lnet_nid_t rtr_nid)
>  	if (rc == LNET_CREDIT_OK)
>  		lnet_ni_send(msg->msg_txni, msg);
>  
> -	/* rc == LNET_CREDIT_OK or LNET_CREDIT_WAIT */
> +	/* rc == LNET_CREDIT_OK or LNET_CREDIT_WAIT or LNET_DC_WAIT */
>  	return 0;
>  }
>  
> diff --git a/drivers/staging/lustre/lnet/lnet/peer.c b/drivers/staging/lustre/lnet/lnet/peer.c
> index b78f99c354de..1ef4a44e752e 100644
> --- a/drivers/staging/lustre/lnet/lnet/peer.c
> +++ b/drivers/staging/lustre/lnet/lnet/peer.c
> @@ -38,6 +38,11 @@
>  #include <linux/lnet/lib-lnet.h>
>  #include <uapi/linux/lnet/lnet-dlc.h>
>  
> +/* Value indicating that recovery needs to re-check a peer immediately. */
> +#define LNET_REDISCOVER_PEER	(1)
> +
> +static int lnet_peer_queue_for_discovery(struct lnet_peer *lp);
> +
>  static void
>  lnet_peer_remove_from_remote_list(struct lnet_peer_ni *lpni)
>  {
> @@ -202,6 +207,7 @@ lnet_peer_alloc(lnet_nid_t nid)
>  	INIT_LIST_HEAD(&lp->lp_peer_list);
>  	INIT_LIST_HEAD(&lp->lp_peer_nets);
>  	INIT_LIST_HEAD(&lp->lp_dc_list);
> +	INIT_LIST_HEAD(&lp->lp_dc_pendq);
>  	init_waitqueue_head(&lp->lp_dc_waitq);
>  	spin_lock_init(&lp->lp_lock);
>  	lp->lp_primary_nid = nid;
> @@ -220,6 +226,10 @@ lnet_destroy_peer_locked(struct lnet_peer *lp)
>  	LASSERT(atomic_read(&lp->lp_refcount) == 0);
>  	LASSERT(list_empty(&lp->lp_peer_nets));
>  	LASSERT(list_empty(&lp->lp_peer_list));
> +	LASSERT(list_empty(&lp->lp_dc_list));
> +
> +	if (lp->lp_data)
> +		lnet_ping_buffer_decref(lp->lp_data);
>  
>  	kfree(lp);
>  }
> @@ -260,10 +270,19 @@ lnet_peer_detach_peer_ni_locked(struct lnet_peer_ni *lpni)
>  	/*
>  	 * If there are no more peer nets, make the peer unfindable
>  	 * via the peer_tables.
> +	 *
> +	 * Otherwise, if the peer is DISCOVERED, tell discovery to
> +	 * take another look at it. This is a no-op if discovery for
> +	 * this peer did the detaching.
>  	 */
>  	if (list_empty(&lp->lp_peer_nets)) {
>  		list_del_init(&lp->lp_peer_list);
>  		ptable->pt_peers--;
> +	} else if (the_lnet.ln_dc_state != LNET_DC_STATE_RUNNING) {
> +		/* Discovery isn't running, nothing to do here. */
> +	} else if (lp->lp_state & LNET_PEER_DISCOVERED) {
> +		lnet_peer_queue_for_discovery(lp);
> +		wake_up(&the_lnet.ln_dc_waitq);
>  	}
>  	CDEBUG(D_NET, "peer %s NID %s\n",
>  	       libcfs_nid2str(lp->lp_primary_nid),
> @@ -599,6 +618,25 @@ lnet_find_peer_ni_locked(lnet_nid_t nid)
>  	return lpni;
>  }
>  
> +struct lnet_peer *
> +lnet_find_peer(lnet_nid_t nid)
> +{
> +	struct lnet_peer_ni *lpni;
> +	struct lnet_peer *lp = NULL;
> +	int cpt;
> +
> +	cpt = lnet_net_lock_current();
> +	lpni = lnet_find_peer_ni_locked(nid);
> +	if (lpni) {
> +		lp = lpni->lpni_peer_net->lpn_peer;
> +		lnet_peer_addref_locked(lp);
> +		lnet_peer_ni_decref_locked(lpni);
> +	}
> +	lnet_net_unlock(cpt);
> +
> +	return lp;
> +}
> +
>  struct lnet_peer_ni *
>  lnet_get_peer_ni_idx_locked(int idx, struct lnet_peer_net **lpn,
>  			    struct lnet_peer **lp)
> @@ -696,6 +734,37 @@ lnet_get_next_peer_ni_locked(struct lnet_peer *peer,
>  	return lpni;
>  }
>  
> +/*
> + * Start pushes to peers that need to be updated for a configuration
> + * change on this node.
> + */
> +void
> +lnet_push_update_to_peers(int force)
> +{
> +	struct lnet_peer_table *ptable;
> +	struct lnet_peer *lp;
> +	int lncpt;
> +	int cpt;
> +
> +	lnet_net_lock(LNET_LOCK_EX);
> +	lncpt = cfs_percpt_number(the_lnet.ln_peer_tables);
> +	for (cpt = 0; cpt < lncpt; cpt++) {
> +		ptable = the_lnet.ln_peer_tables[cpt];
> +		list_for_each_entry(lp, &ptable->pt_peer_list, lp_peer_list) {
> +			if (force) {
> +				spin_lock(&lp->lp_lock);
> +				if (lp->lp_state & LNET_PEER_MULTI_RAIL)
> +					lp->lp_state |= LNET_PEER_FORCE_PUSH;
> +				spin_unlock(&lp->lp_lock);
> +			}
> +			if (lnet_peer_needs_push(lp))
> +				lnet_peer_queue_for_discovery(lp);
> +		}
> +	}
> +	lnet_net_unlock(LNET_LOCK_EX);
> +	wake_up(&the_lnet.ln_dc_waitq);
> +}
> +
>  /*
>   * Test whether a ni is a preferred ni for this peer_ni, e.g, whether
>   * this is a preferred point-to-point path. Call with lnet_net_lock in
> @@ -941,6 +1010,7 @@ lnet_peer_primary_nid_locked(lnet_nid_t nid)
>  lnet_nid_t
>  LNetPrimaryNID(lnet_nid_t nid)
>  {
> +	struct lnet_peer *lp;
>  	struct lnet_peer_ni *lpni;
>  	lnet_nid_t primary_nid = nid;
>  	int rc = 0;
> @@ -952,7 +1022,15 @@ LNetPrimaryNID(lnet_nid_t nid)
>  		rc = PTR_ERR(lpni);
>  		goto out_unlock;
>  	}
> -	primary_nid = lpni->lpni_peer_net->lpn_peer->lp_primary_nid;
> +	lp = lpni->lpni_peer_net->lpn_peer;
> +	while (!lnet_peer_is_uptodate(lp)) {
> +		rc = lnet_discover_peer_locked(lpni, cpt, true);
> +		if (rc)
> +			goto out_decref;
> +		lp = lpni->lpni_peer_net->lpn_peer;
> +	}
> +	primary_nid = lp->lp_primary_nid;
> +out_decref:
>  	lnet_peer_ni_decref_locked(lpni);
>  out_unlock:
>  	lnet_net_unlock(cpt);
> @@ -1229,6 +1307,30 @@ lnet_peer_add_nid(struct lnet_peer *lp, lnet_nid_t nid, unsigned int flags)
>  	return rc;
>  }
>  
> +/*
> + * Update the primary NID of a peer, if possible.
> + *
> + * Call with the lnet_api_mutex held.
> + */
> +static int
> +lnet_peer_set_primary_nid(struct lnet_peer *lp, lnet_nid_t nid,
> +			  unsigned int flags)
> +{
> +	lnet_nid_t old = lp->lp_primary_nid;
> +	int rc = 0;
> +
> +	if (lp->lp_primary_nid == nid)
> +		goto out;
> +	rc = lnet_peer_add_nid(lp, nid, flags);
> +	if (rc)
> +		goto out;
> +	lp->lp_primary_nid = nid;
> +out:
> +	CDEBUG(D_NET, "peer %s NID %s: %d\n",
> +	       libcfs_nid2str(old), libcfs_nid2str(nid), rc);
> +	return rc;
> +}
> +
>  /*
>   * lpni creation initiated due to traffic either sending or receiving.
>   */
> @@ -1548,11 +1650,15 @@ lnet_peer_is_uptodate(struct lnet_peer *lp)
>  			    LNET_PEER_FORCE_PING |
>  			    LNET_PEER_FORCE_PUSH)) {
>  		rc = false;
> +	} else if (lp->lp_state & LNET_PEER_NO_DISCOVERY) {
> +		rc = true;
>  	} else if (lp->lp_state & LNET_PEER_REDISCOVER) {
>  		if (lnet_peer_discovery_disabled)
>  			rc = true;
>  		else
>  			rc = false;
> +	} else if (lnet_peer_needs_push(lp)) {
> +		rc = false;
>  	} else if (lp->lp_state & LNET_PEER_DISCOVERED) {
>  		if (lp->lp_state & LNET_PEER_NIDS_UPTODATE)
>  			rc = true;
> @@ -1588,6 +1694,9 @@ static int lnet_peer_queue_for_discovery(struct lnet_peer *lp)
>  		rc = -EALREADY;
>  	}
>  
> +	CDEBUG(D_NET, "Queue peer %s: %d\n",
> +	       libcfs_nid2str(lp->lp_primary_nid), rc);
> +
>  	return rc;
>  }
>  
> @@ -1597,9 +1706,252 @@ static int lnet_peer_queue_for_discovery(struct lnet_peer *lp)
>   */
>  static void lnet_peer_discovery_complete(struct lnet_peer *lp)
>  {
> +	struct lnet_msg *msg = NULL;
> +	int rc = 0;
> +	struct list_head pending_msgs;
> +
> +	INIT_LIST_HEAD(&pending_msgs);
> +
> +	CDEBUG(D_NET, "Discovery complete. Dequeue peer %s\n",
> +	       libcfs_nid2str(lp->lp_primary_nid));
> +
>  	list_del_init(&lp->lp_dc_list);
> +	list_splice_init(&lp->lp_dc_pendq, &pending_msgs);
>  	wake_up_all(&lp->lp_dc_waitq);
> +
> +	lnet_net_unlock(LNET_LOCK_EX);
> +
> +	/* iterate through all pending messages and send them again */
> +	list_for_each_entry(msg, &pending_msgs, msg_list) {
> +		if (lp->lp_dc_error) {
> +			lnet_finalize(msg, lp->lp_dc_error);
> +			continue;
> +		}
> +
> +		CDEBUG(D_NET, "sending pending message %s to target %s\n",
> +		       lnet_msgtyp2str(msg->msg_type),
> +		       libcfs_id2str(msg->msg_target));
> +		rc = lnet_send(msg->msg_src_nid_param, msg,
> +			       msg->msg_rtr_nid_param);
> +		if (rc < 0) {
> +			CNETERR("Error sending %s to %s: %d\n",
> +				lnet_msgtyp2str(msg->msg_type),
> +				libcfs_id2str(msg->msg_target), rc);
> +			lnet_finalize(msg, rc);
> +		}
> +	}
> +	lnet_net_lock(LNET_LOCK_EX);
> +	lnet_peer_decref_locked(lp);
> +}
> +
> +/*
> + * Handle inbound push.
> + * Like any event handler, called with lnet_res_lock/CPT held.
> + */
> +void lnet_peer_push_event(struct lnet_event *ev)
> +{
> +	struct lnet_ping_buffer *pbuf = ev->md.user_ptr;
> +	struct lnet_peer *lp;
> +
> +	/* lnet_find_peer() adds a refcount */
> +	lp = lnet_find_peer(ev->source.nid);
> +	if (!lp) {
> +		CERROR("Push Put from unknown %s (source %s)\n",
> +		       libcfs_nid2str(ev->initiator.nid),
> +		       libcfs_nid2str(ev->source.nid));
> +		return;
> +	}
> +
> +	/* Ensure peer state remains consistent while we modify it. */
> +	spin_lock(&lp->lp_lock);
> +
> +	/*
> +	 * If some kind of error happened the contents of the message
> +	 * cannot be used. Clear the NIDS_UPTODATE and set the
> +	 * FORCE_PING flag to trigger a ping.
> +	 */
> +	if (ev->status) {
> +		lp->lp_state &= ~LNET_PEER_NIDS_UPTODATE;
> +		lp->lp_state |= LNET_PEER_FORCE_PING;
> +		CDEBUG(D_NET, "Push Put error %d from %s (source %s)\n",
> +		       ev->status,
> +		       libcfs_nid2str(lp->lp_primary_nid),
> +		       libcfs_nid2str(ev->source.nid));
> +		goto out;
> +	}
> +
> +	/*
> +	 * A push with invalid or corrupted info. Clear the UPTODATE
> +	 * flag to trigger a ping.
> +	 */
> +	if (lnet_ping_info_validate(&pbuf->pb_info)) {
> +		lp->lp_state &= ~LNET_PEER_NIDS_UPTODATE;
> +		lp->lp_state |= LNET_PEER_FORCE_PING;
> +		CDEBUG(D_NET, "Corrupted Push from %s\n",
> +		       libcfs_nid2str(lp->lp_primary_nid));
> +		goto out;
> +	}
> +
> +	/*
> +	 * Make sure we'll allocate the correct size ping buffer when
> +	 * pinging the peer.
> +	 */
> +	if (lp->lp_data_nnis < pbuf->pb_info.pi_nnis)
> +		lp->lp_data_nnis = pbuf->pb_info.pi_nnis;
> +
> +	/*
> +	 * A non-Multi-Rail peer is not supposed to be capable of
> +	 * sending a push.
> +	 */
> +	if (!(pbuf->pb_info.pi_features & LNET_PING_FEAT_MULTI_RAIL)) {
> +		CERROR("Push from non-Multi-Rail peer %s dropped\n",
> +		       libcfs_nid2str(lp->lp_primary_nid));
> +		goto out;
> +	}
> +
> +	/*
> +	 * Check the MULTIRAIL flag. Complain if the peer was DLC
> +	 * configured without it.
> +	 */
> +	if (!(lp->lp_state & LNET_PEER_MULTI_RAIL)) {
> +		if (lp->lp_state & LNET_PEER_CONFIGURED) {
> +			CERROR("Push says %s is Multi-Rail, DLC says not\n",
> +			       libcfs_nid2str(lp->lp_primary_nid));
> +		} else {
> +			lp->lp_state |= LNET_PEER_MULTI_RAIL;
> +			lnet_peer_clr_non_mr_pref_nids(lp);
> +		}
> +	}
> +
> +	/*
> +	 * The peer may have discovery disabled at its end. Set
> +	 * NO_DISCOVERY as appropriate.
> +	 */
> +	if (!(pbuf->pb_info.pi_features & LNET_PING_FEAT_DISCOVERY)) {
> +		CDEBUG(D_NET, "Peer %s has discovery disabled\n",
> +		       libcfs_nid2str(lp->lp_primary_nid));
> +		lp->lp_state |= LNET_PEER_NO_DISCOVERY;
> +	} else if (lp->lp_state & LNET_PEER_NO_DISCOVERY) {
> +		CDEBUG(D_NET, "Peer %s has discovery enabled\n",
> +		       libcfs_nid2str(lp->lp_primary_nid));
> +		lp->lp_state &= ~LNET_PEER_NO_DISCOVERY;
> +	}
> +
> +	/*
> +	 * Check for truncation of the Put message. Clear the
> +	 * NIDS_UPTODATE flag and set FORCE_PING to trigger a ping,
> +	 * and tell discovery to allocate a bigger buffer.
> +	 */
> +	if (pbuf->pb_nnis < pbuf->pb_info.pi_nnis) {
> +		if (the_lnet.ln_push_target_nnis < pbuf->pb_info.pi_nnis)
> +			the_lnet.ln_push_target_nnis = pbuf->pb_info.pi_nnis;
> +		lp->lp_state &= ~LNET_PEER_NIDS_UPTODATE;
> +		lp->lp_state |= LNET_PEER_FORCE_PING;
> +		CDEBUG(D_NET, "Truncated Push from %s (%d nids)\n",
> +		       libcfs_nid2str(lp->lp_primary_nid),
> +		       pbuf->pb_info.pi_nnis);
> +		goto out;
> +	}
> +
> +	/*
> +	 * Check whether the Put data is stale. Stale data can just be
> +	 * dropped.
> +	 */
> +	if (pbuf->pb_info.pi_nnis > 1 &&
> +	    lp->lp_primary_nid == pbuf->pb_info.pi_ni[1].ns_nid &&
> +	    LNET_PING_BUFFER_SEQNO(pbuf) < lp->lp_peer_seqno) {
> +		CDEBUG(D_NET, "Stale Push from %s: got %u have %u\n",
> +		       libcfs_nid2str(lp->lp_primary_nid),
> +		       LNET_PING_BUFFER_SEQNO(pbuf),
> +		       lp->lp_peer_seqno);
> +		goto out;
> +	}
> +
> +	/*
> +	 * Check whether the Put data is new, in which case we clear
> +	 * the UPTODATE flag and prepare to process it.
> +	 *
> +	 * If the Put data is current, and the peer is UPTODATE then
> +	 * we assome everything is all right and drop the data as
> +	 * stale.
> +	 */
> +	if (LNET_PING_BUFFER_SEQNO(pbuf) > lp->lp_peer_seqno) {
> +		lp->lp_peer_seqno = LNET_PING_BUFFER_SEQNO(pbuf);
> +		lp->lp_state &= ~LNET_PEER_NIDS_UPTODATE;
> +	} else if (lp->lp_state & LNET_PEER_NIDS_UPTODATE) {
> +		CDEBUG(D_NET, "Stale Push from %s: got %u have %u\n",
> +		       libcfs_nid2str(lp->lp_primary_nid),
> +		       LNET_PING_BUFFER_SEQNO(pbuf),
> +		       lp->lp_peer_seqno);
> +		goto out;
> +	}
> +
> +	/*
> +	 * If there is data present that hasn't been processed yet,
> +	 * we'll replace it if the Put contained newer data and it
> +	 * fits. We're racing with a Ping or earlier Push in this
> +	 * case.
> +	 */
> +	if (lp->lp_state & LNET_PEER_DATA_PRESENT) {
> +		if (LNET_PING_BUFFER_SEQNO(pbuf) >
> +			LNET_PING_BUFFER_SEQNO(lp->lp_data) &&
> +		    pbuf->pb_info.pi_nnis <= lp->lp_data->pb_nnis) {
> +			memcpy(&lp->lp_data->pb_info, &pbuf->pb_info,
> +			       LNET_PING_INFO_SIZE(pbuf->pb_info.pi_nnis));
> +			CDEBUG(D_NET, "Ping/Push race from %s: %u vs %u\n",
> +			       libcfs_nid2str(lp->lp_primary_nid),
> +			       LNET_PING_BUFFER_SEQNO(pbuf),
> +			       LNET_PING_BUFFER_SEQNO(lp->lp_data));
> +		}
> +		goto out;
> +	}
> +
> +	/*
> +	 * Allocate a buffer to copy the data. On a failure we drop
> +	 * the Push and set FORCE_PING to force the discovery
> +	 * thread to fix the problem by pinging the peer.
> +	 */
> +	lp->lp_data = lnet_ping_buffer_alloc(lp->lp_data_nnis, GFP_ATOMIC);
> +	if (!lp->lp_data) {
> +		lp->lp_state |= LNET_PEER_FORCE_PING;
> +		CDEBUG(D_NET, "Cannot allocate Push buffer for %s %u\n",
> +		       libcfs_nid2str(lp->lp_primary_nid),
> +		       LNET_PING_BUFFER_SEQNO(pbuf));
> +		goto out;
> +	}
> +
> +	/* Success */
> +	memcpy(&lp->lp_data->pb_info, &pbuf->pb_info,
> +	       LNET_PING_INFO_SIZE(pbuf->pb_info.pi_nnis));
> +	lp->lp_state |= LNET_PEER_DATA_PRESENT;
> +	CDEBUG(D_NET, "Received Push %s %u\n",
> +	       libcfs_nid2str(lp->lp_primary_nid),
> +	       LNET_PING_BUFFER_SEQNO(pbuf));
> +
> +out:
> +	/*
> +	 * Queue the peer for discovery, and wake the discovery thread
> +	 * if the peer was already queued, because its status changed.
> +	 */
> +	spin_unlock(&lp->lp_lock);
> +	lnet_net_lock(LNET_LOCK_EX);
> +	if (lnet_peer_queue_for_discovery(lp))
> +		wake_up(&the_lnet.ln_dc_waitq);
> +	/* Drop refcount from lookup */
>  	lnet_peer_decref_locked(lp);
> +	lnet_net_unlock(LNET_LOCK_EX);
> +}
> +
> +/*
> + * Clear the discovery error state, unless we're already discovering
> + * this peer, in which case the error is current.
> + */
> +static void lnet_peer_clear_discovery_error(struct lnet_peer *lp)
> +{
> +	spin_lock(&lp->lp_lock);
> +	if (!(lp->lp_state & LNET_PEER_DISCOVERING))
> +		lp->lp_dc_error = 0;
> +	spin_unlock(&lp->lp_lock);
>  }
>  
>  /*
> @@ -1608,7 +1960,7 @@ static void lnet_peer_discovery_complete(struct lnet_peer *lp)
>   * because discovery could tear down an lnet_peer.
>   */
>  int
> -lnet_discover_peer_locked(struct lnet_peer_ni *lpni, int cpt)
> +lnet_discover_peer_locked(struct lnet_peer_ni *lpni, int cpt, bool block)
>  {
>  	DEFINE_WAIT(wait);
>  	struct lnet_peer *lp;
> @@ -1617,25 +1969,40 @@ lnet_discover_peer_locked(struct lnet_peer_ni *lpni, int cpt)
>  again:
>  	lnet_net_unlock(cpt);
>  	lnet_net_lock(LNET_LOCK_EX);
> +	lp = lpni->lpni_peer_net->lpn_peer;
> +	lnet_peer_clear_discovery_error(lp);
>  
> -	/* We're willing to be interrupted. */
> +	/*
> +	 * We're willing to be interrupted. The lpni can become a
> +	 * zombie if we race with DLC, so we must check for that.
> +	 */
>  	for (;;) {
> -		lp = lpni->lpni_peer_net->lpn_peer;
>  		prepare_to_wait(&lp->lp_dc_waitq, &wait, TASK_INTERRUPTIBLE);
>  		if (signal_pending(current))
>  			break;
>  		if (the_lnet.ln_dc_state != LNET_DC_STATE_RUNNING)
>  			break;
> +		if (lp->lp_dc_error)
> +			break;
>  		if (lnet_peer_is_uptodate(lp))
>  			break;
>  		lnet_peer_queue_for_discovery(lp);
>  		lnet_peer_addref_locked(lp);
> +		/*
> +		 * if caller requested a non-blocking operation then
> +		 * return immediately. Once discovery is complete then the
> +		 * peer ref will be decremented and any pending messages
> +		 * that were stopped due to discovery will be transmitted.
> +		 */
> +		if (!block)
> +			break;
>  		lnet_net_unlock(LNET_LOCK_EX);
>  		schedule();
>  		finish_wait(&lp->lp_dc_waitq, &wait);
>  		lnet_net_lock(LNET_LOCK_EX);
>  		lnet_peer_decref_locked(lp);
> -		/* Do not use lp beyond this point. */
> +		/* Peer may have changed */
> +		lp = lpni->lpni_peer_net->lpn_peer;
>  	}
>  	finish_wait(&lp->lp_dc_waitq, &wait);
>  
> @@ -1646,71 +2013,969 @@ lnet_discover_peer_locked(struct lnet_peer_ni *lpni, int cpt)
>  		rc = -EINTR;
>  	else if (the_lnet.ln_dc_state != LNET_DC_STATE_RUNNING)
>  		rc = -ESHUTDOWN;
> +	else if (lp->lp_dc_error)
> +		rc = lp->lp_dc_error;
> +	else if (!block)
> +		CDEBUG(D_NET, "non-blocking discovery\n");
>  	else if (!lnet_peer_is_uptodate(lp))
>  		goto again;
>  
> +	CDEBUG(D_NET, "peer %s NID %s: %d. %s\n",
> +	       (lp ? libcfs_nid2str(lp->lp_primary_nid) : "(none)"),
> +	       libcfs_nid2str(lpni->lpni_nid), rc,
> +	       (!block) ? "pending discovery" : "discovery complete");
> +
>  	return rc;
>  }
>  
> -/*
> - * Event handler for the discovery EQ.
> - *
> - * Called with lnet_res_lock(cpt) held. The cpt is the
> - * lnet_cpt_of_cookie() of the md handle cookie.
> - */
> -static void lnet_discovery_event_handler(struct lnet_event *event)
> +/* Handle an incoming ack for a push. */
> +static void
> +lnet_discovery_event_ack(struct lnet_peer *lp, struct lnet_event *ev)
>  {
> -	wake_up(&the_lnet.ln_dc_waitq);
> +	struct lnet_ping_buffer *pbuf;
> +
> +	pbuf = LNET_PING_INFO_TO_BUFFER(ev->md.start);
> +	spin_lock(&lp->lp_lock);
> +	lp->lp_state &= ~LNET_PEER_PUSH_SENT;
> +	lp->lp_push_error = ev->status;
> +	if (ev->status)
> +		lp->lp_state |= LNET_PEER_PUSH_FAILED;
> +	else
> +		lp->lp_node_seqno = LNET_PING_BUFFER_SEQNO(pbuf);
> +	spin_unlock(&lp->lp_lock);
> +
> +	CDEBUG(D_NET, "peer %s ev->status %d\n",
> +	       libcfs_nid2str(lp->lp_primary_nid), ev->status);
>  }
>  
> -/*
> - * Wait for work to be queued or some other change that must be
> - * attended to. Returns non-zero if the discovery thread should shut
> - * down.
> - */
> -static int lnet_peer_discovery_wait_for_work(void)
> +/* Handle a Reply message. This is the reply to a Ping message. */
> +static void
> +lnet_discovery_event_reply(struct lnet_peer *lp, struct lnet_event *ev)
>  {
> -	int cpt;
> -	int rc = 0;
> +	struct lnet_ping_buffer *pbuf;
> +	int rc;
>  
> -	DEFINE_WAIT(wait);
> +	spin_lock(&lp->lp_lock);
>  
> -	cpt = lnet_net_lock_current();
> -	for (;;) {
> -		prepare_to_wait(&the_lnet.ln_dc_waitq, &wait,
> -				TASK_IDLE);
> -		if (the_lnet.ln_dc_state == LNET_DC_STATE_STOPPING)
> -			break;
> -		if (lnet_push_target_resize_needed())
> -			break;
> -		if (!list_empty(&the_lnet.ln_dc_request))
> -			break;
> -		lnet_net_unlock(cpt);
> -		schedule();
> -		finish_wait(&the_lnet.ln_dc_waitq, &wait);
> -		cpt = lnet_net_lock_current();
> +	/*
> +	 * If some kind of error happened the contents of message
> +	 * cannot be used. Set PING_FAILED to trigger a retry.
> +	 */
> +	if (ev->status) {
> +		lp->lp_state |= LNET_PEER_PING_FAILED;
> +		lp->lp_ping_error = ev->status;
> +		CDEBUG(D_NET, "Ping Reply error %d from %s (source %s)\n",
> +		       ev->status,
> +		       libcfs_nid2str(lp->lp_primary_nid),
> +		       libcfs_nid2str(ev->source.nid));
> +		goto out;
>  	}
> -	finish_wait(&the_lnet.ln_dc_waitq, &wait);
> -
> -	if (the_lnet.ln_dc_state == LNET_DC_STATE_STOPPING)
> -		rc = -ESHUTDOWN;
>  
> -	lnet_net_unlock(cpt);
> +	pbuf = LNET_PING_INFO_TO_BUFFER(ev->md.start);
> +	if (pbuf->pb_info.pi_magic == __swab32(LNET_PROTO_PING_MAGIC))
> +		lnet_swap_pinginfo(pbuf);
>  
> -	CDEBUG(D_NET, "woken: %d\n", rc);
> +	/*
> +	 * A reply with invalid or corrupted info. Set PING_FAILED to
> +	 * trigger a retry.
> +	 */
> +	rc = lnet_ping_info_validate(&pbuf->pb_info);
> +	if (rc) {
> +		lp->lp_state |= LNET_PEER_PING_FAILED;
> +		lp->lp_ping_error = 0;
> +		CDEBUG(D_NET, "Corrupted Ping Reply from %s: %d\n",
> +		       libcfs_nid2str(lp->lp_primary_nid), rc);
> +		goto out;
> +	}
>  
> -	return rc;
> -}
> +	/*
> +	 * Update the MULTI_RAIL flag based on the reply. If the peer
> +	 * was configured with DLC then the setting should match what
> +	 * DLC put in.
> +	 */
> +	if (pbuf->pb_info.pi_features & LNET_PING_FEAT_MULTI_RAIL) {
> +		if (lp->lp_state & LNET_PEER_MULTI_RAIL) {
> +			/* Everything's fine */
> +		} else if (lp->lp_state & LNET_PEER_CONFIGURED) {
> +			CWARN("Reply says %s is Multi-Rail, DLC says not\n",
> +			      libcfs_nid2str(lp->lp_primary_nid));
> +		} else {
> +			lp->lp_state |= LNET_PEER_MULTI_RAIL;
> +			lnet_peer_clr_non_mr_pref_nids(lp);
> +		}
> +	} else if (lp->lp_state & LNET_PEER_MULTI_RAIL) {
> +		if (lp->lp_state & LNET_PEER_CONFIGURED) {
> +			CWARN("DLC says %s is Multi-Rail, Reply says not\n",
> +			      libcfs_nid2str(lp->lp_primary_nid));
> +		} else {
> +			CERROR("Multi-Rail state vanished from %s\n",
> +			       libcfs_nid2str(lp->lp_primary_nid));
> +			lp->lp_state &= ~LNET_PEER_MULTI_RAIL;
> +		}
> +	}
>  
> -/* The discovery thread. */
> -static int lnet_peer_discovery(void *arg)
> -{
> -	struct lnet_peer *lp;
> +	/*
> +	 * Make sure we'll allocate the correct size ping buffer when
> +	 * pinging the peer.
> +	 */
> +	if (lp->lp_data_nnis < pbuf->pb_info.pi_nnis)
> +		lp->lp_data_nnis = pbuf->pb_info.pi_nnis;
>  
> -	CDEBUG(D_NET, "started\n");
> +	/*
> +	 * The peer may have discovery disabled at its end. Set
> +	 * NO_DISCOVERY as appropriate.
> +	 */
> +	if (!(pbuf->pb_info.pi_features & LNET_PING_FEAT_DISCOVERY)) {
> +		CDEBUG(D_NET, "Peer %s has discovery disabled\n",
> +		       libcfs_nid2str(lp->lp_primary_nid));
> +		lp->lp_state |= LNET_PEER_NO_DISCOVERY;
> +	} else if (lp->lp_state & LNET_PEER_NO_DISCOVERY) {
> +		CDEBUG(D_NET, "Peer %s has discovery enabled\n",
> +		       libcfs_nid2str(lp->lp_primary_nid));
> +		lp->lp_state &= ~LNET_PEER_NO_DISCOVERY;
> +	}
>  
> -	for (;;) {
> -		if (lnet_peer_discovery_wait_for_work())
> +	/*
> +	 * Check for truncation of the Reply. Clear PING_SENT and set
> +	 * PING_FAILED to trigger a retry.
> +	 */
> +	if (pbuf->pb_nnis < pbuf->pb_info.pi_nnis) {
> +		if (the_lnet.ln_push_target_nnis < pbuf->pb_info.pi_nnis)
> +			the_lnet.ln_push_target_nnis = pbuf->pb_info.pi_nnis;
> +		lp->lp_state |= LNET_PEER_PING_FAILED;
> +		lp->lp_ping_error = 0;
> +		CDEBUG(D_NET, "Truncated Reply from %s (%d nids)\n",
> +		       libcfs_nid2str(lp->lp_primary_nid),
> +		       pbuf->pb_info.pi_nnis);
> +		goto out;
> +	}
> +
> +	/*
> +	 * Check the sequence numbers in the reply. These are only
> +	 * available if the reply came from a Multi-Rail peer.
> +	 */
> +	if (pbuf->pb_info.pi_features & LNET_PING_FEAT_MULTI_RAIL &&
> +	    pbuf->pb_info.pi_nnis > 1 &&
> +	    lp->lp_primary_nid == pbuf->pb_info.pi_ni[1].ns_nid) {
> +		if (LNET_PING_BUFFER_SEQNO(pbuf) < lp->lp_peer_seqno) {
> +			CDEBUG(D_NET, "Stale Reply from %s: got %u have %u\n",
> +			       libcfs_nid2str(lp->lp_primary_nid),
> +			       LNET_PING_BUFFER_SEQNO(pbuf),
> +			       lp->lp_peer_seqno);
> +			goto out;
> +		}
> +
> +		if (LNET_PING_BUFFER_SEQNO(pbuf) > lp->lp_peer_seqno)
> +			lp->lp_peer_seqno = LNET_PING_BUFFER_SEQNO(pbuf);
> +	}
> +
> +	/* We're happy with the state of the data in the buffer. */
> +	CDEBUG(D_NET, "peer %s data present %u\n",
> +	       libcfs_nid2str(lp->lp_primary_nid), lp->lp_peer_seqno);
> +	if (lp->lp_state & LNET_PEER_DATA_PRESENT)
> +		lnet_ping_buffer_decref(lp->lp_data);
> +	else
> +		lp->lp_state |= LNET_PEER_DATA_PRESENT;
> +	lnet_ping_buffer_addref(pbuf);
> +	lp->lp_data = pbuf;
> +out:
> +	lp->lp_state &= ~LNET_PEER_PING_SENT;
> +	spin_unlock(&lp->lp_lock);
> +}
> +
> +/*
> + * Send event handling. Only matters for error cases, where we clean
> + * up state on the peer and peer_ni that would otherwise be updated in
> + * the REPLY event handler for a successful Ping, and the ACK event
> + * handler for a successful Push.
> + */
> +static int
> +lnet_discovery_event_send(struct lnet_peer *lp, struct lnet_event *ev)
> +{
> +	int rc = 0;
> +
> +	if (!ev->status)
> +		goto out;
> +
> +	spin_lock(&lp->lp_lock);
> +	if (ev->msg_type == LNET_MSG_GET) {
> +		lp->lp_state &= ~LNET_PEER_PING_SENT;
> +		lp->lp_state |= LNET_PEER_PING_FAILED;
> +		lp->lp_ping_error = ev->status;
> +	} else { /* ev->msg_type == LNET_MSG_PUT */
> +		lp->lp_state &= ~LNET_PEER_PUSH_SENT;
> +		lp->lp_state |= LNET_PEER_PUSH_FAILED;
> +		lp->lp_push_error = ev->status;
> +	}
> +	spin_unlock(&lp->lp_lock);
> +	rc = LNET_REDISCOVER_PEER;
> +out:
> +	CDEBUG(D_NET, "%s Send to %s: %d\n",
> +	       (ev->msg_type == LNET_MSG_GET ? "Ping" : "Push"),
> +	       libcfs_nid2str(ev->target.nid), rc);
> +	return rc;
> +}
> +
> +/*
> + * Unlink event handling. This event is only seen if a call to
> + * LNetMDUnlink() caused the event to be unlinked. If this call was
> + * made after the event was set up in LNetGet() or LNetPut() then we
> + * assume the Ping or Push timed out.
> + */
> +static void
> +lnet_discovery_event_unlink(struct lnet_peer *lp, struct lnet_event *ev)
> +{
> +	spin_lock(&lp->lp_lock);
> +	/* We've passed through LNetGet() */
> +	if (lp->lp_state & LNET_PEER_PING_SENT) {
> +		lp->lp_state &= ~LNET_PEER_PING_SENT;
> +		lp->lp_state |= LNET_PEER_PING_FAILED;
> +		lp->lp_ping_error = -ETIMEDOUT;
> +		CDEBUG(D_NET, "Ping Unlink for message to peer %s\n",
> +		       libcfs_nid2str(lp->lp_primary_nid));
> +	}
> +	/* We've passed through LNetPut() */
> +	if (lp->lp_state & LNET_PEER_PUSH_SENT) {
> +		lp->lp_state &= ~LNET_PEER_PUSH_SENT;
> +		lp->lp_state |= LNET_PEER_PUSH_FAILED;
> +		lp->lp_push_error = -ETIMEDOUT;
> +		CDEBUG(D_NET, "Push Unlink for message to peer %s\n",
> +		       libcfs_nid2str(lp->lp_primary_nid));
> +	}
> +	spin_unlock(&lp->lp_lock);
> +}
> +
> +/*
> + * Event handler for the discovery EQ.
> + *
> + * Called with lnet_res_lock(cpt) held. The cpt is the
> + * lnet_cpt_of_cookie() of the md handle cookie.
> + */
> +static void lnet_discovery_event_handler(struct lnet_event *event)
> +{
> +	struct lnet_peer *lp = event->md.user_ptr;
> +	struct lnet_ping_buffer *pbuf;
> +	int rc;
> +
> +	/* discovery needs to take another look */
> +	rc = LNET_REDISCOVER_PEER;
> +
> +	CDEBUG(D_NET, "Received event: %d\n", event->type);
> +
> +	switch (event->type) {
> +	case LNET_EVENT_ACK:
> +		lnet_discovery_event_ack(lp, event);
> +		break;
> +	case LNET_EVENT_REPLY:
> +		lnet_discovery_event_reply(lp, event);
> +		break;
> +	case LNET_EVENT_SEND:
> +		/* Only send failure triggers a retry. */
> +		rc = lnet_discovery_event_send(lp, event);
> +		break;
> +	case LNET_EVENT_UNLINK:
> +		/* LNetMDUnlink() was called */
> +		lnet_discovery_event_unlink(lp, event);
> +		break;
> +	default:
> +		/* Invalid events. */
> +		LBUG();
> +	}
> +	lnet_net_lock(LNET_LOCK_EX);
> +	if (event->unlinked) {
> +		pbuf = LNET_PING_INFO_TO_BUFFER(event->md.start);
> +		lnet_ping_buffer_decref(pbuf);
> +		lnet_peer_decref_locked(lp);
> +	}
> +	if (rc == LNET_REDISCOVER_PEER) {
> +		list_move_tail(&lp->lp_dc_list, &the_lnet.ln_dc_request);
> +		wake_up(&the_lnet.ln_dc_waitq);
> +	}
> +	lnet_net_unlock(LNET_LOCK_EX);
> +}
> +
> +/*
> + * Build a peer from incoming data.
> + *
> + * The NIDs in the incoming data are supposed to be structured as follows:
> + *  - loopback
> + *  - primary NID
> + *  - other NIDs in same net
> + *  - NIDs in second net
> + *  - NIDs in third net
> + *  - ...
> + * This due to the way the list of NIDs in the data is created.
> + *
> + * Note that this function will mark the peer uptodate unless an
> + * ENOMEM is encontered. All other errors are due to a conflict
> + * between the DLC configuration and what discovery sees. We treat DLC
> + * as binding, and therefore set the NIDS_UPTODATE flag to prevent the
> + * peer from becoming stuck in discovery.
> + */
> +static int lnet_peer_merge_data(struct lnet_peer *lp,
> +				struct lnet_ping_buffer *pbuf)
> +{
> +	struct lnet_peer_ni *lpni;
> +	lnet_nid_t *curnis = NULL;
> +	lnet_nid_t *addnis = NULL;
> +	lnet_nid_t *delnis = NULL;
> +	unsigned int flags;
> +	int ncurnis;
> +	int naddnis;
> +	int ndelnis;
> +	int nnis = 0;
> +	int i;
> +	int j;
> +	int rc;
> +
> +	flags = LNET_PEER_DISCOVERED;
> +	if (pbuf->pb_info.pi_features & LNET_PING_FEAT_MULTI_RAIL)
> +		flags |= LNET_PEER_MULTI_RAIL;
> +
> +	nnis = max_t(int, lp->lp_nnis, pbuf->pb_info.pi_nnis);
> +	curnis = kmalloc_array(nnis, sizeof(lnet_nid_t), GFP_NOFS);
> +	addnis = kmalloc_array(nnis, sizeof(lnet_nid_t), GFP_NOFS);
> +	delnis = kmalloc_array(nnis, sizeof(lnet_nid_t), GFP_NOFS);
> +	if (!curnis || !addnis || !delnis) {
> +		rc = -ENOMEM;
> +		goto out;
> +	}
> +	ncurnis = 0;
> +	naddnis = 0;
> +	ndelnis = 0;
> +
> +	/* Construct the list of NIDs present in peer. */
> +	lpni = NULL;
> +	while ((lpni = lnet_get_next_peer_ni_locked(lp, NULL, lpni)) != NULL)
> +		curnis[ncurnis++] = lpni->lpni_nid;
> +
> +	/*
> +	 * Check for NIDs in pbuf not present in curnis[].
> +	 * The loop starts at 1 to skip the loopback NID.
> +	 */
> +	for (i = 1; i < pbuf->pb_info.pi_nnis; i++) {
> +		for (j = 0; j < ncurnis; j++)
> +			if (pbuf->pb_info.pi_ni[i].ns_nid == curnis[j])
> +				break;
> +		if (j == ncurnis)
> +			addnis[naddnis++] = pbuf->pb_info.pi_ni[i].ns_nid;
> +	}
> +	/*
> +	 * Check for NIDs in curnis[] not present in pbuf.
> +	 * The nested loop starts at 1 to skip the loopback NID.
> +	 *
> +	 * But never add the loopback NID to delnis[]: if it is
> +	 * present in curnis[] then this peer is for this node.
> +	 */
> +	for (i = 0; i < ncurnis; i++) {
> +		if (LNET_NETTYP(LNET_NIDNET(curnis[i])) == LOLND)
> +			continue;
> +		for (j = 1; j < pbuf->pb_info.pi_nnis; j++)
> +			if (curnis[i] == pbuf->pb_info.pi_ni[j].ns_nid)
> +				break;
> +		if (j == pbuf->pb_info.pi_nnis)
> +			delnis[ndelnis++] = curnis[i];
> +	}
> +
> +	for (i = 0; i < naddnis; i++) {
> +		rc = lnet_peer_add_nid(lp, addnis[i], flags);
> +		if (rc) {
> +			CERROR("Error adding NID %s to peer %s: %d\n",
> +			       libcfs_nid2str(addnis[i]),
> +			       libcfs_nid2str(lp->lp_primary_nid), rc);
> +			if (rc == -ENOMEM)
> +				goto out;
> +		}
> +	}
> +	for (i = 0; i < ndelnis; i++) {
> +		rc = lnet_peer_del_nid(lp, delnis[i], flags);
> +		if (rc) {
> +			CERROR("Error deleting NID %s from peer %s: %d\n",
> +			       libcfs_nid2str(delnis[i]),
> +			       libcfs_nid2str(lp->lp_primary_nid), rc);
> +			if (rc == -ENOMEM)
> +				goto out;
> +		}
> +	}
> +	/*
> +	 * Errors other than -ENOMEM are due to peers having been
> +	 * configured with DLC. Ignore these because DLC overrides
> +	 * Discovery.
> +	 */
> +	rc = 0;
> +out:
> +	kfree(curnis);
> +	kfree(addnis);
> +	kfree(delnis);
> +	lnet_ping_buffer_decref(pbuf);
> +	CDEBUG(D_NET, "peer %s: %d\n", libcfs_nid2str(lp->lp_primary_nid), rc);
> +
> +	if (rc) {
> +		spin_lock(&lp->lp_lock);
> +		lp->lp_state &= ~LNET_PEER_NIDS_UPTODATE;
> +		lp->lp_state |= LNET_PEER_FORCE_PING;
> +		spin_unlock(&lp->lp_lock);
> +	}
> +	return rc;
> +}
> +
> +/*
> + * The data in pbuf says lp is its primary peer, but the data was
> + * received by a different peer. Try to update lp with the data.
> + */
> +static int
> +lnet_peer_set_primary_data(struct lnet_peer *lp, struct lnet_ping_buffer *pbuf)
> +{
> +	struct lnet_handle_md mdh;
> +
> +	/* Queue lp for discovery, and force it on the request queue. */
> +	lnet_net_lock(LNET_LOCK_EX);
> +	if (lnet_peer_queue_for_discovery(lp))
> +		list_move(&lp->lp_dc_list, &the_lnet.ln_dc_request);
> +	lnet_net_unlock(LNET_LOCK_EX);
> +
> +	LNetInvalidateMDHandle(&mdh);
> +
> +	/*
> +	 * Decide whether we can move the peer to the DATA_PRESENT state.
> +	 *
> +	 * We replace stale data for a multi-rail peer, repair PING_FAILED
> +	 * status, and preempt FORCE_PING.
> +	 *
> +	 * If after that we have DATA_PRESENT, we merge it into this peer.
> +	 */
> +	spin_lock(&lp->lp_lock);
> +	if (lp->lp_state & LNET_PEER_MULTI_RAIL) {
> +		if (lp->lp_peer_seqno < LNET_PING_BUFFER_SEQNO(pbuf)) {
> +			lp->lp_peer_seqno = LNET_PING_BUFFER_SEQNO(pbuf);
> +		} else if (lp->lp_state & LNET_PEER_DATA_PRESENT) {
> +			lp->lp_state &= ~LNET_PEER_DATA_PRESENT;
> +			lnet_ping_buffer_decref(pbuf);
> +			pbuf = lp->lp_data;
> +			lp->lp_data = NULL;
> +		}
> +	}
> +	if (lp->lp_state & LNET_PEER_DATA_PRESENT) {
> +		lnet_ping_buffer_decref(lp->lp_data);
> +		lp->lp_data = NULL;
> +		lp->lp_state &= ~LNET_PEER_DATA_PRESENT;
> +	}
> +	if (lp->lp_state & LNET_PEER_PING_FAILED) {
> +		mdh = lp->lp_ping_mdh;
> +		LNetInvalidateMDHandle(&lp->lp_ping_mdh);
> +		lp->lp_state &= ~LNET_PEER_PING_FAILED;
> +		lp->lp_ping_error = 0;
> +	}
> +	if (lp->lp_state & LNET_PEER_FORCE_PING)
> +		lp->lp_state &= ~LNET_PEER_FORCE_PING;
> +	lp->lp_state |= LNET_PEER_NIDS_UPTODATE;
> +	spin_unlock(&lp->lp_lock);
> +
> +	if (!LNetMDHandleIsInvalid(mdh))
> +		LNetMDUnlink(mdh);
> +
> +	if (pbuf)
> +		return lnet_peer_merge_data(lp, pbuf);
> +
> +	CDEBUG(D_NET, "peer %s\n", libcfs_nid2str(lp->lp_primary_nid));
> +	return 0;
> +}
> +
> +/*
> + * Update a peer using the data received.
> + */
> +static int lnet_peer_data_present(struct lnet_peer *lp)
> +__must_hold(&lp->lp_lock)
> +{
> +	struct lnet_ping_buffer *pbuf;
> +	struct lnet_peer_ni *lpni;
> +	lnet_nid_t nid = LNET_NID_ANY;
> +	unsigned int flags;
> +	int rc = 0;
> +
> +	pbuf = lp->lp_data;
> +	lp->lp_data = NULL;
> +	lp->lp_state &= ~LNET_PEER_DATA_PRESENT;
> +	lp->lp_state |= LNET_PEER_NIDS_UPTODATE;
> +	spin_unlock(&lp->lp_lock);
> +
> +	/*
> +	 * Modifications of peer structures are done while holding the
> +	 * ln_api_mutex. A global lock is required because we may be
> +	 * modifying multiple peer structures, and a mutex greatly
> +	 * simplifies memory management.
> +	 *
> +	 * The actual changes to the data structures must also protect
> +	 * against concurrent lookups, for which the lnet_net_lock in
> +	 * LNET_LOCK_EX mode is used.
> +	 */
> +	mutex_lock(&the_lnet.ln_api_mutex);
> +	if (the_lnet.ln_state == LNET_STATE_SHUTDOWN) {
> +		rc = -ESHUTDOWN;
> +		goto out;
> +	}
> +
> +	/*
> +	 * If this peer is not on the peer list then it is being torn
> +	 * down, and our reference count may be all that is keeping it
> +	 * alive. Don't do any work on it.
> +	 */
> +	if (list_empty(&lp->lp_peer_list))
> +		goto out;
> +
> +	flags = LNET_PEER_DISCOVERED;
> +	if (pbuf->pb_info.pi_features & LNET_PING_FEAT_MULTI_RAIL)
> +		flags |= LNET_PEER_MULTI_RAIL;
> +
> +	/*
> +	 * Check whether the primary NID in the message matches the
> +	 * primary NID of the peer. If it does, update the peer, if
> +	 * it it does not, check whether there is already a peer with
> +	 * that primary NID. If no such peer exists, try to update
> +	 * the primary NID of the current peer (allowed if it was
> +	 * created due to message traffic) and complete the update.
> +	 * If the peer did exist, hand off the data to it.
> +	 *
> +	 * The peer for the loopback interface is a special case: this
> +	 * is the peer for the local node, and we want to set its
> +	 * primary NID to the correct value here.
> +	 */
> +	if (pbuf->pb_info.pi_nnis > 1)
> +		nid = pbuf->pb_info.pi_ni[1].ns_nid;
> +	if (LNET_NETTYP(LNET_NIDNET(lp->lp_primary_nid)) == LOLND) {
> +		rc = lnet_peer_set_primary_nid(lp, nid, flags);
> +		if (!rc)
> +			rc = lnet_peer_merge_data(lp, pbuf);
> +	} else if (lp->lp_primary_nid == nid) {
> +		rc = lnet_peer_merge_data(lp, pbuf);
> +	} else {
> +		lpni = lnet_find_peer_ni_locked(nid);
> +		if (!lpni) {
> +			rc = lnet_peer_set_primary_nid(lp, nid, flags);
> +			if (rc) {
> +				CERROR("Primary NID error %s versus %s: %d\n",
> +				       libcfs_nid2str(lp->lp_primary_nid),
> +				       libcfs_nid2str(nid), rc);
> +			} else {
> +				rc = lnet_peer_merge_data(lp, pbuf);
> +			}
> +		} else {
> +			rc = lnet_peer_set_primary_data(
> +				lpni->lpni_peer_net->lpn_peer, pbuf);
> +			lnet_peer_ni_decref_locked(lpni);
> +		}
> +	}
> +out:
> +	CDEBUG(D_NET, "peer %s: %d\n", libcfs_nid2str(lp->lp_primary_nid), rc);
> +	mutex_unlock(&the_lnet.ln_api_mutex);
> +
> +	spin_lock(&lp->lp_lock);
> +	/* Tell discovery to re-check the peer immediately. */
> +	if (!rc)
> +		rc = LNET_REDISCOVER_PEER;
> +	return rc;
> +}
> +
> +/*
> + * A ping failed. Clear the PING_FAILED state and set the
> + * FORCE_PING state, to ensure a retry even if discovery is
> + * disabled. This avoids being left with incorrect state.
> + */
> +static int lnet_peer_ping_failed(struct lnet_peer *lp)
> +__must_hold(&lp->lp_lock)
> +{
> +	struct lnet_handle_md mdh;
> +	int rc;
> +
> +	mdh = lp->lp_ping_mdh;
> +	LNetInvalidateMDHandle(&lp->lp_ping_mdh);
> +	lp->lp_state &= ~LNET_PEER_PING_FAILED;
> +	lp->lp_state |= LNET_PEER_FORCE_PING;
> +	rc = lp->lp_ping_error;
> +	lp->lp_ping_error = 0;
> +	spin_unlock(&lp->lp_lock);
> +
> +	if (!LNetMDHandleIsInvalid(mdh))
> +		LNetMDUnlink(mdh);
> +
> +	CDEBUG(D_NET, "peer %s:%d\n",
> +	       libcfs_nid2str(lp->lp_primary_nid), rc);
> +
> +	spin_lock(&lp->lp_lock);
> +	return rc ? rc : LNET_REDISCOVER_PEER;
> +}
> +
> +/*
> + * Select NID to send a Ping or Push to.
> + */
> +static lnet_nid_t lnet_peer_select_nid(struct lnet_peer *lp)
> +{
> +	struct lnet_peer_ni *lpni;
> +
> +	/* Look for a direct-connected NID for this peer. */
> +	lpni = NULL;
> +	while ((lpni = lnet_get_next_peer_ni_locked(lp, NULL, lpni)) != NULL) {
> +		if (!lnet_is_peer_ni_healthy_locked(lpni))
> +			continue;
> +		if (!lnet_get_net_locked(lpni->lpni_peer_net->lpn_net_id))
> +			continue;
> +		break;
> +	}
> +	if (lpni)
> +		return lpni->lpni_nid;
> +
> +	/* Look for a routed-connected NID for this peer. */
> +	lpni = NULL;
> +	while ((lpni = lnet_get_next_peer_ni_locked(lp, NULL, lpni)) != NULL) {
> +		if (!lnet_is_peer_ni_healthy_locked(lpni))
> +			continue;
> +		if (!lnet_find_rnet_locked(lpni->lpni_peer_net->lpn_net_id))
> +			continue;
> +		break;
> +	}
> +	if (lpni)
> +		return lpni->lpni_nid;
> +
> +	return LNET_NID_ANY;
> +}
> +
> +/* Active side of ping. */
> +static int lnet_peer_send_ping(struct lnet_peer *lp)
> +__must_hold(&lp->lp_lock)
> +{
> +	struct lnet_md md = { NULL };
> +	struct lnet_process_id id;
> +	struct lnet_ping_buffer *pbuf;
> +	int nnis;
> +	int rc;
> +	int cpt;
> +
> +	lp->lp_state |= LNET_PEER_PING_SENT;
> +	lp->lp_state &= ~LNET_PEER_FORCE_PING;
> +	spin_unlock(&lp->lp_lock);
> +
> +	nnis = max_t(int, lp->lp_data_nnis, LNET_INTERFACES_MIN);
> +	pbuf = lnet_ping_buffer_alloc(nnis, GFP_NOFS);
> +	if (!pbuf) {
> +		rc = -ENOMEM;
> +		goto fail_error;
> +	}
> +
> +	/* initialize md content */
> +	md.start     = &pbuf->pb_info;
> +	md.length    = LNET_PING_INFO_SIZE(nnis);
> +	md.threshold = 2; /* GET/REPLY */
> +	md.max_size  = 0;
> +	md.options   = LNET_MD_TRUNCATE;
> +	md.user_ptr  = lp;
> +	md.eq_handle = the_lnet.ln_dc_eqh;
> +
> +	rc = LNetMDBind(md, LNET_UNLINK, &lp->lp_ping_mdh);
> +	if (rc != 0) {
> +		lnet_ping_buffer_decref(pbuf);
> +		CERROR("Can't bind MD: %d\n", rc);
> +		goto fail_error;
> +	}
> +	cpt = lnet_net_lock_current();
> +	/* Refcount for MD. */
> +	lnet_peer_addref_locked(lp);
> +	id.pid = LNET_PID_LUSTRE;
> +	id.nid = lnet_peer_select_nid(lp);
> +	lnet_net_unlock(cpt);
> +
> +	if (id.nid == LNET_NID_ANY) {
> +		rc = -EHOSTUNREACH;
> +		goto fail_unlink_md;
> +	}
> +
> +	rc = LNetGet(LNET_NID_ANY, lp->lp_ping_mdh, id,
> +		     LNET_RESERVED_PORTAL,
> +		     LNET_PROTO_PING_MATCHBITS, 0);
> +
> +	if (rc)
> +		goto fail_unlink_md;
> +
> +	CDEBUG(D_NET, "peer %s\n", libcfs_nid2str(lp->lp_primary_nid));
> +
> +	spin_lock(&lp->lp_lock);
> +	return 0;
> +
> +fail_unlink_md:
> +	LNetMDUnlink(lp->lp_ping_mdh);
> +	LNetInvalidateMDHandle(&lp->lp_ping_mdh);
> +fail_error:
> +	CDEBUG(D_NET, "peer %s: %d\n", libcfs_nid2str(lp->lp_primary_nid), rc);
> +	/*
> +	 * The errors that get us here are considered hard errors and
> +	 * cause Discovery to terminate. So we clear PING_SENT, but do
> +	 * not set either PING_FAILED or FORCE_PING. In fact we need
> +	 * to clear PING_FAILED, because the unlink event handler will
> +	 * have set it if we called LNetMDUnlink() above.
> +	 */
> +	spin_lock(&lp->lp_lock);
> +	lp->lp_state &= ~(LNET_PEER_PING_SENT | LNET_PEER_PING_FAILED);
> +	return rc;
> +}
> +
> +/*
> + * This function exists because you cannot call LNetMDUnlink() from an
> + * event handler.
> + */
> +static int lnet_peer_push_failed(struct lnet_peer *lp)
> +__must_hold(&lp->lp_lock)
> +{
> +	struct lnet_handle_md mdh;
> +	int rc;
> +
> +	mdh = lp->lp_push_mdh;
> +	LNetInvalidateMDHandle(&lp->lp_push_mdh);
> +	lp->lp_state &= ~LNET_PEER_PUSH_FAILED;
> +	rc = lp->lp_push_error;
> +	lp->lp_push_error = 0;
> +	spin_unlock(&lp->lp_lock);
> +
> +	if (!LNetMDHandleIsInvalid(mdh))
> +		LNetMDUnlink(mdh);
> +
> +	CDEBUG(D_NET, "peer %s\n", libcfs_nid2str(lp->lp_primary_nid));
> +	spin_lock(&lp->lp_lock);
> +	return rc ? rc : LNET_REDISCOVER_PEER;
> +}
> +
> +/* Active side of push. */
> +static int lnet_peer_send_push(struct lnet_peer *lp)
> +__must_hold(&lp->lp_lock)
> +{
> +	struct lnet_ping_buffer *pbuf;
> +	struct lnet_process_id id;
> +	struct lnet_md md;
> +	int cpt;
> +	int rc;
> +
> +	/* Don't push to a non-multi-rail peer. */
> +	if (!(lp->lp_state & LNET_PEER_MULTI_RAIL)) {
> +		lp->lp_state &= ~LNET_PEER_FORCE_PUSH;
> +		return 0;
> +	}
> +
> +	lp->lp_state |= LNET_PEER_PUSH_SENT;
> +	lp->lp_state &= ~LNET_PEER_FORCE_PUSH;
> +	spin_unlock(&lp->lp_lock);
> +
> +	cpt = lnet_net_lock_current();
> +	pbuf = the_lnet.ln_ping_target;
> +	lnet_ping_buffer_addref(pbuf);
> +	lnet_net_unlock(cpt);
> +
> +	/* Push source MD */
> +	md.start     = &pbuf->pb_info;
> +	md.length    = LNET_PING_INFO_SIZE(pbuf->pb_nnis);
> +	md.threshold = 2; /* Put/Ack */
> +	md.max_size  = 0;
> +	md.options   = 0;
> +	md.eq_handle = the_lnet.ln_dc_eqh;
> +	md.user_ptr  = lp;
> +
> +	rc = LNetMDBind(md, LNET_UNLINK, &lp->lp_push_mdh);
> +	if (rc) {
> +		lnet_ping_buffer_decref(pbuf);
> +		CERROR("Can't bind push source MD: %d\n", rc);
> +		goto fail_error;
> +	}
> +	cpt = lnet_net_lock_current();
> +	/* Refcount for MD. */
> +	lnet_peer_addref_locked(lp);
> +	id.pid = LNET_PID_LUSTRE;
> +	id.nid = lnet_peer_select_nid(lp);
> +	lnet_net_unlock(cpt);
> +
> +	if (id.nid == LNET_NID_ANY) {
> +		rc = -EHOSTUNREACH;
> +		goto fail_unlink;
> +	}
> +
> +	rc = LNetPut(LNET_NID_ANY, lp->lp_push_mdh,
> +		     LNET_ACK_REQ, id, LNET_RESERVED_PORTAL,
> +		     LNET_PROTO_PING_MATCHBITS, 0, 0);
> +
> +	if (rc)
> +		goto fail_unlink;
> +
> +	CDEBUG(D_NET, "peer %s\n", libcfs_nid2str(lp->lp_primary_nid));
> +
> +	spin_lock(&lp->lp_lock);
> +	return 0;
> +
> +fail_unlink:
> +	LNetMDUnlink(lp->lp_push_mdh);
> +	LNetInvalidateMDHandle(&lp->lp_push_mdh);
> +fail_error:
> +	CDEBUG(D_NET, "peer %s: %d\n", libcfs_nid2str(lp->lp_primary_nid), rc);
> +	/*
> +	 * The errors that get us here are considered hard errors and
> +	 * cause Discovery to terminate. So we clear PUSH_SENT, but do
> +	 * not set PUSH_FAILED. In fact we need to clear PUSH_FAILED,
> +	 * because the unlink event handler will have set it if we
> +	 * called LNetMDUnlink() above.
> +	 */
> +	spin_lock(&lp->lp_lock);
> +	lp->lp_state &= ~(LNET_PEER_PUSH_SENT | LNET_PEER_PUSH_FAILED);
> +	return rc;
> +}
> +
> +/*
> + * An unrecoverable error was encountered during discovery.
> + * Set error status in peer and abort discovery.
> + */
> +static void lnet_peer_discovery_error(struct lnet_peer *lp, int error)
> +{
> +	CDEBUG(D_NET, "Discovery error %s: %d\n",
> +	       libcfs_nid2str(lp->lp_primary_nid), error);
> +
> +	spin_lock(&lp->lp_lock);
> +	lp->lp_dc_error = error;
> +	lp->lp_state &= ~LNET_PEER_DISCOVERING;
> +	lp->lp_state |= LNET_PEER_REDISCOVER;
> +	spin_unlock(&lp->lp_lock);
> +}
> +
> +/*
> + * Mark the peer as discovered.
> + */
> +static int lnet_peer_discovered(struct lnet_peer *lp)
> +__must_hold(&lp->lp_lock)
> +{
> +	lp->lp_state |= LNET_PEER_DISCOVERED;
> +	lp->lp_state &= ~(LNET_PEER_DISCOVERING |
> +			  LNET_PEER_REDISCOVER);
> +
> +	CDEBUG(D_NET, "peer %s\n", libcfs_nid2str(lp->lp_primary_nid));
> +
> +	return 0;
> +}
> +
> +/*
> + * Mark the peer as to be rediscovered.
> + */
> +static int lnet_peer_rediscover(struct lnet_peer *lp)
> +__must_hold(&lp->lp_lock)
> +{
> +	lp->lp_state |= LNET_PEER_REDISCOVER;
> +	lp->lp_state &= ~LNET_PEER_DISCOVERING;
> +
> +	CDEBUG(D_NET, "peer %s\n", libcfs_nid2str(lp->lp_primary_nid));
> +
> +	return 0;
> +}
> +
> +/*
> + * Returns the first peer on the ln_dc_working queue if its timeout
> + * has expired. Takes the current time as an argument so as to not
> + * obsessively re-check the clock. The oldest discovery request will
> + * be at the head of the queue.
> + */
> +static struct lnet_peer *lnet_peer_dc_timed_out(time64_t now)
> +{
> +	struct lnet_peer *lp;
> +
> +	if (list_empty(&the_lnet.ln_dc_working))
> +		return NULL;
> +	lp = list_first_entry(&the_lnet.ln_dc_working,
> +			      struct lnet_peer, lp_dc_list);
> +	if (now < lp->lp_last_queued + DISCOVERY_TIMEOUT)
> +		return NULL;
> +	return lp;
> +}
> +
> +/*
> + * Discovering this peer is taking too long. Cancel any Ping or Push
> + * that discovery is waiting on by unlinking the relevant MDs. The
> + * lnet_discovery_event_handler() will proceed from here and complete
> + * the cleanup.
> + */
> +static void lnet_peer_discovery_timeout(struct lnet_peer *lp)
> +{
> +	struct lnet_handle_md ping_mdh;
> +	struct lnet_handle_md push_mdh;
> +
> +	LNetInvalidateMDHandle(&ping_mdh);
> +	LNetInvalidateMDHandle(&push_mdh);
> +
> +	spin_lock(&lp->lp_lock);
> +	if (lp->lp_state & LNET_PEER_PING_SENT) {
> +		ping_mdh = lp->lp_ping_mdh;
> +		LNetInvalidateMDHandle(&lp->lp_ping_mdh);
> +	}
> +	if (lp->lp_state & LNET_PEER_PUSH_SENT) {
> +		push_mdh = lp->lp_push_mdh;
> +		LNetInvalidateMDHandle(&lp->lp_push_mdh);
> +	}
> +	spin_unlock(&lp->lp_lock);
> +
> +	if (!LNetMDHandleIsInvalid(ping_mdh))
> +		LNetMDUnlink(ping_mdh);
> +	if (!LNetMDHandleIsInvalid(push_mdh))
> +		LNetMDUnlink(push_mdh);
> +}
> +
> +/*
> + * Wait for work to be queued or some other change that must be
> + * attended to. Returns non-zero if the discovery thread should shut
> + * down.
> + */
> +static int lnet_peer_discovery_wait_for_work(void)
> +{
> +	int cpt;
> +	int rc = 0;
> +
> +	DEFINE_WAIT(wait);
> +
> +	cpt = lnet_net_lock_current();
> +	for (;;) {
> +		prepare_to_wait(&the_lnet.ln_dc_waitq, &wait,
> +				TASK_IDLE);
> +		if (the_lnet.ln_dc_state == LNET_DC_STATE_STOPPING)
> +			break;
> +		if (lnet_push_target_resize_needed())
> +			break;
> +		if (!list_empty(&the_lnet.ln_dc_request))
> +			break;
> +		if (lnet_peer_dc_timed_out(ktime_get_real_seconds()))
> +			break;
> +		lnet_net_unlock(cpt);
> +
> +		/*
> +		 * wakeup max every second to check if there are peers that
> +		 * have been stuck on the working queue for greater than
> +		 * the peer timeout.
> +		 */
> +		schedule_timeout(HZ);
> +		finish_wait(&the_lnet.ln_dc_waitq, &wait);
> +		cpt = lnet_net_lock_current();
> +	}
> +	finish_wait(&the_lnet.ln_dc_waitq, &wait);
> +
> +	if (the_lnet.ln_dc_state == LNET_DC_STATE_STOPPING)
> +		rc = -ESHUTDOWN;
> +
> +	lnet_net_unlock(cpt);
> +
> +	CDEBUG(D_NET, "woken: %d\n", rc);
> +
> +	return rc;
> +}
> +
> +/* The discovery thread. */
> +static int lnet_peer_discovery(void *arg)
> +{
> +	struct lnet_peer *lp;
> +	time64_t now;
> +	int rc;
> +
> +	CDEBUG(D_NET, "started\n");
> +
> +	for (;;) {
> +		if (lnet_peer_discovery_wait_for_work())
>  			break;
>  
>  		if (lnet_push_target_resize_needed())
> @@ -1719,33 +2984,97 @@ static int lnet_peer_discovery(void *arg)
>  		lnet_net_lock(LNET_LOCK_EX);
>  		if (the_lnet.ln_dc_state == LNET_DC_STATE_STOPPING)
>  			break;
> +
> +		/*
> +		 * Process all incoming discovery work requests.  When
> +		 * discovery must wait on a peer to change state, it
> +		 * is added to the tail of the ln_dc_working queue. A
> +		 * timestamp keeps track of when the peer was added,
> +		 * so we can time out discovery requests that take too
> +		 * long.
> +		 */
>  		while (!list_empty(&the_lnet.ln_dc_request)) {
>  			lp = list_first_entry(&the_lnet.ln_dc_request,
>  					      struct lnet_peer, lp_dc_list);
>  			list_move(&lp->lp_dc_list, &the_lnet.ln_dc_working);
> +			/*
> +			 * set the time the peer was put on the dc_working
> +			 * queue. It shouldn't remain on the queue
> +			 * forever, in case the GET message (for ping)
> +			 * doesn't get a REPLY or the PUT message (for
> +			 * push) doesn't get an ACK.
> +			 *
> +			 * TODO: LNet Health will deal with this scenario
> +			 * in a generic way.
> +			 */
> +			lp->lp_last_queued = ktime_get_real_seconds();
>  			lnet_net_unlock(LNET_LOCK_EX);
>  
> -			/* Just tag and release for now. */
> +			/*
> +			 * Select an action depending on the state of
> +			 * the peer and whether discovery is disabled.
> +			 * The check whether discovery is disabled is
> +			 * done after the code that handles processing
> +			 * for arrived data, cleanup for failures, and
> +			 * forcing a Ping or Push.
> +			 */
>  			spin_lock(&lp->lp_lock);
> -			if (lnet_peer_discovery_disabled) {
> -				lp->lp_state |= LNET_PEER_REDISCOVER;
> -				lp->lp_state &= ~(LNET_PEER_DISCOVERED |
> -						  LNET_PEER_NIDS_UPTODATE |
> -						  LNET_PEER_DISCOVERING);
> -			} else {
> -				lp->lp_state |= (LNET_PEER_DISCOVERED |
> -						 LNET_PEER_NIDS_UPTODATE);
> -				lp->lp_state &= ~(LNET_PEER_REDISCOVER |
> -						  LNET_PEER_DISCOVERING);
> -			}
> +			CDEBUG(D_NET, "peer %s state %#x\n",
> +			       libcfs_nid2str(lp->lp_primary_nid),
> +			       lp->lp_state);
> +			if (lp->lp_state & LNET_PEER_DATA_PRESENT)
> +				rc = lnet_peer_data_present(lp);
> +			else if (lp->lp_state & LNET_PEER_PING_FAILED)
> +				rc = lnet_peer_ping_failed(lp);
> +			else if (lp->lp_state & LNET_PEER_PUSH_FAILED)
> +				rc = lnet_peer_push_failed(lp);
> +			else if (lp->lp_state & LNET_PEER_FORCE_PING)
> +				rc = lnet_peer_send_ping(lp);
> +			else if (lp->lp_state & LNET_PEER_FORCE_PUSH)
> +				rc = lnet_peer_send_push(lp);
> +			else if (lnet_peer_discovery_disabled)
> +				rc = lnet_peer_rediscover(lp);
> +			else if (!(lp->lp_state & LNET_PEER_NIDS_UPTODATE))
> +				rc = lnet_peer_send_ping(lp);
> +			else if (lnet_peer_needs_push(lp))
> +				rc = lnet_peer_send_push(lp);
> +			else
> +				rc = lnet_peer_discovered(lp);
> +			CDEBUG(D_NET, "peer %s state %#x rc %d\n",
> +			       libcfs_nid2str(lp->lp_primary_nid),
> +			       lp->lp_state, rc);
>  			spin_unlock(&lp->lp_lock);
>  
>  			lnet_net_lock(LNET_LOCK_EX);
> +			if (rc == LNET_REDISCOVER_PEER) {
> +				list_move(&lp->lp_dc_list,
> +					  &the_lnet.ln_dc_request);
> +			} else if (rc) {
> +				lnet_peer_discovery_error(lp, rc);
> +			}
>  			if (!(lp->lp_state & LNET_PEER_DISCOVERING))
>  				lnet_peer_discovery_complete(lp);
>  			if (the_lnet.ln_dc_state == LNET_DC_STATE_STOPPING)
>  				break;
>  		}
> +
> +		/*
> +		 * Now that the ln_dc_request queue has been emptied
> +		 * check the ln_dc_working queue for peers that are
> +		 * taking too long. Move all that are found to the
> +		 * ln_dc_expired queue and time out any pending
> +		 * Ping or Push. We have to drop the lnet_net_lock
> +		 * in the loop because lnet_peer_discovery_timeout()
> +		 * calls LNetMDUnlink().
> +		 */
> +		now = ktime_get_real_seconds();
> +		while ((lp = lnet_peer_dc_timed_out(now)) != NULL) {
> +			list_move(&lp->lp_dc_list, &the_lnet.ln_dc_expired);
> +			lnet_net_unlock(LNET_LOCK_EX);
> +			lnet_peer_discovery_timeout(lp);
> +			lnet_net_lock(LNET_LOCK_EX);
> +		}
> +
>  		lnet_net_unlock(LNET_LOCK_EX);
>  	}
>  
> @@ -1759,23 +3088,28 @@ static int lnet_peer_discovery(void *arg)
>  	LNetEQFree(the_lnet.ln_dc_eqh);
>  	LNetInvalidateEQHandle(&the_lnet.ln_dc_eqh);
>  
> +	/* Queue cleanup 1: stop all pending pings and pushes. */
>  	lnet_net_lock(LNET_LOCK_EX);
> -	list_for_each_entry(lp, &the_lnet.ln_dc_request, lp_dc_list) {
> -		spin_lock(&lp->lp_lock);
> -		lp->lp_state |= LNET_PEER_REDISCOVER;
> -		lp->lp_state &= ~(LNET_PEER_DISCOVERED |
> -				  LNET_PEER_DISCOVERING |
> -				  LNET_PEER_NIDS_UPTODATE);
> -		spin_unlock(&lp->lp_lock);
> -		lnet_peer_discovery_complete(lp);
> +	while (!list_empty(&the_lnet.ln_dc_working)) {
> +		lp = list_first_entry(&the_lnet.ln_dc_working,
> +				      struct lnet_peer, lp_dc_list);
> +		list_move(&lp->lp_dc_list, &the_lnet.ln_dc_expired);
> +		lnet_net_unlock(LNET_LOCK_EX);
> +		lnet_peer_discovery_timeout(lp);
> +		lnet_net_lock(LNET_LOCK_EX);
>  	}
> -	list_for_each_entry(lp, &the_lnet.ln_dc_working, lp_dc_list) {
> -		spin_lock(&lp->lp_lock);
> -		lp->lp_state |= LNET_PEER_REDISCOVER;
> -		lp->lp_state &= ~(LNET_PEER_DISCOVERED |
> -				  LNET_PEER_DISCOVERING |
> -				  LNET_PEER_NIDS_UPTODATE);
> -		spin_unlock(&lp->lp_lock);
> +	lnet_net_unlock(LNET_LOCK_EX);
> +
> +	/* Queue cleanup 2: wait for the expired queue to clear. */
> +	while (!list_empty(&the_lnet.ln_dc_expired))
> +		schedule_timeout_uninterruptible(HZ);
> +
> +	/* Queue cleanup 3: clear the request queue. */
> +	lnet_net_lock(LNET_LOCK_EX);
> +	while (!list_empty(&the_lnet.ln_dc_request)) {
> +		lp = list_first_entry(&the_lnet.ln_dc_request,
> +				      struct lnet_peer, lp_dc_list);
> +		lnet_peer_discovery_error(lp, -ESHUTDOWN);
>  		lnet_peer_discovery_complete(lp);
>  	}
>  	lnet_net_unlock(LNET_LOCK_EX);
> @@ -1797,10 +3131,6 @@ int lnet_peer_discovery_start(void)
>  	if (the_lnet.ln_dc_state != LNET_DC_STATE_SHUTDOWN)
>  		return -EALREADY;
>  
> -	INIT_LIST_HEAD(&the_lnet.ln_dc_request);
> -	INIT_LIST_HEAD(&the_lnet.ln_dc_working);
> -	init_waitqueue_head(&the_lnet.ln_dc_waitq);
> -
>  	rc = LNetEQAlloc(0, lnet_discovery_event_handler, &the_lnet.ln_dc_eqh);
>  	if (rc != 0) {
>  		CERROR("Can't allocate discovery EQ: %d\n", rc);
> @@ -1819,6 +3149,8 @@ int lnet_peer_discovery_start(void)
>  		the_lnet.ln_dc_state = LNET_DC_STATE_SHUTDOWN;
>  	}
>  
> +	CDEBUG(D_NET, "discovery start: %d\n", rc);
> +
>  	return rc;
>  }
>  
> @@ -1837,6 +3169,9 @@ void lnet_peer_discovery_stop(void)
>  
>  	LASSERT(list_empty(&the_lnet.ln_dc_request));
>  	LASSERT(list_empty(&the_lnet.ln_dc_working));
> +	LASSERT(list_empty(&the_lnet.ln_dc_expired));
> +
> +	CDEBUG(D_NET, "discovery stopped\n");
>  }
>  
>  /* Debugging */
> 
> 
> 


More information about the lustre-devel mailing list