diff --git a/io.h b/io.h index 4736a32..8482282 100644 --- a/io.h +++ b/io.h @@ -186,8 +186,8 @@ unsigned int io_debugstring(int64 s,char* buf,unsigned int bufsize); enum { SLOTS=128 }; typedef struct iomux { int ctx; - int working; /* used to synchronize who is filling the queue */ - unsigned int h,l; /* high, low */ + volatile int working; /* used to synchronize who is filling the queue */ + volatile unsigned int h,l; /* high, low */ struct { int fd, events; } q[SLOTS]; diff --git a/io/iom_wait.c b/io/iom_wait.c index ff59990..5510ada 100644 --- a/io/iom_wait.c +++ b/io/iom_wait.c @@ -27,16 +27,22 @@ int iom_wait(iomux_t* c,int64* s,unsigned int* revents,unsigned long timeout) { if (c->working==-2) return -2; /* iomux was aborted */ for (;;) { // CAS-loop get the first element from the queue - unsigned int f=c->l; + unsigned int f=c->l; // c is a ring buffer, c->l is low, c->h is high + // f is here to prevent double fetches from the volatile low water mark if (f == c->h) break; /* no elements in queue */ - int n=(f+1)%SLOTS; + // We want to grab the first element but other threads might be + // racing us. So first grab the event from the low water mark in + // the ring buffer, then increment the low water mark atomically, + // and if that worked we know we grabbed the right event. + int n=(f+1)%SLOTS; // next value for c->l - *s=c->q[f].fd; + *s=c->q[f].fd; // grab event we think is next in line *revents=c->q[f].events; + // now atomically increment low water mark if (__sync_bool_compare_and_swap(&c->l,f,n)) { - /* we got one */ + /* Nobody snatched the event from us. Report success */ return 1; } /* collided with another thread, try again */ @@ -45,15 +51,17 @@ int iom_wait(iomux_t* c,int64* s,unsigned int* revents,unsigned long timeout) { /* The queue was empty. If someone else is already calling * epoll_wait/kevent, then use the semaphore */ if (__sync_bool_compare_and_swap(&c->working,0,1)) { - /* race avoidance: another thread could have come in between and - * refilled the job queue */ + /* If we get here, we got the lock and no other thread is + * attempting to fill the queue at the same time. However, + * another thread could have interrupted and refilled the job + * queue already, so check if that happened. */ if (c->h != c->l) { /* set working back to 0 unless someone set it to -2 in the mean time (iom_abort) */ if (__sync_val_compare_and_swap(&c->working,1,0)==-2) return -2; continue; // this is why we have an outer for loop, so we don't need goto here } - /* we have the job to fill the struct. */ + /* We have the job to fill the struct. */ #ifdef HAVE_EPOLL struct epoll_event ee[SLOTS]; @@ -88,9 +96,12 @@ int iom_wait(iomux_t* c,int64* s,unsigned int* revents,unsigned long timeout) { * still need to signal the semaphore below */ } else { /* The CAS loop on c->working above ensures we are the only one writing to c->h */ - c->q[c->h].fd=ee[i].data.fd; - c->q[c->h].events=e; - c->h = (c->h + 1) % SLOTS; + size_t hcapture = __atomic_load_n(&c->h, __ATOMIC_ACQUIRE); // c->h is volatile so make copy to perf-avoid double fetch + c->q[hcapture].fd=ee[i].data.fd; + c->q[hcapture].events=e; +// c->h = (hcapture + 1) % SLOTS; + // use __atomic_store so ARM hardware writes c->q before c->h + __atomic_store_n(&c->h, (hcapture + 1) % SLOTS, __ATOMIC_RELEASE); } } #elif defined(HAVE_KQUEUE) @@ -122,9 +133,12 @@ int iom_wait(iomux_t* c,int64* s,unsigned int* revents,unsigned long timeout) { * still need to signal the semaphore below */ } else { /* The CAS loop on c->working above ensures we are the only one writing to c->h */ - c->q[c->h].fd=kev[i].ident; - c->q[c->h].events=e; - c->h = (c->h + 1) % SLOTS; + size_t hcapture = __atomic_load_n(&c->h, __ATOMIC_ACQUIRE); // c->h is volatile so make copy to perf-avoid double fetch + c->q[hcapture].fd=kev[i].ident; + c->q[hcapture].events=e; +// c->h = (c->h + 1) % SLOTS; + // use __atomic_store so ARM hardware writes c->q before c->h + __atomic_store_n(&c->h, (hcapture + 1) % SLOTS, __ATOMIC_RELEASE); } } #else