Skip site navigation (1)Skip section navigation (2)
Date:      Wed, 2 Sep 2009 02:40:48 +0000 (UTC)
From:      Kip Macy <kmacy@FreeBSD.org>
To:        src-committers@freebsd.org, svn-src-user@freebsd.org
Subject:   svn commit: r196747 - in user/kmacy/releng_7_2_fcs_1/sys: kern sys
Message-ID:  <200909020240.n822emNL078914@svn.freebsd.org>

next in thread | raw e-mail | index | archive | help
Author: kmacy
Date: Wed Sep  2 02:40:48 2009
New Revision: 196747
URL: http://svn.freebsd.org/changeset/base/196747

Log:
  initial iteration of background sendfile completion

Modified:
  user/kmacy/releng_7_2_fcs_1/sys/kern/uipc_sockbuf.c
  user/kmacy/releng_7_2_fcs_1/sys/kern/uipc_socket.c
  user/kmacy/releng_7_2_fcs_1/sys/kern/uipc_syscalls.c
  user/kmacy/releng_7_2_fcs_1/sys/sys/file.h
  user/kmacy/releng_7_2_fcs_1/sys/sys/sockbuf.h
  user/kmacy/releng_7_2_fcs_1/sys/sys/socketvar.h
  user/kmacy/releng_7_2_fcs_1/sys/sys/sockstate.h

Modified: user/kmacy/releng_7_2_fcs_1/sys/kern/uipc_sockbuf.c
==============================================================================
--- user/kmacy/releng_7_2_fcs_1/sys/kern/uipc_sockbuf.c	Wed Sep  2 02:12:07 2009	(r196746)
+++ user/kmacy/releng_7_2_fcs_1/sys/kern/uipc_sockbuf.c	Wed Sep  2 02:40:48 2009	(r196747)
@@ -177,7 +177,10 @@ sowakeup(struct socket *so, struct sockb
 {
 
 	SOCKBUF_LOCK_ASSERT(sb);
-
+	if (sb->sb_flags & SB_SENDING) {
+	        SOCKBUF_UNLOCK(sb);
+		return;
+	}
 	selwakeuppri(&sb->sb_sel, PSOCK);
 	if (!SEL_WAITING(&sb->sb_sel))
 		sb->sb_flags &= ~SB_SEL;
@@ -879,6 +882,8 @@ sbdrop_internal(struct sockbuf *sb, int 
 	}
 }
 
+extern void sosendingwakeup(void *unused __unused);
+
 /*
  * Drop data from (the front of) a sockbuf.
  */
@@ -889,6 +894,8 @@ sbdrop_locked(struct sockbuf *sb, int le
 	SOCKBUF_LOCK_ASSERT(sb);
 
 	sbdrop_internal(sb, len);
+	if (sb->sb_flags & SB_SENDING)
+		sosendingwakeup(NULL);
 }
 
 void

Modified: user/kmacy/releng_7_2_fcs_1/sys/kern/uipc_socket.c
==============================================================================
--- user/kmacy/releng_7_2_fcs_1/sys/kern/uipc_socket.c	Wed Sep  2 02:12:07 2009	(r196746)
+++ user/kmacy/releng_7_2_fcs_1/sys/kern/uipc_socket.c	Wed Sep  2 02:40:48 2009	(r196747)
@@ -125,9 +125,13 @@ __FBSDID("$FreeBSD$");
 #include <sys/resourcevar.h>
 #include <net/route.h>
 #include <sys/signalvar.h>
+#include <sys/smp.h>
 #include <sys/stat.h>
 #include <sys/sx.h>
+#include <sys/syscallsubr.h>
 #include <sys/sysctl.h>
+#include <sys/sysproto.h>
+#include <sys/taskqueue.h>
 #include <sys/uio.h>
 #include <sys/jail.h>
 
@@ -3102,6 +3106,325 @@ soisdisconnected(struct socket *so)
 	wakeup(&so->so_timeo);
 }
 
