Skip site navigation (1)Skip section navigation (2)
Date:      Sun, 6 Apr 2008 00:28:54 GMT
From:      Aaron Meihm <alm@FreeBSD.org>
To:        Perforce Change Reviews <perforce@freebsd.org>
Subject:   PERFORCE change 139439 for review
Message-ID:  <200804060028.m360Ssfq078410@repoman.freebsd.org>

next in thread | raw e-mail | index | archive | help
http://perforce.freebsd.org/chv.cgi?CH=139439

Change 139439 by alm@alm_praetorian on 2008/04/06 00:27:56

	Additional code working towards new threaded producer/consumer model.

Affected files ...

.. //depot/projects/trustedbsd/netauditd/grammar.y#3 edit
.. //depot/projects/trustedbsd/netauditd/netauditd.h#13 edit
.. //depot/projects/trustedbsd/netauditd/reader.c#2 edit
.. //depot/projects/trustedbsd/netauditd/reader.h#2 edit
.. //depot/projects/trustedbsd/netauditd/writer.c#2 edit
.. //depot/projects/trustedbsd/netauditd/writer.h#2 edit

Differences ...

==== //depot/projects/trustedbsd/netauditd/grammar.y#3 (text+ko) ====

@@ -131,6 +131,7 @@
 		new->ac_name = $2;
 		new->ac_path = $3;
 		new->ac_init_func = writer_init_trail;
+		new->ac_write_func = writer_write_trail;
 		writer_q_init(new);
 		$$ = new;
 	}

==== //depot/projects/trustedbsd/netauditd/netauditd.h#13 (text+ko) ====

@@ -38,12 +38,11 @@
 struct audit_record {
 	void			*ar_buf;
 	u_int32_t		ar_record_len;
-	int			ar_refcount;
 };
 
 struct au_queue_ent {
 	TAILQ_ENTRY(au_queue_ent)	aq_glue;
-	struct audit_record		*aq_record;
+	struct audit_record		aq_record;
 	u_int32_t			aq_remain;
 };
 
@@ -51,12 +50,12 @@
 
 struct au_qpair {
 	au_q_t				qp_a, qp_b;
-	int				qp_ready;
-	au_q_t				*qp_read, *qp_write;
 	pthread_mutex_t			qp_lock;
-	pthread_cond_t			qp_cond;
-	u_int32_t			qp_read_size;
-	time_t				qp_time;
+	au_q_t				*qp_store;
+	au_q_t				*qp_hold;
+	au_q_t				*qp_free;
+	u_int32_t			qp_store_len;
+	int				qp_store_n;
 };
 
 struct au_cmpnt {
@@ -76,6 +75,7 @@
 
 	int			(*ac_init_func)(struct au_cmpnt *);
 	int			(*ac_read_func)(struct au_cmpnt *);
+	int			(*ac_write_func)(struct au_cmpnt *);
 };
 
 struct au_src_buffer {

==== //depot/projects/trustedbsd/netauditd/reader.c#2 (text+ko) ====

@@ -26,6 +26,7 @@
 #include <sys/types.h>
 #include <sys/socket.h>
 #include <sys/queue.h>
+#include <sys/endian.h>
 
 #include <stdio.h>
 #include <stdlib.h>
@@ -43,22 +44,27 @@
 #include "reader.h"
 #include "writer.h"
 
-#define ROTATE(x)		if (x.qp_read == &x.qp_a) { \
-					x.qp_read = &x.qp_b; \
-					x.qp_write = &x.qp_a; \
-				} \
-				else { \
-					x.qp_read = &x.qp_a; \
-					x.qp_write = &x.qp_b; \
-				} \
-				x.qp_read_size = 0; \
-				x.qp_time = time(NULL);
-
 #define	SRC_BUFFER_INIT(x)	x = malloc(sizeof(struct au_src_buffer)); \
 				assert (x != NULL); \
 				bzero(x, sizeof(struct au_src_buffer));
 
-static int	srcs_online;	/* All source components online */
+#define WRITER_SIGNAL(x)	(void) pthread_mutex_lock(&ready_lock); \
+				records_waiting += x; \
+				x = 0; \
+				(void) pthread_cond_signal(&ready_cond); \
+				(void) pthread_mutex_unlock(&ready_lock)
+
+#define ROTATE			assert(ac->ac_q.qp_free != NULL); \
+				ac->ac_q.qp_hold = ac->ac_q.qp_store; \
+				ac->ac_q.qp_store = ac->ac_q.qp_free; \
+				ac->ac_q.qp_free = NULL; \
+				ac->ac_q.qp_store_len = 0
+
+pthread_mutex_t	ready_lock;
+pthread_cond_t	ready_cond;
+u_int32_t	records_waiting;
+
+static fd_set	rfds;
 
 int
 reader_accept_client(struct au_cmpnt *ac)
