Date: Sun, 6 Apr 2008 00:28:54 GMT From: Aaron Meihm <alm@FreeBSD.org> To: Perforce Change Reviews <perforce@freebsd.org> Subject: PERFORCE change 139439 for review Message-ID: <200804060028.m360Ssfq078410@repoman.freebsd.org>
next in thread | raw e-mail | index | archive | help
http://perforce.freebsd.org/chv.cgi?CH=139439 Change 139439 by alm@alm_praetorian on 2008/04/06 00:27:56 Additional code working towards new threaded producer/consumer model. Affected files ... .. //depot/projects/trustedbsd/netauditd/grammar.y#3 edit .. //depot/projects/trustedbsd/netauditd/netauditd.h#13 edit .. //depot/projects/trustedbsd/netauditd/reader.c#2 edit .. //depot/projects/trustedbsd/netauditd/reader.h#2 edit .. //depot/projects/trustedbsd/netauditd/writer.c#2 edit .. //depot/projects/trustedbsd/netauditd/writer.h#2 edit Differences ... ==== //depot/projects/trustedbsd/netauditd/grammar.y#3 (text+ko) ==== @@ -131,6 +131,7 @@ new->ac_name = $2; new->ac_path = $3; new->ac_init_func = writer_init_trail; + new->ac_write_func = writer_write_trail; writer_q_init(new); $$ = new; } ==== //depot/projects/trustedbsd/netauditd/netauditd.h#13 (text+ko) ==== @@ -38,12 +38,11 @@ struct audit_record { void *ar_buf; u_int32_t ar_record_len; - int ar_refcount; }; struct au_queue_ent { TAILQ_ENTRY(au_queue_ent) aq_glue; - struct audit_record *aq_record; + struct audit_record aq_record; u_int32_t aq_remain; }; @@ -51,12 +50,12 @@ struct au_qpair { au_q_t qp_a, qp_b; - int qp_ready; - au_q_t *qp_read, *qp_write; pthread_mutex_t qp_lock; - pthread_cond_t qp_cond; - u_int32_t qp_read_size; - time_t qp_time; + au_q_t *qp_store; + au_q_t *qp_hold; + au_q_t *qp_free; + u_int32_t qp_store_len; + int qp_store_n; }; struct au_cmpnt { @@ -76,6 +75,7 @@ int (*ac_init_func)(struct au_cmpnt *); int (*ac_read_func)(struct au_cmpnt *); + int (*ac_write_func)(struct au_cmpnt *); }; struct au_src_buffer { ==== //depot/projects/trustedbsd/netauditd/reader.c#2 (text+ko) ==== @@ -26,6 +26,7 @@ #include <sys/types.h> #include <sys/socket.h> #include <sys/queue.h> +#include <sys/endian.h> #include <stdio.h> #include <stdlib.h> @@ -43,22 +44,27 @@ #include "reader.h" #include "writer.h" -#define ROTATE(x) if (x.qp_read == &x.qp_a) { \ - x.qp_read = &x.qp_b; \ - x.qp_write = &x.qp_a; \ - } \ - else { \ - x.qp_read = &x.qp_a; \ - x.qp_write = &x.qp_b; \ - } \ - x.qp_read_size = 0; \ - x.qp_time = time(NULL); - #define SRC_BUFFER_INIT(x) x = malloc(sizeof(struct au_src_buffer)); \ assert (x != NULL); \ bzero(x, sizeof(struct au_src_buffer)); -static int srcs_online; /* All source components online */ +#define WRITER_SIGNAL(x) (void) pthread_mutex_lock(&ready_lock); \ + records_waiting += x; \ + x = 0; \ + (void) pthread_cond_signal(&ready_cond); \ + (void) pthread_mutex_unlock(&ready_lock) + +#define ROTATE assert(ac->ac_q.qp_free != NULL); \ + ac->ac_q.qp_hold = ac->ac_q.qp_store; \ + ac->ac_q.qp_store = ac->ac_q.qp_free; \ + ac->ac_q.qp_free = NULL; \ + ac->ac_q.qp_store_len = 0 + +pthread_mutex_t ready_lock; +pthread_cond_t ready_cond; +u_int32_t records_waiting; + +static fd_set rfds; int reader_accept_client(struct au_cmpnt *ac) @@ -82,6 +88,7 @@ bcopy(&addr, &new->as_addr, sizeof(new->as_addr)); new->as_addrlen = addrlen; TAILQ_INSERT_TAIL(&ac->ac_sbuffers, new, as_glue); + reader_build_rfds(&rfds); return (0); } @@ -89,25 +96,30 @@ reader_build_rfds(fd_set *rfds) { struct au_cmpnt *ac; + struct au_src_buffer *s; FD_ZERO(rfds); TAILQ_FOREACH(ac, &ac_list_src, ac_glue) { - if (ac->ac_flags & FLAG_ONLINE) - FD_SET(ac->ac_fd, rfds); + FD_SET(ac->ac_fd, rfds); + if (ac->ac_type == COMPONENT_NET) { + TAILQ_FOREACH(s, &ac->ac_sbuffers, as_glue) + FD_SET(s->as_fd, rfds); + } } } void reader_handler(fd_set *rfds) { - struct au_cmpnt *ac, *tmp; + struct au_src_buffer *as, *tmp; + struct au_cmpnt *ac; fd_set lrfds; struct timeval tv; int ret, ret2; lrfds = *rfds; bzero(&tv, sizeof(struct timeval)); - tv.tv_sec = 1; + tv.tv_sec = 5; ret = select(FD_SETSIZE, &lrfds, NULL, NULL, &tv); if (ret == -1) { if (errno == EINTR) @@ -115,15 +127,22 @@ else exit(2); } - else if (ret == 0) + else if (ret == 0) { + reader_timeout(); return; - TAILQ_FOREACH_SAFE(ac, &ac_list_src, ac_glue, tmp) { + } + TAILQ_FOREACH(ac, &ac_list_src, ac_glue) { if (FD_ISSET(ac->ac_fd, &lrfds)) { ret2 = ac->ac_read_func(ac); - if (ret2 == -1) { - (void) close(ac->ac_fd); - ac->ac_flags &= FLAG_ONLINE; - } + if (ret2 == -1) + exit(2); + } + if (ac->ac_type == COMPONENT_NET) { + TAILQ_FOREACH_SAFE(as, &ac->ac_sbuffers, as_glue, + tmp) + if (FD_ISSET(as->as_fd, &lrfds)) { + ret2 = reader_read_socket(as); + } } } } @@ -131,30 +150,13 @@ void reader_init() { - time_t t; struct au_cmpnt *ac; - srcs_online = 1; - TAILQ_FOREACH(ac, &ac_list_src, ac_glue) { - if (ac->ac_flags & FLAG_ONLINE) - continue; - t = time(NULL); - if (ac->ac_failed != 0) - if ((ac->ac_failed + READER_RETRY) > t) { - srcs_online = 0; - continue; - } - dprintf("reader_init: %s", ac->ac_name); - if (ac->ac_init_func(ac) == -1) { - srcs_online = 0; - ac->ac_failed = time(NULL); - } - else { - dprintf("reader_init: %s online", ac->ac_name); - ac->ac_failed = 0; - ac->ac_flags |= FLAG_ONLINE; - } - } + (void) pthread_mutex_init(&ready_lock, NULL); + (void) pthread_cond_init(&ready_cond, NULL); + TAILQ_FOREACH(ac, &ac_list_src, ac_glue) + if (ac->ac_init_func(ac) == -1) + exit(2); } int @@ -205,50 +207,51 @@ for (i = 0; i < ac->ac_ndsts; i++) reader_q_record_cmpnt(ar, ac->ac_dsts[i]); + /* Once we have copied the record to all this components consumers + * we can discard it. */ + free(ar->ar_buf); + free(ar); } void reader_q_record_cmpnt(struct audit_record *ar, struct au_cmpnt *ac) { struct au_queue_ent *new; - int rotate = 0; - time_t t; - t = time(NULL); - if (ac->ac_q.qp_time == 0) - ac->ac_q.qp_time = t; - else if ((ac->ac_q.qp_time + WRITER_ROTATE_TIMEOUT) <= t) { - if (!TAILQ_EMPTY(ac->ac_q.qp_read)) - rotate = 1; - } - if (ac->ac_q.qp_read_size >= WRITER_LOW_WATER) - rotate = 1; - if (rotate) { - if (pthread_mutex_lock(&ac->ac_q.qp_lock) != 0) - exit(2); - /* If the writer is still processing the other buffer, the - * record is dropped. */ - if (!ac->ac_q.qp_ready) { - if (pthread_mutex_unlock(&ac->ac_q.qp_lock) != 0) - exit(2); - return; - } - dprintf("reader_q_record_cmpnt: %s: rotate", ac->ac_name); - ROTATE(ac->ac_q); - if (pthread_cond_signal(&ac->ac_q.qp_cond) != 0) - exit(2); - if (pthread_mutex_unlock(&ac->ac_q.qp_lock) != 0) - exit(2); - } - dprintf("reader_q_record: %p for %s", ar, ac->ac_name); new = malloc(sizeof(struct au_queue_ent)); assert(new != NULL); bzero(new, sizeof(struct au_queue_ent)); - new->aq_record = ar; + new->aq_record.ar_buf = malloc(ar->ar_record_len); + assert(new->aq_record.ar_buf != NULL); + bcopy(ar->ar_buf, new->aq_record.ar_buf, ar->ar_record_len); + new->aq_record.ar_record_len = ar->ar_record_len; new->aq_remain = ar->ar_record_len; - ar->ar_refcount++; - TAILQ_INSERT_TAIL(ac->ac_q.qp_read, new, aq_glue); - ac->ac_q.qp_read_size += ar->ar_record_len; + (void) pthread_mutex_lock(&ac->ac_q.qp_lock); + if (ac->ac_q.qp_store_len < WRITER_MAX) { + dprintf("queueing record for %s (%d bytes in queue)", + ac->ac_name, ac->ac_q.qp_store_len); + (void) pthread_mutex_unlock(&ac->ac_q.qp_lock); + TAILQ_INSERT_TAIL(ac->ac_q.qp_store, new, aq_glue); + ac->ac_q.qp_store_len += ar->ar_record_len; + ac->ac_q.qp_store_n++; + return; + } + if (ac->ac_q.qp_hold != NULL) { + /* This consumer is still processing it's queue, so the record + * is dropped. */ + (void) pthread_mutex_unlock(&ac->ac_q.qp_lock); + dprintf("dropping record for %s", ac->ac_name); + free(new->aq_record.ar_buf); + free(new); + return; + } + dprintf("rotating queues for %s", ac->ac_name); + ROTATE; + WRITER_SIGNAL(ac->ac_q.qp_store_n); + (void) pthread_mutex_unlock(&ac->ac_q.qp_lock); + TAILQ_INSERT_TAIL(ac->ac_q.qp_store, new, aq_glue); + ac->ac_q.qp_store_len += ar->ar_record_len; + ac->ac_q.qp_store_n++; } int @@ -279,16 +282,96 @@ return (0); } +int +reader_read_socket(struct au_src_buffer *asb) +{ + u_char *bufptr, *recbufptr; + int ret, left; + u_int32_t hdr_remain, val, need; + u_char as_buf[2048]; + + ret = read(asb->as_fd, as_buf, sizeof(as_buf)); + if (ret == -1) { + if (errno != EINTR) + return (-1); + else + return (0); + } + else if (ret == 0) + return (-1); + left = ret; + bufptr = as_buf; + while (left > 0) { + if (asb->as_record == NULL) { + hdr_remain = sizeof(asb->as_header) - + asb->as_nread; + if (left >= hdr_remain) { + (void) memcpy(asb->as_header + asb->as_nread, + bufptr, hdr_remain); + asb->as_nread += hdr_remain; + left -= hdr_remain; + bufptr += hdr_remain; + (void) memcpy(&val, asb->as_header + 1, + sizeof(val)); + asb->as_record = + malloc(sizeof(struct audit_record)); + assert(asb->as_record != NULL); + asb->as_record->ar_record_len = be32toh(val); + asb->as_record->ar_buf = \ + malloc(asb->as_record->ar_record_len); + assert(asb->as_record->ar_buf != NULL); + (void) memcpy(asb->as_record->ar_buf, + asb->as_header, sizeof(asb->as_header)); + continue; + } + else { + (void) memcpy(asb->as_header + asb->as_nread, + bufptr, left); + asb->as_nread += left; + return (0); + } + } + need = asb->as_record->ar_record_len - asb->as_nread; + recbufptr = asb->as_record->ar_buf + asb->as_nread; + if (left < need) { + (void) memcpy(recbufptr, bufptr, left); + asb->as_nread += left; + return (0); + } + else { + (void) memcpy(recbufptr, bufptr, need); + left -= need; + bufptr += need; + reader_q_record(asb->as_record, asb->as_parent); + asb->as_record = NULL; + asb->as_nread = 0; + } + } + return (0); +} + void reader_start() { - fd_set rfds; + reader_init(); + reader_build_rfds(&rfds); + for (;;) + reader_handler(&rfds); +} + +void +reader_timeout() +{ + struct au_cmpnt *ac; - for (;;) { - if (!srcs_online) { - reader_init(); - reader_build_rfds(&rfds); + TAILQ_FOREACH(ac, &ac_list_dst, ac_glue) { + (void) pthread_mutex_lock(&ac->ac_q.qp_lock); + if ((ac->ac_q.qp_hold != NULL) || (ac->ac_q.qp_store_n == 0)) { + (void) pthread_mutex_unlock(&ac->ac_q.qp_lock); + continue; } - reader_handler(&rfds); + ROTATE; + WRITER_SIGNAL(ac->ac_q.qp_store_n); + (void) pthread_mutex_unlock(&ac->ac_q.qp_lock); } } ==== //depot/projects/trustedbsd/netauditd/reader.h#2 (text+ko) ==== @@ -24,7 +24,9 @@ * SUCH DAMAGE. */ -#define READER_RETRY 30 +extern pthread_mutex_t ready_lock; +extern pthread_cond_t ready_cond; +u_int32_t records_waiting; int reader_accept_client(struct au_cmpnt *); void reader_build_rfds(fd_set *); @@ -35,4 +37,6 @@ void reader_q_record(struct audit_record *, struct au_cmpnt *); void reader_q_record_cmpnt(struct audit_record *, struct au_cmpnt *); int reader_read_pipe(struct au_cmpnt *); +int reader_read_socket(struct au_src_buffer *); void reader_start(void); +void reader_timeout(void); ==== //depot/projects/trustedbsd/netauditd/writer.c#2 (text+ko) ==== @@ -61,13 +61,14 @@ void writer_handler(fd_set *wfds) { + struct au_cmpnt *ac; fd_set lwfds; struct timeval *tv = NULL; - int ret; + int ret, have_records = 1; lwfds = *wfds; if (!dsts_online) { - dprintf("writer_handler: applying select timeout"); + dprintf("writer applying select timeout"); tv = malloc(sizeof(struct timeval)); bzero(tv, sizeof(struct timeval)); tv->tv_sec = 1; @@ -79,6 +80,34 @@ else exit(2); } + (void) pthread_mutex_lock(&ready_lock); + if (records_waiting == 0) { + have_records = 0; + if (dsts_online) { + while (records_waiting == 0) { + dprintf("writer waiting for records"); + (void) pthread_cond_wait(&ready_cond, + &ready_lock); + } + have_records = 1; + (void) pthread_mutex_unlock(&ready_lock); + } + } + (void) pthread_mutex_unlock(&ready_lock); + if (!have_records) + return; + TAILQ_FOREACH(ac, &ac_list_dst, ac_glue) { + if (!FD_ISSET(ac->ac_fd, &lwfds)) + continue; + (void) pthread_mutex_lock(&ac->ac_q.qp_lock); + if (ac->ac_q.qp_hold != NULL) { + (void) pthread_mutex_unlock(&ac->ac_q.qp_lock); + ret = ac->ac_write_func(ac); + if (ret == 1) /* Queue has been drained */ + writer_q_drained(ac); + } + (void) pthread_mutex_unlock(&ac->ac_q.qp_lock); + } } void @@ -153,16 +182,25 @@ } void +writer_q_drained(struct au_cmpnt *ac) +{ + (void) pthread_mutex_lock(&ac->ac_q.qp_lock); + assert(ac->ac_q.qp_hold != NULL); + ac->ac_q.qp_free = ac->ac_q.qp_hold; + ac->ac_q.qp_hold = NULL; + (void) pthread_mutex_unlock(&ac->ac_q.qp_lock); +} + +void writer_q_init(struct au_cmpnt *ac) { TAILQ_INIT(&ac->ac_q.qp_a); TAILQ_INIT(&ac->ac_q.qp_b); - ac->ac_q.qp_read = &ac->ac_q.qp_a; - ac->ac_q.qp_write = &ac->ac_q.qp_b; + ac->ac_q.qp_store = &ac->ac_q.qp_a; + ac->ac_q.qp_free = &ac->ac_q.qp_b; + ac->ac_q.qp_hold = NULL; if (pthread_mutex_init(&ac->ac_q.qp_lock, NULL) != 0) exit(2); - if (pthread_cond_init(&ac->ac_q.qp_cond, NULL) != 0) - exit(2); } void * @@ -179,3 +217,42 @@ } return (NULL); } + +int +writer_write_trail(struct au_cmpnt *ac) +{ + struct au_queue_ent *aq, *tmp; + struct audit_record *ar; + u_int32_t offset; + int ret; + + TAILQ_FOREACH_SAFE(aq, ac->ac_q.qp_hold, aq_glue, tmp) { + ar = &aq->aq_record; + offset = ar->ar_record_len - aq->aq_remain; + dprintf("write offset %d", offset); + ret = write(ac->ac_fd, ar->ar_buf + offset, aq->aq_remain); + if (ret == -1) { + if ((errno == EINTR) || (errno == EAGAIN)) + return (0); + else + return (-1); + } + else if (ret == aq->aq_remain) { + dprintf("wrote %d bytes to %s (completed)", ret, + ac->ac_name); + (void) pthread_mutex_lock(&ready_lock); + records_waiting--; + dprintf("%d records waiting", records_waiting); + (void) pthread_mutex_unlock(&ready_lock); + TAILQ_REMOVE(ac->ac_q.qp_hold, aq, aq_glue); + free(aq->aq_record.ar_buf); + free(aq); + } + else { + dprintf("partial write"); + aq->aq_remain -= ret; + return (0); + } + } + return (1); +} ==== //depot/projects/trustedbsd/netauditd/writer.h#2 (text+ko) ==== @@ -24,7 +24,7 @@ * SUCH DAMAGE. */ -#define WRITER_LOW_WATER 1024000 +#define WRITER_MAX 32768 #define WRITER_ROTATE_TIMEOUT 5 #define WRITER_RETRY 60 @@ -33,5 +33,7 @@ void writer_init(void); int writer_init_net(struct au_cmpnt *); int writer_init_trail(struct au_cmpnt *); +void writer_q_drained(struct au_cmpnt *); void writer_q_init(struct au_cmpnt *); void *writer_start(void *); +int writer_write_trail(struct au_cmpnt *);
Want to link to this message? Use this URL: <https://mail-archive.FreeBSD.org/cgi/mid.cgi?200804060028.m360Ssfq078410>