Skip site navigation (1)Skip section navigation (2)
Date:      Thu, 17 May 2001 01:31:06 +0200
From:      Tor.Egge@fast.no
To:        dillon@earth.backplane.com
Cc:        arch@FreeBSD.ORG
Subject:   Re: on load control / process swapping
Message-ID:  <200105162331.BAA04708@midten.fast.no>
In-Reply-To: Your message of "Wed, 16 May 2001 15:22:51 -0700 (PDT)"
References:  <200105162222.f4GMMpC81247@earth.backplane.com>

next in thread | previous in thread | raw e-mail | index | archive | help
----Next_Part(Thu_May_17_01:30:16_2001)--
Content-Type: Text/Plain; charset=us-ascii
Content-Transfer-Encoding: 7bit

>     I'd have to see your test code.  Doing a direct-read into a user buffer
>     has no cache impact at all (DMA does not go through the cpu cache).
>     If you are doing seek/read()s but not actually looking at the data that
>     is returned, your test results are going to be seriously skewed.

The test code does not look at the data.  I sent a copy of it to you
at January 7th 2000 (along with a previous version of the O_DIRECT
patch).

I agree that the 95% reduction in CPU usage is seriously skewed.  The
performance improvement for most real applications will be very small
or even negative.  For some specialized applications it is a
significant performance improvement, giving nearly the same
performance as when bypassing the kernel file system and using the raw
device directly.

- Tor Egge


----Next_Part(Thu_May_17_01:30:16_2001)--
Content-Type: Text/Plain; charset=us-ascii
Content-Transfer-Encoding: 7bit
Content-Description: "Makefile"

all:	aiotest_lt_raw aiotest_lt aiotest_ut

clean:
	rm -f aiotest_lt_raw aiotest_lt aiotest_ut

aiotest_lt_raw: aiotest.c
	cc -static -D_THREAD_SAFE -D_PTHREADS -DLINUXTHREADS -DRAWREAD -O2 -I/usr/local/include/pthread/linuxthreads -o aiotest_lt_raw aiotest.c -L/usr/local/lib -llthread -llgcc_r

aiotest_lt: aiotest.c
	cc -D_THREAD_SAFE -D_PTHREADS -DLINUXTHREADS -I/usr/local/include/pthread/linuxthreads -O2 -o aiotest_lt aiotest.c -L/usr/local/lib -llthread -llgcc_r

aiotest_ut: aiotest.c
	cc -static -pthread -D_THREAD_SAFE -D_PTHREADS -O2 -o aiotest_ut aiotest.c

----Next_Part(Thu_May_17_01:30:16_2001)--
Content-Type: Text/Plain; charset=us-ascii
Content-Transfer-Encoding: 7bit
Content-Description: "aiotest.c"

#include <sys/types.h>
#include <sys/param.h>
#include <sys/time.h>
#include <time.h>
#include <string.h>
#include <unistd.h>
#include <stdio.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netdb.h>
#include <fcntl.h>
#include <stdlib.h>
#include <assert.h>
#include <errno.h>
#include <sys/stat.h>
#include <sys/resource.h>
#include <signal.h>
#include <arpa/inet.h>
#include <ctype.h>
#include <stdarg.h>
#include <sys/wait.h>
#include <sys/utsname.h>
#include <rpc/types.h>
#include <pthread.h>
#ifndef __linux__
#include <sys/filio.h>
#endif

#ifndef LINUXTHREADS
#include <sys/aio.h>

struct myaio {
  struct aiocb cb;
  struct {
    int busy;
    pthread_mutex_t mutex;
    pthread_cond_t cond;
  } cond;
  struct myaio *next;
  struct myaio *prev;
  ssize_t retval;
  size_t  reterrno;
  time_t started;
  int errwritten;
};

static struct myaio *activeaios;
static struct myaio *freeaios;
static int freecnt;

static pthread_mutex_t aiomutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t aiocond = PTHREAD_COND_INITIALIZER;
static int aiostartcnt;
static int aioendcnt;
static volatile sig_atomic_t gotusr1;
struct timeval maxlat;
static struct timeval gotusr1time;
static pthread_once_t aiothread_once = PTHREAD_ONCE_INIT;
static pthread_t aiothread;
static int aiothread_running;
#endif

int xreadlen; /* bytes */
int xreadoff; /* skip KB at eof */

static void runaiothread(void);

#ifndef __linux__
#ifndef O_DIRECT
ssize_t
rawread(int fd, void *buf, size_t nbytes, off_t offset)
{
  struct rawread rr;
  ssize_t ret;

  rr.udata = buf;
  rr.len = nbytes;
  rr.offset = offset;

  ret = ioctl(fd, FIORAWREAD, &rr);
  if (ret < 0 && errno == ENOTTY)
	ret = pread(fd, buf, nbytes, offset);
  return ret;
}
#endif
#endif