@@ -82,6 +88,7 @@
 	bcopy(&addr, &new->as_addr, sizeof(new->as_addr));
 	new->as_addrlen = addrlen;
 	TAILQ_INSERT_TAIL(&ac->ac_sbuffers, new, as_glue);
+	reader_build_rfds(&rfds);
 	return (0);
 }
 
@@ -89,25 +96,30 @@
 reader_build_rfds(fd_set *rfds)
 {
 	struct au_cmpnt *ac;
+	struct au_src_buffer *s;
 
 	FD_ZERO(rfds);
 	TAILQ_FOREACH(ac, &ac_list_src, ac_glue) {
-		if (ac->ac_flags & FLAG_ONLINE)
-			FD_SET(ac->ac_fd, rfds);
+		FD_SET(ac->ac_fd, rfds);
+		if (ac->ac_type == COMPONENT_NET) {
+			TAILQ_FOREACH(s, &ac->ac_sbuffers, as_glue)
+				FD_SET(s->as_fd, rfds);
+		}
 	}
 }
 
 void
 reader_handler(fd_set *rfds)
 {
-	struct au_cmpnt *ac, *tmp;
+	struct au_src_buffer *as, *tmp;
+	struct au_cmpnt *ac;
 	fd_set lrfds;
 	struct timeval tv;
 	int ret, ret2;
 
 	lrfds = *rfds;
 	bzero(&tv, sizeof(struct timeval));
-	tv.tv_sec = 1;
+	tv.tv_sec = 5;
 	ret = select(FD_SETSIZE, &lrfds, NULL, NULL, &tv);
 	if (ret == -1) {
 		if (errno == EINTR)
@@ -115,15 +127,22 @@
 		else
 			exit(2);
 	}
-	else if (ret == 0)
+	else if (ret == 0) {
+		reader_timeout();
 		return;
-	TAILQ_FOREACH_SAFE(ac, &ac_list_src, ac_glue, tmp) {
+	}
+	TAILQ_FOREACH(ac, &ac_list_src, ac_glue) {
 		if (FD_ISSET(ac->ac_fd, &lrfds)) {
 			ret2 = ac->ac_read_func(ac);
-			if (ret2 == -1) {
-				(void) close(ac->ac_fd);
-				ac->ac_flags &= FLAG_ONLINE;
-			}
+			if (ret2 == -1)
+				exit(2);
+		}
+		if (ac->ac_type == COMPONENT_NET) {
+			TAILQ_FOREACH_SAFE(as, &ac->ac_sbuffers, as_glue,
+			    tmp)
+				if (FD_ISSET(as->as_fd, &lrfds)) {
+					ret2 = reader_read_socket(as);
+				}
 		}
 	}
 }
@@ -131,30 +150,13 @@
 void
 reader_init()
 {
-	time_t t;
 	struct au_cmpnt *ac;
 
-	srcs_online = 1;
-	TAILQ_FOREACH(ac, &ac_list_src, ac_glue) {
-		if (ac->ac_flags & FLAG_ONLINE)
-			continue;
-		t = time(NULL);
-		if (ac->ac_failed != 0)
-			if ((ac->ac_failed + READER_RETRY) > t) {
-				srcs_online = 0;
-				continue;
-			}
-		dprintf("reader_init: %s", ac->ac_name);
-		if (ac->ac_init_func(ac) == -1) {
-			srcs_online = 0;
-			ac->ac_failed = time(NULL);
-		}
-		else {
-			dprintf("reader_init: %s online", ac->ac_name);
-			ac->ac_failed = 0;
-			ac->ac_flags |= FLAG_ONLINE;
-		}
-	}
+	(void) pthread_mutex_init(&ready_lock, NULL);
+	(void) pthread_cond_init(&ready_cond, NULL);
+	TAILQ_FOREACH(ac, &ac_list_src, ac_glue)
+		if (ac->ac_init_func(ac) == -1)
+			exit(2);
 }
 
 int
@@ -205,50 +207,51 @@
 
 	for (i = 0; i < ac->ac_ndsts; i++)
 		reader_q_record_cmpnt(ar, ac->ac_dsts[i]);
+	/* Once we have copied the record to all this components consumers
+	 * we can discard it. */
+	free(ar->ar_buf);
+	free(ar);
 }
 
 void
 reader_q_record_cmpnt(struct audit_record *ar, struct au_cmpnt *ac)
 {
 	struct au_queue_ent *new;
-	int rotate = 0;
-	time_t t;
 
-	t = time(NULL);
-	if (ac->ac_q.qp_time == 0)
-		ac->ac_q.qp_time = t;
-	else if ((ac->ac_q.qp_time + WRITER_ROTATE_TIMEOUT) <= t) {
-		if (!TAILQ_EMPTY(ac->ac_q.qp_read))
-			rotate = 1;
-	}
-	if (ac->ac_q.qp_read_size >= WRITER_LOW_WATER)
-		rotate = 1;
-	if (rotate) {
-		if (pthread_mutex_lock(&ac->ac_q.qp_lock) != 0)
-			exit(2);
-		/* If the writer is still processing the other buffer, the
-		 * record is dropped. */
-		if (!ac->ac_q.qp_ready) {
-			if (pthread_mutex_unlock(&ac->ac_q.qp_lock) != 0)
-				exit(2);
-			return;
-		}
-		dprintf("reader_q_record_cmpnt: %s: rotate", ac->ac_name);
-		ROTATE(ac->ac_q);
-		if (pthread_cond_signal(&ac->ac_q.qp_cond) != 0)
-			exit(2);
-		if (pthread_mutex_unlock(&ac->ac_q.qp_lock) != 0)
-			exit(2);
-	}
-	dprintf("reader_q_record: %p for %s", ar, ac->ac_name);
 	new = malloc(sizeof(struct au_queue_ent));
 	assert(new != NULL);
 	bzero(new, sizeof(struct au_queue_ent));
-	new->aq_record = ar;
+	new->aq_record.ar_buf = malloc(ar->ar_record_len);
+	assert(new->aq_record.ar_buf != NULL);
+	bcopy(ar->ar_buf, new->aq_record.ar_buf, ar->ar_record_len);
+	new->aq_record.ar_record_len = ar->ar_record_len;
 	new->aq_remain = ar->ar_record_len;
-	ar->ar_refcount++;
-	TAILQ_INSERT_TAIL(ac->ac_q.qp_read, new, aq_glue);
-	ac->ac_q.qp_read_size += ar->ar_record_len;
+	(void) pthread_mutex_lock(&ac->ac_q.qp_lock);
+	if (ac->ac_q.qp_store_len < WRITER_MAX) {
+		dprintf("queueing record for %s (%d bytes in queue)",
+		    ac->ac_name, ac->ac_q.qp_store_len);
+		(void) pthread_mutex_unlock(&ac->ac_q.qp_lock);
+		TAILQ_INSERT_TAIL(ac->ac_q.qp_store, new, aq_glue);
+		ac->ac_q.qp_store_len += ar->ar_record_len;
+		ac->ac_q.qp_store_n++;
+		return;
+	}
+	if (ac->ac_q.qp_hold != NULL) {
+		/* This consumer is still processing it's queue, so the record
+		 * is dropped. */
+		(void) pthread_mutex_unlock(&ac->ac_q.qp_lock);
+		dprintf("dropping record for %s", ac->ac_name);
+		free(new->aq_record.ar_buf);
+		free(new);
+		return;
+	}
+	dprintf("rotating queues for %s", ac->ac_name);
+	ROTATE;
+	WRITER_SIGNAL(ac->ac_q.qp_store_n);
+	(void) pthread_mutex_unlock(&ac->ac_q.qp_lock);
+	TAILQ_INSERT_TAIL(ac->ac_q.qp_store, new, aq_glue);
+	ac->ac_q.qp_store_len += ar->ar_record_len;
+	ac->ac_q.qp_store_n++;
 }
 
 int
