Skip site navigation (1)Skip section navigation (2)
Date:      Wed, 29 May 2013 20:17:07 GMT
From:      Olivier Cochard-Labbe <olivier@cochard.me>
To:        freebsd-gnats-submit@FreeBSD.org
Subject:   bin/179085: pthread patch for netblast
Message-ID:  <201305292017.r4TKH7G0003793@oldred.FreeBSD.org>
Resent-Message-ID: <201305292020.r4TKK0Od023280@freefall.freebsd.org>

next in thread | raw e-mail | index | archive | help

>Number:         179085
>Category:       bin
>Synopsis:       pthread patch for netblast
>Confidential:   no
>Severity:       non-critical
>Priority:       low
>Responsible:    freebsd-bugs
>State:          open
>Quarter:        
>Keywords:       
>Date-Required:
>Class:          sw-bug
>Submitter-Id:   current-users
>Arrival-Date:   Wed May 29 20:20:00 UTC 2013
>Closed-Date:
>Last-Modified:
>Originator:     Olivier Cochard-Labbe
>Release:        current
>Organization:
BSD Router Project
>Environment:
FreeBSD laptop.bsdrp.net 10.0-CURRENT FreeBSD 10.0-CURRENT #2 r250925: Thu May 23 21:49:14 CEST 2013     root@laptop.bsdrp.net:/usr/obj/usr/local/BSDRP/BSDRPcur/FreeBSD/src/sys/LAPTOP  amd64
>Description:
Here is a patch that add multithread support to tools/tools/netrate/netblast (I've used the netreceive code).
WARNING: It need full review because it's my first try with pthread.
>How-To-Repeat:

>Fix:


Patch attached with submission follows:

Index: tools/tools/netrate/netblast/Makefile
===================================================================
--- tools/tools/netrate/netblast/Makefile	(revision 250926)
+++ tools/tools/netrate/netblast/Makefile	(working copy)
@@ -4,5 +4,8 @@
 
 PROG=	netblast
 NO_MAN=
+LDFLAGS += -lpthread
 
+WARNS?= 3
+
 .include <bsd.prog.mk>
Index: tools/tools/netrate/netblast/netblast.c
===================================================================
--- tools/tools/netrate/netblast/netblast.c	(revision 250926)
+++ tools/tools/netrate/netblast/netblast.c	(working copy)
@@ -40,15 +40,24 @@
 #include <string.h>
 #include <unistd.h>			/* close */
 
+#include <pthread.h>
+#include <fcntl.h>
+#include <time.h>   /* clock_getres() */
+
+static int round_to(int n, int l)
+{
+    return ((n + l - 1)/l)*l;
+}
+
 static void
 usage(void)
 {
 
-	fprintf(stderr, "netblast [ip] [port] [payloadsize] [duration]\n");
+	fprintf(stderr, "netblast [ip] [port] [payloadsize] [duration] [nthreads]\n");
 	exit(-1);
 }
 
-static int	global_stop_flag;
+static int	global_stop_flag=0;
 
 static void
 signal_handler(int signum __unused)
@@ -57,48 +66,28 @@
 	global_stop_flag = 1;
 }
 
+
 /*
- * Loop that blasts packets: begin by recording time information, resetting
- * stats.  Set the interval timer for when we want to wake up.  Then go.
- * SIGALRM will set a flag indicating it's time to stop.  Note that there's
- * some overhead to the signal and timer setup, so the smaller the duration,
- * the higher the relative overhead.
+ * Each socket uses multiple threads so the receiver is
+ * more efficient. A collector thread runs the stats.
  */