#ifndef LINUXTHREADS
ssize_t aio_pread(const int fd,
	     void *buf,
	     const size_t buflen,
	     const off_t off)
{
  struct myaio *aio;
  int ret;
  size_t retval;
  int reterrno;
  
  pthread_mutex_lock(&aiomutex);
  if (freeaios != NULL) {
    assert(freecnt > 0);
    freecnt--;
    aio = freeaios;
    freeaios = aio->next;
    aio->next = NULL;
    aio->prev = NULL;
  } else {
    assert(freecnt == 0);

    pthread_once(&aiothread_once, runaiothread);

    while (aiothread_running == 0)
      pthread_cond_wait(&aiocond, &aiomutex);
    pthread_mutex_unlock(&aiomutex);
    
    aio = (struct myaio *) malloc(sizeof(struct myaio));
    memset(aio, 0, sizeof(struct myaio));
    pthread_mutex_init(&aio->cond.mutex, NULL);
    pthread_cond_init(&aio->cond.cond, NULL);
    aio->next = NULL;
    aio->prev = NULL;
    
    pthread_mutex_lock(&aiomutex);
  }
  
  assert(aio->cond.busy == 0);
  aio->cond.busy = 1;
  
  aio->cb.aio_fildes = fd;
  aio->cb.aio_offset = off;
  aio->cb.aio_buf = buf;
  aio->cb.aio_nbytes = buflen;
  aio->cb.aio_sigevent.sigev_notify = SIGEV_SIGNAL;
  aio->cb.aio_sigevent.sigev_signo = SIGUSR1;
  aio->cb.aio_sigevent.sigev_value.sigval_ptr = &aio->cb;
  aio->cb.aio_lio_opcode = 0;
  aio->cb.aio_reqprio = 0;
  aio->retval = 0;
  
  aio->started = time(0);
  aio->errwritten = 0;
  
  aio->prev = NULL;
  aio->next = activeaios;
  if (activeaios != NULL)
    activeaios->prev = aio;
  activeaios = aio;
  
  aiostartcnt++;
  ret = aio_read(&aio->cb);
  pthread_mutex_unlock(&aiomutex);
  
  assert(ret == 0);
  
  pthread_mutex_lock(&aio->cond.mutex);
  while (aio->cond.busy != 0) {
    pthread_cond_wait(&aio->cond.cond, &aio->cond.mutex);
  }
  pthread_mutex_unlock(&aio->cond.mutex);
  retval = aio->retval;
  reterrno = aio->reterrno;
#if 0
  assert((size_t) aio->retval == buflen);
#endif
  
  pthread_mutex_lock(&aiomutex);
  
  assert(aio->next == NULL);
  assert(aio->prev == NULL);
  assert(aio != activeaios);
  assert(aio != freeaios);
  
  aio->next = freeaios;
  aio->prev = NULL;
  freeaios = aio;
  freecnt++;
  pthread_mutex_unlock(&aiomutex);
  errno = reterrno;
  return retval;
}

static void 
usr1handler(int sig)
{
  (void) sig;
  if (gotusr1 == 0)
	gettimeofday(&gotusr1time, NULL);
  gotusr1 = 1;
}

void
processusr1(void)
{
  struct myaio *aio, *naio;
  int reterrno;
  int now;
  int qpos;

  pthread_mutex_lock(&aiomutex);

  now = time(0);
  qpos = 0;
  for (aio = activeaios; aio != NULL; aio =naio, qpos++) {
    naio = aio->next;
    reterrno = aio_error(&aio->cb);
    if (now - aio->started > 15 && 
	(reterrno  != EINPROGRESS || aio->errwritten == 0)) {
      printf("ERROR: aio used more than %d seconds: cb=%p, buflen=%u"
	     ", qpos=%d %s, aiocnt=%d,%d\n",
	     (int) (now - aio->started - 1),
	     (void *) &aio->cb, aio->cb.aio_nbytes, qpos,
	     aio->next == NULL ? "" : "(more elements)", 
	     aiostartcnt, aioendcnt);
      aio->errwritten = 1;
    }
    if (reterrno == EINPROGRESS)
      continue;
    else if (reterrno < 0) {
      assert(errno == EINVAL);
      assert(now - aio->started < 10);
    } else {
      aioendcnt++;
      assert(aio->prev != NULL || aio == activeaios);
      aio->retval = aio_return(&aio->cb);
      aio->reterrno = reterrno;
      if (aio->next != NULL)
	aio->next->prev = aio->prev;
      if (aio->prev != NULL)
	aio->prev->next = aio->next;
      if (aio == activeaios)
	activeaios = aio->next;
      aio->prev = NULL;
      aio->next = NULL;
      pthread_mutex_lock(&aio->cond.mutex);
      aio->cond.busy = 0;
      pthread_cond_signal(&aio->cond.cond);
      pthread_mutex_unlock(&aio->cond.mutex);
    }
  }
  pthread_mutex_unlock(&aiomutex);
}