@@ -279,16 +282,96 @@
 	return (0);
 }
 
+int
+reader_read_socket(struct au_src_buffer *asb)
+{
+	u_char *bufptr, *recbufptr;
+	int ret, left;
+	u_int32_t hdr_remain, val, need;
+	u_char as_buf[2048];
+
+	ret = read(asb->as_fd, as_buf, sizeof(as_buf));
+	if (ret == -1) {
+		if (errno != EINTR)
+			return (-1);
+		else
+			return (0);
+	}
+	else if (ret == 0)
+		return (-1);
+	left = ret;
+	bufptr = as_buf;
+	while (left > 0) {
+		if (asb->as_record == NULL) {
+			hdr_remain = sizeof(asb->as_header) -
+			    asb->as_nread;
+			if (left >= hdr_remain) {
+				(void) memcpy(asb->as_header + asb->as_nread,
+				    bufptr, hdr_remain);
+				asb->as_nread += hdr_remain;
+				left -= hdr_remain;
+				bufptr += hdr_remain;
+				(void) memcpy(&val, asb->as_header + 1,
+				    sizeof(val));
+				asb->as_record =
+				    malloc(sizeof(struct audit_record));
+				assert(asb->as_record != NULL);
+				asb->as_record->ar_record_len = be32toh(val);
+				asb->as_record->ar_buf = \
+				    malloc(asb->as_record->ar_record_len);
+				assert(asb->as_record->ar_buf != NULL);
+				(void) memcpy(asb->as_record->ar_buf,
+				    asb->as_header, sizeof(asb->as_header));
+				continue;
+			}
+			else {
+				(void) memcpy(asb->as_header + asb->as_nread,
+				    bufptr, left);
+				asb->as_nread += left;
+				return (0);
+			}
+		}
+		need = asb->as_record->ar_record_len - asb->as_nread;
+		recbufptr = asb->as_record->ar_buf + asb->as_nread;
+		if (left < need) {
+			(void) memcpy(recbufptr, bufptr, left);
+			asb->as_nread += left;
+			return (0);
+		}
+		else {
+			(void) memcpy(recbufptr, bufptr, need);
+			left -= need;
+			bufptr += need;
+			reader_q_record(asb->as_record, asb->as_parent);
+			asb->as_record = NULL;
+			asb->as_nread = 0;
+		}
+	}
+	return (0);
+}
+
 void
 reader_start()
 {
-	fd_set rfds;
+	reader_init();
+	reader_build_rfds(&rfds);
+	for (;;)
+		reader_handler(&rfds);
+}
+
+void
+reader_timeout()
+{
+	struct au_cmpnt *ac;
 
-	for (;;) {
-		if (!srcs_online) {
-			reader_init();
-			reader_build_rfds(&rfds);
+	TAILQ_FOREACH(ac, &ac_list_dst, ac_glue) {
+		(void) pthread_mutex_lock(&ac->ac_q.qp_lock);
+		if ((ac->ac_q.qp_hold != NULL) || (ac->ac_q.qp_store_n == 0)) {
+			(void) pthread_mutex_unlock(&ac->ac_q.qp_lock);
+			continue;
 		}
-		reader_handler(&rfds);
+		ROTATE;
+		WRITER_SIGNAL(ac->ac_q.qp_store_n);
+		(void) pthread_mutex_unlock(&ac->ac_q.qp_lock);
 	}
 }

==== //depot/projects/trustedbsd/netauditd/reader.h#2 (text+ko) ====

@@ -24,7 +24,9 @@
  * SUCH DAMAGE.
  */
 
-#define		READER_RETRY	30
+extern		pthread_mutex_t	ready_lock;
+extern		pthread_cond_t	ready_cond;
+u_int32_t			records_waiting;
 
 int		reader_accept_client(struct au_cmpnt *);
 void		reader_build_rfds(fd_set *);
@@ -35,4 +37,6 @@
 void		reader_q_record(struct audit_record *, struct au_cmpnt *);
 void		reader_q_record_cmpnt(struct audit_record *, struct au_cmpnt *);
 int		reader_read_pipe(struct au_cmpnt *);
+int		reader_read_socket(struct au_src_buffer *);
 void		reader_start(void);
+void		reader_timeout(void);

==== //depot/projects/trustedbsd/netauditd/writer.c#2 (text+ko) ====

@@ -61,13 +61,14 @@
 void
 writer_handler(fd_set *wfds)
 {
+	struct au_cmpnt *ac;
 	fd_set lwfds;
 	struct timeval *tv = NULL;
-	int ret;
+	int ret, have_records = 1;
 
 	lwfds = *wfds;
 	if (!dsts_online) {
-		dprintf("writer_handler: applying select timeout");
+		dprintf("writer applying select timeout");
 		tv = malloc(sizeof(struct timeval));
 		bzero(tv, sizeof(struct timeval));
 		tv->tv_sec = 1;
@@ -79,6 +80,34 @@
 		else
 			exit(2);
 	}
+	(void) pthread_mutex_lock(&ready_lock);
+	if (records_waiting == 0) {
+		have_records = 0;
+		if (dsts_online) {
+			while (records_waiting == 0) {
+				dprintf("writer waiting for records");
+				(void) pthread_cond_wait(&ready_cond,
+				    &ready_lock);
+			}
+			have_records = 1;
+			(void) pthread_mutex_unlock(&ready_lock);
+		}
+	}
+	(void) pthread_mutex_unlock(&ready_lock);
+	if (!have_records)
+		return;
+	TAILQ_FOREACH(ac, &ac_list_dst, ac_glue) {
+		if (!FD_ISSET(ac->ac_fd, &lwfds))
+			continue;
+		(void) pthread_mutex_lock(&ac->ac_q.qp_lock);
+		if (ac->ac_q.qp_hold != NULL) {
+			(void) pthread_mutex_unlock(&ac->ac_q.qp_lock);
+			ret = ac->ac_write_func(ac);
+			if (ret == 1)	/* Queue has been drained */
+				writer_q_drained(ac);
+		}
+		(void) pthread_mutex_unlock(&ac->ac_q.qp_lock);
+	}
 }
 
 void
@@ -153,16 +182,25 @@
 }
 
 void
+writer_q_drained(struct au_cmpnt *ac)
+{
+	(void) pthread_mutex_lock(&ac->ac_q.qp_lock);
+	assert(ac->ac_q.qp_hold != NULL);
+	ac->ac_q.qp_free = ac->ac_q.qp_hold;
+	ac->ac_q.qp_hold = NULL;
+	(void) pthread_mutex_unlock(&ac->ac_q.qp_lock);
+}
+
+void
 writer_q_init(struct au_cmpnt *ac)
 {
 	TAILQ_INIT(&ac->ac_q.qp_a);
 	TAILQ_INIT(&ac->ac_q.qp_b);
-	ac->ac_q.qp_read = &ac->ac_q.qp_a;
-	ac->ac_q.qp_write = &ac->ac_q.qp_b;
+	ac->ac_q.qp_store = &ac->ac_q.qp_a;
+	ac->ac_q.qp_free = &ac->ac_q.qp_b;
+	ac->ac_q.qp_hold = NULL;
 	if (pthread_mutex_init(&ac->ac_q.qp_lock, NULL) != 0)
 		exit(2);
-	if (pthread_cond_init(&ac->ac_q.qp_cond, NULL) != 0)
-		exit(2);
 }
 
 void *
@@ -179,3 +217,42 @@
 	}
 	return (NULL);
 }
