Skip site navigation (1)Skip section navigation (2)
Date:      Wed, 17 Mar 2010 03:02:48 +0000 (UTC)
From:      Lawrence Stewart <lstewart@FreeBSD.org>
To:        src-committers@freebsd.org, svn-src-user@freebsd.org
Subject:   svn commit: r205237 - in user/lstewart/alq_varlen_head/sys: kern sys
Message-ID:  <201003170302.o2H32mpw075673@svn.freebsd.org>

next in thread | raw e-mail | index | archive | help
Author: lstewart
Date: Wed Mar 17 03:02:48 2010
New Revision: 205237
URL: http://svn.freebsd.org/changeset/base/205237

Log:
  - Rework the way thread ordering is enforced so that it actually behaves as
    expected (issue discovered during detailed testing). Ordering is now an
    off-by-default option that can be enabled at ALQ creation time using the
    ALQ_ORDERED flag.
  
  - Add an alq_open_flags() KPI call to allow the new ALQ_ORDERED flag to be
    specified. alq_open() is now implemented as a wrapper around alq_open_flags.
  
  - Rename alq_postn() to alq_post_flags() to keep the naming consistent.
  
  - Keep a record of some useful debugging printf's (will be removed in a later
    diff).
  
  - Remove some no longer relevant assertions.
  
  - Introduce the AQ_VARLEN flag, used internally to indicate the ALQ is variable
    length message capable.
  
  - Protect alq_getn/alq_post from 0 length writes so that the "use less than you
    asked for" feature works in contexts where no data may be generated.
  
  Sponsored by:	FreeBSD Foundation

Modified:
  user/lstewart/alq_varlen_head/sys/kern/kern_alq.c
  user/lstewart/alq_varlen_head/sys/sys/alq.h

Modified: user/lstewart/alq_varlen_head/sys/kern/kern_alq.c
==============================================================================
--- user/lstewart/alq_varlen_head/sys/kern/kern_alq.c	Wed Mar 17 02:48:14 2010	(r205236)
+++ user/lstewart/alq_varlen_head/sys/kern/kern_alq.c	Wed Mar 17 03:02:48 2010	(r205237)
@@ -54,15 +54,16 @@ __FBSDID("$FreeBSD$");
 
 /* Async. Logging Queue */
 struct alq {
+	char	*aq_entbuf;		/* Buffer for stored entries */
 	int	aq_entmax;		/* Max entries */
 	int	aq_entlen;		/* Entry length */
 	int	aq_freebytes;		/* Bytes available in buffer */
 	int	aq_buflen;		/* Total length of our buffer */
-	char	*aq_entbuf;		/* Buffer for stored entries */
 	int	aq_writehead;		/* Location for next write */
 	int	aq_writetail;		/* Flush starts at this location */
 	int	aq_wrapearly;		/* # bytes left blank at end of buf */
 	int	aq_flags;		/* Queue flags */
+	int	aq_waiters;		/* Num threads waiting for resources */
 	struct	ale	aq_getpost;	/* ALE for use by get/post */
 	struct mtx	aq_mtx;		/* Queue lock */
 	struct vnode	*aq_vp;		/* Open vnode handle */
@@ -75,6 +76,8 @@ struct alq {
 #define	AQ_ACTIVE	0x0002		/* on the active list */
 #define	AQ_FLUSHING	0x0004		/* doing IO */
 #define	AQ_SHUTDOWN	0x0008		/* Queue no longer valid */
+#define	AQ_ORDERED	0x0010		/* Queue enforces ordered writes */
+#define	AQ_VARLEN	0x0020		/* Queue is variable length capable */
 
 #define	ALQ_LOCK(alq)	mtx_lock_spin(&(alq)->aq_mtx)
 #define	ALQ_UNLOCK(alq)	mtx_unlock_spin(&(alq)->aq_mtx)
@@ -200,7 +203,7 @@ ald_daemon(void)
 		needwakeup = alq_doio(alq);
 		ALQ_UNLOCK(alq);
 		if (needwakeup)
-			wakeup_one(alq);
+			wakeup(alq);
 		ALD_LOCK();
 	}
 
@@ -334,6 +337,9 @@ alq_doio(struct alq *alq)
 		totlen = aiov[0].iov_len + aiov[1].iov_len;
 	}
 
+	/*printf("Flushing %d bytes to disk, aq_freebytes==%d\n", totlen,
+	    alq->aq_freebytes);*/
+
 	alq->aq_flags |= AQ_FLUSHING;
 	ALQ_UNLOCK(alq);
 
@@ -366,6 +372,9 @@ alq_doio(struct alq *alq)
 	    alq->aq_buflen;
 	alq->aq_freebytes += totlen + wrapearly;
 
