Skip site navigation (1)Skip section navigation (2)
Date:      Sat, 7 May 2016 00:33:35 +0000 (UTC)
From:      John Baldwin <jhb@FreeBSD.org>
To:        src-committers@freebsd.org, svn-src-all@freebsd.org, svn-src-head@freebsd.org
Subject:   svn commit: r299210 - in head/sys/dev/cxgbe: . tom
Message-ID:  <201605070033.u470XZCs075568@repo.freebsd.org>

next in thread | raw e-mail | index | archive | help
Author: jhb
Date: Sat May  7 00:33:35 2016
New Revision: 299210
URL: https://svnweb.freebsd.org/changeset/base/299210

Log:
  Use DDP to implement zerocopy TCP receive with aio_read().
  
  Chelsio's TCP offload engine supports direct DMA of received TCP payload
  into wired user buffers.  This feature is known as Direct-Data Placement.
  However, to scale well the adapter needs to prepare buffers for DDP
  before data arrives.  aio_read() is more amenable to this requirement than
  read() as applications often call read() only after data is available in
  the socket buffer.
  
  When DDP is enabled, TOE sockets use the recently added pru_aio_queue
  protocol hook to claim aio_read(2) requests instead of letting them use
  the default AIO socket logic.  The DDP feature supports scheduling DMA
  to two buffers at a time so that the second buffer is ready for use
  after the first buffer is filled.  The aio/DDP code optimizes the case
  of an application ping-ponging between two buffers (similar to the
  zero-copy bpf(4) code) by keeping the two most recently used AIO buffers
  wired.  If a buffer is reused, the aio/DDP code is able to reuse the
  vm_page_t array as well as page pod mappings (a kind of MMU mapping the
  Chelsio NIC uses to describe user buffers).  The generation of the
  vmspace of the calling process is used in conjunction with the user
  buffer's address and length to determine if a user buffer matches a
  previously used buffer.  If an application queues a buffer for AIO that
  does not match a previously used buffer then the least recently used
  buffer is unwired before the new buffer is wired.  This ensures that no
  more than two user buffers per socket are ever wired.
  
  Note that this feature is best suited to applications sending a steady
  stream of data vs short bursts of traffic.
  
  Discussed with:	np
  Relnotes:	yes
  Sponsored by:	Chelsio Communications

Modified:
  head/sys/dev/cxgbe/offload.h
  head/sys/dev/cxgbe/t4_main.c
  head/sys/dev/cxgbe/tom/t4_cpl_io.c
  head/sys/dev/cxgbe/tom/t4_ddp.c
  head/sys/dev/cxgbe/tom/t4_tom.c
  head/sys/dev/cxgbe/tom/t4_tom.h

