Skip site navigation (1)Skip section navigation (2)
Date:      Tue, 20 Oct 2015 19:20:42 +0000 (UTC)
From:      "Conrad E. Meyer" <cem@FreeBSD.org>
To:        src-committers@freebsd.org, svn-src-all@freebsd.org, svn-src-head@freebsd.org
Subject:   svn commit: r289651 - head/sys/dev/ntb/if_ntb
Message-ID:  <201510201920.t9KJKgaV037894@repo.freebsd.org>

next in thread | raw e-mail | index | archive | help
Author: cem
Date: Tue Oct 20 19:20:42 2015
New Revision: 289651
URL: https://svnweb.freebsd.org/changeset/base/289651

Log:
  NTB: MFV da2e5ae5: Fix ntb_transport out-of-order RX update
  
  It was possible for a synchronous update of the RX index in the error
  case to get ahead of the asynchronous RX index update in the normal
  case.  Change the RX processing to preserve an RX completion order.
  
  There were two error cases.  First, if a buffer is not present to
  receive data, there would be no queue entry to preserve the RX
  completion order.  Instead of dropping the RX frame, leave the RX frame
  in the ring.  Schedule RX processing when RX entries are enqueued, in
  case there are RX frames waiting in the ring to be received.
  
  Second, if a buffer is too small to receive data, drop the frame in the
  ring, mark the RX entry as done, and indicate the error in the RX entry
  length.  Check for a negative length in the receive callback in
  ntb_netdev, and count occurrences as rx_length_errors.
  
  Authored by:	Allen Hubbe
  Obtained from:	Linux (Dual BSD/GPL driver)
  Sponsored by:	EMC / Isilon Storage Division

Modified:
  head/sys/dev/ntb/if_ntb/if_ntb.c