+	/*printf("Flushed %d bytes to disk, aq_freebytes==%d, AQ_WANTED==%d\n",
+	    totlen, alq->aq_freebytes, alq->aq_flags & AQ_WANTED);*/
+
 	/*
 	 * If we just flushed part of the buffer which wrapped, reset the
 	 * wrapearly indicator.
@@ -374,8 +383,8 @@ alq_doio(struct alq *alq)
 		alq->aq_wrapearly = 0;
 
 	/*
-	 * If we just flushed the buffer completely,
-	 * reset indexes to 0 to minimise buffer wraps.
+	 * If we just flushed the buffer completely, reset indexes to 0 to
+	 * minimise buffer wraps.
 	 * This is also required to ensure alq_getn() can't wedge itself.
 	 */
 	if (!HAS_PENDING_DATA(alq))
@@ -407,14 +416,15 @@ SYSINIT(ald, SI_SUB_LOCK, SI_ORDER_ANY, 
 /*
  * Create the queue data structure, allocate the buffer, and open the file.
  */
+
 int
-alq_open(struct alq **alqp, const char *file, struct ucred *cred, int cmode,
-    int size, int count)
+alq_open_flags(struct alq **alqp, const char *file, struct ucred *cred, int cmode,
+    int size, int count, int flags)
 {
 	struct thread *td;
 	struct nameidata nd;
 	struct alq *alq;
-	int flags;
+	int oflags;
 	int error;
 	int vfslocked;
 
@@ -425,9 +435,9 @@ alq_open(struct alq **alqp, const char *
 	td = curthread;
 
 	NDINIT(&nd, LOOKUP, NOFOLLOW | MPSAFE, UIO_SYSSPACE, file, td);
-	flags = FWRITE | O_NOFOLLOW | O_CREAT;
+	oflags = FWRITE | O_NOFOLLOW | O_CREAT;
 
-	error = vn_open_cred(&nd, &flags, cmode, 0, cred, NULL);
+	error = vn_open_cred(&nd, &oflags, cmode, 0, cred, NULL);
 	if (error)
 		return (error);
 
@@ -453,11 +463,14 @@ alq_open(struct alq **alqp, const char *
 		alq->aq_buflen = size;
 		alq->aq_entmax = 0;
 		alq->aq_entlen = 0;
+		alq->aq_flags |= AQ_VARLEN;
 	}
 
 	alq->aq_freebytes = alq->aq_buflen;
 	alq->aq_entbuf = malloc(alq->aq_buflen, M_ALD, M_WAITOK|M_ZERO);
 	alq->aq_writehead = alq->aq_writetail = 0;
+	if (flags & ALQ_ORDERED)
+		alq->aq_flags |= AQ_ORDERED;
 
 	if ((error = ald_add(alq)) != 0) {
 		alq_destroy(alq);
@@ -503,27 +516,47 @@ alq_writen(struct alq *alq, void *data, 
 	}
 
 	/*
+	 * If we want ordered writes and there are threads already waiting for
+	 * resources to become available, spin until we're woken.
+	 */
+	if (alq->aq_flags & AQ_ORDERED && alq->aq_waiters > 0) {
+		/*printf("tid %d order sleep, wants %d bytes (%d avail)\n",
+		    curthread->td_tid, len, alq->aq_freebytes);*/
+		alq->aq_waiters++;
+		msleep_spin(&alq->aq_waiters, &alq->aq_mtx, "alqwriten", 0);
+		alq->aq_waiters--;
+		/*printf("tid %d order woken, wants %d bytes (%d avail)\n",
+		    curthread->td_tid, len, alq->aq_freebytes);*/
+	}
+
+	/*
 	 * ALQ_WAITOK or alq->aq_freebytes > len, either spin until
-	 * we have enough free bytes (former) or skip (latter). However in the
-	 * latter case, we can't skip if other threads are already
-	 * waiting (AQ_WANTED is set), otherwise records can get out of order.
+	 * we have enough free bytes (former) or skip (latter).
 	 */
-	while ((alq->aq_freebytes < len && !(alq->aq_flags & AQ_SHUTDOWN))
-	    || alq->aq_flags & AQ_WANTED) {
+	while (alq->aq_freebytes < len && !(alq->aq_flags & AQ_SHUTDOWN)) {
+		/*printf("tid %d sleep, wants %d bytes (%d avail)\n",
+		    curthread->td_tid, len, alq->aq_freebytes);*/
 		alq->aq_flags |= AQ_WANTED;
+		alq->aq_waiters++;
 		msleep_spin(alq, &alq->aq_mtx, "alqwriten", 0);
-		KASSERT(!(alq->aq_flags & AQ_WANTED),
-		    ("AQ_WANTED should have been unset!"));
+		alq->aq_waiters--;
+		/*printf("tid %d woken, wants %d bytes (%d avail)\n",
+		    curthread->td_tid, len, alq->aq_freebytes);*/
 	}
 
+	/*printf("tid %d got %d bytes (%d avail, %d waiters)\n",
+	    curthread->td_tid, len, alq->aq_freebytes, alq->aq_waiters);*/
+
 	/*
-	 * We need to serialise wakeups to ensure records remain in order.
-	 * Therefore, wakeup the next thread in the queue waiting for
-	 * ALQ resources to be available.
-	 * (Technically this is only required if we actually entered the above
-	 * while loop.)
+	 * If there are waiters, wakeup the next thread in the queue waiting for
+	 * ALQ resources.
 	 */
-	wakeup_one(alq);
+	if (alq->aq_waiters > 0) {
+		if (alq->aq_flags & AQ_ORDERED)
+			wakeup_one(&alq->aq_waiters);
+		else
+			wakeup(alq);
+	}
 
 	/* Bail if we're shutting down. */
 	if (alq->aq_flags & AQ_SHUTDOWN) {
@@ -569,6 +602,8 @@ alq_writen(struct alq *alq, void *data, 
 		activate = 1;
 	}
 
+	KASSERT((HAS_PENDING_DATA(alq)), ("%s: queue emtpy!", __func__));
+
 	ALQ_UNLOCK(alq);
 
 	if (activate) {
@@ -584,7 +619,7 @@ int
 alq_write(struct alq *alq, void *data, int flags)
 {
 	/* Should only be called in fixed length message (legacy) mode. */
-	KASSERT((alq->aq_entmax > 0 && alq->aq_entlen > 0),
+	KASSERT((!(alq->aq_flags & AQ_VARLEN)),
 	    ("%s: fixed length write on variable length queue", __func__));
 	return (alq_writen(alq, data, alq->aq_entlen, flags));
 }
@@ -651,19 +686,31 @@ alq_getn(struct alq *alq, int len, int f
 	}
 
 	/*
+	 * If we want ordered writes and there are threads already waiting for
+	 * resources to become available, spin until we're woken.
+	 */
+	if (alq->aq_flags & AQ_ORDERED && alq->aq_waiters > 0) {
+		/*printf("tid %d order sleep, wants %d bytes (%d avail)\n",
+		    curthread->td_tid, len, alq->aq_freebytes);*/
+		alq->aq_waiters++;
+		msleep_spin(&alq->aq_waiters, &alq->aq_mtx, "alqgetn", 0);
+		alq->aq_waiters--;
+		/*printf("tid %d order woken, wants %d bytes (%d avail)\n",
+		    curthread->td_tid, len, alq->aq_freebytes);*/
+	}
+
+	/*
 	 * ALQ_WAITOK or contigbytes >= len,
 	 * either spin until we have enough free contiguous bytes (former)
 	 * or skip (latter). However, in the latter case, we can't skip if
 	 * other threads are already waiting (AQ_WANTED is set), otherwise
 	 * records can get out of order.
 	 */
-	while ((contigbytes < len && !(alq->aq_flags & AQ_SHUTDOWN))
-	    || alq->aq_flags & AQ_WANTED) {
+	while (contigbytes < len && !(alq->aq_flags & AQ_SHUTDOWN)) {
 		alq->aq_flags |= AQ_WANTED;
+		alq->aq_waiters++;
 		msleep_spin(alq, &alq->aq_mtx, "alqgetn", 0);
-
-		KASSERT(!(alq->aq_flags & AQ_WANTED),
-		    ("AQ_WANTED should have been unset!"));
+		alq->aq_waiters--;
 
 		if (alq->aq_writehead <= alq->aq_writetail)
 			contigbytes = alq->aq_freebytes;
@@ -672,13 +719,15 @@ alq_getn(struct alq *alq, int len, int f
 	}
 
 	/*
-	 * We need to serialise wakeups to ensure records remain in order.
-	 * Therefore, wakeup the next thread in the queue waiting for
-	 * ALQ resources to be available.
-	 * (Technically this is only required if we actually entered the above
-	 * while loop.)
+	 * If there are waiters, wakeup the next thread in the queue waiting for
+	 * ALQ resources.
 	 */
-	wakeup_one(alq);
+	if (alq->aq_waiters > 0) {
+		if (alq->aq_flags & AQ_ORDERED)
+			wakeup_one(&alq->aq_waiters);
+		else
+			wakeup(alq);
+	}
 
 	/* Bail if we're shutting down. */
 	if (alq->aq_flags & AQ_SHUTDOWN) {
@@ -700,32 +749,39 @@ struct ale *
 alq_get(struct alq *alq, int flags)
 {
 	/* Should only be called in fixed length message (legacy) mode. */
-	KASSERT((alq->aq_entmax > 0 && alq->aq_entlen > 0),
+	KASSERT((!(alq->aq_flags & AQ_VARLEN)),
 	    ("%s: fixed length get on variable length queue", __func__));
 	return (alq_getn(alq, alq->aq_entlen, flags));
 }
 
 void
-alq_postn(struct alq *alq, struct ale *ale, int flags)
+alq_post_flags(struct alq *alq, struct ale *ale, int flags)
 {
 	int activate;
 
 	activate = 0;
 
-	if (!(alq->aq_flags & AQ_ACTIVE) && !(flags & ALQ_NOACTIVATE)) {
-		alq->aq_flags |= AQ_ACTIVE;
-		activate = 1;
-	}
+	if (ale->ae_bytesused > 0) {
+		if (!(alq->aq_flags & AQ_ACTIVE) &&
+		    !(flags & ALQ_NOACTIVATE)) {
+			alq->aq_flags |= AQ_ACTIVE;
+			activate = 1;
+		}
 
-	alq->aq_writehead += ale->ae_bytesused;
-	alq->aq_freebytes -= ale->ae_bytesused;
+		alq->aq_writehead += ale->ae_bytesused;
+		alq->aq_freebytes -= ale->ae_bytesused;
 
-	/* Wrap aq_writehead if we've filled to the end of the buffer. */
-	if (alq->aq_writehead == alq->aq_buflen)
-		alq->aq_writehead = 0;
+		/* Wrap aq_writehead if we filled to the end of the buffer. */
+		if (alq->aq_writehead == alq->aq_buflen)
+			alq->aq_writehead = 0;
+
+		KASSERT((alq->aq_writehead >= 0 &&
+		    alq->aq_writehead < alq->aq_buflen),
+		    ("%s: aq_writehead < 0 || aq_writehead >= aq_buflen",
+		    __func__));
 
-	KASSERT((alq->aq_writehead >= 0 && alq->aq_writehead < alq->aq_buflen),
-	    ("%s: aq_writehead < 0 || aq_writehead >= aq_buflen", __func__));
+		KASSERT((HAS_PENDING_DATA(alq)), ("%s: queue emtpy!", __func__));
+	}
 
 	ALQ_UNLOCK(alq);
 
@@ -761,7 +817,7 @@ alq_flush(struct alq *alq)
 	ALQ_UNLOCK(alq);
 
 	if (needwakeup)
-		wakeup_one(alq);
+		wakeup(alq);
 }
 
 /*

Modified: user/lstewart/alq_varlen_head/sys/sys/alq.h
==============================================================================
--- user/lstewart/alq_varlen_head/sys/sys/alq.h	Wed Mar 17 02:48:14 2010	(r205236)
+++ user/lstewart/alq_varlen_head/sys/sys/alq.h	Wed Mar 17 03:02:48 2010	(r205237)
@@ -56,6 +56,7 @@ struct ale {
 #define	ALQ_NOWAIT	0x0001
 #define	ALQ_WAITOK	0x0002
 #define	ALQ_NOACTIVATE	0x0004
+#define	ALQ_ORDERED	0x0010
 
 /* Suggested mode for file creation. */
 #define	ALQ_DEFAULT_CMODE	0600
@@ -77,8 +78,15 @@ struct ale {
  *	error from open or 0 on success
  */
 struct ucred;
-int alq_open(struct alq **, const char *file, struct ucred *cred, int cmode,
-	    int size, int count);
+int alq_open_flags(struct alq **alqp, const char *file, struct ucred *cred, int cmode,
+	    int size, int count, int flags);
+
+static __inline int
+alq_open(struct alq **alqp, const char *file, struct ucred *cred, int cmode,
+	    int size, int count)
+{
+	return alq_open_flags(alqp, file, cred, cmode, size, count, 0);
+}
 
 /*
  * alq_writen:  Write data into the queue
@@ -133,12 +141,12 @@ struct ale *alq_get(struct alq *alq, int
  *	ale	An asynch logging entry returned by alq_get.
  *	flags	ALQ_NOACTIVATE
  */
-void alq_postn(struct alq *alq, struct ale *ale, int flags);
+void alq_post_flags(struct alq *alq, struct ale *ale, int flags);
 
 static __inline void
 alq_post(struct alq *alq, struct ale *ale)
 {
-	alq_postn(alq, ale, 0);
+	alq_post_flags(alq, ale, 0);
 }
 
 #endif	/* _SYS_ALQ_H_ */



Want to link to this message? Use this URL: <https://mail-archive.FreeBSD.org/cgi/mid.cgi?201003170302.o2H32mpw075673>