Skip to content

Commit 0e3b0fc

Browse files
authored
Thread scheduler for light weight concurrency.
1 parent 336119d commit 0e3b0fc

28 files changed

+1014
-305
lines changed

cont.c

Lines changed: 86 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -241,12 +241,17 @@ struct rb_fiber_struct {
241241
*/
242242
unsigned int transferred : 1;
243243

244+
/* Whether the fiber is allowed to implicitly yield. */
245+
unsigned int blocking : 1;
246+
244247
struct coroutine_context context;
245248
struct fiber_pool_stack stack;
246249
};
247250

248251
static struct fiber_pool shared_fiber_pool = {NULL, NULL, 0, 0, 0, 0};
249252

253+
static ID fiber_initialize_keywords[2] = {0};
254+
250255
/*
251256
* FreeBSD require a first (i.e. addr) argument of mmap(2) is not NULL
252257
* if MAP_STACK is passed.
@@ -1733,7 +1738,7 @@ fiber_alloc(VALUE klass)
17331738
}
17341739

17351740
static rb_fiber_t*
1736-
fiber_t_alloc(VALUE fiber_value)
1741+
fiber_t_alloc(VALUE fiber_value, unsigned int blocking)
17371742
{
17381743
rb_fiber_t *fiber;
17391744
rb_thread_t *th = GET_THREAD();
@@ -1746,6 +1751,7 @@ fiber_t_alloc(VALUE fiber_value)
17461751
fiber = ZALLOC(rb_fiber_t);
17471752
fiber->cont.self = fiber_value;
17481753
fiber->cont.type = FIBER_CONTEXT;
1754+
fiber->blocking = blocking;
17491755
cont_init(&fiber->cont, th);
17501756

17511757
fiber->cont.saved_ec.fiber_ptr = fiber;
@@ -1763,9 +1769,9 @@ fiber_t_alloc(VALUE fiber_value)
17631769
}
17641770

17651771
static VALUE
1766-
fiber_initialize(VALUE self, VALUE proc, struct fiber_pool * fiber_pool)
1772+
fiber_initialize(VALUE self, VALUE proc, struct fiber_pool * fiber_pool, unsigned int blocking)
17671773
{
1768-
rb_fiber_t *fiber = fiber_t_alloc(self);
1774+
rb_fiber_t *fiber = fiber_t_alloc(self, blocking);
17691775

17701776
fiber->first_proc = proc;
17711777
fiber->stack.base = NULL;
@@ -1793,17 +1799,66 @@ fiber_prepare_stack(rb_fiber_t *fiber)
17931799
sec->local_storage_recursive_hash_for_trace = Qnil;
17941800
}
17951801

1802+
static struct fiber_pool *
1803+
rb_fiber_pool_default(VALUE pool)
1804+
{
1805+
return &shared_fiber_pool;
1806+
}
1807+
1808+
/* :nodoc: */
1809+
static VALUE
1810+
rb_fiber_initialize_kw(int argc, VALUE* argv, VALUE self, int kw_splat)
1811+
{
1812+
VALUE pool = Qnil;
1813+
VALUE blocking = Qtrue;
1814+
1815+
if (kw_splat != RB_NO_KEYWORDS) {
1816+
VALUE options = Qnil;
1817+
VALUE arguments[2] = {Qundef};
1818+
1819+
argc = rb_scan_args_kw(kw_splat, argc, argv, ":", &options);
1820+
rb_get_kwargs(options, fiber_initialize_keywords, 0, 2, arguments);
1821+
1822+
blocking = arguments[0];
1823+
pool = arguments[1];
1824+
}
1825+
1826+
return fiber_initialize(self, rb_block_proc(), rb_fiber_pool_default(pool), RTEST(blocking));
1827+
}
1828+
17961829
/* :nodoc: */
17971830
static VALUE
17981831
rb_fiber_initialize(int argc, VALUE* argv, VALUE self)
17991832
{
1800-
return fiber_initialize(self, rb_block_proc(), &shared_fiber_pool);
1833+
return rb_fiber_initialize_kw(argc, argv, self, rb_keyword_given_p());
18011834
}
18021835

