Skip site navigation (1)Skip section navigation (2)
Date:      Mon, 5 Oct 2009 14:49:16 +0000 (UTC)
From:      Robert Watson <rwatson@FreeBSD.org>
To:        src-committers@freebsd.org, svn-src-all@freebsd.org, svn-src-head@freebsd.org
Subject:   svn commit: r197775 - head/sys/kern
Message-ID:  <200910051449.n95EnGof015525@svn.freebsd.org>

next in thread | raw e-mail | index | archive | help
Author: rwatson
Date: Mon Oct  5 14:49:16 2009
New Revision: 197775
URL: http://svn.freebsd.org/changeset/base/197775

Log:
  First cut at implementing SOCK_SEQPACKET support for UNIX (local) domain
  sockets.  This allows for reliable bi-directional datagram communication
  over UNIX domain sockets, in contrast to SOCK_DGRAM (M:N, unreliable) or
  SOCK_STERAM (bi-directional bytestream).  Largely, this reuses existing
  UNIX domain socket code.  This allows applications requiring record-
  oriented semantics to do so reliably via local IPC.
  
  Some implementation notes (also present in XXX comments):
  
  - Currently we lack an sbappend variant able to do datagrams and control
    data without doing addresses, so we mark SOCK_SEQPACKET as PR_ADDR.
    Adding a new variant will solve this problem.
  
  - UNIX domain sockets on FreeBSD provide back-pressure/flow control
    notification for stream sockets by manipulating the send socket
    buffer's size during pru_send and pru_rcvd.  This trick works less well
    for SOCK_SEQPACKET as sosend_generic() uses sb_hiwat not just to
    manage blocking, but also to determine maximum datagram size.  Fixing
    this requires rethinking how back-pressure is done for SOCK_SEQPACKET;
    in the mean time, it's possible to get EMSGSIZE when buffers fill,
    instead of blocking.
  
  Discussed with:	benl
  Reviewed by:	bz, rpaulo
  MFC after:	3 months
  Sponsored by:	Google

Modified:
  head/sys/kern/uipc_usrreq.c

Modified: head/sys/kern/uipc_usrreq.c
==============================================================================
--- head/sys/kern/uipc_usrreq.c	Mon Oct  5 14:46:56 2009	(r197774)
+++ head/sys/kern/uipc_usrreq.c	Mon Oct  5 14:49:16 2009	(r197775)
@@ -50,7 +50,8 @@
  * garbage collector to find and tear down cycles of disconnected sockets.
  *
  * TODO:
- *	SEQPACKET, RDM
+ *	RDM
+ *	distinguish datagram size limits from flow control limits in SEQPACKET
  *	rethink name space problems
  *	need a proper out-of-band
  */
@@ -112,6 +113,7 @@ static ino_t		unp_ino;	/* Prototype for 
 static int		unp_rights;	/* (g) File descriptors in flight. */
 static struct unp_head	unp_shead;	/* (l) List of stream sockets. */
 static struct unp_head	unp_dhead;	/* (l) List of datagram sockets. */
+static struct unp_head	unp_sphead;	/* (l) List of seqpacket sockets. */
 
 static const struct sockaddr	sun_noname = { sizeof(sun_noname), AF_LOCAL };
 
@@ -139,10 +141,14 @@ static u_long	unpst_sendspace = PIPSIZ;
 static u_long	unpst_recvspace = PIPSIZ;
 static u_long	unpdg_sendspace = 2*1024;	/* really max datagram size */
 static u_long	unpdg_recvspace = 4*1024;
+static u_long	unpsp_sendspace = PIPSIZ;	/* really max datagram size */
+static u_long	unpsp_recvspace = PIPSIZ;
 
 SYSCTL_NODE(_net, PF_LOCAL, local, CTLFLAG_RW, 0, "Local domain");
 SYSCTL_NODE(_net_local, SOCK_STREAM, stream, CTLFLAG_RW, 0, "SOCK_STREAM");
 SYSCTL_NODE(_net_local, SOCK_DGRAM, dgram, CTLFLAG_RW, 0, "SOCK_DGRAM");