+
+int
+writer_write_trail(struct au_cmpnt *ac)
+{
+	struct au_queue_ent *aq, *tmp;
+	struct audit_record *ar;
+	u_int32_t offset;
+	int ret;
+
+	TAILQ_FOREACH_SAFE(aq, ac->ac_q.qp_hold, aq_glue, tmp) {
+		ar = &aq->aq_record;
+		offset = ar->ar_record_len - aq->aq_remain;
+		dprintf("write offset %d", offset);
+		ret = write(ac->ac_fd, ar->ar_buf + offset, aq->aq_remain);
+		if (ret == -1) {
+			if ((errno == EINTR) || (errno == EAGAIN))
+				return (0);
+			else
+				return (-1);
+		}
+		else if (ret == aq->aq_remain) {
+			dprintf("wrote %d bytes to %s (completed)", ret,
+			    ac->ac_name);
+			(void) pthread_mutex_lock(&ready_lock);
+			records_waiting--;
+			dprintf("%d records waiting", records_waiting);
+			(void) pthread_mutex_unlock(&ready_lock);
+			TAILQ_REMOVE(ac->ac_q.qp_hold, aq, aq_glue);
+			free(aq->aq_record.ar_buf);
+			free(aq);
+		}
+		else {
+			dprintf("partial write");
+			aq->aq_remain -= ret;
+			return (0);
+		}
+	}
+	return (1);
+}

==== //depot/projects/trustedbsd/netauditd/writer.h#2 (text+ko) ====

@@ -24,7 +24,7 @@
  * SUCH DAMAGE.
  */
 
-#define		WRITER_LOW_WATER	1024000
+#define		WRITER_MAX		32768
 #define		WRITER_ROTATE_TIMEOUT	5
 #define		WRITER_RETRY		60
 
@@ -33,5 +33,7 @@
 void		writer_init(void);
 int		writer_init_net(struct au_cmpnt *);
 int		writer_init_trail(struct au_cmpnt *);
+void		writer_q_drained(struct au_cmpnt *);
 void		writer_q_init(struct au_cmpnt *);
 void		*writer_start(void *);
+int		writer_write_trail(struct au_cmpnt *);



Want to link to this message? Use this URL: <https://mail-archive.FreeBSD.org/cgi/mid.cgi?200804060028.m360Ssfq078410>