(* Example: reader-writer locks with capabilities *) module type RW_LOCK = sig type ('a,'b) array type read type write type 'b@'c : A val new : int -> 'a -> ex 'b. ('a, 'b) array (* build is more convenient than new, but it would take more space * in the paper. *) val build : int -> (int -> 'a) -> ex 'b. ('a, 'b) array val acquireR : ('a,'b) array -> 'b@read val acquireW : ('a,'b) array -> 'b@write val release : ('a,'b) array -> 'b@'c -> unit * unit val get : ('a,'b) array -> int -> 'b@'c -> 'a * 'b@'c val set : ('a,'b) array -> int -> 'a -> 'b@write -> unit * 'b@write (* We added (unit * _) to result types of release and get to support * the imperative variable notation. *) end #load "libqueue" #load "libarray" module A = Array module RWLock : RW_LOCK = struct (* We keep a queue of waiting readers and writers blocked on mvars. * We maintain the invariant that if read-only capabilities are * outstanding, the either the queue is empty or the head of the * queue is either a writer. We don't allow readers to jump ahead in * line, because that could starve writers. *) type queue = (unit MVar.mvar + unit MVar.mvar) Queue.queue (* The lock state is synchronized by an mvar. We keep the queue and * an integer, which tells us what capabilites are outstanding: * - -1 for read-write * - 0 for nothing outstanding * - N >= 0 for N readers *) type lock = (queue * int) MVar.mvar type ('a,'t) array = 'a A.array * lock type read type write type 't@'m = unit let new (size: int) (init: 'a) = (A.new size init, MVar.new ((Queue.empty : queue), 0)) let build (size: int) (builder: int -> 'a) = (A.build size builder, MVar.new ((Queue.empty : queue), 0)) (* To see what's happening, uncomment the rest of show. *) let show (who: string) ((q, count): queue * int) = () (* ; putStr ("[" ^ who ^ "] count: " ^ string_of_int count ^ " "); let rec loop (q: queue) : unit = match Queue.dequeueA q with | None -> putStr "\n" | Some (Left _, q) -> putStr "R"; loop q | Some (Right _, q) -> putStr "W"; loop q in loop q; *) let showL (who: string) (lock: lock) = let (q, count) = MVar.take lock in show who (q, count); MVar.put lock (q, count) (* After the queue has changed, wake restores our queue invariant * described above. *) let wake (lock: lock) = showL "wake" lock; let rec wakeReaders (q: queue) (count: int) : unit = show "wakeReaders" (q, count); match Queue.dequeueA q with | Some (Left reader, q) -> MVar.put reader (); wakeReaders q (count + 1) | _ -> MVar.put lock (q, count); show "endWR" (q, count) in match MVar.take lock with | (q, -1) -> MVar.put lock (q, -1) | (q, 0) -> (match Queue.dequeueA q with | None -> MVar.put lock (q, 0) | Some (Right writer, q) -> MVar.put writer (); MVar.put lock (q, -1) | _ -> wakeReaders q 0) | (q, count) -> wakeReaders q count (* acquireR creates an mvar for the requesting reader to wait on and * adds it to the tail of the queue. It calls wake to process the * queue and then waits in the mvar. *) let acquireR ((rep, lock) : ('a,'t) array) = let (q, count) = MVar.take lock in show "acquireR" (q, count); let wait = MVar.newEmpty[unit] () in MVar.put lock (Queue.enqueue (Left wait) q, count); wake lock; MVar.take wait (* Same idea as acquireR -- could probably refactor. *) let acquireW ((rep, lock) : ('a,'t) array) = let (q, count) = MVar.take lock in show "acquireW" (q, count); let wait = MVar.newEmpty[unit] () in MVar.put lock (Queue.enqueue (Right wait) q, count); wake lock; MVar.take wait (* We don't need separate releaseR and releaseW because the count has * enough information to figure out what kind of release is happening. * We update the counter and then call wake to process the queue. *) let release ((rep, lock) : ('a,'t) array) _ = let (q, count) = MVar.take lock in show "release" (q, count); let count' = if count > 1 then count - 1 else 0 in MVar.put lock (q, count'); (wake lock, ()) let get ((rep, _) : ('a,'t) array) (ix: int) () = (A.get rep ix, ()) let set ((rep, _) : ('a,'t) array) (ix: int) (v: 'a) () = (A.set rep ix v, ()) end (* Try * RWLockTest.go n * to create n random readers and writers that all attempt to * acquire the lock. Once acquired, they perform an array operation, * sleep a bit, and then check that the array hasn't changed while * they looked away. * * Currently we create writers with probably 1/8 so * that we can see a lot of reader overlap, though other values may be * interesting as well. *) module RWLockTest = struct open RWLock let makeCounter () = let counter = MVar.new 0 in fun () -> let count = MVar.take counter in MVar.put counter (count + 1); count let delay () = Thread.delay 250000 let reader (me: int) (a: (int,'t) array) = Future.new (fun () -> putStrLn ("reader " ^ string_of_int me ^ ": waiting"); let !cap = acquireR a in putStrLn ("reader " ^ string_of_int me ^ ": acquired"); let n = get a 0 cap in delay (); let m = get a 0 cap in putStrLn ("reader " ^ string_of_int me ^ ": releasing"); release a cap; if n != m then failwith "reader: meh" else ()) let writer (me: int) (a: (int,'t) array) = Future.new (fun () -> putStrLn ("writer " ^ string_of_int me ^ ": waiting"); let !cap = acquireW a in putStrLn ("writer " ^ string_of_int me ^ ": acquired"); set a 0 me cap; delay (); let me' = get a 0 cap in putStrLn ("writer " ^ string_of_int me ^ ": releasing"); release a cap; if me != me' then failwith "writer: meh" else ()) let go (iters: int) = let next = makeCounter () in let ('t,a) = build 10 (fun x:int -> x) in let rec start (n: int) : U Future.future list = if n < 1 then Nil[any] else Cons (if random_int () % 8 == 0 then writer (next ()) a else reader (next ()) a, start (n - 1)) in let rec stop (fs: U Future.future list) : unit = match fs with | Nil -> () | Cons(f, fs) -> Future.sync f; stop fs in stop (start iters) end