/* Copyright Joyent, Inc. and other Node contributors. All rights reserved. * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to * deal in the Software without restriction, including without limitation the * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or * sell copies of the Software, and to permit persons to whom the Software is * furnished to do so, subject to the following conditions: * * The above copyright notice and this permission notice shall be included in * all copies or substantial portions of the Software. * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS * IN THE SOFTWARE. */ /* This file contains both the uv__async internal infrastructure and the * user-facing uv_async_t functions. */ #include "uv.h" #include "internal.h" #include "atomic-ops.h" #include #include /* snprintf() */ #include #include #include #include #include /* sched_yield() */ #ifdef __linux__ #include #endif static void uv__async_send(uv_loop_t* loop); static int uv__async_start(uv_loop_t* loop); int uv_async_init(uv_loop_t* loop, uv_async_t* handle, uv_async_cb async_cb) { int err; err = uv__async_start(loop); if (err) return err; uv__handle_init(loop, (uv_handle_t*)handle, UV_ASYNC); handle->async_cb = async_cb; handle->pending = 0; QUEUE_INSERT_TAIL(&loop->async_handles, &handle->queue); uv__handle_start(handle); return 0; } int uv_async_send(uv_async_t* handle) { /* Do a cheap read first. */ if (ACCESS_ONCE(int, handle->pending) != 0) return 0; /* Tell the other thread we're busy with the handle. */ if (cmpxchgi(&handle->pending, 0, 1) != 0) return 0; /* Wake up the other thread's event loop. */ uv__async_send(handle->loop); /* Tell the other thread we're done. */ if (cmpxchgi(&handle->pending, 1, 2) != 1) abort(); return 0; } /* Only call this from the event loop thread. */ static int uv__async_spin(uv_async_t* handle) { int i; int rc; for (;;) { /* 997 is not completely chosen at random. It's a prime number, acyclical * by nature, and should therefore hopefully dampen sympathetic resonance. */ for (i = 0; i < 997; i++) { /* rc=0 -- handle is not pending. * rc=1 -- handle is pending, other thread is still working with it. * rc=2 -- handle is pending, other thread is done. */ rc = cmpxchgi(&handle->pending, 2, 0); if (rc != 1) return rc; /* Other thread is busy with this handle, spin until it's done. */ cpu_relax(); } /* Yield the CPU. We may have preempted the other thread while it's * inside the critical section and if it's running on the same CPU * as us, we'll just burn CPU cycles until the end of our time slice. */ sched_yield(); } } void uv__async_close(uv_async_t* handle) { uv__async_spin(handle); QUEUE_REMOVE(&handle->queue); uv__handle_stop(handle); } static void uv__async_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) { char buf[1024]; ssize_t r; QUEUE queue; QUEUE* q; uv_async_t* h; assert(w == &loop->async_io_watcher); for (;;) { r = read(w->fd, buf, sizeof(buf)); if (r == sizeof(buf)) continue; if (r != -1) break; if (errno == EAGAIN || errno == EWOULDBLOCK) break; if (errno == EINTR) continue; abort(); } QUEUE_MOVE(&loop->async_handles, &queue); while (!QUEUE_EMPTY(&queue)) { q = QUEUE_HEAD(&queue); h = QUEUE_DATA(q, uv_async_t, queue); QUEUE_REMOVE(q); QUEUE_INSERT_TAIL(&loop->async_handles, q); if (0 == uv__async_spin(h)) continue; /* Not pending. */ if (h->async_cb == NULL) continue; h->async_cb(h); } } static void uv__async_send(uv_loop_t* loop) { const void* buf; ssize_t len; int fd; int r; buf = ""; len = 1; fd = loop->async_wfd; #if defined(__linux__) if (fd == -1) { static const uint64_t val = 1; buf = &val; len = sizeof(val); fd = loop->async_io_watcher.fd; /* eventfd */ } #endif do r = write(fd, buf, len); while (r == -1 && errno == EINTR); if (r == len) return; if (r == -1) if (errno == EAGAIN || errno == EWOULDBLOCK) return; abort(); } static int uv__async_start(uv_loop_t* loop) { int pipefd[2]; int err; if (loop->async_io_watcher.fd != -1) return 0; #ifdef __linux__ err = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK); if (err < 0) return UV__ERR(errno); pipefd[0] = err; pipefd[1] = -1; #else err = uv__make_pipe(pipefd, UV__F_NONBLOCK); if (err < 0) return err; #endif uv__io_init(&loop->async_io_watcher, uv__async_io, pipefd[0]); uv__io_start(loop, &loop->async_io_watcher, POLLIN); loop->async_wfd = pipefd[1]; return 0; } int uv__async_fork(uv_loop_t* loop) { if (loop->async_io_watcher.fd == -1) /* never started */ return 0; uv__async_stop(loop); return uv__async_start(loop); } void uv__async_stop(uv_loop_t* loop) { if (loop->async_io_watcher.fd == -1) return; if (loop->async_wfd != -1) { if (loop->async_wfd != loop->async_io_watcher.fd) uv__close(loop->async_wfd); loop->async_wfd = -1; } uv__io_stop(loop, &loop->async_io_watcher, POLLIN); uv__close(loop->async_io_watcher.fd); loop->async_io_watcher.fd = -1; }