void *aiothreadmeat(void *dummy)
{
  sigset_t sigs_to_block;
  struct sigaction act;
  struct timeval now, lat;
  struct sched_param schedparam;
  int policy;

  if (pthread_getschedparam(pthread_self(), &policy, &schedparam) == 0) {
    printf("Initial Aiothread priority was %d\n", schedparam.sched_priority);
    schedparam.sched_priority += 4;
    if (pthread_setschedparam(pthread_self(), policy, &schedparam) == 0) {
      if (pthread_getschedparam(pthread_self(), &policy, &schedparam) == 0)
        printf("Bumped priority of Aiothread to %d\n", 
	schedparam.sched_priority);
      else
        printf("Failed rereading Aiothread priority\n");
    } else
      printf("Failed Bumping Aiothread priority\n");
  } else
    printf("Failed reading initial Aiothread priority\n");

  act.sa_handler=usr1handler;
  sigemptyset(&act.sa_mask);
  act.sa_flags=0;
  sigaction(SIGUSR1,&act,NULL);
  
  sigemptyset(&sigs_to_block);
  sigaddset(&sigs_to_block, SIGUSR1);
  pthread_sigmask(SIG_UNBLOCK, &sigs_to_block, NULL);

  pthread_mutex_lock(&aiomutex);
  aiothread_running = 1;
  pthread_cond_broadcast(&aiocond);
  pthread_mutex_unlock(&aiomutex);
 
  gettimeofday(&gotusr1time, NULL); 
  gotusr1 = 1;
  while (1) {
    if (gotusr1 != 0) {
      gettimeofday(&now, NULL);
      if (now.tv_usec >= gotusr1time.tv_usec) {
	lat.tv_usec = now.tv_usec - gotusr1time.tv_usec;
	lat.tv_sec = now.tv_sec - gotusr1time.tv_sec; 
      } else {
	lat.tv_usec = now.tv_usec + 1000000 - gotusr1time.tv_usec;
	lat.tv_sec = now.tv_sec - 1 - gotusr1time.tv_sec; 
      }
      if (lat.tv_sec > maxlat.tv_sec ||
	 (lat.tv_sec == maxlat.tv_sec &&
	  lat.tv_usec >= maxlat.tv_usec))
	 maxlat = lat;
      gotusr1 = 0;
      processusr1();
    }
    sleep(1);
  }
  abort();
}

static void runaiothread(void)
{
  pthread_create(&aiothread, NULL, aiothreadmeat, NULL);
}
#endif



/* 10000 MB test file */

#define FILESIZE 10000

static off_t filesize;

int writefile(void)
{
  char *buf;
  size_t buflen;
  int fd;
  int count;
  ssize_t wgot;
  struct stat stbuf;
  
  buflen = 1024 * 1024;
  
  buf = (char *) malloc(buflen);
  assert(buf != NULL);
  
  filesize = (off_t) FILESIZE * (off_t) buflen;

  fd = open("largefile", O_RDWR | O_CREAT, 0666);
  assert(fd >= 0);
#if 1
  fstat(fd, &stbuf);
  if (stbuf.st_size < filesize) {
    for (count = 0; count < FILESIZE; count++) {
      wgot = write(fd, buf, buflen);
      assert(wgot == buflen);
    }
  }
#endif
#ifdef RAWREAD
#ifdef O_DIRECT
  {
    int flags;
  flags = fcntl(fd, F_GETFL, 0);
  flags |= O_DIRECT;
  fcntl(fd, F_SETFL, flags);
  }
#endif
#endif
  return fd;
}

static pthread_mutex_t cntmutex = PTHREAD_MUTEX_INITIALIZER;
static int startreadcnt;
static int donereadcnt;
static off_t donereadbytes;

