Date: Thu, 4 Nov 2010 02:03:27 +0000 (UTC) From: David Xu <davidxu@FreeBSD.org> To: src-committers@freebsd.org, svn-src-user@freebsd.org Subject: svn commit: r214767 - in user/davidxu/libthr/sys: kern sys Message-ID: <201011040203.oA423RI5040582@svn.freebsd.org>
next in thread | raw e-mail | index | archive | help
Author: davidxu Date: Thu Nov 4 02:03:26 2010 New Revision: 214767 URL: http://svn.freebsd.org/changeset/base/214767 Log: To avoid thundering hurd problem on pthread_cond_broadcast, implement wait queue migration. Modified: user/davidxu/libthr/sys/kern/kern_umtx.c user/davidxu/libthr/sys/sys/umtx.h Modified: user/davidxu/libthr/sys/kern/kern_umtx.c ============================================================================== --- user/davidxu/libthr/sys/kern/kern_umtx.c Wed Nov 3 23:29:52 2010 (r214766) +++ user/davidxu/libthr/sys/kern/kern_umtx.c Thu Nov 4 02:03:26 2010 (r214767) @@ -91,6 +91,7 @@ struct umtx_key { uintptr_t b; } both; } info; + struct umtxq_chain * volatile chain; }; /* Priority inheritance mutex info. */ @@ -150,6 +151,8 @@ struct umtx_q { /* The queue we on */ struct umtxq_queue *uq_cur_queue; + + int uq_repair_mutex; }; TAILQ_HEAD(umtxq_head, umtx_q); @@ -160,6 +163,10 @@ struct umtxq_queue { struct umtx_key key; LIST_ENTRY(umtxq_queue) link; int length; + + int binding; + struct umutex *bind_mutex; + struct umtx_key bind_mkey; }; LIST_HEAD(umtxq_list, umtxq_queue); @@ -177,7 +184,7 @@ struct umtxq_chain { LIST_HEAD(, umtxq_queue) uc_spare_queue; /* Busy flag */ - char uc_busy; + volatile char uc_busy; /* Chain lock waiters */ int uc_waiters; @@ -220,10 +227,13 @@ static uma_zone_t umtx_pi_zone; static struct umtxq_chain umtxq_chains[2][UMTX_CHAINS]; static MALLOC_DEFINE(M_UMTX, "umtx", "UMTX queue memory"); static int umtx_pi_allocated; +static int umtx_cv_migrated; SYSCTL_NODE(_debug, OID_AUTO, umtx, CTLFLAG_RW, 0, "umtx debug"); SYSCTL_INT(_debug_umtx, OID_AUTO, umtx_pi_allocated, CTLFLAG_RD, &umtx_pi_allocated, 0, "Allocated umtx_pi"); +SYSCTL_INT(_debug_umtx, OID_AUTO, umtx_cv_migrated, CTLFLAG_RD, + &umtx_cv_migrated, 0, "Thread migrated"); static void umtxq_sysinit(void *); static void umtxq_hash(struct umtx_key *key); @@ -232,7 +242,9 @@ static void umtxq_lock(struct umtx_key * static void umtxq_unlock(struct umtx_key *key); static void umtxq_busy(struct umtx_key *key); static void umtxq_unbusy(struct umtx_key *key); -static void umtxq_insert_queue(struct umtx_q *uq, int q); +static void umtxq_insert_queue(struct umtx_q *, int); +static int umtxq_insert_queue2(struct umtx_q *, int, struct umutex *, + const struct umtx_key *); static void umtxq_remove_queue(struct umtx_q *uq, int q); static int umtxq_sleep(struct umtx_q *uq, const char *wmesg, int timo); static int umtxq_count(struct umtx_key *key); @@ -315,14 +327,30 @@ umtx_key_match(const struct umtx_key *k1 k1->info.both.b == k2->info.both.b); } +static inline void +umtx_key_copy(struct umtx_key *k1, const struct umtx_key *k2) +{ + k1->hash = k2->hash; + k1->type = k2->type; + k1->shared = k2->shared; + k1->info.both = k2->info.both; + k1->chain = k2->chain; +} + static inline struct umtxq_chain * -umtxq_getchain(struct umtx_key *key) +umtxq_calcchain(struct umtx_key *key) { if (key->type <= TYPE_SEM) return (&umtxq_chains[1][key->hash]); return (&umtxq_chains[0][key->hash]); } +static inline struct umtxq_chain * +umtxq_getchain(struct umtx_key *key) +{ + return (key->chain); +} + /* * Lock a chain. */ @@ -331,8 +359,14 @@ umtxq_lock(struct umtx_key *key) { struct umtxq_chain *uc; - uc = umtxq_getchain(key); - mtx_lock(&uc->uc_lock); + for (;;) { + uc = key->chain; + mtx_lock(&uc->uc_lock); + if (key->chain != uc) + mtx_unlock(&uc->uc_lock); + else + break; + } } /* @@ -341,10 +375,7 @@ umtxq_lock(struct umtx_key *key) static inline void umtxq_unlock(struct umtx_key *key) { - struct umtxq_chain *uc; - - uc = umtxq_getchain(key); - mtx_unlock(&uc->uc_lock); + mtx_unlock(&key->chain->uc_lock); } /* @@ -364,8 +395,10 @@ umtxq_busy(struct umtx_key *key) int count = BUSY_SPINS; if (count > 0) { umtxq_unlock(key); - while (uc->uc_busy && --count > 0) + while (uc->uc_busy && --count > 0) { cpu_spinwait(); + uc = key->chain; + } umtxq_lock(key); } } @@ -374,6 +407,9 @@ umtxq_busy(struct umtx_key *key) uc->uc_waiters++; msleep(uc, &uc->uc_lock, 0, "umtxqb", 0); uc->uc_waiters--; + mtx_unlock(&uc->uc_lock); + umtxq_lock(key); + uc = umtxq_getchain(key); } } uc->uc_busy = 1; @@ -414,19 +450,46 @@ umtxq_queue_lookup(struct umtx_key *key, static inline void umtxq_insert_queue(struct umtx_q *uq, int q) { + int error; + + error = umtxq_insert_queue2(uq, q, NULL, NULL); + MPASS(error == 0); +} + +static inline int +umtxq_insert_queue2(struct umtx_q *uq, int q, struct umutex *m, + const struct umtx_key *mkey) +{ struct umtxq_queue *uh; struct umtxq_chain *uc; uc = umtxq_getchain(&uq->uq_key); UMTXQ_LOCKED_ASSERT(uc); - KASSERT((uq->uq_flags & UQF_UMTXQ) == 0, ("umtx_q is already on queue")); + KASSERT((uq->uq_flags & UQF_UMTXQ) == 0, + ("umtx_q is already on queue")); uh = umtxq_queue_lookup(&uq->uq_key, q); if (uh != NULL) { + if (uh->binding) { + if (mkey == NULL || + !umtx_key_match(&uh->bind_mkey, mkey)) + return (EEXIST); + } else { + if (mkey != NULL) + return (EEXIST); + } LIST_INSERT_HEAD(&uc->uc_spare_queue, uq->uq_spare_queue, link); } else { uh = uq->uq_spare_queue; uh->key = uq->uq_key; LIST_INSERT_HEAD(&uc->uc_queue[q], uh, link); + uh->bind_mutex = m; + uh->length = 0; + if (mkey != NULL) { + uh->binding = 1; + uh->bind_mkey = *mkey; + } else { + uh->binding = 0; + } } uq->uq_spare_queue = NULL; @@ -434,7 +497,7 @@ umtxq_insert_queue(struct umtx_q *uq, in uh->length++; uq->uq_flags |= UQF_UMTXQ; uq->uq_cur_queue = uh; - return; + return (0); } static inline void @@ -458,6 +521,8 @@ umtxq_remove_queue(struct umtx_q *uq, in uh = LIST_FIRST(&uc->uc_spare_queue); KASSERT(uh != NULL, ("uc_spare_queue is empty")); LIST_REMOVE(uh, link); + uh->bind_mutex = NULL; + uh->binding = 0; } uq->uq_spare_queue = uh; uq->uq_cur_queue = NULL; @@ -558,9 +623,10 @@ umtxq_sleep(struct umtx_q *uq, const cha UMTXQ_LOCKED_ASSERT(uc); if (!(uq->uq_flags & UQF_UMTXQ)) return (0); - error = msleep(uq, &uc->uc_lock, PCATCH, wmesg, timo); + error = msleep(uq, &uc->uc_lock, PCATCH|PDROP, wmesg, timo); if (error == EWOULDBLOCK) error = ETIMEDOUT; + umtxq_lock(&uq->uq_key); return (error); } @@ -578,6 +644,7 @@ umtx_key_get(void *addr, int type, int s boolean_t wired; key->type = type; + key->chain = NULL; if (share == THREAD_SHARE) { key->shared = 0; key->info.private.vs = td->td_proc->p_vmspace; @@ -607,6 +674,7 @@ umtx_key_get(void *addr, int type, int s } umtxq_hash(key); + key->chain = umtxq_calcchain(key); return (0); } @@ -1209,7 +1277,11 @@ _do_lock_normal(struct thread *td, struc umtxq_unbusy(&uq->uq_key); if (old == owner) error = umtxq_sleep(uq, "umtxn", timo); - umtxq_remove(uq); + if ((uq->uq_flags & UQF_UMTXQ) != 0) { + umtxq_busy(&uq->uq_key); + umtxq_remove(uq); + umtxq_unbusy(&uq->uq_key); + } umtxq_unlock(&uq->uq_key); umtx_key_release(&uq->uq_key); } @@ -1618,12 +1690,14 @@ umtxq_sleep_pi(struct umtx_q *uq, struct mtx_unlock_spin(&umtx_lock); umtxq_unbusy(&uq->uq_key); - if (uq->uq_flags & UQF_UMTXQ) { + if ((uq->uq_flags & UQF_UMTXQ) != 0) { error = msleep(uq, &uc->uc_lock, PCATCH, wmesg, timo); if (error == EWOULDBLOCK) error = ETIMEDOUT; - if (uq->uq_flags & UQF_UMTXQ) { + if ((uq->uq_flags & UQF_UMTXQ) != 0) { + umtxq_busy(&uq->uq_key); umtxq_remove(uq); + umtxq_unbusy(&uq->uq_key); } } mtx_lock_spin(&umtx_lock); @@ -2342,30 +2416,104 @@ do_unlock_umutex(struct thread *td, stru } static int +set_contested_bit(struct umtx_key *mkey, struct umutex *m, + struct umtxq_queue *uhm, int repair) +{ + int do_wake; + int qlen = uhm->length; + uint32_t owner; + + do_wake = 0; + /* + * Set contested bit for mutex when necessary, so that userland + * mutex unlocker will wake up a waiter thread. + */ + owner = fuword32(__DEVOLATILE(uint32_t *, &m->m_owner)); + for (;;) { + if (owner == UMUTEX_UNOWNED) { + if (!repair && qlen == 1) { + do_wake = 1; + break; + } + if ((owner = casuword32(&m->m_owner, UMUTEX_UNOWNED, + UMUTEX_CONTESTED)) == UMUTEX_UNOWNED) { + do_wake = 1; + break; + } + } + if (owner == UMUTEX_CONTESTED) { + do_wake = 1; + break; + } + if ((owner & UMUTEX_CONTESTED) == 0) { + uint32_t old; + old = casuword32(&m->m_owner, owner, + owner|UMUTEX_CONTESTED); + if (old == owner) + break; + owner = old; + } else { + break; + } + } + return (do_wake); +} + +static int do_cv_wait(struct thread *td, struct ucond *cv, struct umutex *m, struct timespec *timeout, u_long wflags) { struct umtx_q *uq; + struct umtx_key mkey, *mkeyp; + struct umutex *bind_mutex; struct timeval tv; struct timespec cts, ets, tts; - uint32_t flags; + struct umtxq_chain *old_chain; + uint32_t flags, mflags; int error; uq = td->td_umtxq; flags = fuword32(&cv->c_flags); + mflags = fuword32(&m->m_flags); error = umtx_key_get(cv, TYPE_CV, GET_SHARE(flags), &uq->uq_key); if (error != 0) return (error); + if ((wflags & CVWAIT_BIND_MUTEX) != 0) { + if ((mflags & UMUTEX_PRIO_INHERIT) != 0) + return (EINVAL); + error = umtx_key_get(m, TYPE_NORMAL_UMUTEX, + GET_SHARE(mflags), &mkey); + if (error != 0) { + umtx_key_release(&uq->uq_key); + return (error); + } + if (mkey.shared == 0) + bind_mutex = m; + else + bind_mutex = NULL; + mkeyp = &mkey; + } else { + bind_mutex = NULL; + mkeyp = NULL; + } + + old_chain = uq->uq_key.chain; umtxq_lock(&uq->uq_key); umtxq_busy(&uq->uq_key); - umtxq_insert(uq); + error = umtxq_insert_queue2(uq, UMTX_SHARED_QUEUE, bind_mutex, mkeyp); + if (error != 0) { + umtxq_unbusy(&uq->uq_key); + umtxq_unlock(&uq->uq_key); + return (error); + } umtxq_unlock(&uq->uq_key); /* - * The magic thing is we should set c_has_waiters to 1 before - * releasing user mutex. + * Set c_has_waiters to 1 before releasing user mutex, also + * don't modify cache line when unnecessary. */ - suword32(__DEVOLATILE(uint32_t *, &cv->c_has_waiters), 1); + if (fuword32(__DEVOLATILE(uint32_t *, &cv->c_has_waiters)) == 0) + suword32(__DEVOLATILE(uint32_t *, &cv->c_has_waiters), 1); umtxq_lock(&uq->uq_key); umtxq_unbusy(&uq->uq_key); @@ -2375,11 +2523,7 @@ do_cv_wait(struct thread *td, struct uco umtxq_lock(&uq->uq_key); if (error == 0) { - if ((wflags & UMTX_CHECK_UNPARKING) && - (td->td_pflags & TDP_WAKEUP)) { - td->td_pflags &= ~TDP_WAKEUP; - error = EINTR; - } else if (timeout == NULL) { + if (timeout == NULL) { error = umtxq_sleep(uq, "ucond", 0); } else { getnanouptime(&ets); @@ -2400,17 +2544,63 @@ do_cv_wait(struct thread *td, struct uco } } } - if ((uq->uq_flags & UQF_UMTXQ) == 0) error = 0; else { - umtxq_remove(uq); + /* + * This must be timeout or interrupted by signal or + * surprious wakeup. + */ + umtxq_busy(&uq->uq_key); + if ((uq->uq_flags & UQF_UMTXQ) != 0) { + int oldlen = uq->uq_cur_queue->length; + umtxq_remove(uq); + if (oldlen == 1 && old_chain == uq->uq_key.chain) { + umtxq_unlock(&uq->uq_key); + suword32( + __DEVOLATILE(uint32_t *, + &cv->c_has_waiters), 0); + umtxq_lock(&uq->uq_key); + } + } + umtxq_unbusy(&uq->uq_key); if (error == ERESTART) error = EINTR; } - umtxq_unlock(&uq->uq_key); + + /* We were moved to mutex queue. */ + if (mkeyp != NULL && + old_chain != uq->uq_key.chain) { + /* + * cv_broadcast can not access the mutex if we are pshared, + * but it still migrate threads to mutex queue, + * we should repair contested bit here. + */ + if ((mflags & USYNC_PROCESS_SHARED) != 0 && uq->uq_repair_mutex) { + uint32_t owner = fuword32( + __DEVOLATILE(void *, &m->m_owner)); + if ((owner & UMUTEX_CONTESTED) == 0) { + struct umtxq_queue *uhm; + umtxq_lock(mkeyp); + umtxq_busy(mkeyp); + uhm = umtxq_queue_lookup(mkeyp, + UMTX_SHARED_QUEUE); + if (uhm != NULL) + set_contested_bit(mkeyp, m, uhm, 1); + umtxq_unbusy(mkeyp); + umtxq_unlock(mkeyp); + } + } + + error = 0; + } umtx_key_release(&uq->uq_key); + if (mkeyp != NULL) + umtx_key_release(mkeyp); + uq->uq_spare_queue->bind_mutex = NULL; + uq->uq_spare_queue->binding = 0; + uq->uq_repair_mutex = 0; return (error); } @@ -2427,6 +2617,7 @@ do_cv_signal(struct thread *td, struct u flags = fuword32(&cv->c_flags); if ((error = umtx_key_get(cv, TYPE_CV, GET_SHARE(flags), &key)) != 0) return (error); + umtxq_lock(&key); umtxq_busy(&key); cnt = umtxq_count(&key); @@ -2446,6 +2637,8 @@ do_cv_signal(struct thread *td, struct u static int do_cv_broadcast(struct thread *td, struct ucond *cv) { + struct umtxq_queue *uh, *uhm, *uh_temp; + struct umtxq_chain *uc, *ucm; struct umtx_key key; int error; uint32_t flags; @@ -2456,17 +2649,142 @@ do_cv_broadcast(struct thread *td, struc umtxq_lock(&key); umtxq_busy(&key); - umtxq_signal(&key, INT_MAX); - umtxq_unlock(&key); + uh = umtxq_queue_lookup(&key, UMTX_SHARED_QUEUE); + if (uh != NULL && uh->binding) { + /* + * To avoid thundering herd problem, if there are waiters, + * try to move them to mutex queue. + */ + struct umutex *bind_mutex = uh->bind_mutex; + struct umtx_key mkey; + struct umtx_q *uq; + int do_wake; + int len, oldlen; + + len = uh->length; + mkey = uh->bind_mkey; + uc = umtxq_getchain(&key); + ucm = umtxq_getchain(&mkey); + LIST_REMOVE(uh, link); - error = suword32(__DEVOLATILE(uint32_t *, &cv->c_has_waiters), 0); + /* + * Before busying mutex sleep-queue, we must unlock cv's + * sleep-queue mutex, because the mutex is unsleepable. + */ + umtxq_unlock(&key); - umtxq_lock(&key); - umtxq_unbusy(&key); - umtxq_unlock(&key); + umtxq_lock(&mkey); + umtxq_busy(&mkey); + umtxq_unlock(&mkey); + umtxq_lock(&key); + umtxq_lock(&mkey); + uhm = umtxq_queue_lookup(&mkey, UMTX_SHARED_QUEUE); - umtx_key_release(&key); - return (error); + /* Change waiter's key (include chain address). */ + TAILQ_FOREACH(uq, &uh->head, uq_link) { + umtx_key_copy(&uq->uq_key, &mkey); + if (uhm != NULL) + uq->uq_cur_queue = uhm; + } + if (uhm == NULL) { + /* + * Mutex has no waiters, just move the queue head to + * new chain. + */ + oldlen = 0; + uh->key = mkey; + uh->bind_mutex = NULL; + uh->binding = 0; + LIST_INSERT_HEAD(&ucm->uc_queue[UMTX_SHARED_QUEUE], + uh, link); + uhm = uh; + } else { + /* + * Otherwise, move cv waiters. + */ + oldlen = uhm->length; + TAILQ_CONCAT(&uhm->head, &uh->head, uq_link); + uhm->length += uh->length; + uh->length = 0; + uh->bind_mutex = NULL; + uh->binding = 0; + LIST_INSERT_HEAD(&ucm->uc_spare_queue, uh, link); + } + + /* + * At this point, cv's queue no longer needs to be accessed, + * NULL it. + */ + uh = NULL; + + /* + * One queue head has already been moved, we need to + * move (n - 1) free queue head to new chain. + */ + while (--len > 0) { + uh_temp = LIST_FIRST(&uc->uc_spare_queue); + LIST_REMOVE(uh_temp, link); + LIST_INSERT_HEAD(&ucm->uc_spare_queue, uh_temp, link); + } + + umtxq_unlock(&mkey); + umtxq_unlock(&key); + + /* Now, the cv does not have any waiter. */ + suword32(__DEVOLATILE(uint32_t *, &cv->c_has_waiters), 0); + + umtxq_lock(&key); + umtxq_unbusy(&key); + umtxq_unlock(&key); + umtx_key_release(&key); + + /* + * Wake one thread when necessary. if before the queue + * migration, there is thread on mutex queue, we don't + * need to wake up a thread, because the mutex contention + * bit should have already been set by other mutex locking + * code. + * For pshared mutex, because different process has different + * address even for same process-shared mutex! + * we don't know where the mutex is in our address space. + * In this situation, we let a thread resumed from cv_wait + * to repair the mutex contention bit. + * XXX Fixme! we should make the repairing thread runs as + * soon as possible, boost its priority. + */ + if (oldlen == 0) { + if (!mkey.shared) { + do_wake = set_contested_bit(&mkey, bind_mutex, + uhm, 0); + } else { + do_wake = 1; + } + } else { + do_wake = 0; + } + + umtxq_lock(&mkey); + if (do_wake) { + uq = TAILQ_FIRST(&uhm->head); + if (uq != NULL) { + if (mkey.shared) + uq->uq_repair_mutex = 1; + umtxq_signal_thread(uq); + } + } + umtxq_unbusy(&mkey); + umtxq_unlock(&mkey); + return (0); + } else { + umtxq_signal(&key, INT_MAX); + umtxq_unlock(&key); + suword32(__DEVOLATILE(uint32_t *, &cv->c_has_waiters), 0); + umtxq_lock(&key); + umtxq_unbusy(&key); + umtxq_unlock(&key); + umtx_key_release(&key); + } + return (0); } static int @@ -2839,7 +3157,9 @@ do_sem_wait(struct thread *td, struct _u umtxq_insert(uq); umtxq_unlock(&uq->uq_key); - suword32(__DEVOLATILE(uint32_t *, &sem->_has_waiters), 1); + /* Don't modify cacheline when unnecessary. */ + if (fuword32(__DEVOLATILE(uint32_t *, &sem->_has_waiters)) == 0) + suword32(__DEVOLATILE(uint32_t *, &sem->_has_waiters), 1); count = fuword32(__DEVOLATILE(uint32_t *, &sem->_count)); if (count != 0) { Modified: user/davidxu/libthr/sys/sys/umtx.h ============================================================================== --- user/davidxu/libthr/sys/sys/umtx.h Wed Nov 3 23:29:52 2010 (r214766) +++ user/davidxu/libthr/sys/sys/umtx.h Thu Nov 4 02:03:26 2010 (r214767) @@ -82,7 +82,9 @@ #define UMTX_OP_MAX 21 /* flags for UMTX_OP_CV_WAIT */ -#define UMTX_CHECK_UNPARKING 0x01 +#define CVWAIT_CHECK_UNPARKING 0x01 +#define CVWAIT_BIND_MUTEX 0x02 +#define UMTX_CHECK_UNPARKING _CVWAIT_CHECK_UNPARKING #ifndef _KERNEL
Want to link to this message? Use this URL: <https://mail-archive.FreeBSD.org/cgi/mid.cgi?201011040203.oA423RI5040582>