Modified: head/sys/dev/cxgbe/offload.h
==============================================================================
--- head/sys/dev/cxgbe/offload.h	Sat May  7 00:07:03 2016	(r299209)
+++ head/sys/dev/cxgbe/offload.h	Sat May  7 00:33:35 2016	(r299210)
@@ -145,8 +145,6 @@ struct uld_info {
 struct tom_tunables {
 	int sndbuf;
 	int ddp;
-	int indsz;
-	int ddp_thres;
 	int rx_coalesce;
 	int tx_align;
 };

Modified: head/sys/dev/cxgbe/t4_main.c
==============================================================================
--- head/sys/dev/cxgbe/t4_main.c	Sat May  7 00:07:03 2016	(r299209)
+++ head/sys/dev/cxgbe/t4_main.c	Sat May  7 00:33:35 2016	(r299210)
@@ -4901,15 +4901,6 @@ t4_sysctls(struct adapter *sc)
 		SYSCTL_ADD_INT(ctx, children, OID_AUTO, "ddp", CTLFLAG_RW,
 		    &sc->tt.ddp, 0, "DDP allowed");
 
-		sc->tt.indsz = G_INDICATESIZE(t4_read_reg(sc, A_TP_PARA_REG5));
-		SYSCTL_ADD_INT(ctx, children, OID_AUTO, "indsz", CTLFLAG_RW,
-		    &sc->tt.indsz, 0, "DDP max indicate size allowed");
-
-		sc->tt.ddp_thres =
-		    G_RXCOALESCESIZE(t4_read_reg(sc, A_TP_PARA_REG2));
-		SYSCTL_ADD_INT(ctx, children, OID_AUTO, "ddp_thres", CTLFLAG_RW,
-		    &sc->tt.ddp_thres, 0, "DDP threshold");
-
 		sc->tt.rx_coalesce = 1;
 		SYSCTL_ADD_INT(ctx, children, OID_AUTO, "rx_coalesce",
 		    CTLFLAG_RW, &sc->tt.rx_coalesce, 0, "receive coalescing");

Modified: head/sys/dev/cxgbe/tom/t4_cpl_io.c
==============================================================================
--- head/sys/dev/cxgbe/tom/t4_cpl_io.c	Sat May  7 00:07:03 2016	(r299209)
+++ head/sys/dev/cxgbe/tom/t4_cpl_io.c	Sat May  7 00:33:35 2016	(r299210)
@@ -343,7 +343,7 @@ send_rx_credits(struct adapter *sc, stru
 }
 
 void
-t4_rcvd(struct toedev *tod, struct tcpcb *tp)
+t4_rcvd_locked(struct toedev *tod, struct tcpcb *tp)
 {
 	struct adapter *sc = tod->tod_softc;
 	struct inpcb *inp = tp->t_inpcb;
@@ -354,7 +354,7 @@ t4_rcvd(struct toedev *tod, struct tcpcb
 
 	INP_WLOCK_ASSERT(inp);
 
-	SOCKBUF_LOCK(sb);
+	SOCKBUF_LOCK_ASSERT(sb);
 	KASSERT(toep->sb_cc >= sbused(sb),
 	    ("%s: sb %p has more data (%d) than last time (%d).",
 	    __func__, sb, sbused(sb), toep->sb_cc));
@@ -372,6 +372,17 @@ t4_rcvd(struct toedev *tod, struct tcpcb
 		tp->rcv_wnd += credits;
 		tp->rcv_adv += credits;
 	}
+}
+
+void
+t4_rcvd(struct toedev *tod, struct tcpcb *tp)
+{
+	struct inpcb *inp = tp->t_inpcb;
+	struct socket *so = inp->inp_socket;
+	struct sockbuf *sb = &so->so_rcv;
+
+	SOCKBUF_LOCK(sb);
+	t4_rcvd_locked(tod, tp);
 	SOCKBUF_UNLOCK(sb);
 }
 
@@ -1042,7 +1053,6 @@ do_peer_close(struct sge_iq *iq, const s
 	struct inpcb *inp = toep->inp;
 	struct tcpcb *tp = NULL;
 	struct socket *so;
-	struct sockbuf *sb;
 #ifdef INVARIANTS
 	unsigned int opcode = G_CPL_OPCODE(be32toh(OPCODE_TID(cpl)));
 #endif
@@ -1088,12 +1098,14 @@ do_peer_close(struct sge_iq *iq, const s
 	tp->rcv_nxt++;	/* FIN */
 
 	so = inp->inp_socket;
-	sb = &so->so_rcv;
-	SOCKBUF_LOCK(sb);
-	if (__predict_false(toep->ddp_flags & (DDP_BUF0_ACTIVE | DDP_BUF1_ACTIVE))) {
-		handle_ddp_close(toep, tp, sb, cpl->rcv_nxt);
+	if (toep->ulp_mode == ULP_MODE_TCPDDP) {
+		DDP_LOCK(toep);
+		if (__predict_false(toep->ddp_flags &
+		    (DDP_BUF0_ACTIVE | DDP_BUF1_ACTIVE)))
+			handle_ddp_close(toep, tp, cpl->rcv_nxt);
+		DDP_UNLOCK(toep);
 	}
-	socantrcvmore_locked(so);	/* unlocks the sockbuf */
+	socantrcvmore(so);
 
 	if (toep->ulp_mode != ULP_MODE_RDMA) {
 		KASSERT(tp->rcv_nxt == be32toh(cpl->rcv_nxt),
@@ -1409,6 +1421,8 @@ do_rx_data(struct sge_iq *iq, const stru
 	tp->rcv_wnd -= len;
 	tp->t_rcvtime = ticks;
 
+	if (toep->ulp_mode == ULP_MODE_TCPDDP)
+		DDP_LOCK(toep);
 	so = inp_inpcbtosocket(inp);
 	sb = &so->so_rcv;
 	SOCKBUF_LOCK(sb);
@@ -1418,6 +1432,8 @@ do_rx_data(struct sge_iq *iq, const stru
 		    __func__, tid, len);
 		m_freem(m);
 		SOCKBUF_UNLOCK(sb);
+		if (toep->ulp_mode == ULP_MODE_TCPDDP)
+			DDP_UNLOCK(toep);
 		INP_WUNLOCK(inp);
 
 		INP_INFO_RLOCK(&V_tcbinfo);
@@ -1446,6 +1462,10 @@ do_rx_data(struct sge_iq *iq, const stru
 			toep->rx_credits += newsize - hiwat;
 	}
 
+	if (toep->ddp_waiting_count != 0 || toep->ddp_active_count != 0)
+		CTR3(KTR_CXGBE, "%s: tid %u, non-ddp rx (%d bytes)", __func__,
+		    tid, len);
+
 	if (toep->ulp_mode == ULP_MODE_TCPDDP) {
 		int changed = !(toep->ddp_flags & DDP_ON) ^ cpl->ddp_off;
 
@@ -1458,47 +1478,22 @@ do_rx_data(struct sge_iq *iq, const stru
 				    __func__));
 
 				/* Fell out of DDP mode */
-				toep->ddp_flags &= ~(DDP_ON | DDP_BUF0_ACTIVE |
-				    DDP_BUF1_ACTIVE);
+				toep->ddp_flags &= ~DDP_ON;
+				CTR1(KTR_CXGBE, "%s: fell out of DDP mode",
+				    __func__);
 
-				if (ddp_placed)
-					insert_ddp_data(toep, ddp_placed);
+				insert_ddp_data(toep, ddp_placed);
 			}
 		}
 
-		if ((toep->ddp_flags & DDP_OK) == 0 &&
-		    time_uptime >= toep->ddp_disabled + DDP_RETRY_WAIT) {
-			toep->ddp_score = DDP_LOW_SCORE;
-			toep->ddp_flags |= DDP_OK;
-			CTR3(KTR_CXGBE, "%s: tid %u DDP_OK @ %u",
-			    __func__, tid, time_uptime);
-		}
-
 		if (toep->ddp_flags & DDP_ON) {
-
 			/*
-			 * CPL_RX_DATA with DDP on can only be an indicate.  Ask
-			 * soreceive to post a buffer or disable DDP.  The
-			 * payload that arrived in this indicate is appended to
-			 * the socket buffer as usual.
+			 * CPL_RX_DATA with DDP on can only be an indicate.
+			 * Start posting queued AIO requests via DDP.  The
+			 * payload that arrived in this indicate is appended
+			 * to the socket buffer as usual.
 			 */
-
-#if 0
-			CTR5(KTR_CXGBE,
-			    "%s: tid %u (0x%x) DDP indicate (seq 0x%x, len %d)",
-			    __func__, tid, toep->flags, be32toh(cpl->seq), len);
-#endif
-			sb->sb_flags |= SB_DDP_INDICATE;
-		} else if ((toep->ddp_flags & (DDP_OK|DDP_SC_REQ)) == DDP_OK &&
-		    tp->rcv_wnd > DDP_RSVD_WIN && len >= sc->tt.ddp_thres) {
-
-			/*
-			 * DDP allowed but isn't on (and a request to switch it
-			 * on isn't pending either), and conditions are ripe for
-			 * it to work.  Switch it on.
-			 */
-
-			enable_ddp(sc, toep);
+			handle_ddp_indicate(toep);
 		}
 	}
 
@@ -1516,8 +1511,16 @@ do_rx_data(struct sge_iq *iq, const stru
 		tp->rcv_wnd += credits;
 		tp->rcv_adv += credits;
 	}
+
+	if (toep->ddp_waiting_count > 0 && sbavail(sb) != 0) {
+		CTR2(KTR_CXGBE, "%s: tid %u queueing AIO task", __func__,
+		    tid);
+		ddp_queue_toep(toep);
+	}
 	sorwakeup_locked(so);
 	SOCKBUF_UNLOCK_ASSERT(sb);
+	if (toep->ulp_mode == ULP_MODE_TCPDDP)
+		DDP_UNLOCK(toep);
 
 	INP_WUNLOCK(inp);
 	CURVNET_RESTORE();
@@ -1680,6 +1683,7 @@ do_set_tcb_rpl(struct sge_iq *iq, const 
 	struct adapter *sc = iq->adapter;
 	const struct cpl_set_tcb_rpl *cpl = (const void *)(rss + 1);
 	unsigned int tid = GET_TID(cpl);
+	struct toepcb *toep;
 #ifdef INVARIANTS
 	unsigned int opcode = G_CPL_OPCODE(be32toh(OPCODE_TID(cpl)));
 #endif
@@ -1691,6 +1695,12 @@ do_set_tcb_rpl(struct sge_iq *iq, const 
 	if (is_ftid(sc, tid))
 		return (t4_filter_rpl(iq, rss, m)); /* TCB is a filter */
 
+	toep = lookup_tid(sc, tid);
+	if (toep->ulp_mode == ULP_MODE_TCPDDP) {
+		handle_ddp_tcb_rpl(toep, cpl);
+		return (0);
+	}
+
 	/*
 	 * TOM and/or other ULPs don't request replies for CPL_SET_TCB or
 	 * CPL_SET_TCB_FIELD requests.  This can easily change and when it does
@@ -1732,6 +1742,31 @@ t4_set_tcb_field(struct adapter *sc, str
 }
 
 void
+t4_set_tcb_field_rpl(struct adapter *sc, struct toepcb *toep, int ctrl,
+    uint16_t word, uint64_t mask, uint64_t val, uint8_t cookie)
+{
+	struct wrqe *wr;
+	struct cpl_set_tcb_field *req;
+
+	KASSERT((cookie & ~M_COOKIE) == 0, ("%s: invalid cookie %#x", __func__,
+	    cookie));
+	wr = alloc_wrqe(sizeof(*req), ctrl ? toep->ctrlq : toep->ofld_txq);
+	if (wr == NULL) {
+		/* XXX */
+		panic("%s: allocation failure.", __func__);
+	}
+	req = wrtod(wr);
+
+	INIT_TP_WR_MIT_CPL(req, CPL_SET_TCB_FIELD, toep->tid);
+	req->reply_ctrl = htobe16(V_QUEUENO(toep->ofld_rxq->iq.abs_id));
+	req->word_cookie = htobe16(V_WORD(word) | V_COOKIE(cookie));
+	req->mask = htobe64(mask);
+	req->val = htobe64(val);
+
+	t4_wrq_tx(sc, wr);
+}
+
+void
 t4_init_cpl_io_handlers(struct adapter *sc)
 {
 

Modified: head/sys/dev/cxgbe/tom/t4_ddp.c
==============================================================================
--- head/sys/dev/cxgbe/tom/t4_ddp.c	Sat May  7 00:07:03 2016	(r299209)
+++ head/sys/dev/cxgbe/tom/t4_ddp.c	Sat May  7 00:33:35 2016	(r299210)
@@ -31,7 +31,8 @@ __FBSDID("$FreeBSD$");
 #include "opt_inet.h"
 
 #include <sys/param.h>
-#include <sys/types.h>
+#include <sys/aio.h>
+#include <sys/file.h>
 #include <sys/systm.h>
 #include <sys/kernel.h>
 #include <sys/ktr.h>
@@ -41,6 +42,7 @@ __FBSDID("$FreeBSD$");
 #include <sys/domain.h>
 #include <sys/socket.h>
 #include <sys/socketvar.h>
+#include <sys/taskqueue.h>
 #include <sys/uio.h>
 #include <netinet/in.h>
 #include <netinet/in_pcb.h>
@@ -72,7 +74,10 @@ VNET_DECLARE(int, tcp_autorcvbuf_inc);
 VNET_DECLARE(int, tcp_autorcvbuf_max);
 #define V_tcp_autorcvbuf_max VNET(tcp_autorcvbuf_max)
 
-static struct mbuf *get_ddp_mbuf(int len);
+static void aio_ddp_requeue_task(void *context, int pending);
+static void ddp_complete_all(struct toepcb *toep, int error);
+static void t4_aio_cancel_active(struct kaiocb *job);
+static void t4_aio_cancel_queued(struct kaiocb *job);
 
 #define PPOD_SZ(n)	((n) * sizeof(struct pagepod))
 #define PPOD_SIZE	(PPOD_SZ(1))
@@ -80,6 +85,10 @@ static struct mbuf *get_ddp_mbuf(int len
 /* XXX: must match A_ULP_RX_TDDP_PSZ */
 static int t4_ddp_pgsz[] = {4096, 4096 << 2, 4096 << 4, 4096 << 6};
 
+static TAILQ_HEAD(, pageset) ddp_orphan_pagesets;
+static struct mtx ddp_orphan_pagesets_lock;
+static struct task ddp_orphan_task;
+
 #define MAX_DDP_BUFFER_SIZE		(M_TCB_RX_DDP_BUF0_LEN)
 static int
 alloc_ppods(struct tom_data *td, int n, u_int *ppod_addr)
@@ -112,33 +121,199 @@ pages_to_nppods(int npages, int ddp_pgsz
 	return (howmany(nsegs, PPOD_PAGES));
 }
 
+/*
+ * A page set holds information about a buffer used for DDP.  The page
+ * set holds resources such as the VM pages backing the buffer (either
+ * held or wired) and the page pods associated with the buffer.
+ * Recently used page sets are cached to allow for efficient reuse of
+ * buffers (avoiding the need to re-fault in pages, hold them, etc.).
+ * Note that cached page sets keep the backing pages wired.  The
+ * number of wired pages is capped by only allowing for two wired
+ * pagesets per connection.  This is not a perfect cap, but is a
+ * trade-off for performance.
+ *
+ * If an application ping-pongs two buffers for a connection via
+ * aio_read(2) then those buffers should remain wired and expensive VM
+ * fault lookups should be avoided after each buffer has been used
+ * once.  If an application uses more than two buffers then this will
+ * fall back to doing expensive VM fault lookups for each operation.
+ */
+static void
+free_pageset(struct tom_data *td, struct pageset *ps)
+{
+	vm_page_t p;
+	int i;
+
+	if (ps->nppods > 0)
+		free_ppods(td, ps->ppod_addr, ps->nppods);
+
+	if (ps->flags & PS_WIRED) {
+		for (i = 0; i < ps->npages; i++) {
+			p = ps->pages[i];
+			vm_page_lock(p);
+			vm_page_unwire(p, PQ_INACTIVE);
+			vm_page_unlock(p);
+		}
+	} else
+		vm_page_unhold_pages(ps->pages, ps->npages);
+	mtx_lock(&ddp_orphan_pagesets_lock);
+	TAILQ_INSERT_TAIL(&ddp_orphan_pagesets, ps, link);
+	taskqueue_enqueue(taskqueue_thread, &ddp_orphan_task);
+	mtx_unlock(&ddp_orphan_pagesets_lock);
+}
+
+static void
+ddp_free_orphan_pagesets(void *context, int pending)
+{
+	struct pageset *ps;
+
+	mtx_lock(&ddp_orphan_pagesets_lock);
+	while (!TAILQ_EMPTY(&ddp_orphan_pagesets)) {
+		ps = TAILQ_FIRST(&ddp_orphan_pagesets);
+		TAILQ_REMOVE(&ddp_orphan_pagesets, ps, link);
+		mtx_unlock(&ddp_orphan_pagesets_lock);
+		if (ps->vm)
+			vmspace_free(ps->vm);
+		free(ps, M_CXGBE);
+		mtx_lock(&ddp_orphan_pagesets_lock);
+	}
+	mtx_unlock(&ddp_orphan_pagesets_lock);
+}
+
+static void
+recycle_pageset(struct toepcb *toep, struct pageset *ps)
+{
+
+	DDP_ASSERT_LOCKED(toep);
+	if (!(toep->ddp_flags & DDP_DEAD) && ps->flags & PS_WIRED) {
+		KASSERT(toep->ddp_cached_count + toep->ddp_active_count <
+		    nitems(toep->db), ("too many wired pagesets"));
+		TAILQ_INSERT_HEAD(&toep->ddp_cached_pagesets, ps, link);
+		toep->ddp_cached_count++;
+	} else
+		free_pageset(toep->td, ps);
+}
+
+static void
+ddp_complete_one(struct kaiocb *job, int error)
+{
+	long copied;
+
+	/*
+	 * If this job had copied data out of the socket buffer before
+	 * it was cancelled, report it as a short read rather than an
+	 * error.
+	 */
+	copied = job->uaiocb._aiocb_private.status;
+	if (copied != 0 || error == 0)
+		aio_complete(job, copied, 0);
+	else
+		aio_complete(job, -1, error);
+}
+
 static void
 free_ddp_buffer(struct tom_data *td, struct ddp_buffer *db)
 {
 
-	if (db == NULL)
-		return;
+	if (db->job) {
+		/*
+		 * XXX: If we are un-offloading the socket then we
+		 * should requeue these on the socket somehow.  If we
+		 * got a FIN from the remote end, then this completes
+		 * any remaining requests with an EOF read.
+		 */
+		if (!aio_clear_cancel_function(db->job))
+			ddp_complete_one(db->job, 0);
+	}
+
+	if (db->ps)
+		free_pageset(td, db->ps);
+}
+
+void
+ddp_init_toep(struct toepcb *toep)
+{
 
-	if (db->pages)
-		free(db->pages, M_CXGBE);
+	TAILQ_INIT(&toep->ddp_aiojobq);
+	TASK_INIT(&toep->ddp_requeue_task, 0, aio_ddp_requeue_task, toep);
+	toep->ddp_active_id = -1;
+	mtx_init(&toep->ddp_lock, "t4 ddp", NULL, MTX_DEF);
+}
 
-	if (db->nppods > 0)
-		free_ppods(td, db->ppod_addr, db->nppods);
+void
+ddp_uninit_toep(struct toepcb *toep)
+{
 
-	free(db, M_CXGBE);
+	mtx_destroy(&toep->ddp_lock);
 }
 
 void
 release_ddp_resources(struct toepcb *toep)
 {
+	struct pageset *ps;
 	int i;
 
+	DDP_LOCK(toep);
+	toep->flags |= DDP_DEAD;
 	for (i = 0; i < nitems(toep->db); i++) {
-		if (toep->db[i] != NULL) {
-			free_ddp_buffer(toep->td, toep->db[i]);
-			toep->db[i] = NULL;
-		}
+		free_ddp_buffer(toep->td, &toep->db[i]);
+	}
+	while ((ps = TAILQ_FIRST(&toep->ddp_cached_pagesets)) != NULL) {
+		TAILQ_REMOVE(&toep->ddp_cached_pagesets, ps, link);
+		free_pageset(toep->td, ps);
+	}
+	ddp_complete_all(toep, 0);
+	DDP_UNLOCK(toep);
+}
+
+#ifdef INVARIANTS
+void
+ddp_assert_empty(struct toepcb *toep)
+{
+	int i;
+
+	MPASS(!(toep->ddp_flags & DDP_TASK_ACTIVE));
+	for (i = 0; i < nitems(toep->db); i++) {
+		MPASS(toep->db[i].job == NULL);
+		MPASS(toep->db[i].ps == NULL);
 	}
+	MPASS(TAILQ_EMPTY(&toep->ddp_cached_pagesets));
+	MPASS(TAILQ_EMPTY(&toep->ddp_aiojobq));
+}
+#endif
+
+static void
+complete_ddp_buffer(struct toepcb *toep, struct ddp_buffer *db,
+    unsigned int db_idx)
+{
+	unsigned int db_flag;
+
+	toep->ddp_active_count--;
+	if (toep->ddp_active_id == db_idx) {
+		if (toep->ddp_active_count == 0) {
+			KASSERT(toep->db[db_idx ^ 1].job == NULL,
+			    ("%s: active_count mismatch", __func__));
+			toep->ddp_active_id = -1;
+		} else
+			toep->ddp_active_id ^= 1;
+		CTR2(KTR_CXGBE, "%s: ddp_active_id = %d", __func__,
+		    toep->ddp_active_id);
+	} else {
+		KASSERT(toep->ddp_active_count != 0 &&
+		    toep->ddp_active_id != -1,
+		    ("%s: active count mismatch", __func__));
+	}
+
+	db->cancel_pending = 0;
+	db->job = NULL;
+	recycle_pageset(toep, db->ps);
+	db->ps = NULL;
+
+	db_flag = db_idx == 1 ? DDP_BUF1_ACTIVE : DDP_BUF0_ACTIVE;
+	KASSERT(toep->ddp_flags & db_flag,
+	    ("%s: DDP buffer not active. toep %p, ddp_flags 0x%x",
+	    __func__, toep, toep->ddp_flags));
+	toep->ddp_flags &= ~db_flag;
 }
 
 /* XXX: handle_ddp_data code duplication */
@@ -147,28 +322,59 @@ insert_ddp_data(struct toepcb *toep, uin
 {
 	struct inpcb *inp = toep->inp;
 	struct tcpcb *tp = intotcpcb(inp);
-	struct sockbuf *sb = &inp->inp_socket->so_rcv;
-	struct mbuf *m;
+	struct ddp_buffer *db;
+	struct kaiocb *job;
+	size_t placed;
+	long copied;
+	unsigned int db_flag, db_idx;
 
 	INP_WLOCK_ASSERT(inp);
-	SOCKBUF_LOCK_ASSERT(sb);
+	DDP_ASSERT_LOCKED(toep);
 
-	m = get_ddp_mbuf(n);
 	tp->rcv_nxt += n;
 #ifndef USE_DDP_RX_FLOW_CONTROL
 	KASSERT(tp->rcv_wnd >= n, ("%s: negative window size", __func__));
 	tp->rcv_wnd -= n;
 #endif
-
-	KASSERT(toep->sb_cc >= sbused(sb),
-	    ("%s: sb %p has more data (%d) than last time (%d).",
-	    __func__, sb, sbused(sb), toep->sb_cc));
-	toep->rx_credits += toep->sb_cc - sbused(sb);
-#ifdef USE_DDP_RX_FLOW_CONTROL
-	toep->rx_credits -= n;	/* adjust for F_RX_FC_DDP */
+#ifndef USE_DDP_RX_FLOW_CONTROL
+	toep->rx_credits += n;
 #endif
-	sbappendstream_locked(sb, m, 0);
-	toep->sb_cc = sbused(sb);
+	CTR2(KTR_CXGBE, "%s: placed %u bytes before falling out of DDP",
+	    __func__, n);
+	while (toep->ddp_active_count > 0) {
+		MPASS(toep->ddp_active_id != -1);
+		db_idx = toep->ddp_active_id;
+		db_flag = db_idx == 1 ? DDP_BUF1_ACTIVE : DDP_BUF0_ACTIVE;
+		MPASS((toep->ddp_flags & db_flag) != 0);
+		db = &toep->db[db_idx];
+		job = db->job;
+		copied = job->uaiocb._aiocb_private.status;
+		placed = n;
+		if (placed > job->uaiocb.aio_nbytes - copied)
+			placed = job->uaiocb.aio_nbytes - copied;
+		if (!aio_clear_cancel_function(job)) {
+			/*
+			 * Update the copied length for when
+			 * t4_aio_cancel_active() completes this
+			 * request.
+			 */
+			job->uaiocb._aiocb_private.status += placed;
+		} else if (copied + placed != 0) {
+			CTR4(KTR_CXGBE,
+			    "%s: completing %p (copied %ld, placed %lu)",
+			    __func__, job, copied, placed);
+			/* XXX: This always completes if there is some data. */
+			aio_complete(job, copied + placed, 0);
+		} else if (aio_set_cancel_function(job, t4_aio_cancel_queued)) {
+			TAILQ_INSERT_HEAD(&toep->ddp_aiojobq, job, list);
+			toep->ddp_waiting_count++;
+		} else
+			aio_cancel(job);
+		n -= placed;
+		complete_ddp_buffer(toep, db, db_idx);
+	}
+
+	MPASS(n == 0);
 }
 
 /* SET_TCB_FIELD sent as a ULP command looks like this */
@@ -236,42 +442,10 @@ mk_rx_data_ack_ulp(struct ulp_txpkt *ulp
 	return (ulpsc);
 }
 
-static inline uint64_t
-select_ddp_flags(struct socket *so, int flags, int db_idx)
-{
-	uint64_t ddp_flags = V_TF_DDP_INDICATE_OUT(0);
-	int waitall = flags & MSG_WAITALL;
-	int nb = so->so_state & SS_NBIO || flags & (MSG_DONTWAIT | MSG_NBIO);
-
-	KASSERT(db_idx == 0 || db_idx == 1,
-	    ("%s: bad DDP buffer index %d", __func__, db_idx));
-
-	if (db_idx == 0) {
-		ddp_flags |= V_TF_DDP_BUF0_VALID(1) | V_TF_DDP_ACTIVE_BUF(0);
-		if (waitall)
-			ddp_flags |= V_TF_DDP_PUSH_DISABLE_0(1);
-		else if (nb)
-			ddp_flags |= V_TF_DDP_BUF0_FLUSH(1);
-		else
-			ddp_flags |= V_TF_DDP_BUF0_FLUSH(0);
-	} else {
-		ddp_flags |= V_TF_DDP_BUF1_VALID(1) | V_TF_DDP_ACTIVE_BUF(1);
-		if (waitall)
-			ddp_flags |= V_TF_DDP_PUSH_DISABLE_1(1);
-		else if (nb)
-			ddp_flags |= V_TF_DDP_BUF1_FLUSH(1);
-		else
-			ddp_flags |= V_TF_DDP_BUF1_FLUSH(0);
-	}
-
-	return (ddp_flags);
-}
-
 static struct wrqe *
 mk_update_tcb_for_ddp(struct adapter *sc, struct toepcb *toep, int db_idx,
-    int offset, uint64_t ddp_flags)
+    struct pageset *ps, int offset, uint64_t ddp_flags, uint64_t ddp_flags_mask)
 {
-	struct ddp_buffer *db = toep->db[db_idx];
 	struct wrqe *wr;
 	struct work_request_hdr *wrh;
 	struct ulp_txpkt *ulpmc;
@@ -302,7 +476,7 @@ mk_update_tcb_for_ddp(struct adapter *sc
 	ulpmc = mk_set_tcb_field_ulp(ulpmc, toep,
 	    W_TCB_RX_DDP_BUF0_TAG + db_idx,
 	    V_TCB_RX_DDP_BUF0_TAG(M_TCB_RX_DDP_BUF0_TAG),
-	    V_TCB_RX_DDP_BUF0_TAG(db->tag));
+	    V_TCB_RX_DDP_BUF0_TAG(ps->tag));
 
 	/* Update the current offset in the DDP buffer and its total length */
 	if (db_idx == 0)
@@ -311,21 +485,18 @@ mk_update_tcb_for_ddp(struct adapter *sc
 		    V_TCB_RX_DDP_BUF0_OFFSET(M_TCB_RX_DDP_BUF0_OFFSET) |
 		    V_TCB_RX_DDP_BUF0_LEN(M_TCB_RX_DDP_BUF0_LEN),
 		    V_TCB_RX_DDP_BUF0_OFFSET(offset) |
-		    V_TCB_RX_DDP_BUF0_LEN(db->len));
+		    V_TCB_RX_DDP_BUF0_LEN(ps->len));
 	else
 		ulpmc = mk_set_tcb_field_ulp(ulpmc, toep,
 		    W_TCB_RX_DDP_BUF1_OFFSET,
 		    V_TCB_RX_DDP_BUF1_OFFSET(M_TCB_RX_DDP_BUF1_OFFSET) |
 		    V_TCB_RX_DDP_BUF1_LEN((u64)M_TCB_RX_DDP_BUF1_LEN << 32),
 		    V_TCB_RX_DDP_BUF1_OFFSET(offset) |
-		    V_TCB_RX_DDP_BUF1_LEN((u64)db->len << 32));
+		    V_TCB_RX_DDP_BUF1_LEN((u64)ps->len << 32));
 
 	/* Update DDP flags */
 	ulpmc = mk_set_tcb_field_ulp(ulpmc, toep, W_TCB_RX_DDP_FLAGS,
-	    V_TF_DDP_BUF0_FLUSH(1) | V_TF_DDP_BUF1_FLUSH(1) |
-	    V_TF_DDP_PUSH_DISABLE_0(1) | V_TF_DDP_PUSH_DISABLE_1(1) |
-	    V_TF_DDP_BUF0_VALID(1) | V_TF_DDP_BUF1_VALID(1) |
-	    V_TF_DDP_ACTIVE_BUF(1) | V_TF_DDP_INDICATE_OUT(1), ddp_flags);
+	    ddp_flags_mask, ddp_flags);
 
 	/* Gratuitous RX_DATA_ACK with RX_MODULATE set to speed up delivery. */
 	ulpmc = mk_rx_data_ack_ulp(ulpmc, toep);
@@ -333,30 +504,20 @@ mk_update_tcb_for_ddp(struct adapter *sc
 	return (wr);
 }
 
-static void
-discourage_ddp(struct toepcb *toep)
-{
-
-	if (toep->ddp_score && --toep->ddp_score == 0) {
-		toep->ddp_flags &= ~DDP_OK;
-		toep->ddp_disabled = time_uptime;
-		CTR3(KTR_CXGBE, "%s: tid %u !DDP_OK @ %u",
-		    __func__, toep->tid, time_uptime);
-	}
-}
-
 static int
 handle_ddp_data(struct toepcb *toep, __be32 ddp_report, __be32 rcv_nxt, int len)
 {
 	uint32_t report = be32toh(ddp_report);
-	unsigned int db_flag;
+	unsigned int db_idx;
 	struct inpcb *inp = toep->inp;
+	struct ddp_buffer *db;
 	struct tcpcb *tp;
 	struct socket *so;
 	struct sockbuf *sb;
-	struct mbuf *m;
+	struct kaiocb *job;
+	long copied;
 
-	db_flag = report & F_DDP_BUF_IDX ? DDP_BUF1_ACTIVE : DDP_BUF0_ACTIVE;
+	db_idx = report & F_DDP_BUF_IDX ? 1 : 0;
 
 	if (__predict_false(!(report & F_DDP_INV)))
 		CXGBE_UNIMPLEMENTED("DDP buffer still valid");
@@ -364,19 +525,24 @@ handle_ddp_data(struct toepcb *toep, __b
 	INP_WLOCK(inp);
 	so = inp_inpcbtosocket(inp);
 	sb = &so->so_rcv;
-	if (__predict_false(inp->inp_flags & (INP_DROPPED | INP_TIMEWAIT))) {
+	DDP_LOCK(toep);
 
+	KASSERT(toep->ddp_active_id == db_idx,
+	    ("completed DDP buffer (%d) != active_id (%d) for tid %d", db_idx,
+	    toep->ddp_active_id, toep->tid));
+	db = &toep->db[db_idx];
+	job = db->job;
+
+	if (__predict_false(inp->inp_flags & (INP_DROPPED | INP_TIMEWAIT))) {
 		/*
-		 * XXX: think a bit more.
-		 * tcpcb probably gone, but socket should still be around
-		 * because we always wait for DDP completion in soreceive no
-		 * matter what.  Just wake it up and let it clean up.
+		 * This can happen due to an administrative tcpdrop(8).
+		 * Just fail the request with ECONNRESET.
 		 */
-
 		CTR5(KTR_CXGBE, "%s: tid %u, seq 0x%x, len %d, inp_flags 0x%x",
 		    __func__, toep->tid, be32toh(rcv_nxt), len, inp->inp_flags);
-		SOCKBUF_LOCK(sb);
-		goto wakeup;
+		if (aio_clear_cancel_function(job))
+			ddp_complete_one(job, ECONNRESET);
+		goto completed;
 	}
 
 	tp = intotcpcb(inp);
@@ -386,7 +552,7 @@ handle_ddp_data(struct toepcb *toep, __b
 	 * sequence number of the next byte to receive.  The length of
 	 * the data received for this message must be computed by
 	 * comparing the new and old values of rcv_nxt.
-	 * 
+	 *
 	 * For RX_DATA_DDP, len might be non-zero, but it is only the
 	 * length of the most recent DMA.  It does not include the
 	 * total length of the data received since the previous update
@@ -400,15 +566,14 @@ handle_ddp_data(struct toepcb *toep, __b
 	KASSERT(tp->rcv_wnd >= len, ("%s: negative window size", __func__));
 	tp->rcv_wnd -= len;
 #endif
-	m = get_ddp_mbuf(len);
-
-	SOCKBUF_LOCK(sb);
-	if (report & F_DDP_BUF_COMPLETE)
-		toep->ddp_score = DDP_HIGH_SCORE;
-	else
-		discourage_ddp(toep);
+#ifdef VERBOSE_TRACES
+	CTR4(KTR_CXGBE, "%s: DDP[%d] placed %d bytes (%#x)", __func__, db_idx,
+	    len, report);
+#endif
 
 	/* receive buffer autosize */
+	CURVNET_SET(so->so_vnet);
+	SOCKBUF_LOCK(sb);
 	if (sb->sb_flags & SB_AUTOSIZE &&
 	    V_tcp_do_autorcvbuf &&
 	    sb->sb_hiwat < V_tcp_autorcvbuf_max &&
@@ -422,57 +587,185 @@ handle_ddp_data(struct toepcb *toep, __b
 		else
 			toep->rx_credits += newsize - hiwat;
 	}
+	SOCKBUF_UNLOCK(sb);
+	CURVNET_RESTORE();
 
-	KASSERT(toep->sb_cc >= sbused(sb),
-	    ("%s: sb %p has more data (%d) than last time (%d).",
-	    __func__, sb, sbused(sb), toep->sb_cc));
-	toep->rx_credits += toep->sb_cc - sbused(sb);
-#ifdef USE_DDP_RX_FLOW_CONTROL
-	toep->rx_credits -= len;	/* adjust for F_RX_FC_DDP */
+#ifndef USE_DDP_RX_FLOW_CONTROL
+	toep->rx_credits += len;
 #endif
-	sbappendstream_locked(sb, m, 0);
-	toep->sb_cc = sbused(sb);
-wakeup:
-	KASSERT(toep->ddp_flags & db_flag,
-	    ("%s: DDP buffer not active. toep %p, ddp_flags 0x%x, report 0x%x",
-	    __func__, toep, toep->ddp_flags, report));
-	toep->ddp_flags &= ~db_flag;
-	sorwakeup_locked(so);
-	SOCKBUF_UNLOCK_ASSERT(sb);
 
+	if (db->cancel_pending) {
+		/*
+		 * Update the job's length but defer completion to the
+		 * TCB_RPL callback.
+		 */
+		job->uaiocb._aiocb_private.status += len;
+		goto out;
+	} else if (!aio_clear_cancel_function(job)) {
+		/*
+		 * Update the copied length for when
+		 * t4_aio_cancel_active() completes this request.
+		 */
+		job->uaiocb._aiocb_private.status += len;
+	} else {
+		copied = job->uaiocb._aiocb_private.status;
+#ifdef VERBOSE_TRACES
+		CTR4(KTR_CXGBE, "%s: completing %p (copied %ld, placed %d)",
+		    __func__, job, copied, len);
+#endif
+		aio_complete(job, copied + len, 0);
+		t4_rcvd(&toep->td->tod, tp);
+	}
+
+completed:
+	complete_ddp_buffer(toep, db, db_idx);
+	if (toep->ddp_waiting_count > 0)
+		ddp_queue_toep(toep);
+out:
+	DDP_UNLOCK(toep);
 	INP_WUNLOCK(inp);
+
 	return (0);
 }
 
 void
-handle_ddp_close(struct toepcb *toep, struct tcpcb *tp, struct sockbuf *sb,
-    __be32 rcv_nxt)
+handle_ddp_indicate(struct toepcb *toep)
 {
-	struct mbuf *m;
-	int len;
 
-	SOCKBUF_LOCK_ASSERT(sb);
+	DDP_ASSERT_LOCKED(toep);
+	MPASS(toep->ddp_active_count == 0);
+	MPASS((toep->ddp_flags & (DDP_BUF0_ACTIVE | DDP_BUF1_ACTIVE)) == 0);
+	if (toep->ddp_waiting_count == 0) {
+		/*
+		 * The pending requests that triggered the request for an
+		 * an indicate were cancelled.  Those cancels should have
+		 * already disabled DDP.  Just ignore this as the data is
+		 * going into the socket buffer anyway.
+		 */
+		return;
+	}
+	CTR3(KTR_CXGBE, "%s: tid %d indicated (%d waiting)", __func__,
+	    toep->tid, toep->ddp_waiting_count);
+	ddp_queue_toep(toep);
+}
+
+enum {
+	DDP_BUF0_INVALIDATED = 0x2,
+	DDP_BUF1_INVALIDATED
+};
+
+void
+handle_ddp_tcb_rpl(struct toepcb *toep, const struct cpl_set_tcb_rpl *cpl)
+{
+	unsigned int db_idx;
+	struct inpcb *inp = toep->inp;
+	struct ddp_buffer *db;
+	struct kaiocb *job;
+	long copied;
+
+	if (cpl->status != CPL_ERR_NONE)
+		panic("XXX: tcp_rpl failed: %d", cpl->status);
+
+	switch (cpl->cookie) {
+	case V_WORD(W_TCB_RX_DDP_FLAGS) | V_COOKIE(DDP_BUF0_INVALIDATED):
+	case V_WORD(W_TCB_RX_DDP_FLAGS) | V_COOKIE(DDP_BUF1_INVALIDATED):
+		/*
+		 * XXX: This duplicates a lot of code with handle_ddp_data().
+		 */
+		db_idx = G_COOKIE(cpl->cookie) - DDP_BUF0_INVALIDATED;
+		INP_WLOCK(inp);
+		DDP_LOCK(toep);
+		db = &toep->db[db_idx];
+
+		/*
+		 * handle_ddp_data() should leave the job around until
+		 * this callback runs once a cancel is pending.
+		 */
+		MPASS(db != NULL);
+		MPASS(db->job != NULL);
+		MPASS(db->cancel_pending);
+
+		/*
+		 * XXX: It's not clear what happens if there is data
+		 * placed when the buffer is invalidated.  I suspect we
+		 * need to read the TCB to see how much data was placed.
+		 *
+		 * For now this just pretends like nothing was placed.
+		 *
+		 * XXX: Note that if we did check the PCB we would need to
+		 * also take care of updating the tp, etc.
+		 */
+		job = db->job;
+		copied = job->uaiocb._aiocb_private.status;
+		if (copied == 0) {
+			CTR2(KTR_CXGBE, "%s: cancelling %p", __func__, job);
+			aio_cancel(job);
+		} else {
+			CTR3(KTR_CXGBE, "%s: completing %p (copied %ld)",
+			    __func__, job, copied);
+			aio_complete(job, copied, 0);
+			t4_rcvd(&toep->td->tod, intotcpcb(inp));
+		}
+
+		complete_ddp_buffer(toep, db, db_idx);
+		if (toep->ddp_waiting_count > 0)
+			ddp_queue_toep(toep);
+		DDP_UNLOCK(toep);
+		INP_WUNLOCK(inp);
+		break;
+	default:
+		panic("XXX: unknown tcb_rpl offset %#x, cookie %#x",
+		    G_WORD(cpl->cookie), G_COOKIE(cpl->cookie));
+	}
+}
+
+void
+handle_ddp_close(struct toepcb *toep, struct tcpcb *tp, __be32 rcv_nxt)
+{
+	struct ddp_buffer *db;
+	struct kaiocb *job;
+	long copied;
+	unsigned int db_flag, db_idx;
+	int len, placed;
+
 	INP_WLOCK_ASSERT(toep->inp);
+	DDP_ASSERT_LOCKED(toep);
 	len = be32toh(rcv_nxt) - tp->rcv_nxt;
 
-	/* Signal handle_ddp() to break out of its sleep loop. */
-	toep->ddp_flags &= ~(DDP_BUF0_ACTIVE | DDP_BUF1_ACTIVE);
-	if (len == 0)
-		return;
-
 	tp->rcv_nxt += len;
-	KASSERT(toep->sb_cc >= sbused(sb),
-	    ("%s: sb %p has more data (%d) than last time (%d).",
-	    __func__, sb, sbused(sb), toep->sb_cc));
-	toep->rx_credits += toep->sb_cc - sbused(sb);
-#ifdef USE_DDP_RX_FLOW_CONTROL
-	toep->rx_credits -= len;	/* adjust for F_RX_FC_DDP */
+#ifndef USE_DDP_RX_FLOW_CONTROL
+	toep->rx_credits += len;
 #endif
 
-	m = get_ddp_mbuf(len);
+	while (toep->ddp_active_count > 0) {
+		MPASS(toep->ddp_active_id != -1);
+		db_idx = toep->ddp_active_id;
+		db_flag = db_idx == 1 ? DDP_BUF1_ACTIVE : DDP_BUF0_ACTIVE;
+		MPASS((toep->ddp_flags & db_flag) != 0);
+		db = &toep->db[db_idx];
+		job = db->job;
+		copied = job->uaiocb._aiocb_private.status;
+		placed = len;
+		if (placed > job->uaiocb.aio_nbytes - copied)
+			placed = job->uaiocb.aio_nbytes - copied;
+		if (!aio_clear_cancel_function(job)) {
+			/*
+			 * Update the copied length for when
+			 * t4_aio_cancel_active() completes this
+			 * request.
+			 */
+			job->uaiocb._aiocb_private.status += placed;
+		} else {
+			CTR4(KTR_CXGBE, "%s: tid %d completed buf %d len %d",
+			    __func__, toep->tid, db_idx, placed);
+			aio_complete(job, copied + placed, 0);
+		}
+		len -= placed;
+		complete_ddp_buffer(toep, db, db_idx);
+	}
 
-	sbappendstream_locked(sb, m, 0);
-	toep->sb_cc = sbused(sb);
+	MPASS(len == 0);
+	ddp_complete_all(toep, 0);
 }
 
 #define DDP_ERR (F_DDP_PPOD_MISMATCH | F_DDP_LLIMIT_ERR | F_DDP_ULIMIT_ERR |\
@@ -529,7 +822,7 @@ do_rx_ddp_complete(struct sge_iq *iq, co
 	return (0);
 }
 
-void
+static void
 enable_ddp(struct adapter *sc, struct toepcb *toep)
 {
 
@@ -540,6 +833,7 @@ enable_ddp(struct adapter *sc, struct to
 	CTR3(KTR_CXGBE, "%s: tid %u (time %u)",
 	    __func__, toep->tid, time_uptime);
 
+	DDP_ASSERT_LOCKED(toep);
 	toep->ddp_flags |= DDP_SC_REQ;
 	t4_set_tcb_field(sc, toep, 1, W_TCB_RX_DDP_FLAGS,
 	    V_TF_DDP_OFF(1) | V_TF_DDP_INDICATE_OUT(1) |
@@ -550,81 +844,6 @@ enable_ddp(struct adapter *sc, struct to
 	    V_TF_RCV_COALESCE_ENABLE(1), 0);
 }
 
-static inline void
-disable_ddp(struct adapter *sc, struct toepcb *toep)
-{

*** DIFF OUTPUT TRUNCATED AT 1000 LINES ***



Want to link to this message? Use this URL: <https://mail-archive.FreeBSD.org/cgi/mid.cgi?201605070033.u470XZCs075568>