+struct socketref {
+	struct proc *sr_proc;
+	struct file *sr_sock_fp;
+	struct file *sr_fp;
+	struct sendfile_args sr_uap;
+	struct uio sr_hdr_uio;
+	struct uio sr_trl_uio;
+	int sr_compat;
+	int sr_magic;
+	TAILQ_ENTRY(socketref) entry;
+
+};
+TAILQ_HEAD(srq, socketref);
+
+struct socketref_object {
+	struct srq sro_srh;
+	struct task sro_task;
+};
+
+struct srq *sendfile_bg_queue;
+struct mtx sendfile_bg_lock;
+struct callout *sendfile_callout;
+struct taskqueue *sendfile_tq;
+extern int getsock(struct filedesc *fdp, int fd,
+    struct file **fpp, u_int *fflagp);
+
+MALLOC_DEFINE(M_SOCKREF, "sockref", "socket reference memory");
+
+void
+soissending(struct socket *so, struct thread *td,
+    struct sendfile_args *uap, struct uio *hdr_uio,
+    struct uio *trl_uio, int compat)
+{
+	struct socketref *ref;
+	struct srq *srh;
+	int error;
+	struct socket *refso;
+	
+	SOCKBUF_LOCK_ASSERT(&so->so_snd);
+	ref = malloc(sizeof(struct socketref), 
+	    M_SOCKREF, M_NOWAIT);
+	if (ref == NULL)
+		return;
+	/*
+	 * Obtain reference to socket :-/
+	 * drop when done sending
+	 */
+	so->so_snd.sb_flags |= SB_SENDING;
+	ref->sr_proc = td->td_proc;
+
+	if ((error = getsock(td->td_proc->p_fd, uap->s, &ref->sr_sock_fp,
+		    NULL)) != 0) {
+		free(ref, M_DEVBUF);
+		return;
+	}
+	if (ref->sr_sock_fp->f_type != DTYPE_SOCKET) {
+		printf("socket descriptor s=%d is not socket", uap->s);
+		free(ref, M_DEVBUF);
+		return;
+	}
+
+	refso = ref->sr_sock_fp->f_data;
+	if (refso != so) {
+		printf("socket mismatch between refso: %p so: %p\n",
+		    refso, so);
+		free(ref, M_DEVBUF);
+		return;		
+	}
+	
+	if ((error = fget(td, uap->fd, &ref->sr_fp)) != 0) {
+		fdrop(ref->sr_sock_fp, td);
+		free(ref, M_DEVBUF);
+		return;
+	}
+
+	bcopy(uap, &ref->sr_uap, sizeof(*uap));
+	ref->sr_uap.sbytes = NULL;
+
+	/*
+	 * XXX 
+	 * We have to malloc memory for the uio data
+	 */
+	if (hdr_uio != NULL)
+		bcopy(hdr_uio, &ref->sr_hdr_uio, 
+		      sizeof(*hdr_uio));
+	if (trl_uio != NULL)
+		bcopy(trl_uio, &ref->sr_trl_uio, 
+		      sizeof(*trl_uio));
+	ref->sr_compat = compat;
+	ref->sr_magic = 0xCAFEBABE;
+	CTR3(KTR_SPARE2, "enqueueing socket %p sock_fp %p s %d", so, ref->sr_sock_fp, uap->s);
+	mtx_lock(&sendfile_bg_lock);
+	srh = sendfile_bg_queue;
+	TAILQ_INSERT_HEAD(srh, ref, entry);
+	mtx_unlock(&sendfile_bg_lock);
+}
+
+static void
+socketref_free(struct socketref *sr)
+{
+	struct thread *td = curthread;
+
+	fdrop(sr->sr_sock_fp, td);
+	fdrop(sr->sr_fp, td);
+	free(sr, M_SOCKREF);
+}
+
+static void
+sendfile_task_func(void *context, int pending __unused)
+{
+	struct socketref_object *sro;
+	struct srq *sh;
+	struct socketref *sr, *srtmp;
+	struct socket *so;
+	struct sockbuf *sb;
+	struct proc *p;
+	struct thread *td;
+	struct file *sock_fp, *fp;
+	int error, writeable;
+
+	sro = context;
+	sh = &sro->sro_srh;
+	td = curthread;
+
+	CTR0(KTR_SPARE2, "task_func running");
+	while (!TAILQ_EMPTY(sh)) {
+		sr = TAILQ_FIRST(sh);
+		TAILQ_REMOVE(sh, sr, entry);
+		if (sr->sr_magic != 0xCAFEBABE) {
+			printf("bad magic! 0x%x\n", sr->sr_magic);
+			continue;
+		}
+		p = td->td_proc;
+		td->td_proc = sr->sr_proc;
+		sock_fp = sr->sr_sock_fp;
+
+		CTR2(KTR_SPARE2, "processing sr %p sock_fp %p", sr, sock_fp);
+		if (sock_fp->f_type != DTYPE_SOCKET)
+			goto done;
+		
+		so = sock_fp->f_data;
+		CTR1(KTR_SPARE2, "task processing socket %p", so);
+		
+		if ((so->so_state & SS_ISCONNECTED) == 0)
+			goto done;
+		sb = &so->so_snd;
+		fp = sr->sr_fp;
+
+		SOCKBUF_LOCK(sb);
+		sb->sb_flags &= ~SB_SENDING;
+		if (so->so_snd.sb_state & SBS_CANTSENDMORE) {
+			CTR1(KTR_SPARE2, "task expired socket %p", so);
+			sowwakeup_locked(so);
+		} else if (sowriteable(so)) {
+			off_t sbytes;
+
+			sb->sb_flags |= SB_SENDING;
+			SOCKBUF_UNLOCK(sb);
+			sr->sr_uap.sbytes = &sbytes;
+			CTR1(KTR_SPARE2, "task sending on socket %p", so);
+			error = kern_sendfile(td, &sr->sr_uap,
+			    &sr->sr_hdr_uio, &sr->sr_trl_uio,
+			    sr->sr_compat);
+			atomic_add_long(&fp->f_sfbytes, sbytes);
+			if (error != EAGAIN) {
+				SOCKBUF_LOCK(sb);
+				sb->sb_flags &= ~SB_SENDING;
+				sowwakeup_locked(so);
+			}
+		}
+		td->td_proc = p;
+	done:
+		fdrop(fp, td);
+		fdrop(sr->sr_sock_fp, td);
+		free(sr, M_DEVBUF);
+	}
+	free(sro, M_DEVBUF);
+}
+
+#define SOCKBUF_LOCK_COND(sb, lockflag) do {	\
+		if ((lockflag))			\
+			SOCKBUF_LOCK((sb));	\
+} while (0)
+
+#define SOCKBUF_UNLOCK_COND(sb, lockflag) do {	\
+		if ((lockflag))			\
+			SOCKBUF_UNLOCK((sb));	\
+} while (0)
+
+
+void
+sosendingwakeup(void *unused __unused)
+{
+	struct socketref *sr, *srtmp;
+	struct srq *srh_local, *srh_global, srh_tmp;
+	struct socketref_object *sro;
+	struct task *srh_task;
+	struct socket *so;
+	struct sockbuf *sb;
+	struct file *fp;
+	struct proc *p;
+	struct thread *td;
+	int writeable, sblockneeded;
+
+	srh_global = sendfile_bg_queue;
+	if (!TAILQ_EMPTY(srh_global)) {
+		TAILQ_INIT(&srh_tmp);
+		mtx_lock(&sendfile_bg_lock);
+		TAILQ_CONCAT(&srh_tmp, srh_global, entry);
+		mtx_unlock(&sendfile_bg_lock);
+		if (TAILQ_EMPTY(&srh_tmp))
+		    goto done;
+
+		if ((sro = malloc(sizeof(struct socketref_object),
+			    M_DEVBUF, M_NOWAIT)) == NULL)
+			goto done;
+
+		srh_local = &sro->sro_srh;
+		srh_task = &sro->sro_task;
+		TAILQ_INIT(srh_local);
+		TASK_INIT(srh_task, 0, sendfile_task_func, sro);
+		CTR0(KTR_SPARE2, "processing pcpu list");
+	} else
+		goto done;
+
+	td = curthread;
+	p = td->td_proc;
+	TAILQ_FOREACH_SAFE(sr, &srh_tmp, entry, srtmp) {
+		fp = sr->sr_sock_fp;
+		td->td_proc = sr->sr_proc;
+		CTR2(KTR_SPARE2, "processing s %d sock_fp %p", sr->sr_uap.s, fp);
+
+		if (fp->f_type != DTYPE_SOCKET) {
+			CTR1(KTR_SPARE2, "not socket - type %d", fp->f_type);
+			goto next;
+		}
+		so = fp->f_data;
+		if ((so->so_state & SS_ISCONNECTED) == 0) {
+			CTR0(KTR_SPARE2, "not connected %p");
+			goto next;
+		}
+		CTR1(KTR_SPARE2, "processing socket %p", so);
+		sb = &so->so_snd;
+		sblockneeded = !SOCKBUF_OWNED(sb);
+		writeable = 0;
+		SOCKBUF_LOCK_COND(sb, sblockneeded);
+		sb->sb_flags &= ~SB_SENDING;
+		if (sb->sb_state & SBS_CANTSENDMORE) {
+			SOCKBUF_UNLOCK_COND(sb, sblockneeded);
+			goto next;
+		} else {
+			writeable = sowriteable(so);
+			sb->sb_flags |= SB_SENDING;
+			SOCKBUF_UNLOCK_COND(sb, sblockneeded);
+		}
+
+		if (writeable) {
+			CTR2(KTR_SPARE2, "enqueue socket to task %p sr %p", so, sr);
+			TAILQ_REMOVE(&srh_tmp, sr, entry);
+			TAILQ_INSERT_HEAD(srh_local, sr, entry);
+		}
+		if (sr->sr_magic != 0xCAFEBABE)
+			printf("bad magic! 0x%x in %s\n",
+			    sr->sr_magic, __FUNCTION__);
+
+		continue;
+	next:
+		CTR1(KTR_SPARE2, "freeing expired socket %p", so);
+		TAILQ_REMOVE(&srh_tmp, sr, entry);
+		socketref_free(sr);
+	}
+	td->td_proc = p;
+
+	if (!TAILQ_EMPTY(&srh_tmp)) {
+		mtx_lock(&sendfile_bg_lock);
+		TAILQ_CONCAT(srh_global, &srh_tmp, entry);
+		mtx_unlock(&sendfile_bg_lock);
+	}
+	
+	if (!TAILQ_EMPTY(srh_local)) {
+		taskqueue_enqueue(sendfile_tq, srh_task);
+	} else {
+		free(sro, M_DEVBUF);
+	}
+done:
+	if (!callout_pending(sendfile_callout))
+		callout_reset(sendfile_callout, MAX(hz/10, 1),
+		    sosendingwakeup, NULL);
+}
+
+static void
+init_bgsend(void *unused __unused)
+{	
+	struct srq *srh;
+
+	sendfile_tq = taskqueue_create("sendfile background taskq",  M_NOWAIT,
+	    taskqueue_thread_enqueue, &sendfile_tq);
+	taskqueue_start_threads(&sendfile_tq, 1, PI_NET,
+	    "sendfile background taskq");
+
+	printf("init_bgsend mp_maxid: %d all_cpus 0x%x\n",
+	    mp_maxid, all_cpus);
+
+	mtx_init(&sendfile_bg_lock, "sendfile bg", NULL, MTX_DEF);
+	sendfile_callout = malloc(sizeof(struct callout),
+	    M_DEVBUF, M_NOWAIT);
+	srh = sendfile_bg_queue = malloc(sizeof(struct srq),
+	    M_DEVBUF, M_NOWAIT);
+	TAILQ_INIT(srh);
+
+	callout_init(sendfile_callout, TRUE);
+	callout_reset(sendfile_callout, MAX(hz/10, 1),
+		    sosendingwakeup, NULL);
+
+	printf("init_bgsend done\n");
+}
+
+SYSINIT(init_bgsend, SI_SUB_SMP, SI_ORDER_ANY, init_bgsend, NULL);
+
 /*
  * Make a copy of a sockaddr in a malloced buffer of type M_SONAME.
  */