+SYSCTL_NODE(_net_local, SOCK_SEQPACKET, seqpacket, CTLFLAG_RW, 0,
+    "SOCK_SEQPACKET");
 
 SYSCTL_ULONG(_net_local_stream, OID_AUTO, sendspace, CTLFLAG_RW,
 	   &unpst_sendspace, 0, "Default stream send space.");
@@ -152,6 +158,10 @@ SYSCTL_ULONG(_net_local_dgram, OID_AUTO,
 	   &unpdg_sendspace, 0, "Default datagram send space.");
 SYSCTL_ULONG(_net_local_dgram, OID_AUTO, recvspace, CTLFLAG_RW,
 	   &unpdg_recvspace, 0, "Default datagram receive space.");
+SYSCTL_ULONG(_net_local_seqpacket, OID_AUTO, maxseqpacket, CTLFLAG_RW,
+	   &unpsp_sendspace, 0, "Default seqpacket send space.");
+SYSCTL_ULONG(_net_local_seqpacket, OID_AUTO, recvspace, CTLFLAG_RW,
+	   &unpsp_recvspace, 0, "Default seqpacket receive space.");
 SYSCTL_INT(_net_local, OID_AUTO, inflight, CTLFLAG_RD, &unp_rights, 0, 
     "File descriptors in flight.");
 