18031836
VALUE
18041837
rb_fiber_new(rb_block_call_func_t func, VALUE obj)
18051838
{
1806-
return fiber_initialize(fiber_alloc(rb_cFiber), rb_proc_new(func, obj), &shared_fiber_pool);
1839+
return fiber_initialize(fiber_alloc(rb_cFiber), rb_proc_new(func, obj), rb_fiber_pool_default(Qnil), 1);
1840+
}
1841+
1842+
static VALUE
1843+
rb_f_fiber_kw(int argc, VALUE* argv, int kw_splat)
1844+
{
1845+
rb_thread_t * th = GET_THREAD();
1846+
VALUE scheduler = th->scheduler;
1847+
VALUE fiber = Qnil;
1848+
1849+
if (scheduler != Qnil) {
1850+
fiber = rb_funcall_passing_block_kw(scheduler, rb_intern("fiber"), argc, argv, kw_splat);
1851+
} else {
1852+
rb_raise(rb_eRuntimeError, "No scheduler is available!");
1853+
}
1854+
1855+
return fiber;
1856+
}
1857+
1858+
static VALUE
1859+
rb_f_fiber(int argc, VALUE *argv, VALUE obj)
1860+
{
1861+
return rb_f_fiber_kw(argc, argv, rb_keyword_given_p());
18071862
}
18081863

18091864
static void rb_fiber_terminate(rb_fiber_t *fiber, int need_interrupt);
@@ -1820,6 +1875,10 @@ rb_fiber_start(void)
18201875
VM_ASSERT(th->ec == ruby_current_execution_context_ptr);
18211876
VM_ASSERT(FIBER_RESUMED_P(fiber));
18221877

1878+
if (fiber->blocking) {
1879+
th->blocking += 1;
1880+
}
1881+
18231882
EC_PUSH_TAG(th->ec);
18241883
if ((state = EC_EXEC_TAG()) == TAG_NONE) {
18251884
rb_context_t *cont = &VAR_FROM_MEMORY(fiber)->cont;
@@ -1892,6 +1951,7 @@ rb_threadptr_root_fiber_setup(rb_thread_t *th)
18921951
fiber->cont.type = FIBER_CONTEXT;
18931952
fiber->cont.saved_ec.fiber_ptr = fiber;
18941953
fiber->cont.saved_ec.thread_ptr = th;
1954+
fiber->blocking = 1;
18951955
fiber_status_set(fiber, FIBER_RESUMED); /* skip CREATED */
18961956
th->ec = &fiber->cont.saved_ec;
18971957
}
@@ -2044,11 +2104,15 @@ fiber_switch(rb_fiber_t *fiber, int argc, const VALUE *argv, int is_resume, int
20442104
}
20452105
}
20462106

2107+
VM_ASSERT(FIBER_RUNNABLE_P(fiber));
2108+
20472109
if (is_resume) {
20482110
fiber->prev = fiber_current();
20492111
}
20502112

2051-
VM_ASSERT(FIBER_RUNNABLE_P(fiber));
2113+
if (fiber_current()->blocking) {
2114+
th->blocking -= 1;
2115+
}
20522116

20532117
cont->argc = argc;
20542118
cont->kw_splat = kw_splat;
@@ -2060,6 +2124,10 @@ fiber_switch(rb_fiber_t *fiber, int argc, const VALUE *argv, int is_resume, int
20602124
fiber_stack_release(fiber);
20612125
}
20622126

2127+
if (fiber_current()->blocking) {
2128+
th->blocking += 1;
2129+
}
2130+
20632131
RUBY_VM_CHECK_INTS(th->ec);
20642132

20652133
EXEC_EVENT_HOOK(th->ec, RUBY_EVENT_FIBER_SWITCH, th->self, 0, 0, 0, Qnil);
@@ -2073,6 +2141,12 @@ rb_fiber_transfer(VALUE fiber_value, int argc, const VALUE *argv)
20732141
return fiber_switch(fiber_ptr(fiber_value), argc, argv, 0, RB_NO_KEYWORDS);
20742142
}
20752143

