Skip site navigation (1)Skip section navigation (2)
Date:      Sun, 18 May 2008 20:13:08 GMT
From:      Aaron Meihm <alm@FreeBSD.org>
To:        Perforce Change Reviews <perforce@freebsd.org>
Subject:   PERFORCE change 141820 for review
Message-ID:  <200805182013.m4IKD8Gg058015@repoman.freebsd.org>

next in thread | raw e-mail | index | archive | help
http://perforce.freebsd.org/chv.cgi?CH=141820

Change 141820 by alm@alm_praetorian on 2008/05/18 20:12:18

	Remove some old cruft, support for all transport methods
	is now implemented (pipe, net, trail).

Affected files ...

.. //depot/projects/trustedbsd/netauditd/conf.c#9 edit
.. //depot/projects/trustedbsd/netauditd/conf.h#2 edit
.. //depot/projects/trustedbsd/netauditd/grammar.y#4 edit
.. //depot/projects/trustedbsd/netauditd/netauditd.c#18 edit
.. //depot/projects/trustedbsd/netauditd/netauditd.conf#6 edit
.. //depot/projects/trustedbsd/netauditd/netauditd.h#15 edit
.. //depot/projects/trustedbsd/netauditd/reader.c#5 edit
.. //depot/projects/trustedbsd/netauditd/reader.h#3 edit
.. //depot/projects/trustedbsd/netauditd/token.l#2 edit
.. //depot/projects/trustedbsd/netauditd/writer.c#6 edit
.. //depot/projects/trustedbsd/netauditd/writer.h#3 edit

Differences ...

==== //depot/projects/trustedbsd/netauditd/conf.c#9 (text+ko) ====

@@ -23,6 +23,7 @@
  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
  * SUCH DAMAGE.
  */
+
 #include <sys/types.h>
 #include <sys/socket.h>
 #include <sys/queue.h>
@@ -59,11 +60,11 @@
 }
 
 struct au_cmpnt *