-static int
-blast_loop(int s, long duration, u_char *packet, u_int packet_len)
+struct td_desc {
+    pthread_t td_id;
+    uint64_t counter; /* tx counter */
+	uint64_t send_errors; /* tx send errors */
+    uint64_t send_calls;    /* tx send calls */
+	int s;
+	u_char *packet;
+	u_int packet_len;
+};
+
+static void *
+blast(void *data)
 {
-	struct timespec starttime, tmptime;
-	struct itimerval it;
-	u_int32_t counter;
-	int send_errors, send_calls;
-
-	if (signal(SIGALRM, signal_handler) == SIG_ERR) {
-		perror("signal");
-		return (-1);
-	}
-
-	if (clock_getres(CLOCK_REALTIME, &tmptime) == -1) {
-		perror("clock_getres");
-		return (-1);
-	}
-
-	if (clock_gettime(CLOCK_REALTIME, &starttime) == -1) {
-		perror("clock_gettime");
-		return (-1);
-	}
-
-	it.it_interval.tv_sec = 0;
-	it.it_interval.tv_usec = 0;
-	it.it_value.tv_sec = duration;
-	it.it_value.tv_usec = 0;
-
-	if (setitimer(ITIMER_REAL, &it, NULL) < 0) {
-		perror("setitimer");
-		return (-1);
-	}
-
-	send_errors = send_calls = 0;
-	counter = 0;
+    struct td_desc *t = data;
+	t->counter=0;
+	t->send_errors=0;
+	t->send_calls=0;
 	while (global_stop_flag == 0) {
 		/*
 		 * We maintain and, if there's room, send a counter.  Note
@@ -110,32 +99,74 @@
 		 * operation, causing the current sequence number also to be
 		 * skipped.
 		 */
-		if (packet_len >= 4) {
-			be32enc(packet, counter);
-			counter++;
+		if (t->packet_len >= 4) {
+			be32enc(t->packet, t->counter);
+			t->counter++;
 		}
-		if (send(s, packet, packet_len, 0) < 0)
-			send_errors++;
-		send_calls++;
+		if (send(t->s, t->packet, t->packet_len, 0) < 0)
+			t->send_errors++;
+		t->send_calls++;
 	}
+    return NULL;
+}
 
+static struct td_desc **
+make_threads(int s, u_char *packet, u_int packet_len, int nthreads)
+{
+    int i;
+    int lb = round_to(nthreads * sizeof (struct td_desc *), 64);
+    int td_len = round_to(sizeof(struct td_desc), 64); // cache align
+    char *m = calloc(1, lb + td_len * nthreads);
+    struct td_desc **tp;
+
+    /* pointers plus the structs */
+    if (m == NULL) {
+        perror("no room for pointers!");
+        exit(1);
+    }
+    tp = (struct td_desc **)m;
+    m += lb;    /* skip the pointers */
+    for (i = 0; i < nthreads; i++, m += td_len) {
+        tp[i] = (struct td_desc *)m;
+        tp[i]->s = s;
+        tp[i]->packet = packet;
+        tp[i]->packet_len = packet_len;
+        if (pthread_create(&tp[i]->td_id, NULL, blast, tp[i])) {
+            perror("unable to create thread");
+            exit(1);
+        }
+    }
+    return tp;
+}
+
+
+static void
+main_thread(struct td_desc **tp, long duration, struct timespec starttime, struct timespec tmptime, int nthreads)
+{
+	uint64_t send_errors=0, send_calls=0;
+	int i;
 	if (clock_gettime(CLOCK_REALTIME, &tmptime) == -1) {
 		perror("clock_gettime");
-		return (-1);
 	}
 
+	for (i = 0; i < nthreads; i++) {
+		/* Wait for thread end */
+		pthread_join( tp[i]->td_id, NULL);
+		send_calls+=tp[i]->send_calls;
+		send_errors+=tp[i]->send_errors;
+    }
+
 	printf("\n");
 	printf("start:             %zd.%09lu\n", starttime.tv_sec,
 	    starttime.tv_nsec);
 	printf("finish:            %zd.%09lu\n", tmptime.tv_sec,
 	    tmptime.tv_nsec);
-	printf("send calls:        %d\n", send_calls);
-	printf("send errors:       %d\n", send_errors);
-	printf("approx send rate:  %ld\n", (send_calls - send_errors) /
+	printf("send calls:        %lu\n", send_calls);
+	printf("send errors:       %lu\n", send_errors);
+	printf("approx send rate:  %lu\n", (send_calls - send_errors) /
 	    duration);
-	printf("approx error rate: %d\n", (send_errors / send_calls));
+	printf("approx error rate: %lu\n", (send_errors / send_calls));
 
-	return (0);
 }
 
 int
@@ -143,11 +174,15 @@
 {
 	long payloadsize, duration;
 	struct addrinfo hints, *res, *res0;
-	char *dummy, *packet;
-	int port, s, error;
+	char *dummy;
+	u_char *packet;
+	int port, s, error, nthreads = 1;
+	struct td_desc **tp;	
 	const char *cause = NULL;
+	struct timespec starttime, tmptime;
+	struct itimerval it;
 
-	if (argc != 5)
+	if (argc < 5)
 		usage();
 
 	memset(&hints, 0, sizeof(hints));
@@ -177,6 +212,11 @@
 		/*NOTREACHED*/
 	}
 
+	if (argc > 5)
+        nthreads = strtoul(argv[5], &dummy, 10);
+    if (nthreads < 1 || nthreads > 64)
+        usage();
+
 	packet = malloc(payloadsize);
 	if (packet == NULL) {
 		perror("malloc");
@@ -216,6 +256,43 @@
 
 	freeaddrinfo(res0);
 
-	return (blast_loop(s, duration, packet, payloadsize));
+	printf("netblast %d threads sending on UDP port %d\n",
+    nthreads, (u_short)port);
 
+	/*
+	 * Begin by recording time information stats.
+	 * Set the interval timer for when we want to wake up.
+	 * SIGALRM will set a flag indicating it's time to stop.  Note that there's
+	 * some overhead to the signal and timer setup, so the smaller the duration,
+	 * the higher the relative overhead.
+	 */
+
+	if (signal(SIGALRM, signal_handler) == SIG_ERR) {
+		perror("signal");
+		return (-1);
+	}
+
+	if (clock_getres(CLOCK_REALTIME, &tmptime) == -1) {
+		perror("clock_getres");
+		return (-1);
+	}
+
+	if (clock_gettime(CLOCK_REALTIME, &starttime) == -1) {
+		perror("clock_gettime");
+		return (-1);
+	}
+
+	it.it_interval.tv_sec = 0;
+	it.it_interval.tv_usec = 0;
+	it.it_value.tv_sec = duration;
+	it.it_value.tv_usec = 0;
+
+	if (setitimer(ITIMER_REAL, &it, NULL) < 0) {
+		perror("setitimer");
+		return (-1);
+	}
+
+    tp = make_threads(s, packet, payloadsize, nthreads);
+    main_thread(tp, duration, starttime,  tmptime, nthreads);
+
 }


>Release-Note:
>Audit-Trail:
>Unformatted:



Want to link to this message? Use this URL: <https://mail-archive.FreeBSD.org/cgi/mid.cgi?201305292017.r4TKH7G0003793>