2144+
VALUE
2145+
rb_fiber_blocking_p(VALUE fiber)
2146+
{
2147+
return (fiber_ptr(fiber)->blocking == 0) ? Qfalse : Qtrue;
2148+
}
2149+
20762150
void
20772151
rb_fiber_close(rb_fiber_t *fiber)
20782152
{
@@ -2442,6 +2516,9 @@ Init_Cont(void)
24422516

24432517
fiber_pool_initialize(&shared_fiber_pool, stack_size, FIBER_POOL_INITIAL_SIZE, vm_stack_size);
24442518

2519+
fiber_initialize_keywords[0] = rb_intern_const("blocking");
2520+
fiber_initialize_keywords[1] = rb_intern_const("pool");
2521+
24452522
char * fiber_shared_fiber_pool_free_stacks = getenv("RUBY_SHARED_FIBER_POOL_FREE_STACKS");
24462523
if (fiber_shared_fiber_pool_free_stacks) {
24472524
shared_fiber_pool.free_stacks = atoi(fiber_shared_fiber_pool_free_stacks);
@@ -2452,11 +2529,14 @@ Init_Cont(void)
24522529
rb_eFiberError = rb_define_class("FiberError", rb_eStandardError);
24532530
rb_define_singleton_method(rb_cFiber, "yield", rb_fiber_s_yield, -1);
24542531
rb_define_method(rb_cFiber, "initialize", rb_fiber_initialize, -1);
2532+
rb_define_method(rb_cFiber, "blocking?", rb_fiber_blocking_p, 0);
24552533
rb_define_method(rb_cFiber, "resume", rb_fiber_m_resume, -1);
24562534
rb_define_method(rb_cFiber, "raise", rb_fiber_raise, -1);
24572535
rb_define_method(rb_cFiber, "to_s", fiber_to_s, 0);
24582536
rb_define_alias(rb_cFiber, "inspect", "to_s");
24592537

2538+
rb_define_global_function("Fiber", rb_f_fiber, -1);
2539+
24602540
#ifdef RB_EXPERIMENTAL_FIBER_POOL
24612541
rb_cFiberPool = rb_define_class("Pool", rb_cFiber);
24622542
rb_define_alloc_func(rb_cFiberPool, fiber_pool_alloc);

doc/fiber.rdoc

Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
= Fiber
2+
3+
Fiber is a flow-control primitive which enable cooperative scheduling. This is
4+
in contrast to threads which can be preemptively scheduled at any time. While
5+
having a similar memory profiles, the cost of context switching fibers can be
6+
significantly less than threads as it does not involve a system call.
7+
8+
== Design
9+
10+
=== Scheduler
11+
12+
The per-thread fiber scheduler interface is used to intercept blocking
13+
operations. A typical implementation would be a wrapper for a gem like
14+
EventMachine or Async. This design provides separation of concerns between the
15+
event loop implementation and application code. It also allows for layered
16+
schedulers which can perform instrumentation.
17+
18+
class Scheduler
19+
# Wait for the given file descriptor to become readable.
20+
def wait_readable(io)
21+
end
22+
23+
# Wait for the given file descriptor to become writable.
24+
def wait_writable(io)
25+
end
26+
27+
# Wait for the given file descriptor to match the specified events within
28+
# the specified timeout.
29+
# @param event [Integer] a bit mask of +IO::WAIT_READABLE+,
30+
# `IO::WAIT_WRITABLE` and `IO::WAIT_PRIORITY`.
31+
# @param timeout [#to_f] the amount of time to wait for the event.
32+
def wait_any(io, events, timeout)
33+
end
34+
35+
# Sleep the current task for the specified duration, or forever if not
36+
# specified.
37+
# @param duration [#to_f] the amount of time to sleep.
38+
def wait_sleep(duration = nil)
39+
end
40+
41+
# The Ruby virtual machine is going to enter a system level blocking
42+
# operation.
43+
def enter_blocking_region
44+
end
45+
46+
# The Ruby virtual machine has completed the system level blocking
47+
# operation.
48+
def exit_blocking_region
49+
end
50+
51+
# Intercept the creation of a non-blocking fiber.
52+
def fiber(&block)
53+
Fiber.new(blocking: false, &block)
54+
end
55+
56+
# Invoked when the thread exits.
57+
def run
58+
# Implement event loop here.
59+
end
60+
end
61+
62+
On CRuby, the following extra methods need to be implemented to handle the
63+
public C interface:
64+
65+
class Scheduler
66+
# Wrapper for rb_wait_readable(int) C function.
67+
def wait_readable_fd(fd)
68+
wait_readable(::IO.from_fd(fd, autoclose: false))
69+
end
70+
71+
# Wrapper for rb_wait_readable(int) C function.
72+
def wait_writable_fd(fd)
73+
wait_writable(::IO.from_fd(fd, autoclose: false))
74+
end
75+
76+
# Wrapper for rb_wait_for_single_fd(int) C function.
77+
def wait_for_single_fd(fd, events, duration)
78+
wait_any(::IO.from_fd(fd, autoclose: false), events, duration)
79+
end
80+
end
81+
82+
=== Non-blocking Fibers
83+
84+
By default fibers are blocking. Non-blocking fibers may invoke specific
85+
scheduler hooks when a blocking operation occurs, and these hooks may introduce
86+
context switching points.
87+
88+
Fiber.new(blocking: false) do
89+
puts Fiber.current.blocking? # false
90+
91+
# May invoke `Thread.scheduler&.wait_readable`.
92+
io.read(...)
93+
94+
# May invoke `Thread.scheduler&.wait_writable`.
95+
io.write(...)
96+
97+
# Will invoke `Thread.scheduler&.wait_sleep`.
98+
sleep(n)
99+
end.resume
100+
101+
We also introduce a new method which simplifes the creation of these
102+
non-blocking fibers:
103+
104+
Fiber do
105+
puts Fiber.current.blocking? # false
106+
end
107+
108+
The purpose of this method is to allow the scheduler to internally decide the
109+
policy for when to start the fiber, and whether to use symmetric or asymmetric
110+
fibers.
111+
112+
=== Mutex
113+
114+
Locking a mutex causes the +Thread#scheduler+ to not be used while the mutex
115+
is held by that thread. On +Mutex#lock+, fiber switching via the scheduler
116+
is disabled and operations become blocking for all fibers of the same +Thread+.
117+
On +Mutex#unlock+, the scheduler is enabled again.
118+
119+
mutex = Mutex.new
120+
121+
puts Thread.current.blocking? # 1 (true)
122+
123+
Fiber.new(blocking: false) do
124+
puts Thread.current.blocking? # false
125+
mutex.synchronize do
126+
puts Thread.current.blocking? # (1) true
127+
end
128+
129+
puts Thread.current.blocking? # false
130+
end.resume
131+
132+
=== Non-blocking I/O
133+
134+
By default, I/O is non-blocking. Not all operating systems support non-blocking
135+
I/O. Windows is a notable example where socket I/O can be non-blocking but pipe
136+
I/O is blocking. Provided that there *is* a scheduler and the current thread *is
137+
non-blocking*, the operation will invoke the scheduler.

ext/socket/ancdata.c

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
#include <time.h>
44

5-
int rsock_cmsg_cloexec_state = -1; /* <0: unknown, 0: ignored, >0: working */
65
static VALUE sym_wait_readable, sym_wait_writable;
76

87
#if defined(HAVE_STRUCT_MSGHDR_MSG_CONTROL)
@@ -1429,10 +1428,7 @@ make_io_for_unix_rights(VALUE ctl, struct cmsghdr *cmh, char *msg_end)
14291428
if (fstat(fd, &stbuf) == -1)
14301429
rb_raise(rb_eSocket, "invalid fd in SCM_RIGHTS");
14311430
rb_update_max_fd(fd);
1432-
if (rsock_cmsg_cloexec_state < 0)
1433-
rsock_cmsg_cloexec_state = rsock_detect_cloexec(fd);
1434-
if (rsock_cmsg_cloexec_state == 0 || fd <= 2)
1435-
rb_maygvl_fd_fix_cloexec(fd);
1431+
rb_maygvl_fd_fix_cloexec(fd);
14361432
if (S_ISSOCK(stbuf.st_mode))
14371433
io = rsock_init_sock(rb_obj_alloc(rb_cSocket), fd);
14381434
else

0 commit comments

Comments
 (0)