-conf_get_src(char *name)
+conf_get_reader(char *name)
 {
 	struct au_cmpnt *ret;
 
-	TAILQ_FOREACH(ret, &ac_list_src, ac_glue) {
+	TAILQ_FOREACH(ret, &ac_list_readers, ac_glue) {
 		if (strcmp(ret->ac_name, name) == 0)
 			return (ret);
 	}
@@ -71,11 +72,11 @@
 }
 
 struct au_cmpnt *
-conf_get_dst(char *name)
+conf_get_writer(char *name)
 {
 	struct au_cmpnt *ret;
 
-	TAILQ_FOREACH(ret, &ac_list_dst, ac_glue) {
+	TAILQ_FOREACH(ret, &ac_list_writers, ac_glue) {
 		if (strcmp(ret->ac_name, name) == 0)
 			return (ret);
 	}
@@ -87,13 +88,15 @@
 {
 	if ((src == NULL) || (dst == NULL))
 		conf_error("A component specified does not exist");
-	src->ac_ndsts++;
-	if (src->ac_dsts == NULL)
-		src->ac_dsts = malloc(sizeof(struct au_cmpnt *));
+	src->ac_nwriters++;
+	if (src->ac_writers == NULL)
+		src->ac_writers = malloc(sizeof(struct au_cmpnt *));
 	else
-		src->ac_dsts = realloc(src->ac_dsts,
-		    sizeof(struct au_cmpnt *) * src->ac_ndsts);
-	src->ac_dsts[src->ac_ndsts - 1] = dst;
+		src->ac_writers = realloc(src->ac_writers,
+		    sizeof(struct au_cmpnt *) * src->ac_nwriters);
+	if (src->ac_writers == NULL)
+		exit(2);
+	src->ac_writers[src->ac_nwriters - 1] = dst;
 }
 
 void

==== //depot/projects/trustedbsd/netauditd/conf.h#2 (text+ko) ====

@@ -27,8 +27,8 @@
 extern int	lineno;
 
 void		conf_error(char *, ...);
-struct au_cmpnt	*conf_get_src(char *);
-struct au_cmpnt	*conf_get_dst(char *);
+struct au_cmpnt	*conf_get_reader(char *);
+struct au_cmpnt	*conf_get_writer(char *);
 void		conf_link(struct au_cmpnt *, struct au_cmpnt *);
 void		conf_load(char *);
 void		yyerror(const char *);

==== //depot/projects/trustedbsd/netauditd/grammar.y#4 (text+ko) ====

@@ -24,6 +24,7 @@
  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
  * SUCH DAMAGE.
  */
+
 #include <sys/types.h>
 #include <sys/socket.h>
 #include <sys/queue.h>
@@ -32,6 +33,7 @@
 #include <string.h>
 #include <unistd.h>
 #include <netdb.h>
+#include <pthread.h>
 #include <assert.h>
 
 #include "conf.h"
@@ -39,10 +41,9 @@
 #include "reader.h"
 #include "writer.h"
 
-#define AU_CMPNT_INIT(x)	x = malloc(sizeof(struct au_cmpnt)); \
-				assert (x != NULL); \
-				bzero(x, sizeof(struct au_cmpnt)); \
-				TAILQ_INIT(&x->ac_sbuffers);
+#define AU_CMPNT_INIT(x)	x = calloc(1, sizeof(struct au_cmpnt)); \
+				if (x == NULL) \
+					exit(2);
 
 static int	ainfo_passive;
 
@@ -55,10 +56,10 @@
 	struct au_cmpnt		*ac;
 }
 
-%token		SRC DST MAP NEWLINE STRING PIPE_TOKEN NET_TOKEN TRAIL_TOKEN
+%token		READ WRITE MAP NEWLINE STRING PIPE_TOKEN NET_TOKEN TRAIL_TOKEN
 %type <str>	STRING
-%type <ac>	src_component_spec dst_component_spec src_component_name
-%type <ac>	dst_component_name
+%type <ac>	read_component_spec write_component_spec read_component_name
+%type <ac>	write_component_name
 %type <ai>	ainfo;
 
 %%
@@ -69,17 +70,17 @@
 	;
 
 cmd	:
-	SRC src_component_spec
+	READ read_component_spec
 	{
-		TAILQ_INSERT_TAIL(&ac_list_src, $2, ac_glue);
+		TAILQ_INSERT_TAIL(&ac_list_readers, $2, ac_glue);
 	}
 	|
-	DST dst_component_spec
+	WRITE write_component_spec
 	{
-		TAILQ_INSERT_TAIL(&ac_list_dst, $2, ac_glue);
+		TAILQ_INSERT_TAIL(&ac_list_writers, $2, ac_glue);
 	}
 	|
-	MAP src_component_name dst_component_name
+	MAP read_component_name write_component_name
 	{
 		conf_link($2, $3);
 	}
@@ -90,54 +91,65 @@
 	nl NEWLINE
 	;
 
-src_component_name	:
+read_component_name	:
 	STRING
 	{
 		struct au_cmpnt *ret;
-		ret = conf_get_src($1);
+		ret = conf_get_reader($1);
 		free($1);
 		$$ = ret;
 	}
 	;
 
-dst_component_name	:
+write_component_name	:
 	STRING
 	{
 		struct au_cmpnt *ret;
-		ret = conf_get_dst($1);
+		ret = conf_get_writer($1);
 		free($1);
 		$$ = ret;
 	}
 	;
 
-dst_component_spec	:
+write_component_spec	:
 	NET_TOKEN { ainfo_passive = 0; } STRING ainfo
 	{
 		struct au_cmpnt *new;
+		int ret;
 		AU_CMPNT_INIT(new);
 		new->ac_type = COMPONENT_NET;
 		new->ac_name = $3;
 		new->ac_ainfo = $4;
 		new->ac_init_func = writer_init_net;
-		writer_q_init(new);
+		new->ac_write_func = writer_write_fd;
+		ret = pthread_mutex_init(&new->ac_writer_lock, NULL);
+		assert(ret == 0);
+		ret = pthread_cond_init(&new->ac_writer_cond, NULL);
+		assert(ret == 0);
+		TAILQ_INIT(&new->ac_writer_q);
 		$$ = new;
 	}
 	|
 	TRAIL_TOKEN STRING STRING
 	{
 		struct au_cmpnt *new;
+		int ret;
 		AU_CMPNT_INIT(new);
 		new->ac_type = COMPONENT_TRAIL;
 		new->ac_name = $2;
 		new->ac_path = $3;
 		new->ac_init_func = writer_init_trail;
-		new->ac_write_func = writer_write_trail;
-		writer_q_init(new);
+		new->ac_write_func = writer_write_fd;
+		ret = pthread_mutex_init(&new->ac_writer_lock, NULL);
+		assert(ret == 0);
+		ret = pthread_cond_init(&new->ac_writer_cond, NULL);
+		assert(ret == 0);
+		TAILQ_INIT(&new->ac_writer_q);
 		$$ = new;
 	}
 	;
 
-src_component_spec	:
+read_component_spec	:
 	PIPE_TOKEN STRING STRING
 	{
 		struct au_cmpnt *new;

==== //depot/projects/trustedbsd/netauditd/netauditd.c#18 (text+ko) ====

@@ -23,6 +23,7 @@
  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
  * SUCH DAMAGE.
  */
+
 #include <sys/types.h>
 #include <sys/socket.h>
 #include <sys/queue.h>
@@ -32,10 +33,9 @@
 #include <string.h>
 #include <stdarg.h>
 #include <unistd.h>
-#include <netdb.h>
-#include <fcntl.h>
 #include <pthread.h>
 #include <signal.h>
+#include <assert.h>
 
 #include "conf.h"
 #include "netauditd.h"
@@ -43,10 +43,9 @@
 #include "writer.h"
 
 static int		debug_flag;
-pthread_mutex_t		debug_mutex;
-ac_head_t		ac_list_src;
-ac_head_t		ac_list_dst;
-
+static pthread_mutex_t	debug_mutex;
+ac_head_t		ac_list_readers;
+ac_head_t		ac_list_writers;
 extern char		*conf_path;
 
 void
@@ -54,27 +53,28 @@
 {
 	char buf[1024];
 	va_list ap;
+	int r;
 
 	if (!debug_flag)
 		return;
-	if (pthread_mutex_lock(&debug_mutex) != 0)
-		exit(2);
+	r = pthread_mutex_lock(&debug_mutex);
+	assert(r == 0);
 	va_start(ap, fmt);
 	(void) vsnprintf(buf, sizeof(buf), fmt, ap);
 	va_end(ap);
 	(void) printf("debug: %s\n", buf);
-	if (pthread_mutex_unlock(&debug_mutex) != 0)
-		exit(2);
+	r = pthread_mutex_unlock(&debug_mutex);
+	assert(r == 0);
 }
 
 int
 main(int argc, char *argv[])
 {
-	pthread_t writer_thread;
 	char ch;
+	int r;
 
-	if (pthread_mutex_init(&debug_mutex, NULL) != 0)
-		exit(2);
+	r = pthread_mutex_init(&debug_mutex, NULL);
+	assert(r == 0);
 	conf_path = DEFAULT_CONF_PATH;
 	while ((ch = getopt(argc, argv, "df:h")) != -1) {
 		switch (ch) {
@@ -91,29 +91,13 @@
 		}
 	}
 	(void) signal(SIGPIPE, SIG_IGN);
-	TAILQ_INIT(&ac_list_src);
-	TAILQ_INIT(&ac_list_dst);
+	TAILQ_INIT(&ac_list_readers);
+	TAILQ_INIT(&ac_list_writers);
 	conf_load(conf_path);
-	if (pthread_create(&writer_thread, NULL, writer_start, NULL) != 0)
-		exit(2);
 	reader_start();
 	return (0);
 }
 
-int
-nonblock(int fd)
-{
-	int flags;
-
-	flags = fcntl(fd, F_GETFL);
-	if (flags == -1)
-		return (-1);
-	flags |= O_NONBLOCK;
-	if (fcntl(fd, F_SETFL, flags) == -1)
-		return (-1);
-	return (0);
-}
-
 void
 usage()
 {

==== //depot/projects/trustedbsd/netauditd/netauditd.conf#6 (text+ko) ====

@@ -1,7 +1,7 @@
-src: p source_pipe /dev/auditpipe
-src: n source_net 0.0.0.0 6655
-dst: n dst_net 127.0.0.1 6655
-dst: t dst_trail /tmp/trail
+read: p source_pipe /dev/auditpipe
+read: n source_net 0.0.0.0 6655
+write: n dst_net 127.0.0.1 6655
+write: t dst_trail /tmp/trail
 
 map: source_pipe dst_net
 map: source_net dst_trail

==== //depot/projects/trustedbsd/netauditd/netauditd.h#15 (text+ko) ====

@@ -24,6 +24,7 @@
  * SUCH DAMAGE.
  */
 
+#define		AU_BUFFER_SIZE		8192
 #define		DEFAULT_CONF_PATH	"/usr/local/etc/netauditd.conf"
 
 enum {
@@ -32,68 +33,58 @@
 	COMPONENT_TRAIL
 };
 
-#define		FLAG_ONLINE	1		/* Component is online */
-#define		FLAG_CONNECTING	(1 << 1)	/* Component connecting */
-
 struct audit_record {
 	void			*ar_buf;
 	int			ar_count;
 	u_int32_t		ar_record_len;
 };
 
-struct au_queue_ent {
-	TAILQ_ENTRY(au_queue_ent)	aq_glue;
-	struct audit_record		aq_record;
-	u_int32_t			aq_remain;
+struct au_buffer {
+	u_char			ab_buf[AU_BUFFER_SIZE];
+	size_t			ab_size;
+	TAILQ_ENTRY(au_buffer)	ab_glue;
 };
 
-typedef TAILQ_HEAD(, au_queue_ent)	au_q_t;
+typedef TAILQ_HEAD(, au_buffer)	ab_q_t;
 
-struct au_qpair {
-	au_q_t				qp_a, qp_b;
-	pthread_mutex_t			qp_lock;
-	au_q_t				*qp_store;
-	au_q_t				*qp_hold;
-	au_q_t				*qp_free;
-	u_int32_t			qp_store_len;
-	int				qp_store_n;
-};
-
 struct au_cmpnt {
 	struct addrinfo		*ac_ainfo;
-	struct au_cmpnt		**ac_dsts;
-	time_t			ac_failed;
+	struct au_cmpnt		**ac_writers;
+	int			ac_nwriters;
+	int			ac_connected;
 	int			ac_fd;
-	int			ac_flags;
-	TAILQ_ENTRY(au_cmpnt)	ac_glue;
 	char			*ac_name;
-	int			ac_ndsts;
 	char			*ac_path;
-	struct au_qpair		ac_q;
 	int			ac_type;
+	struct au_buffer	ac_buffer;
 
-	TAILQ_HEAD(, au_src_buffer)	ac_sbuffers;
+	pthread_mutex_t		ac_writer_lock;
+	pthread_cond_t		ac_writer_cond;
+	int			ac_writer_nbufs;
+	u_int32_t		ac_writer_index;
+	ab_q_t			ac_writer_q;
 
 	int			(*ac_init_func)(struct au_cmpnt *);
 	int			(*ac_read_func)(struct au_cmpnt *);
 	int			(*ac_write_func)(struct au_cmpnt *);
+
+	TAILQ_ENTRY(au_cmpnt)	ac_glue;
 };
 
-struct au_src_buffer {
-	struct sockaddr			as_addr;
-	socklen_t			as_addrlen;
-	int				as_fd;
-	TAILQ_ENTRY(au_src_buffer)	as_glue;
-	u_char				as_header[5];
-	struct au_cmpnt			*as_parent;
-	u_int32_t			as_nread;
-	struct audit_record		*as_record;
+struct au_client {
+	struct sockaddr			auc_addr;
+	socklen_t			auc_addrlen;
+	int				auc_fd;
+	u_char				auc_header[5];
+	struct au_cmpnt			*auc_parent;
+	u_int32_t			auc_nread;
+	struct audit_record		*auc_record;
+	struct au_buffer		auc_buffer;
 };
 
 typedef TAILQ_HEAD(, au_cmpnt)	ac_head_t;
-extern ac_head_t		ac_list_src;
-extern ac_head_t		ac_list_dst;
+extern ac_head_t		ac_list_readers;
+extern ac_head_t		ac_list_writers;
 
 void		dprintf(char *, ...);
-int		nonblock(int);
 void		usage(void);

==== //depot/projects/trustedbsd/netauditd/reader.c#5 (text+ko) ====

@@ -23,6 +23,7 @@
  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
  * SUCH DAMAGE.
  */
+
 #include <sys/types.h>
 #include <sys/socket.h>
 #include <sys/queue.h>
@@ -44,112 +45,166 @@
 #include "reader.h"
 #include "writer.h"
 
-#define	SRC_BUFFER_INIT(x) do {						\
-	x = calloc(1, sizeof(struct au_src_buffer));			\
-	assert(x != NULL);						\
-} while (0)
-
-#define	WRITER_SIGNAL(x) do {						\
-	(void) pthread_mutex_lock(&ready_lock);				\
-	records_waiting += x;						\
-	x = 0;								\
-	(void) pthread_cond_signal(&ready_cond);			\
-	(void) pthread_mutex_unlock(&ready_lock);			\
-} while (0)
-
-#define	ROTATE() do {							\
-	assert(ac->ac_q.qp_free != NULL);				\
-	ac->ac_q.qp_hold = ac->ac_q.qp_store;				\
-	ac->ac_q.qp_store = ac->ac_q.qp_free;				\
-	ac->ac_q.qp_free = NULL;					\
-	ac->ac_q.qp_store_len = 0;					\
-} while (0)
-
-pthread_mutex_t	ready_lock;
-pthread_cond_t	ready_cond;
-u_int32_t	records_waiting;
-
-static fd_set	rfds;
-
 int
 reader_accept_client(struct au_cmpnt *ac)
 {
-	struct au_src_buffer *new;
+	struct au_client *new;
 	struct sockaddr addr;
 	socklen_t addrlen;
-	int fd;
+	int fd, ret;
+	pthread_t p;
 
 	fd = accept(ac->ac_fd, &addr, &addrlen);
 	if (fd == -1) {
-		if ((errno != EINTR) && (errno != EWOULDBLOCK) &&
-		    (errno != ECONNABORTED))
+		if ((errno == EINTR) || (errno == ECONNABORTED))
+			return (0);
+		else
 			return (-1);
-		else
-			return (0);
 	}
-	SRC_BUFFER_INIT(new);
-	new->as_parent = ac;
-	new->as_fd = fd;
-	bcopy(&addr, &new->as_addr, sizeof(new->as_addr));
-	new->as_addrlen = addrlen;
-	TAILQ_INSERT_TAIL(&ac->ac_sbuffers, new, as_glue);
-	reader_build_rfds(&rfds);
+	new = calloc(1, sizeof(struct au_client));
+	if (new == NULL)
+		exit(2);
+	new->auc_parent = ac;
+	new->auc_fd = fd;
+	bcopy(&addr, &new->auc_addr, sizeof(new->auc_addr));
+	new->auc_addrlen = addrlen;
+	ret = pthread_create(&p, NULL, reader_client_handler, new);
+	assert(ret == 0);
+	(void) pthread_detach(p);
+	dprintf("reader_accept_client(%s): new client %d", ac->ac_name,
+	    new->auc_fd);
 	return (0);
 }
 
 void
-reader_build_rfds(fd_set *rfds)
+reader_push(struct au_cmpnt *ac, struct au_buffer *ab)
 {
-	struct au_cmpnt *ac;
-	struct au_src_buffer *s;
+	struct au_buffer *clone;
+	struct au_cmpnt *dst;
+	int ret, i;
 
-	FD_ZERO(rfds);
-	TAILQ_FOREACH(ac, &ac_list_src, ac_glue) {
-		FD_SET(ac->ac_fd, rfds);
-		if (ac->ac_type == COMPONENT_NET) {
-			TAILQ_FOREACH(s, &ac->ac_sbuffers, as_glue)
-				FD_SET(s->as_fd, rfds);
-		}
+	if (ab->ab_size <= 0)
+		return;
+	for (i = 0; i < ac->ac_nwriters; i++) {
+		dst = ac->ac_writers[i];
+		clone = malloc(sizeof(struct au_buffer));
+		bcopy(ab, clone, sizeof(struct au_buffer));
+		ret = pthread_mutex_lock(&dst->ac_writer_lock);
+		assert(ret == 0);
+		TAILQ_INSERT_TAIL(&dst->ac_writer_q, clone, ab_glue);
+		dst->ac_writer_nbufs++;
+		dprintf("reader_push(%s): %s (%d)", ac->ac_name,
+		    dst->ac_name, dst->ac_writer_nbufs);
+		clone = NULL;
+		ret = pthread_cond_signal(&dst->ac_writer_cond);
+		assert(ret == 0);
+		ret = pthread_mutex_unlock(&dst->ac_writer_lock);
+		assert(ret == 0);
 	}
 }
 
 void
-reader_handler(fd_set *rfds)
+reader_buffer_record(struct au_cmpnt *ac, struct au_buffer *ab,
+    struct audit_record *ar)
 {
-	struct au_src_buffer *as, *tmp;
-	struct au_cmpnt *ac;
-	fd_set lrfds;
-	struct timeval tv;
-	int ret, ret2;
+	u_int32_t rlen = ar->ar_record_len;
+	u_char *ptr;
 
-	lrfds = *rfds;
-	bzero(&tv, sizeof(struct timeval));
-	tv.tv_sec = 5;
-	ret = select(FD_SETSIZE, &lrfds, NULL, NULL, &tv);
-	if (ret == -1) {
-		if (errno == EINTR)
-			return;
-		else
-			exit(2);
+	if ((AU_BUFFER_SIZE - ab->ab_size) < rlen) {
+		reader_push(ac, ab);
+		ab->ab_size = 0;
 	}
-	else if (ret == 0) {
-		reader_timeout();
+	/*
+	 * In the case where the record is too big for the audit
+	 * buffer, the record is dropped.
+	 */
+	if ((ab->ab_size == 0) && (rlen > AU_BUFFER_SIZE)) {
+		dprintf("reader_buffer_record(%s): dropping record, too"
+		    " large (%u)", ac->ac_name, rlen);
+		free(ar->ar_buf);
 		return;
 	}
-	TAILQ_FOREACH(ac, &ac_list_src, ac_glue) {
-		if (FD_ISSET(ac->ac_fd, &lrfds)) {
-			ret2 = ac->ac_read_func(ac);
-			if (ret2 == -1)
+	dprintf("reader_buffer_record(%p): appending %u bytes (%u)",
+	    ab, rlen, ab->ab_size);
+	ptr = ab->ab_buf + ab->ab_size;
+	bcopy(ar->ar_buf, ptr, rlen);
+	ab->ab_size += rlen;
+	free(ar->ar_buf);
+	free(ar);
+}
+
+void *
+reader_client_handler(void *arg)
+{
+	struct timeval tv;
+	struct au_client *ac;
+	fd_set rfds;
+	int ret;
+
+	ac = (struct au_client *)arg;
+	for (;;) {
+		FD_ZERO(&rfds);
+		FD_SET(ac->auc_fd, &rfds);
+		tv.tv_sec = READER_TIMEOUT;
+		tv.tv_usec = 0;
+		ret = select(FD_SETSIZE, &rfds, NULL, NULL, &tv);
+		if (ret == -1) {
+			if (errno == EINTR)
+				continue;
+			else
+				break;
+		} else if (ret == 0) {
+			dprintf("reader_client_handler(%s): client %d timeout",
+			    ac->auc_parent->ac_name, ac->auc_fd);
+			reader_push(ac->auc_parent, &ac->auc_buffer);
+			ac->auc_buffer.ab_size = 0;
+			continue;
+		}
+		if (FD_ISSET(ac->auc_fd, &rfds)) {
+			ret = reader_read_socket(ac);
+			if (ret == -1)
+				break;
+		}
+	}
+	dprintf("reader_client_handler(%s): client %d terminating",
+	    ac->auc_parent->ac_name, ac->auc_fd);
+	(void) close(ac->auc_fd);
+	return (NULL);		/* Thread termination */
+}
+
+void *
+reader_handler(void *arg)
+{
+	struct timeval tv;
+	struct au_cmpnt *ac;
+	fd_set rfds;
+	int ret;
+
+	ac = (struct au_cmpnt *)arg;
+	for (;;) {
+		FD_ZERO(&rfds);
+		FD_SET(ac->ac_fd, &rfds);
+		tv.tv_sec = READER_TIMEOUT;
+		tv.tv_usec = 0;
+		ret = select(FD_SETSIZE, &rfds, NULL, NULL, &tv);
+		if (ret == -1) {
+			if (errno == EINTR)
+				continue;
+			else
 				exit(2);
+		} else if (ret == 0) {
+			if (ac->ac_type == COMPONENT_PIPE) {
+				dprintf("reader_handler(%s): timeout",
+				    ac->ac_name);
+				reader_push(ac, &ac->ac_buffer);
+				ac->ac_buffer.ab_size = 0;
+			}
+			continue;
 		}
-		if (ac->ac_type == COMPONENT_NET) {
-			TAILQ_FOREACH_SAFE(as, &ac->ac_sbuffers, as_glue,
-			    tmp)
-				if (FD_ISSET(as->as_fd, &lrfds)) {
-					ret2 = reader_read_socket(as);
-				}
-		}
+		if (ac->ac_read_func(ac) == -1)
+			exit(2);
 	}
+	return (NULL);
 }
 
 void
@@ -157,13 +212,14 @@
 {
 	struct au_cmpnt *ac;
 
-	(void) pthread_mutex_init(&ready_lock, NULL);
-	(void) pthread_cond_init(&ready_cond, NULL);
-	TAILQ_FOREACH(ac, &ac_list_src, ac_glue)
+	TAILQ_FOREACH(ac, &ac_list_readers, ac_glue)
 		if (ac->ac_init_func(ac) == -1)
 			exit(2);
 }
 
+/*
+ * XXX Add error reporting to reader_init_*.
+ */
 int
 reader_init_net(struct au_cmpnt *ac)
 {
@@ -182,10 +238,6 @@
 		return (-1);
 	}
 	ac->ac_fd = fd;
-	if (nonblock(fd) == -1) {
-		(void) close(fd);
-		return (-1);
-	}
 	return (0);
 }
 
@@ -198,75 +250,9 @@
 	if (fd == -1)
 		return (-1);
 	ac->ac_fd = fd;
-	if (nonblock(fd) == -1) {
-		(void) close(fd);
-		return (-1);
-	}
 	return (0);
 }
 
-void
-reader_q_record(struct audit_record *ar, struct au_cmpnt *ac)
-{
-	int i;
-
-	for (i = 0; i < ac->ac_ndsts; i++)
-		reader_q_record_cmpnt(ar, ac->ac_dsts[i]);
-	free(ar);
-}
-
-void
-reader_q_record_cmpnt(struct audit_record *ar, struct au_cmpnt *ac)
-{
-	struct au_queue_ent *new;
-
-	new = malloc(sizeof(struct au_queue_ent));
-	assert(new != NULL);
-	bzero(new, sizeof(struct au_queue_ent));
-	/*
-	 * In most cases we will have a 1:1 relationship between source
-	 * and destination components.  We avoid an extra copy by reference
-	 * counting usage of this audit record.  This may be built on to
-	 * avoid copying altogether.
-	 */
-	if (ar->ar_count == 0)
-		new->aq_record.ar_buf = ar->ar_buf;
-	else {
-		new->aq_record.ar_buf = malloc(ar->ar_record_len);
-		assert(new->aq_record.ar_buf != NULL);
-		bcopy(ar->ar_buf, new->aq_record.ar_buf, ar->ar_record_len);
-	}
-	ar->ar_count++;
-	new->aq_record.ar_record_len = ar->ar_record_len;
-	new->aq_remain = ar->ar_record_len;
-	(void) pthread_mutex_lock(&ac->ac_q.qp_lock);
-	if (ac->ac_q.qp_store_len < WRITER_MAX) {
-		dprintf("queueing record for %s (%d bytes in queue)",
-		    ac->ac_name, ac->ac_q.qp_store_len);
-		(void) pthread_mutex_unlock(&ac->ac_q.qp_lock);
-		TAILQ_INSERT_TAIL(ac->ac_q.qp_store, new, aq_glue);
-		ac->ac_q.qp_store_len += ar->ar_record_len;
-		ac->ac_q.qp_store_n++;
-		return;
-	}
-	if (ac->ac_q.qp_hold != NULL) {
-		/* This consumer is still processing it's queue, so the record
-		 * is dropped. */
-		(void) pthread_mutex_unlock(&ac->ac_q.qp_lock);
-		dprintf("dropping record for %s", ac->ac_name);
-		free(new->aq_record.ar_buf);
-		free(new);
-		return;
-	}
-	dprintf("rotating queues for %s", ac->ac_name);
-	ROTATE();
-	WRITER_SIGNAL(ac->ac_q.qp_store_n);
-	(void) pthread_mutex_unlock(&ac->ac_q.qp_lock);
-	TAILQ_INSERT_TAIL(ac->ac_q.qp_store, new, aq_glue);
-	ac->ac_q.qp_store_len += ar->ar_record_len;
-	ac->ac_q.qp_store_n++;
-}
-
 int
 reader_read_pipe(struct au_cmpnt *ac)
 {
@@ -276,88 +262,88 @@
 
 	ret = read(ac->ac_fd, buf, sizeof(buf));
 	if (ret == -1) {
-		if ((errno == EINTR) || (errno == EAGAIN))
+		if (errno == EINTR)
 			return (0);
 		else
 			return (-1);
-	}
-	else if (ret == 0)
+	} else if (ret == 0)
 		return (-1);
-	ar = malloc(sizeof(struct audit_record));
-	assert(ar != NULL);
-	bzero(ar, sizeof(struct audit_record));
+	ar = calloc(1, sizeof(struct audit_record));
+	if (ar == NULL)
+		exit(2);
 	ar->ar_buf = malloc(ret);
-	assert(ar->ar_buf != NULL);
+	if (ar->ar_buf == NULL)
+		exit(2);
 	bcopy(buf, ar->ar_buf, ret);
 	ar->ar_record_len = ret;
-	dprintf("reader_read_pipe: read record %d bytes", ret);
-	reader_q_record(ar, ac);
+	dprintf("reader_read_pipe(%s): %d bytes", ac->ac_name, ret);
+	reader_buffer_record(ac, &ac->ac_buffer, ar);
 	return (0);
 }
 
 int
-reader_read_socket(struct au_src_buffer *asb)
+reader_read_socket(struct au_client *asb)
 {
 	u_char *bufptr, *recbufptr;
 	int ret, left;
 	u_int32_t hdr_remain, val, need;
 	u_char as_buf[2048];
 
-	ret = read(asb->as_fd, as_buf, sizeof(as_buf));
+	ret = read(asb->auc_fd, as_buf, sizeof(as_buf));
 	if (ret == -1) {
 		if (errno != EINTR)
 			return (-1);
 		else
 			return (0);
-	}
-	else if (ret == 0)
+	} else if (ret == 0)
 		return (-1);
 	left = ret;
 	bufptr = as_buf;
 	while (left > 0) {
-		if (asb->as_record == NULL) {
-			hdr_remain = sizeof(asb->as_header) -
-			    asb->as_nread;
+		if (asb->auc_record == NULL) {
+			hdr_remain = sizeof(asb->auc_header) -
+			    asb->auc_nread;
 			if (left >= hdr_remain) {
-				(void) memcpy(asb->as_header + asb->as_nread,
+				(void) memcpy(asb->auc_header + asb->auc_nread,
 				    bufptr, hdr_remain);
-				asb->as_nread += hdr_remain;
+				asb->auc_nread += hdr_remain;
 				left -= hdr_remain;
 				bufptr += hdr_remain;
-				(void) memcpy(&val, asb->as_header + 1,
+				(void) memcpy(&val, asb->auc_header + 1,
 				    sizeof(val));
-				asb->as_record =
+				asb->auc_record =
 				    malloc(sizeof(struct audit_record));
-				assert(asb->as_record != NULL);
-				asb->as_record->ar_record_len = be32toh(val);
-				asb->as_record->ar_buf = \
-				    malloc(asb->as_record->ar_record_len);
-				assert(asb->as_record->ar_buf != NULL);
-				(void) memcpy(asb->as_record->ar_buf,
-				    asb->as_header, sizeof(asb->as_header));
+				if (asb->auc_record == NULL)
+					exit(2);
+				asb->auc_record->ar_record_len = be32toh(val);
+				asb->auc_record->ar_buf = \
+				    malloc(asb->auc_record->ar_record_len);
+				if (asb->auc_record->ar_buf == NULL)
+					exit(2);
+				(void) memcpy(asb->auc_record->ar_buf,
+				    asb->auc_header, sizeof(asb->auc_header));
 				continue;
-			}
-			else {
-				(void) memcpy(asb->as_header + asb->as_nread,
+			} else {
+				(void) memcpy(asb->auc_header + asb->auc_nread,
 				    bufptr, left);
-				asb->as_nread += left;
+				asb->auc_nread += left;
 				return (0);
 			}
 		}
-		need = asb->as_record->ar_record_len - asb->as_nread;
-		recbufptr = asb->as_record->ar_buf + asb->as_nread;
+		need = asb->auc_record->ar_record_len - asb->auc_nread;
+		recbufptr = asb->auc_record->ar_buf + asb->auc_nread;
 		if (left < need) {
 			(void) memcpy(recbufptr, bufptr, left);
-			asb->as_nread += left;
+			asb->auc_nread += left;
 			return (0);
-		}
-		else {
+		} else {
 			(void) memcpy(recbufptr, bufptr, need);
 			left -= need;
 			bufptr += need;
-			reader_q_record(asb->as_record, asb->as_parent);
-			asb->as_record = NULL;
-			asb->as_nread = 0;
+			reader_buffer_record(asb->auc_parent,
+			    &asb->auc_buffer, asb->auc_record);
+			asb->auc_record = NULL;
+			asb->auc_nread = 0;
 		}
 	}
 	return (0);
@@ -366,25 +352,20 @@
 void
 reader_start()
 {
-	reader_init();
-	reader_build_rfds(&rfds);
-	for (;;)
-		reader_handler(&rfds);
-}
-
-void
-reader_timeout()
-{
 	struct au_cmpnt *ac;
+	pthread_t p;
+	int ret;
 
-	TAILQ_FOREACH(ac, &ac_list_dst, ac_glue) {
-		(void) pthread_mutex_lock(&ac->ac_q.qp_lock);
-		if ((ac->ac_q.qp_hold != NULL) || (ac->ac_q.qp_store_n == 0)) {
-			(void) pthread_mutex_unlock(&ac->ac_q.qp_lock);
-			continue;
-		}
-		ROTATE();
-		WRITER_SIGNAL(ac->ac_q.qp_store_n);
-		(void) pthread_mutex_unlock(&ac->ac_q.qp_lock);
+	reader_init();
+	TAILQ_FOREACH(ac, &ac_list_readers, ac_glue) {
+		ret = pthread_create(&p, NULL, reader_handler, ac);
+		assert(ret == 0);
 	}
+	writer_start();
+	/*
+	 * Reader threads will always exist, so we suspend execution of
+	 * the main thread using the last reader thread which was created.
+	 */
+	(void) pthread_join(p, NULL);
+	exit(0);
 }

==== //depot/projects/trustedbsd/netauditd/reader.h#3 (text+ko) ====

@@ -24,19 +24,17 @@
  * SUCH DAMAGE.

>>> TRUNCATED FOR MAIL (1000 lines) <<<



Want to link to this message? Use this URL: <https://mail-archive.FreeBSD.org/cgi/mid.cgi?200805182013.m4IKD8Gg058015>