void *readthread(void *data)
{
  int fd;
  size_t buflen;
  char *buf;
  ssize_t rgot;
  off_t loc;
  sigset_t sigs_to_block;

  fd = (int) data;
  buflen = xreadlen;
  buf = (char *) malloc(buflen);
  assert(buf != NULL);

  sigemptyset(&sigs_to_block);
  sigaddset(&sigs_to_block, SIGUSR1);
  pthread_sigmask(SIG_BLOCK, &sigs_to_block, NULL);

  sleep(1);

  while (1) {
    loc = (off_t) (random() % (FILESIZE * 2048 - xreadoff)) * (off_t) 512;
#if 0
    loc &= ~ 32767LL;
#endif
    pthread_mutex_lock(&cntmutex);
    startreadcnt++;
    pthread_mutex_unlock(&cntmutex);
#ifdef LINUXTHREADS
#if defined(RAWREAD) && !defined(O_DIRECT)
    rgot = rawread(fd, buf, buflen, loc);
#else
    rgot = pread(fd, buf, buflen, loc);
#endif
#else
    rgot = aio_pread(fd, buf, buflen, loc);
#endif
    if (rgot != buflen) {
	    printf("rgot=%d, buflen=%d, loc=%qd, startreadcnt=%d,%d\n",
		   rgot, buflen, loc, startreadcnt, donereadcnt);
    }
    assert(rgot == buflen);
    pthread_mutex_lock(&cntmutex);
    donereadcnt++;
    donereadbytes += buflen;
    pthread_mutex_unlock(&cntmutex);
  }

  return NULL;
}
int main(int argc, char **argv)
{
  int fd;
  int cnt;
  pthread_t curthread;
  int startcntcopy, donecntcopy;
  sigset_t sigs_to_block;
  struct timeval stime;
  struct timeval now;
  struct timeval report;
  struct timeval delta;
  double fdelta;
  double rate;
  double mbrate;
  struct timeval tvsel;
  int nthreads;

  xreadlen = 1024;
  if (argc >= 2) {
    xreadlen = atoi(argv[1]);
    if (xreadlen < 0 || xreadlen > 2097152)
	xreadlen = 1024;
    xreadlen = (xreadlen + 511) & ~511;
  }
  xreadoff = (xreadlen / 512) - 1;

  nthreads = 250;
  if (argc >= 3) {
    nthreads = atoi(argv[2]);
    if (nthreads < 1 || nthreads > 1000)
	nthreads = 1;
  }

  fd = writefile();

#if 1
  sigemptyset(&sigs_to_block);
  sigaddset(&sigs_to_block, SIGUSR1);
  pthread_sigmask(SIG_BLOCK, &sigs_to_block, NULL);
#endif

  srandom(time(NULL));

  gettimeofday(&stime, NULL);
  report = stime;
  report.tv_sec++;
  for (cnt = 0; cnt < nthreads; cnt++) {
    pthread_create(&curthread, NULL, readthread, (void *) fd);
  }
  
  while (1) {
#if 0
    sleep(1); /* XXX: Does not work */
#else
    gettimeofday(&now, NULL);
    if (now.tv_sec < report.tv_sec ||
	(now.tv_sec == report.tv_sec &&
	 now.tv_usec < report.tv_usec)) {
      if (report.tv_usec >= now.tv_usec) {
	tvsel.tv_sec = report.tv_sec - now.tv_sec;
	tvsel.tv_usec = report.tv_usec - now.tv_usec;
      } else {
	tvsel.tv_sec = report.tv_sec -now.tv_sec - 1;
	tvsel.tv_usec = report.tv_usec + 1000000 - now.tv_usec;
      }
      select(1, NULL, NULL, NULL, &tvsel);
      continue;
    }
    report.tv_sec++;
#endif
    
    gettimeofday(&now, NULL);
    if (now.tv_usec >= stime.tv_usec) {
      delta.tv_sec = now.tv_sec - stime.tv_sec;
      delta.tv_usec = now.tv_usec - stime.tv_usec;
    } else {
      delta.tv_sec = now.tv_sec - stime.tv_sec - 1;
      delta.tv_usec = now.tv_usec + 1000000 - stime.tv_usec;
    }
    fdelta = delta.tv_sec + ((double) delta.tv_usec) / 1000000.0;
    pthread_mutex_lock(&cntmutex);
    startcntcopy = startreadcnt;
    donecntcopy = donereadcnt;
    pthread_mutex_unlock(&cntmutex);
    rate = (double) donecntcopy / (double) fdelta;
    mbrate = (double) donereadbytes / ((double) (fdelta) * 1048576.0);
    printf("%d(+%d) read operations time=%6.3f, rate=%6.3f tps/s, %6.3f MB/s\n",
	   donecntcopy, startcntcopy - donecntcopy,
	   fdelta, rate, mbrate);
#ifndef LINUXTHREADS
    printf("lat=%d.%06d\n", maxlat.tv_sec, maxlat.tv_usec);
#endif
    fflush(stdout);

  }
}

----Next_Part(Thu_May_17_01:30:16_2001)----

To Unsubscribe: send mail to majordomo@FreeBSD.org
with "unsubscribe freebsd-arch" in the body of the message




Want to link to this message? Use this URL: <https://mail-archive.FreeBSD.org/cgi/mid.cgi?200105162331.BAA04708>