From owner-p4-projects@FreeBSD.ORG Sun May 18 20:13:09 2008 Return-Path: Delivered-To: p4-projects@freebsd.org Received: by hub.freebsd.org (Postfix, from userid 32767) id F2A001065670; Sun, 18 May 2008 20:13:08 +0000 (UTC) Delivered-To: perforce@freebsd.org Received: from mx1.freebsd.org (mx1.freebsd.org [IPv6:2001:4f8:fff6::34]) by hub.freebsd.org (Postfix) with ESMTP id B2E07106566B for ; Sun, 18 May 2008 20:13:08 +0000 (UTC) (envelope-from alm@freebsd.org) Received: from repoman.freebsd.org (repoman.freebsd.org [IPv6:2001:4f8:fff6::29]) by mx1.freebsd.org (Postfix) with ESMTP id A29318FC26 for ; Sun, 18 May 2008 20:13:08 +0000 (UTC) (envelope-from alm@freebsd.org) Received: from repoman.freebsd.org (localhost [127.0.0.1]) by repoman.freebsd.org (8.14.1/8.14.1) with ESMTP id m4IKD8sf058017 for ; Sun, 18 May 2008 20:13:08 GMT (envelope-from alm@freebsd.org) Received: (from perforce@localhost) by repoman.freebsd.org (8.14.1/8.14.1/Submit) id m4IKD8Gg058015 for perforce@freebsd.org; Sun, 18 May 2008 20:13:08 GMT (envelope-from alm@freebsd.org) Date: Sun, 18 May 2008 20:13:08 GMT Message-Id: <200805182013.m4IKD8Gg058015@repoman.freebsd.org> X-Authentication-Warning: repoman.freebsd.org: perforce set sender to alm@freebsd.org using -f From: Aaron Meihm To: Perforce Change Reviews Cc: Subject: PERFORCE change 141820 for review X-BeenThere: p4-projects@freebsd.org X-Mailman-Version: 2.1.5 Precedence: list List-Id: p4 projects tree changes List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Sun, 18 May 2008 20:13:09 -0000 http://perforce.freebsd.org/chv.cgi?CH=141820 Change 141820 by alm@alm_praetorian on 2008/05/18 20:12:18 Remove some old cruft, support for all transport methods is now implemented (pipe, net, trail). Affected files ... .. //depot/projects/trustedbsd/netauditd/conf.c#9 edit .. //depot/projects/trustedbsd/netauditd/conf.h#2 edit .. //depot/projects/trustedbsd/netauditd/grammar.y#4 edit .. //depot/projects/trustedbsd/netauditd/netauditd.c#18 edit .. //depot/projects/trustedbsd/netauditd/netauditd.conf#6 edit .. //depot/projects/trustedbsd/netauditd/netauditd.h#15 edit .. //depot/projects/trustedbsd/netauditd/reader.c#5 edit .. //depot/projects/trustedbsd/netauditd/reader.h#3 edit .. //depot/projects/trustedbsd/netauditd/token.l#2 edit .. //depot/projects/trustedbsd/netauditd/writer.c#6 edit .. //depot/projects/trustedbsd/netauditd/writer.h#3 edit Differences ... ==== //depot/projects/trustedbsd/netauditd/conf.c#9 (text+ko) ==== @@ -23,6 +23,7 @@ * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. */ + #include #include #include @@ -59,11 +60,11 @@ } struct au_cmpnt * -conf_get_src(char *name) +conf_get_reader(char *name) { struct au_cmpnt *ret; - TAILQ_FOREACH(ret, &ac_list_src, ac_glue) { + TAILQ_FOREACH(ret, &ac_list_readers, ac_glue) { if (strcmp(ret->ac_name, name) == 0) return (ret); } @@ -71,11 +72,11 @@ } struct au_cmpnt * -conf_get_dst(char *name) +conf_get_writer(char *name) { struct au_cmpnt *ret; - TAILQ_FOREACH(ret, &ac_list_dst, ac_glue) { + TAILQ_FOREACH(ret, &ac_list_writers, ac_glue) { if (strcmp(ret->ac_name, name) == 0) return (ret); } @@ -87,13 +88,15 @@ { if ((src == NULL) || (dst == NULL)) conf_error("A component specified does not exist"); - src->ac_ndsts++; - if (src->ac_dsts == NULL) - src->ac_dsts = malloc(sizeof(struct au_cmpnt *)); + src->ac_nwriters++; + if (src->ac_writers == NULL) + src->ac_writers = malloc(sizeof(struct au_cmpnt *)); else - src->ac_dsts = realloc(src->ac_dsts, - sizeof(struct au_cmpnt *) * src->ac_ndsts); - src->ac_dsts[src->ac_ndsts - 1] = dst; + src->ac_writers = realloc(src->ac_writers, + sizeof(struct au_cmpnt *) * src->ac_nwriters); + if (src->ac_writers == NULL) + exit(2); + src->ac_writers[src->ac_nwriters - 1] = dst; } void ==== //depot/projects/trustedbsd/netauditd/conf.h#2 (text+ko) ==== @@ -27,8 +27,8 @@ extern int lineno; void conf_error(char *, ...); -struct au_cmpnt *conf_get_src(char *); -struct au_cmpnt *conf_get_dst(char *); +struct au_cmpnt *conf_get_reader(char *); +struct au_cmpnt *conf_get_writer(char *); void conf_link(struct au_cmpnt *, struct au_cmpnt *); void conf_load(char *); void yyerror(const char *); ==== //depot/projects/trustedbsd/netauditd/grammar.y#4 (text+ko) ==== @@ -24,6 +24,7 @@ * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. */ + #include #include #include @@ -32,6 +33,7 @@ #include #include #include +#include #include #include "conf.h" @@ -39,10 +41,9 @@ #include "reader.h" #include "writer.h" -#define AU_CMPNT_INIT(x) x = malloc(sizeof(struct au_cmpnt)); \ - assert (x != NULL); \ - bzero(x, sizeof(struct au_cmpnt)); \ - TAILQ_INIT(&x->ac_sbuffers); +#define AU_CMPNT_INIT(x) x = calloc(1, sizeof(struct au_cmpnt)); \ + if (x == NULL) \ + exit(2); static int ainfo_passive; @@ -55,10 +56,10 @@ struct au_cmpnt *ac; } -%token SRC DST MAP NEWLINE STRING PIPE_TOKEN NET_TOKEN TRAIL_TOKEN +%token READ WRITE MAP NEWLINE STRING PIPE_TOKEN NET_TOKEN TRAIL_TOKEN %type STRING -%type src_component_spec dst_component_spec src_component_name -%type dst_component_name +%type read_component_spec write_component_spec read_component_name +%type write_component_name %type ainfo; %% @@ -69,17 +70,17 @@ ; cmd : - SRC src_component_spec + READ read_component_spec { - TAILQ_INSERT_TAIL(&ac_list_src, $2, ac_glue); + TAILQ_INSERT_TAIL(&ac_list_readers, $2, ac_glue); } | - DST dst_component_spec + WRITE write_component_spec { - TAILQ_INSERT_TAIL(&ac_list_dst, $2, ac_glue); + TAILQ_INSERT_TAIL(&ac_list_writers, $2, ac_glue); } | - MAP src_component_name dst_component_name + MAP read_component_name write_component_name { conf_link($2, $3); } @@ -90,54 +91,65 @@ nl NEWLINE ; -src_component_name : +read_component_name : STRING { struct au_cmpnt *ret; - ret = conf_get_src($1); + ret = conf_get_reader($1); free($1); $$ = ret; } ; -dst_component_name : +write_component_name : STRING { struct au_cmpnt *ret; - ret = conf_get_dst($1); + ret = conf_get_writer($1); free($1); $$ = ret; } ; -dst_component_spec : +write_component_spec : NET_TOKEN { ainfo_passive = 0; } STRING ainfo { struct au_cmpnt *new; + int ret; AU_CMPNT_INIT(new); new->ac_type = COMPONENT_NET; new->ac_name = $3; new->ac_ainfo = $4; new->ac_init_func = writer_init_net; - writer_q_init(new); + new->ac_write_func = writer_write_fd; + ret = pthread_mutex_init(&new->ac_writer_lock, NULL); + assert(ret == 0); + ret = pthread_cond_init(&new->ac_writer_cond, NULL); + assert(ret == 0); + TAILQ_INIT(&new->ac_writer_q); $$ = new; } | TRAIL_TOKEN STRING STRING { struct au_cmpnt *new; + int ret; AU_CMPNT_INIT(new); new->ac_type = COMPONENT_TRAIL; new->ac_name = $2; new->ac_path = $3; new->ac_init_func = writer_init_trail; - new->ac_write_func = writer_write_trail; - writer_q_init(new); + new->ac_write_func = writer_write_fd; + ret = pthread_mutex_init(&new->ac_writer_lock, NULL); + assert(ret == 0); + ret = pthread_cond_init(&new->ac_writer_cond, NULL); + assert(ret == 0); + TAILQ_INIT(&new->ac_writer_q); $$ = new; } ; -src_component_spec : +read_component_spec : PIPE_TOKEN STRING STRING { struct au_cmpnt *new; ==== //depot/projects/trustedbsd/netauditd/netauditd.c#18 (text+ko) ==== @@ -23,6 +23,7 @@ * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. */ + #include #include #include @@ -32,10 +33,9 @@ #include #include #include -#include -#include #include #include +#include #include "conf.h" #include "netauditd.h" @@ -43,10 +43,9 @@ #include "writer.h" static int debug_flag; -pthread_mutex_t debug_mutex; -ac_head_t ac_list_src; -ac_head_t ac_list_dst; - +static pthread_mutex_t debug_mutex; +ac_head_t ac_list_readers; +ac_head_t ac_list_writers; extern char *conf_path; void @@ -54,27 +53,28 @@ { char buf[1024]; va_list ap; + int r; if (!debug_flag) return; - if (pthread_mutex_lock(&debug_mutex) != 0) - exit(2); + r = pthread_mutex_lock(&debug_mutex); + assert(r == 0); va_start(ap, fmt); (void) vsnprintf(buf, sizeof(buf), fmt, ap); va_end(ap); (void) printf("debug: %s\n", buf); - if (pthread_mutex_unlock(&debug_mutex) != 0) - exit(2); + r = pthread_mutex_unlock(&debug_mutex); + assert(r == 0); } int main(int argc, char *argv[]) { - pthread_t writer_thread; char ch; + int r; - if (pthread_mutex_init(&debug_mutex, NULL) != 0) - exit(2); + r = pthread_mutex_init(&debug_mutex, NULL); + assert(r == 0); conf_path = DEFAULT_CONF_PATH; while ((ch = getopt(argc, argv, "df:h")) != -1) { switch (ch) { @@ -91,29 +91,13 @@ } } (void) signal(SIGPIPE, SIG_IGN); - TAILQ_INIT(&ac_list_src); - TAILQ_INIT(&ac_list_dst); + TAILQ_INIT(&ac_list_readers); + TAILQ_INIT(&ac_list_writers); conf_load(conf_path); - if (pthread_create(&writer_thread, NULL, writer_start, NULL) != 0) - exit(2); reader_start(); return (0); } -int -nonblock(int fd) -{ - int flags; - - flags = fcntl(fd, F_GETFL); - if (flags == -1) - return (-1); - flags |= O_NONBLOCK; - if (fcntl(fd, F_SETFL, flags) == -1) - return (-1); - return (0); -} - void usage() { ==== //depot/projects/trustedbsd/netauditd/netauditd.conf#6 (text+ko) ==== @@ -1,7 +1,7 @@ -src: p source_pipe /dev/auditpipe -src: n source_net 0.0.0.0 6655 -dst: n dst_net 127.0.0.1 6655 -dst: t dst_trail /tmp/trail +read: p source_pipe /dev/auditpipe +read: n source_net 0.0.0.0 6655 +write: n dst_net 127.0.0.1 6655 +write: t dst_trail /tmp/trail map: source_pipe dst_net map: source_net dst_trail ==== //depot/projects/trustedbsd/netauditd/netauditd.h#15 (text+ko) ==== @@ -24,6 +24,7 @@ * SUCH DAMAGE. */ +#define AU_BUFFER_SIZE 8192 #define DEFAULT_CONF_PATH "/usr/local/etc/netauditd.conf" enum { @@ -32,68 +33,58 @@ COMPONENT_TRAIL }; -#define FLAG_ONLINE 1 /* Component is online */ -#define FLAG_CONNECTING (1 << 1) /* Component connecting */ - struct audit_record { void *ar_buf; int ar_count; u_int32_t ar_record_len; }; -struct au_queue_ent { - TAILQ_ENTRY(au_queue_ent) aq_glue; - struct audit_record aq_record; - u_int32_t aq_remain; +struct au_buffer { + u_char ab_buf[AU_BUFFER_SIZE]; + size_t ab_size; + TAILQ_ENTRY(au_buffer) ab_glue; }; -typedef TAILQ_HEAD(, au_queue_ent) au_q_t; +typedef TAILQ_HEAD(, au_buffer) ab_q_t; -struct au_qpair { - au_q_t qp_a, qp_b; - pthread_mutex_t qp_lock; - au_q_t *qp_store; - au_q_t *qp_hold; - au_q_t *qp_free; - u_int32_t qp_store_len; - int qp_store_n; -}; - struct au_cmpnt { struct addrinfo *ac_ainfo; - struct au_cmpnt **ac_dsts; - time_t ac_failed; + struct au_cmpnt **ac_writers; + int ac_nwriters; + int ac_connected; int ac_fd; - int ac_flags; - TAILQ_ENTRY(au_cmpnt) ac_glue; char *ac_name; - int ac_ndsts; char *ac_path; - struct au_qpair ac_q; int ac_type; + struct au_buffer ac_buffer; - TAILQ_HEAD(, au_src_buffer) ac_sbuffers; + pthread_mutex_t ac_writer_lock; + pthread_cond_t ac_writer_cond; + int ac_writer_nbufs; + u_int32_t ac_writer_index; + ab_q_t ac_writer_q; int (*ac_init_func)(struct au_cmpnt *); int (*ac_read_func)(struct au_cmpnt *); int (*ac_write_func)(struct au_cmpnt *); + + TAILQ_ENTRY(au_cmpnt) ac_glue; }; -struct au_src_buffer { - struct sockaddr as_addr; - socklen_t as_addrlen; - int as_fd; - TAILQ_ENTRY(au_src_buffer) as_glue; - u_char as_header[5]; - struct au_cmpnt *as_parent; - u_int32_t as_nread; - struct audit_record *as_record; +struct au_client { + struct sockaddr auc_addr; + socklen_t auc_addrlen; + int auc_fd; + u_char auc_header[5]; + struct au_cmpnt *auc_parent; + u_int32_t auc_nread; + struct audit_record *auc_record; + struct au_buffer auc_buffer; }; typedef TAILQ_HEAD(, au_cmpnt) ac_head_t; -extern ac_head_t ac_list_src; -extern ac_head_t ac_list_dst; +extern ac_head_t ac_list_readers; +extern ac_head_t ac_list_writers; void dprintf(char *, ...); -int nonblock(int); void usage(void); ==== //depot/projects/trustedbsd/netauditd/reader.c#5 (text+ko) ==== @@ -23,6 +23,7 @@ * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. */ + #include #include #include @@ -44,112 +45,166 @@ #include "reader.h" #include "writer.h" -#define SRC_BUFFER_INIT(x) do { \ - x = calloc(1, sizeof(struct au_src_buffer)); \ - assert(x != NULL); \ -} while (0) - -#define WRITER_SIGNAL(x) do { \ - (void) pthread_mutex_lock(&ready_lock); \ - records_waiting += x; \ - x = 0; \ - (void) pthread_cond_signal(&ready_cond); \ - (void) pthread_mutex_unlock(&ready_lock); \ -} while (0) - -#define ROTATE() do { \ - assert(ac->ac_q.qp_free != NULL); \ - ac->ac_q.qp_hold = ac->ac_q.qp_store; \ - ac->ac_q.qp_store = ac->ac_q.qp_free; \ - ac->ac_q.qp_free = NULL; \ - ac->ac_q.qp_store_len = 0; \ -} while (0) - -pthread_mutex_t ready_lock; -pthread_cond_t ready_cond; -u_int32_t records_waiting; - -static fd_set rfds; - int reader_accept_client(struct au_cmpnt *ac) { - struct au_src_buffer *new; + struct au_client *new; struct sockaddr addr; socklen_t addrlen; - int fd; + int fd, ret; + pthread_t p; fd = accept(ac->ac_fd, &addr, &addrlen); if (fd == -1) { - if ((errno != EINTR) && (errno != EWOULDBLOCK) && - (errno != ECONNABORTED)) + if ((errno == EINTR) || (errno == ECONNABORTED)) + return (0); + else return (-1); - else - return (0); } - SRC_BUFFER_INIT(new); - new->as_parent = ac; - new->as_fd = fd; - bcopy(&addr, &new->as_addr, sizeof(new->as_addr)); - new->as_addrlen = addrlen; - TAILQ_INSERT_TAIL(&ac->ac_sbuffers, new, as_glue); - reader_build_rfds(&rfds); + new = calloc(1, sizeof(struct au_client)); + if (new == NULL) + exit(2); + new->auc_parent = ac; + new->auc_fd = fd; + bcopy(&addr, &new->auc_addr, sizeof(new->auc_addr)); + new->auc_addrlen = addrlen; + ret = pthread_create(&p, NULL, reader_client_handler, new); + assert(ret == 0); + (void) pthread_detach(p); + dprintf("reader_accept_client(%s): new client %d", ac->ac_name, + new->auc_fd); return (0); } void -reader_build_rfds(fd_set *rfds) +reader_push(struct au_cmpnt *ac, struct au_buffer *ab) { - struct au_cmpnt *ac; - struct au_src_buffer *s; + struct au_buffer *clone; + struct au_cmpnt *dst; + int ret, i; - FD_ZERO(rfds); - TAILQ_FOREACH(ac, &ac_list_src, ac_glue) { - FD_SET(ac->ac_fd, rfds); - if (ac->ac_type == COMPONENT_NET) { - TAILQ_FOREACH(s, &ac->ac_sbuffers, as_glue) - FD_SET(s->as_fd, rfds); - } + if (ab->ab_size <= 0) + return; + for (i = 0; i < ac->ac_nwriters; i++) { + dst = ac->ac_writers[i]; + clone = malloc(sizeof(struct au_buffer)); + bcopy(ab, clone, sizeof(struct au_buffer)); + ret = pthread_mutex_lock(&dst->ac_writer_lock); + assert(ret == 0); + TAILQ_INSERT_TAIL(&dst->ac_writer_q, clone, ab_glue); + dst->ac_writer_nbufs++; + dprintf("reader_push(%s): %s (%d)", ac->ac_name, + dst->ac_name, dst->ac_writer_nbufs); + clone = NULL; + ret = pthread_cond_signal(&dst->ac_writer_cond); + assert(ret == 0); + ret = pthread_mutex_unlock(&dst->ac_writer_lock); + assert(ret == 0); } } void -reader_handler(fd_set *rfds) +reader_buffer_record(struct au_cmpnt *ac, struct au_buffer *ab, + struct audit_record *ar) { - struct au_src_buffer *as, *tmp; - struct au_cmpnt *ac; - fd_set lrfds; - struct timeval tv; - int ret, ret2; + u_int32_t rlen = ar->ar_record_len; + u_char *ptr; - lrfds = *rfds; - bzero(&tv, sizeof(struct timeval)); - tv.tv_sec = 5; - ret = select(FD_SETSIZE, &lrfds, NULL, NULL, &tv); - if (ret == -1) { - if (errno == EINTR) - return; - else - exit(2); + if ((AU_BUFFER_SIZE - ab->ab_size) < rlen) { + reader_push(ac, ab); + ab->ab_size = 0; } - else if (ret == 0) { - reader_timeout(); + /* + * In the case where the record is too big for the audit + * buffer, the record is dropped. + */ + if ((ab->ab_size == 0) && (rlen > AU_BUFFER_SIZE)) { + dprintf("reader_buffer_record(%s): dropping record, too" + " large (%u)", ac->ac_name, rlen); + free(ar->ar_buf); return; } - TAILQ_FOREACH(ac, &ac_list_src, ac_glue) { - if (FD_ISSET(ac->ac_fd, &lrfds)) { - ret2 = ac->ac_read_func(ac); - if (ret2 == -1) + dprintf("reader_buffer_record(%p): appending %u bytes (%u)", + ab, rlen, ab->ab_size); + ptr = ab->ab_buf + ab->ab_size; + bcopy(ar->ar_buf, ptr, rlen); + ab->ab_size += rlen; + free(ar->ar_buf); + free(ar); +} + +void * +reader_client_handler(void *arg) +{ + struct timeval tv; + struct au_client *ac; + fd_set rfds; + int ret; + + ac = (struct au_client *)arg; + for (;;) { + FD_ZERO(&rfds); + FD_SET(ac->auc_fd, &rfds); + tv.tv_sec = READER_TIMEOUT; + tv.tv_usec = 0; + ret = select(FD_SETSIZE, &rfds, NULL, NULL, &tv); + if (ret == -1) { + if (errno == EINTR) + continue; + else + break; + } else if (ret == 0) { + dprintf("reader_client_handler(%s): client %d timeout", + ac->auc_parent->ac_name, ac->auc_fd); + reader_push(ac->auc_parent, &ac->auc_buffer); + ac->auc_buffer.ab_size = 0; + continue; + } + if (FD_ISSET(ac->auc_fd, &rfds)) { + ret = reader_read_socket(ac); + if (ret == -1) + break; + } + } + dprintf("reader_client_handler(%s): client %d terminating", + ac->auc_parent->ac_name, ac->auc_fd); + (void) close(ac->auc_fd); + return (NULL); /* Thread termination */ +} + +void * +reader_handler(void *arg) +{ + struct timeval tv; + struct au_cmpnt *ac; + fd_set rfds; + int ret; + + ac = (struct au_cmpnt *)arg; + for (;;) { + FD_ZERO(&rfds); + FD_SET(ac->ac_fd, &rfds); + tv.tv_sec = READER_TIMEOUT; + tv.tv_usec = 0; + ret = select(FD_SETSIZE, &rfds, NULL, NULL, &tv); + if (ret == -1) { + if (errno == EINTR) + continue; + else exit(2); + } else if (ret == 0) { + if (ac->ac_type == COMPONENT_PIPE) { + dprintf("reader_handler(%s): timeout", + ac->ac_name); + reader_push(ac, &ac->ac_buffer); + ac->ac_buffer.ab_size = 0; + } + continue; } - if (ac->ac_type == COMPONENT_NET) { - TAILQ_FOREACH_SAFE(as, &ac->ac_sbuffers, as_glue, - tmp) - if (FD_ISSET(as->as_fd, &lrfds)) { - ret2 = reader_read_socket(as); - } - } + if (ac->ac_read_func(ac) == -1) + exit(2); } + return (NULL); } void @@ -157,13 +212,14 @@ { struct au_cmpnt *ac; - (void) pthread_mutex_init(&ready_lock, NULL); - (void) pthread_cond_init(&ready_cond, NULL); - TAILQ_FOREACH(ac, &ac_list_src, ac_glue) + TAILQ_FOREACH(ac, &ac_list_readers, ac_glue) if (ac->ac_init_func(ac) == -1) exit(2); } +/* + * XXX Add error reporting to reader_init_*. + */ int reader_init_net(struct au_cmpnt *ac) { @@ -182,10 +238,6 @@ return (-1); } ac->ac_fd = fd; - if (nonblock(fd) == -1) { - (void) close(fd); - return (-1); - } return (0); } @@ -198,75 +250,9 @@ if (fd == -1) return (-1); ac->ac_fd = fd; - if (nonblock(fd) == -1) { - (void) close(fd); - return (-1); - } return (0); } -void -reader_q_record(struct audit_record *ar, struct au_cmpnt *ac) -{ - int i; - - for (i = 0; i < ac->ac_ndsts; i++) - reader_q_record_cmpnt(ar, ac->ac_dsts[i]); - free(ar); -} - -void -reader_q_record_cmpnt(struct audit_record *ar, struct au_cmpnt *ac) -{ - struct au_queue_ent *new; - - new = malloc(sizeof(struct au_queue_ent)); - assert(new != NULL); - bzero(new, sizeof(struct au_queue_ent)); - /* - * In most cases we will have a 1:1 relationship between source - * and destination components. We avoid an extra copy by reference - * counting usage of this audit record. This may be built on to - * avoid copying altogether. - */ - if (ar->ar_count == 0) - new->aq_record.ar_buf = ar->ar_buf; - else { - new->aq_record.ar_buf = malloc(ar->ar_record_len); - assert(new->aq_record.ar_buf != NULL); - bcopy(ar->ar_buf, new->aq_record.ar_buf, ar->ar_record_len); - } - ar->ar_count++; - new->aq_record.ar_record_len = ar->ar_record_len; - new->aq_remain = ar->ar_record_len; - (void) pthread_mutex_lock(&ac->ac_q.qp_lock); - if (ac->ac_q.qp_store_len < WRITER_MAX) { - dprintf("queueing record for %s (%d bytes in queue)", - ac->ac_name, ac->ac_q.qp_store_len); - (void) pthread_mutex_unlock(&ac->ac_q.qp_lock); - TAILQ_INSERT_TAIL(ac->ac_q.qp_store, new, aq_glue); - ac->ac_q.qp_store_len += ar->ar_record_len; - ac->ac_q.qp_store_n++; - return; - } - if (ac->ac_q.qp_hold != NULL) { - /* This consumer is still processing it's queue, so the record - * is dropped. */ - (void) pthread_mutex_unlock(&ac->ac_q.qp_lock); - dprintf("dropping record for %s", ac->ac_name); - free(new->aq_record.ar_buf); - free(new); - return; - } - dprintf("rotating queues for %s", ac->ac_name); - ROTATE(); - WRITER_SIGNAL(ac->ac_q.qp_store_n); - (void) pthread_mutex_unlock(&ac->ac_q.qp_lock); - TAILQ_INSERT_TAIL(ac->ac_q.qp_store, new, aq_glue); - ac->ac_q.qp_store_len += ar->ar_record_len; - ac->ac_q.qp_store_n++; -} - int reader_read_pipe(struct au_cmpnt *ac) { @@ -276,88 +262,88 @@ ret = read(ac->ac_fd, buf, sizeof(buf)); if (ret == -1) { - if ((errno == EINTR) || (errno == EAGAIN)) + if (errno == EINTR) return (0); else return (-1); - } - else if (ret == 0) + } else if (ret == 0) return (-1); - ar = malloc(sizeof(struct audit_record)); - assert(ar != NULL); - bzero(ar, sizeof(struct audit_record)); + ar = calloc(1, sizeof(struct audit_record)); + if (ar == NULL) + exit(2); ar->ar_buf = malloc(ret); - assert(ar->ar_buf != NULL); + if (ar->ar_buf == NULL) + exit(2); bcopy(buf, ar->ar_buf, ret); ar->ar_record_len = ret; - dprintf("reader_read_pipe: read record %d bytes", ret); - reader_q_record(ar, ac); + dprintf("reader_read_pipe(%s): %d bytes", ac->ac_name, ret); + reader_buffer_record(ac, &ac->ac_buffer, ar); return (0); } int -reader_read_socket(struct au_src_buffer *asb) +reader_read_socket(struct au_client *asb) { u_char *bufptr, *recbufptr; int ret, left; u_int32_t hdr_remain, val, need; u_char as_buf[2048]; - ret = read(asb->as_fd, as_buf, sizeof(as_buf)); + ret = read(asb->auc_fd, as_buf, sizeof(as_buf)); if (ret == -1) { if (errno != EINTR) return (-1); else return (0); - } - else if (ret == 0) + } else if (ret == 0) return (-1); left = ret; bufptr = as_buf; while (left > 0) { - if (asb->as_record == NULL) { - hdr_remain = sizeof(asb->as_header) - - asb->as_nread; + if (asb->auc_record == NULL) { + hdr_remain = sizeof(asb->auc_header) - + asb->auc_nread; if (left >= hdr_remain) { - (void) memcpy(asb->as_header + asb->as_nread, + (void) memcpy(asb->auc_header + asb->auc_nread, bufptr, hdr_remain); - asb->as_nread += hdr_remain; + asb->auc_nread += hdr_remain; left -= hdr_remain; bufptr += hdr_remain; - (void) memcpy(&val, asb->as_header + 1, + (void) memcpy(&val, asb->auc_header + 1, sizeof(val)); - asb->as_record = + asb->auc_record = malloc(sizeof(struct audit_record)); - assert(asb->as_record != NULL); - asb->as_record->ar_record_len = be32toh(val); - asb->as_record->ar_buf = \ - malloc(asb->as_record->ar_record_len); - assert(asb->as_record->ar_buf != NULL); - (void) memcpy(asb->as_record->ar_buf, - asb->as_header, sizeof(asb->as_header)); + if (asb->auc_record == NULL) + exit(2); + asb->auc_record->ar_record_len = be32toh(val); + asb->auc_record->ar_buf = \ + malloc(asb->auc_record->ar_record_len); + if (asb->auc_record->ar_buf == NULL) + exit(2); + (void) memcpy(asb->auc_record->ar_buf, + asb->auc_header, sizeof(asb->auc_header)); continue; - } - else { - (void) memcpy(asb->as_header + asb->as_nread, + } else { + (void) memcpy(asb->auc_header + asb->auc_nread, bufptr, left); - asb->as_nread += left; + asb->auc_nread += left; return (0); } } - need = asb->as_record->ar_record_len - asb->as_nread; - recbufptr = asb->as_record->ar_buf + asb->as_nread; + need = asb->auc_record->ar_record_len - asb->auc_nread; + recbufptr = asb->auc_record->ar_buf + asb->auc_nread; if (left < need) { (void) memcpy(recbufptr, bufptr, left); - asb->as_nread += left; + asb->auc_nread += left; return (0); - } - else { + } else { (void) memcpy(recbufptr, bufptr, need); left -= need; bufptr += need; - reader_q_record(asb->as_record, asb->as_parent); - asb->as_record = NULL; - asb->as_nread = 0; + reader_buffer_record(asb->auc_parent, + &asb->auc_buffer, asb->auc_record); + asb->auc_record = NULL; + asb->auc_nread = 0; } } return (0); @@ -366,25 +352,20 @@ void reader_start() { - reader_init(); - reader_build_rfds(&rfds); - for (;;) - reader_handler(&rfds); -} - -void -reader_timeout() -{ struct au_cmpnt *ac; + pthread_t p; + int ret; - TAILQ_FOREACH(ac, &ac_list_dst, ac_glue) { - (void) pthread_mutex_lock(&ac->ac_q.qp_lock); - if ((ac->ac_q.qp_hold != NULL) || (ac->ac_q.qp_store_n == 0)) { - (void) pthread_mutex_unlock(&ac->ac_q.qp_lock); - continue; - } - ROTATE(); - WRITER_SIGNAL(ac->ac_q.qp_store_n); - (void) pthread_mutex_unlock(&ac->ac_q.qp_lock); + reader_init(); + TAILQ_FOREACH(ac, &ac_list_readers, ac_glue) { + ret = pthread_create(&p, NULL, reader_handler, ac); + assert(ret == 0); } + writer_start(); + /* + * Reader threads will always exist, so we suspend execution of + * the main thread using the last reader thread which was created. + */ + (void) pthread_join(p, NULL); + exit(0); } ==== //depot/projects/trustedbsd/netauditd/reader.h#3 (text+ko) ==== @@ -24,19 +24,17 @@ * SUCH DAMAGE. >>> TRUNCATED FOR MAIL (1000 lines) <<<