@@ -257,6 +267,7 @@ static struct mbuf	*unp_addsockcred(stru
  */
 static struct domain localdomain;
 static struct pr_usrreqs uipc_usrreqs_dgram, uipc_usrreqs_stream;
+static struct pr_usrreqs uipc_usrreqs_seqpacket;
 static struct protosw localsw[] = {
 {
 	.pr_type =		SOCK_STREAM,
@@ -271,6 +282,19 @@ static struct protosw localsw[] = {
 	.pr_flags =		PR_ATOMIC|PR_ADDR|PR_RIGHTS,
 	.pr_usrreqs =		&uipc_usrreqs_dgram
 },
+{
+	.pr_type =		SOCK_SEQPACKET,
+	.pr_domain =		&localdomain,
+
+	/*
+	 * XXXRW: For now, PR_ADDR because soreceive will bump into them
+	 * due to our use of sbappendaddr.  A new sbappend variants is needed
+	 * that supports both atomic record writes and control data.
+	 */
+	.pr_flags =		PR_ADDR|PR_ATOMIC|PR_CONNREQUIRED|PR_WANTRCVD|
+				    PR_RIGHTS,
+	.pr_usrreqs =		&uipc_usrreqs_seqpacket,
+},
 };
 
 static struct domain localdomain = {
@@ -353,6 +377,11 @@ uipc_attach(struct socket *so, int proto
 			recvspace = unpdg_recvspace;
 			break;
 
+		case SOCK_SEQPACKET:
+			sendspace = unpsp_sendspace;
+			recvspace = unpsp_recvspace;
+			break;
+
 		default:
 			panic("uipc_attach");
 		}
@@ -372,8 +401,22 @@ uipc_attach(struct socket *so, int proto
 	UNP_LIST_LOCK();
 	unp->unp_gencnt = ++unp_gencnt;
 	unp_count++;
-	LIST_INSERT_HEAD(so->so_type == SOCK_DGRAM ? &unp_dhead : &unp_shead,
-	    unp, unp_link);
+	switch (so->so_type) {
+	case SOCK_STREAM:
+		LIST_INSERT_HEAD(&unp_shead, unp, unp_link);
+		break;
+
+	case SOCK_DGRAM:
+		LIST_INSERT_HEAD(&unp_dhead, unp, unp_link);
+		break;
+
+	case SOCK_SEQPACKET:
+		LIST_INSERT_HEAD(&unp_sphead, unp, unp_link);
+		break;
+
+	default:
+		panic("uipc_attach");
+	}
 	UNP_LIST_UNLOCK();
 
 	return (0);
@@ -705,11 +748,8 @@ uipc_rcvd(struct socket *so, int flags)
 	unp = sotounpcb(so);
 	KASSERT(unp != NULL, ("uipc_rcvd: unp == NULL"));
 
-	if (so->so_type == SOCK_DGRAM)
-		panic("uipc_rcvd DGRAM?");
-
-	if (so->so_type != SOCK_STREAM)
-		panic("uipc_rcvd unknown socktype");
+	if (so->so_type != SOCK_STREAM && so->so_type != SOCK_SEQPACKET)
+		panic("uipc_rcvd socktype %d", so->so_type);
 
 	/*
 	 * Adjust backpressure on sender and wakeup any waiting to write.
@@ -824,6 +864,7 @@ uipc_send(struct socket *so, int flags, 
 		break;
 	}
 
+	case SOCK_SEQPACKET:
 	case SOCK_STREAM:
 		if ((so->so_state & SS_ISCONNECTED) == 0) {
 			if (nam != NULL) {
@@ -875,11 +916,33 @@ uipc_send(struct socket *so, int flags, 
 		 * Send to paired receive port, and then reduce send buffer
 		 * hiwater marks to maintain backpressure.  Wake up readers.
 		 */
-		if (control != NULL) {
-			if (sbappendcontrol_locked(&so2->so_rcv, m, control))
+		switch (so->so_type) {
+		case SOCK_STREAM:
+			if (control != NULL) {
+				if (sbappendcontrol_locked(&so2->so_rcv, m,
+				    control))
+					control = NULL;
+			} else
+				sbappend_locked(&so2->so_rcv, m);
+			break;
+
+		case SOCK_SEQPACKET: {
+			const struct sockaddr *from;
+
+			from = &sun_noname;
+			if (sbappendaddr_locked(&so2->so_rcv, from, m,
+			    control))
 				control = NULL;
-		} else
-			sbappend_locked(&so2->so_rcv, m);
+			break;
+			}
+		}
+
+		/*
+		 * XXXRW: While fine for SOCK_STREAM, this conflates maximum
+		 * datagram size and back-pressure for SOCK_SEQPACKET, which
+		 * can lead to undesired return of EMSGSIZE on send instead
+		 * of more desirable blocking.
+		 */
 		mbcnt_delta = so2->so_rcv.sb_mbcnt - unp2->unp_mbcnt;
 		unp2->unp_mbcnt = so2->so_rcv.sb_mbcnt;
 		sbcc = so2->so_rcv.sb_cc;
@@ -939,7 +1002,8 @@ uipc_sense(struct socket *so, struct sta
 	UNP_LINK_RLOCK();
 	UNP_PCB_LOCK(unp);
 	unp2 = unp->unp_conn;
-	if (so->so_type == SOCK_STREAM && unp2 != NULL) {
+	if ((so->so_type == SOCK_STREAM || so->so_type == SOCK_SEQPACKET) &&
+	    unp2 != NULL) {
 		so2 = unp2->unp_socket;
 		sb->st_blksize += so2->so_rcv.sb_cc;
 	}
@@ -1009,6 +1073,26 @@ static struct pr_usrreqs uipc_usrreqs_dg
 	.pru_close =		uipc_close,
 };
 
+static struct pr_usrreqs uipc_usrreqs_seqpacket = {
+	.pru_abort =		uipc_abort,
+	.pru_accept =		uipc_accept,
+	.pru_attach =		uipc_attach,
+	.pru_bind =		uipc_bind,
+	.pru_connect =		uipc_connect,
+	.pru_connect2 =		uipc_connect2,
+	.pru_detach =		uipc_detach,
+	.pru_disconnect =	uipc_disconnect,
+	.pru_listen =		uipc_listen,
+	.pru_peeraddr =		uipc_peeraddr,
+	.pru_rcvd =		uipc_rcvd,
+	.pru_send =		uipc_send,
+	.pru_sense =		uipc_sense,
+	.pru_shutdown =		uipc_shutdown,
+	.pru_sockaddr =		uipc_sockaddr,
+	.pru_soreceive =	soreceive_generic,	/* XXX: or...? */
+	.pru_close =		uipc_close,
+};
+
 static struct pr_usrreqs uipc_usrreqs_stream = {
 	.pru_abort = 		uipc_abort,
 	.pru_accept =		uipc_accept,
@@ -1306,6 +1390,7 @@ unp_connect2(struct socket *so, struct s
 		break;
 
 	case SOCK_STREAM:
+	case SOCK_SEQPACKET:
 		unp2->unp_conn = unp;
 		if (req == PRU_CONNECT &&
 		    ((unp->unp_flags | unp2->unp_flags) & UNP_CONNWAIT))
@@ -1343,6 +1428,7 @@ unp_disconnect(struct unpcb *unp, struct
 		break;
 
 	case SOCK_STREAM:
+	case SOCK_SEQPACKET:
 		soisdisconnected(unp->unp_socket);
 		unp2->unp_conn = NULL;
 		soisdisconnected(unp2->unp_socket);
@@ -1368,7 +1454,22 @@ unp_pcblist(SYSCTL_HANDLER_ARGS)
 	struct unp_head *head;
 	struct xunpcb *xu;
 
-	head = ((intptr_t)arg1 == SOCK_DGRAM ? &unp_dhead : &unp_shead);
+	switch ((intptr_t)arg1) {
+	case SOCK_STREAM:
+		head = &unp_shead;
+		break;
+
+	case SOCK_DGRAM:
+		head = &unp_dhead;
+		break;
+
+	case SOCK_SEQPACKET:
+		head = &unp_sphead;
+		break;
+
+	default:
+		panic("unp_pcblist: arg1 %d", (intptr_t)arg1);
+	}
 
 	/*
 	 * The process of preparing the PCB list is too time-consuming and
@@ -1481,6 +1582,9 @@ SYSCTL_PROC(_net_local_dgram, OID_AUTO, 
 SYSCTL_PROC(_net_local_stream, OID_AUTO, pcblist, CTLFLAG_RD,
 	    (caddr_t)(long)SOCK_STREAM, 0, unp_pcblist, "S,xunpcb",
 	    "List of active local stream sockets");
+SYSCTL_PROC(_net_local_seqpacket, OID_AUTO, pcblist, CTLFLAG_RD,
+	    (caddr_t)(long)SOCK_SEQPACKET, 0, unp_pcblist, "S,xunpcb",
+	    "List of active local seqpacket sockets");
 
 static void
 unp_shutdown(struct unpcb *unp)
@@ -1492,7 +1596,8 @@ unp_shutdown(struct unpcb *unp)
 	UNP_PCB_LOCK_ASSERT(unp);
 
 	unp2 = unp->unp_conn;
-	if (unp->unp_socket->so_type == SOCK_STREAM && unp2 != NULL) {
+	if ((unp->unp_socket->so_type == SOCK_STREAM ||
+	    (unp->unp_socket->so_type == SOCK_SEQPACKET)) && unp2 != NULL) {
 		so = unp2->unp_socket;
 		if (so != NULL)
 			socantrcvmore(so);
@@ -1658,6 +1763,7 @@ unp_init(void)
 	    NULL, EVENTHANDLER_PRI_ANY);
 	LIST_INIT(&unp_dhead);
 	LIST_INIT(&unp_shead);
+	LIST_INIT(&unp_sphead);
 	TASK_INIT(&unp_gc_task, 0, unp_gc, NULL);
 	UNP_LINK_LOCK_INIT();
 	UNP_LIST_LOCK_INIT();
@@ -1974,7 +2080,8 @@ SYSCTL_INT(_net_local, OID_AUTO, taskcou
 static void
 unp_gc(__unused void *arg, int pending)
 {
-	struct unp_head *heads[] = { &unp_dhead, &unp_shead, NULL };
+	struct unp_head *heads[] = { &unp_dhead, &unp_shead, &unp_sphead,
+				    NULL };
 	struct unp_head **head;
 	struct file **unref;
 	struct unpcb *unp;



Want to link to this message? Use this URL: <https://mail-archive.FreeBSD.org/cgi/mid.cgi?200910051449.n95EnGof015525>