Modified: head/sys/dev/ntb/if_ntb/if_ntb.c
==============================================================================
--- head/sys/dev/ntb/if_ntb/if_ntb.c	Tue Oct 20 19:20:33 2015	(r289650)
+++ head/sys/dev/ntb/if_ntb/if_ntb.c	Tue Oct 20 19:20:42 2015	(r289651)
@@ -149,10 +149,11 @@ struct ntb_transport_qp {
 
 	void (*rx_handler)(struct ntb_transport_qp *qp, void *qp_data,
 	    void *data, int len);
+	struct ntb_queue_list	rx_post_q;
 	struct ntb_queue_list	rx_pend_q;
 	struct ntb_queue_list	rx_free_q;
-	struct mtx		ntb_rx_pend_q_lock;
-	struct mtx		ntb_rx_free_q_lock;
+	/* ntb_rx_q_lock: synchronize access to rx_XXXX_q */
+	struct mtx		ntb_rx_q_lock;
 	struct task		rx_completion_task;
 	struct task		rxc_db_work;
 	void			*rx_buff;
@@ -285,10 +286,11 @@ static void ntb_memcpy_tx(struct ntb_tra
     struct ntb_queue_entry *entry, void *offset);
 static void ntb_qp_full(void *arg);
 static void ntb_transport_rxc_db(void *arg, int pending);
-static void ntb_rx_pendq_full(void *arg);
 static int ntb_process_rxc(struct ntb_transport_qp *qp);
-static void ntb_rx_copy_task(struct ntb_transport_qp *qp,
+static void ntb_memcpy_rx(struct ntb_transport_qp *qp,
     struct ntb_queue_entry *entry, void *offset);
+static inline void ntb_rx_copy_callback(struct ntb_transport_qp *qp,
+    void *data);
 static void ntb_complete_rxc(void *arg, int pending);
 static void ntb_transport_doorbell_callback(void *data, uint32_t vector);
 static void ntb_transport_event_callback(void *data);
@@ -308,6 +310,8 @@ static void ntb_list_add(struct mtx *loc
     struct ntb_queue_list *list);
 static struct ntb_queue_entry *ntb_list_rm(struct mtx *lock,
     struct ntb_queue_list *list);
+static struct ntb_queue_entry *ntb_list_mv(struct mtx *lock,
+    struct ntb_queue_list *from, struct ntb_queue_list *to);
 static void create_random_local_eui48(u_char *eaddr);
 static unsigned int ntb_transport_max_size(struct ntb_transport_qp *qp);
 
@@ -681,12 +685,12 @@ ntb_transport_init_queue(struct ntb_tran
 	callout_init(&qp->queue_full, 1);
 	callout_init(&qp->rx_full, 1);
 
-	mtx_init(&qp->ntb_rx_pend_q_lock, "ntb rx pend q", NULL, MTX_SPIN);
-	mtx_init(&qp->ntb_rx_free_q_lock, "ntb rx free q", NULL, MTX_SPIN);
+	mtx_init(&qp->ntb_rx_q_lock, "ntb rx q", NULL, MTX_SPIN);
 	mtx_init(&qp->ntb_tx_free_q_lock, "ntb tx free q", NULL, MTX_SPIN);
 	TASK_INIT(&qp->rx_completion_task, 0, ntb_complete_rxc, qp);
 	TASK_INIT(&qp->rxc_db_work, 0, ntb_transport_rxc_db, qp);
 
+	STAILQ_INIT(&qp->rx_post_q);
 	STAILQ_INIT(&qp->rx_pend_q);
 	STAILQ_INIT(&qp->rx_free_q);
 	STAILQ_INIT(&qp->tx_free_q);
@@ -711,10 +715,13 @@ ntb_transport_free_queue(struct ntb_tran
 	qp->tx_handler = NULL;
 	qp->event_handler = NULL;
 
-	while ((entry = ntb_list_rm(&qp->ntb_rx_free_q_lock, &qp->rx_free_q)))
+	while ((entry = ntb_list_rm(&qp->ntb_rx_q_lock, &qp->rx_free_q)))
 		free(entry, M_NTB_IF);
 
-	while ((entry = ntb_list_rm(&qp->ntb_rx_pend_q_lock, &qp->rx_pend_q)))
+	while ((entry = ntb_list_rm(&qp->ntb_rx_q_lock, &qp->rx_pend_q)))
+		free(entry, M_NTB_IF);
+
+	while ((entry = ntb_list_rm(&qp->ntb_rx_q_lock, &qp->rx_post_q)))
 		free(entry, M_NTB_IF);
 
 	while ((entry = ntb_list_rm(&qp->ntb_tx_free_q_lock, &qp->tx_free_q)))
@@ -769,7 +776,7 @@ ntb_transport_create_queue(void *data, s
 		entry->cb_data = nt->ifp;
 		entry->buf = NULL;
 		entry->len = transport_mtu;
-		ntb_list_add(&qp->ntb_rx_pend_q_lock, entry, &qp->rx_pend_q);
+		ntb_list_add(&qp->ntb_rx_q_lock, entry, &qp->rx_pend_q);
 	}
 
 	for (i = 0; i < NTB_QP_DEF_NUM_ENTRIES; i++) {
@@ -951,14 +958,6 @@ ntb_qp_full(void *arg)
 
 /* Transport Rx */
 static void
-ntb_rx_pendq_full(void *arg)
-{
-
-	CTR0(KTR_NTB, "RX: ntb_rx_pendq_full callout");
-	ntb_transport_rxc_db(arg, 0);
-}
-
-static void
 ntb_transport_rxc_db(void *arg, int pending __unused)
 {
 	struct ntb_transport_qp *qp = arg;
@@ -1030,7 +1029,7 @@ ntb_process_rxc(struct ntb_transport_qp 
 		return (EIO);
 	}
 
-	entry = ntb_list_rm(&qp->ntb_rx_pend_q_lock, &qp->rx_pend_q);
+	entry = ntb_list_mv(&qp->ntb_rx_q_lock, &qp->rx_pend_q, &qp->rx_post_q);
 	if (entry == NULL) {
 		qp->rx_err_no_buf++;
 		CTR0(KTR_NTB, "RX: No entries in rx_pend_q");
@@ -1050,7 +1049,6 @@ ntb_process_rxc(struct ntb_transport_qp 
 		entry->len = -EIO;
 		entry->flags |= IF_NTB_DESC_DONE_FLAG;
 
-		ntb_list_add(&qp->ntb_rx_free_q_lock, entry, &qp->rx_free_q);
 		taskqueue_enqueue(taskqueue_swi, &qp->rx_completion_task);
 	} else {
 		qp->rx_bytes += hdr->len;
@@ -1060,7 +1058,7 @@ ntb_process_rxc(struct ntb_transport_qp 
 
 		entry->len = hdr->len;
 
-		ntb_rx_copy_task(qp, entry, offset);
+		ntb_memcpy_rx(qp, entry, offset);
 	}
 
 	qp->rx_index++;
@@ -1069,7 +1067,7 @@ ntb_process_rxc(struct ntb_transport_qp 
 }
 
 static void
-ntb_rx_copy_task(struct ntb_transport_qp *qp, struct ntb_queue_entry *entry,
+ntb_memcpy_rx(struct ntb_transport_qp *qp, struct ntb_queue_entry *entry,
     void *offset)
 {
 	struct ifnet *ifp = entry->cb_data;
@@ -1084,15 +1082,18 @@ ntb_rx_copy_task(struct ntb_transport_qp
 
 	/* Ensure that the data is globally visible before clearing the flag */
 	wmb();
-	entry->x_hdr->flags = 0;
-	/* TODO: replace with bus_space_write */
-	qp->rx_info->entry = qp->rx_index;
 
-	CTR2(KTR_NTB,
-	    "RX: copied entry %p to mbuf %p. Adding entry to rx_free_q", entry,
-	    m);
-	ntb_list_add(&qp->ntb_rx_free_q_lock, entry, &qp->rx_free_q);
+	CTR2(KTR_NTB, "RX: copied entry %p to mbuf %p.", entry, m);
+	ntb_rx_copy_callback(qp, entry);
+}
 
+static inline void
+ntb_rx_copy_callback(struct ntb_transport_qp *qp, void *data)
+{
+	struct ntb_queue_entry *entry;
+
+	entry = data;
+	entry->flags |= IF_NTB_DESC_DONE_FLAG;
 	taskqueue_enqueue(taskqueue_swi, &qp->rx_completion_task);
 }
 
@@ -1100,30 +1101,39 @@ static void
 ntb_complete_rxc(void *arg, int pending)
 {
 	struct ntb_transport_qp *qp = arg;
-	struct mbuf *m;
 	struct ntb_queue_entry *entry;
+	struct mbuf *m;
+	unsigned len;
 
 	CTR0(KTR_NTB, "RX: rx_completion_task");
 
-	while ((entry = ntb_list_rm(&qp->ntb_rx_free_q_lock, &qp->rx_free_q))) {
+	mtx_lock_spin(&qp->ntb_rx_q_lock);
+
+	while (!STAILQ_EMPTY(&qp->rx_post_q)) {
+		entry = STAILQ_FIRST(&qp->rx_post_q);
+		if ((entry->flags & IF_NTB_DESC_DONE_FLAG) == 0)
+			break;
+
+		entry->x_hdr->flags = 0;
+		/* XXX bus_space_write */
+		qp->rx_info->entry = entry->index;
+
+		len = entry->len;
 		m = entry->buf;
-		CTR2(KTR_NTB, "RX: completing entry %p, mbuf %p", entry, m);
-		if (qp->rx_handler && qp->client_ready)
-			qp->rx_handler(qp, qp->cb_data, m, entry->len);
 
-		entry->buf = NULL;
-		entry->len = qp->transport->bufsize;
+		STAILQ_REMOVE_HEAD(&qp->rx_post_q, entry);
+		STAILQ_INSERT_TAIL(&qp->rx_free_q, entry, entry);
 
-		CTR1(KTR_NTB,"RX: entry %p removed from rx_free_q "
-		    "and added to rx_pend_q", entry);
-		ntb_list_add(&qp->ntb_rx_pend_q_lock, entry, &qp->rx_pend_q);
-		if (qp->rx_err_no_buf > qp->last_rx_no_buf) {
-			qp->last_rx_no_buf = qp->rx_err_no_buf;
-			CTR0(KTR_NTB, "RX: could spawn rx task");
-			callout_reset(&qp->rx_full, hz / 1000, ntb_rx_pendq_full,
-			    qp);
-		}
+		mtx_unlock_spin(&qp->ntb_rx_q_lock);
+
+		CTR2(KTR_NTB, "RX: completing entry %p, mbuf %p", entry, m);
+		if (qp->rx_handler != NULL && qp->client_ready)
+			qp->rx_handler(qp, qp->cb_data, m, len);
+
+		mtx_lock_spin(&qp->ntb_rx_q_lock);
 	}
+
+	mtx_unlock_spin(&qp->ntb_rx_q_lock);
 }
 
 static void
@@ -1573,6 +1583,26 @@ out:
 	return (entry);
 }
 
+static struct ntb_queue_entry *
+ntb_list_mv(struct mtx *lock, struct ntb_queue_list *from,
+    struct ntb_queue_list *to)
+{
+	struct ntb_queue_entry *entry;
+
+	mtx_lock_spin(lock);
+	if (STAILQ_EMPTY(from)) {
+		entry = NULL;
+		goto out;
+	}
+	entry = STAILQ_FIRST(from);
+	STAILQ_REMOVE_HEAD(from, entry);
+	STAILQ_INSERT_TAIL(to, entry, entry);
+
+out:
+	mtx_unlock_spin(lock);
+	return (entry);
+}
+
 /* Helper functions */
 /* TODO: This too should really be part of the kernel */
 #define EUI48_MULTICAST			1 << 0



Want to link to this message? Use this URL: <https://mail-archive.FreeBSD.org/cgi/mid.cgi?201510201920.t9KJKgaV037894>