Modified: user/kmacy/releng_7_2_fcs_1/sys/kern/uipc_syscalls.c
==============================================================================
--- user/kmacy/releng_7_2_fcs_1/sys/kern/uipc_syscalls.c	Wed Sep  2 02:12:07 2009	(r196746)
+++ user/kmacy/releng_7_2_fcs_1/sys/kern/uipc_syscalls.c	Wed Sep  2 02:40:48 2009	(r196747)
@@ -114,7 +114,7 @@ SYSCTL_INT(_kern_ipc, OID_AUTO, nsfbufsu
  * associated with the additional reference count.  If requested, return the
  * open file flags.
  */
-static int
+int
 getsock(struct filedesc *fdp, int fd, struct file **fpp, u_int *fflagp)
 {
 	struct file *fp;
@@ -1778,7 +1778,7 @@ int
 kern_sendfile(struct thread *td, struct sendfile_args *uap,
     struct uio *hdr_uio, struct uio *trl_uio, int compat)
 {
-	struct file *sock_fp;
+	struct file *sock_fp, *fp = NULL;
 	struct vnode *vp;
 	struct vm_object *obj = NULL;
 	struct socket *so = NULL;
@@ -1795,10 +1795,22 @@ kern_sendfile(struct thread *td, struct 
 	 * File offset must be positive.  If it goes beyond EOF
 	 * we send only the header/trailer and no payload data.
 	 */
-	if ((error = fgetvp_read(td, uap->fd, &vp)) != 0)
+	if ((error = fget_read(td, uap->fd, &fp)) != 0)
 		goto out;
+	else {
+		if (fp->f_vnode == NULL) {
+			fdrop(fp, td);
+			error = EINVAL;
+			goto out;
+		} else {
+			vp = fp->f_vnode;
+			vref(vp);
+		}
+	}
+	
+	
 	vfslocked = VFS_LOCK_GIANT(vp->v_mount);
-	vn_lock(vp, LK_SHARED | LK_RETRY, td);
+	vn_lock(vp, LK_SHARED | LK_RETRY, curthread);
 	if (vp->v_type == VREG) {
 		obj = vp->v_object;
 		if (obj != NULL) {
@@ -1818,7 +1830,7 @@ kern_sendfile(struct thread *td, struct 
 			}
 		}
 	}
-	VOP_UNLOCK(vp, 0, td);
+	VOP_UNLOCK(vp, 0, curthread);
 	VFS_UNLOCK_GIANT(vfslocked);
 	if (obj == NULL) {
 		error = EINVAL;
@@ -1834,7 +1846,7 @@ kern_sendfile(struct thread *td, struct 
 	 * Remember if it a blocking or non-blocking socket.
 	 */
 	if ((error = getsock(td->td_proc->p_fd, uap->s, &sock_fp,
-	    NULL)) != 0)
+		    NULL)) != 0)
 		goto out;
 	so = sock_fp->f_data;
 	if (so->so_type != SOCK_STREAM) {
@@ -1845,6 +1857,19 @@ kern_sendfile(struct thread *td, struct 
 		error = ENOTCONN;
 		goto out;
 	}
+
+	SOCKBUF_LOCK(&so->so_snd);
+	if (((so->so_snd.sb_flags & SB_SENDING) == 0) && fp->f_sfbytes != 0) {
+		SOCKBUF_UNLOCK(&so->so_snd);
+		if (uap->sbytes != NULL) {
+			copyout(&sbytes, uap->sbytes, sizeof(off_t));
+		}
+		fp->f_sfbytes = 0;
+		error = 0;
+		goto out;
+	}
+	SOCKBUF_UNLOCK(&so->so_snd);
+	
 	/*
 	 * Do not wait on memory allocations but return ENOMEM for
 	 * caller to retry later.
@@ -1946,6 +1971,7 @@ retry_space:
 		    (space <= 0 ||
 		     space < so->so_snd.sb_lowat)) {
 			if (so->so_state & SS_NBIO) {
+				soissending(so, td, uap, hdr_uio, trl_uio, compat);
 				SOCKBUF_UNLOCK(&so->so_snd);
 				error = EAGAIN;
 				goto done;
@@ -2053,7 +2079,7 @@ retry_space:
 				 */
 				bsize = vp->v_mount->mnt_stat.f_iosize;
 				vfslocked = VFS_LOCK_GIANT(vp->v_mount);
-				vn_lock(vp, LK_SHARED | LK_RETRY, td);
+				vn_lock(vp, LK_SHARED | LK_RETRY, curthread);
 
 				/*
 				 * XXXMAC: Because we don't have fp->f_cred
@@ -2065,7 +2091,7 @@ retry_space:
 				    trunc_page(off), UIO_NOCOPY, IO_NODELOCKED |
 				    IO_VMIO | ((MAXBSIZE / bsize) << IO_SEQSHIFT),
 				    td->td_ucred, NOCRED, &resid, td);
-				VOP_UNLOCK(vp, 0, td);
+				VOP_UNLOCK(vp, 0, curthread);
 				VFS_UNLOCK_GIANT(vfslocked);
 				VM_OBJECT_LOCK(obj);
 				vm_page_io_finish(pg);
@@ -2214,6 +2240,8 @@ out:
 	}
 	if (so)
 		fdrop(sock_fp, td);
+	if (fp)
+		fdrop(fp, td);
 	if (m)
 		m_freem(m);
 

Modified: user/kmacy/releng_7_2_fcs_1/sys/sys/file.h
==============================================================================
--- user/kmacy/releng_7_2_fcs_1/sys/sys/file.h	Wed Sep  2 02:12:07 2009	(r196746)
+++ user/kmacy/releng_7_2_fcs_1/sys/sys/file.h	Wed Sep  2 02:40:48 2009	(r196747)
@@ -125,6 +125,7 @@ struct file {
 	 *  DFLAG_SEEKABLE specific fields
 	 */
 	off_t		f_offset;
+	off_t		f_sfbytes;
 	/*
 	 * Mandatory Access control information.
 	 */

Modified: user/kmacy/releng_7_2_fcs_1/sys/sys/sockbuf.h
==============================================================================
--- user/kmacy/releng_7_2_fcs_1/sys/sys/sockbuf.h	Wed Sep  2 02:12:07 2009	(r196746)
+++ user/kmacy/releng_7_2_fcs_1/sys/sys/sockbuf.h	Wed Sep  2 02:40:48 2009	(r196747)
@@ -52,6 +52,7 @@
 #define	SB_NOCOALESCE	0x200		/* don't coalesce new data into existing mbufs */
 #define	SB_IN_TOE	0x400		/* socket buffer is in the middle of an operation */
 #define	SB_AUTOSIZE	0x800		/* automatically size socket buffer */
+#define	SB_SENDING	0x1000		/* socket is owned by sendfile thread */
 
 #define	SBS_CANTSENDMORE	0x0010	/* can't send more data to peer */
 #define	SBS_CANTRCVMORE		0x0020	/* can't receive more data from peer */

Modified: user/kmacy/releng_7_2_fcs_1/sys/sys/socketvar.h
==============================================================================
--- user/kmacy/releng_7_2_fcs_1/sys/sys/socketvar.h	Wed Sep  2 02:12:07 2009	(r196746)
+++ user/kmacy/releng_7_2_fcs_1/sys/sys/socketvar.h	Wed Sep  2 02:40:48 2009	(r196747)
@@ -201,7 +201,8 @@ struct xsocket {
 /* can we write something to so? */
 #define	sowriteable(so) \
     ((sbspace(&(so)->so_snd) >= (so)->so_snd.sb_lowat && \
-	(((so)->so_state&SS_ISCONNECTED) || \
+	!((so)->so_snd.sb_flags & SB_SENDING) &&	 \
+	(((so)->so_state&SS_ISCONNECTED) ||		     \
 	  ((so)->so_proto->pr_flags&PR_CONNREQUIRED)==0)) || \
      ((so)->so_snd.sb_state & SBS_CANTSENDMORE) || \
      (so)->so_error)

Modified: user/kmacy/releng_7_2_fcs_1/sys/sys/sockstate.h
==============================================================================
--- user/kmacy/releng_7_2_fcs_1/sys/sys/sockstate.h	Wed Sep  2 02:12:07 2009	(r196746)
+++ user/kmacy/releng_7_2_fcs_1/sys/sys/sockstate.h	Wed Sep  2 02:40:48 2009	(r196747)
@@ -71,11 +71,16 @@
 #define	SBS_RCVATMARK		0x0040	/* at mark on input */
 
 struct socket;
+struct sendfile_args;
+struct uio;
 
 void	soisconnected(struct socket *so);
 void	soisconnecting(struct socket *so);
 void	soisdisconnected(struct socket *so);
 void	soisdisconnecting(struct socket *so);
+void	soissending(struct socket *so,
+    struct thread *td, struct sendfile_args *uap,
+    struct uio *hdr_uio, struct uio *trl_uio, int compat);
 void	socantrcvmore(struct socket *so);
 void	socantrcvmore_locked(struct socket *so);
 void	socantsendmore(struct socket *so);



Want to link to this message? Use this URL: <https://mail-archive.FreeBSD.org/cgi/mid.cgi?200909020240.n822emNL078914>