(* Example: reader-writer locks with capabilities *) module type RW_LOCK = sig type ('α,'β) array type read type write type 'β@'γ : A val new : int → 'α → ∃'β. ('α, 'β) array (* build is more convenient than new, but it would take more space * in the paper. *) val build : int → (int → 'α) → ∃'β. ('α, 'β) array val acquireR : ('α,'β) array → 'β@read val acquireW : ('α,'β) array → 'β@write val release : ('α,'β) array → 'β@'γ → unit × unit val get : ('α,'β) array → int → 'β@'γ → 'α × 'β@'γ val set : ('α,'β) array → int → 'α → 'β@write → unit × 'β@write (* We added (unit × _) to result types of release and get to support * the imperative variable notation. *) end #load "libqueue" #load "libarray" module A = Array module RWLock : RW_LOCK = struct (* We keep a queue of waiting readers and writers blocked on mvars. * We maintain the invariant that if read-only capabilities are * outstanding, the either the queue is empty or the head of the * queue is either a writer. We don't allow readers to jump ahead in * line, because that could starve writers. *) type queue = (unit MVar.mvar + unit MVar.mvar) Queue.queue (* The lock state is synchronized by an mvar. We keep the queue and * an integer, which tells us what capabilites are outstanding: * - -1 for read-write * - 0 for nothing outstanding * - N ≥ 0 for N readers *) type lock = (queue × int) MVar.mvar type ('α,'β) array = 'α A.array × lock type read type write type 'β@'γ = unit let new (size: int) (init: 'α) : ∃'β. ('α, 'β) array = (A.new size init, MVar.new ((Queue.empty : queue), 0)) let build (size: int) (builder: int → 'α) : ∃'β. ('α, 'β) array = (A.build size builder, MVar.new ((Queue.empty : queue), 0)) (* To see what's happening, uncomment the rest of show. *) let show (who: string) ((q, count): queue × int) = () (* ; putStr ("[" ^ who ^ "] count: " ^ string_of_int count ^ " "); let rec loop (q: queue) : unit = match Queue.dequeueA q with | None → putStr "\n" | Some (Left _, q) → putStr "R"; loop q | Some (Right _, q) → putStr "W"; loop q in loop q; *) let showL (who: string) (lock: lock) = let (q, count) = MVar.take lock in show who (q, count); MVar.put lock (q, count) (* After the queue has changed, wake restores our queue invariant * described above. *) let wake (lock: lock) = showL "wake" lock; let rec wakeReaders (q: queue) (count: int) : unit = show "wakeReaders" (q, count); match Queue.dequeueA q with | Some (Left reader, q) → MVar.put reader (); wakeReaders q (count + 1) | _ → MVar.put lock (q, count); show "endWR" (q, count) in match MVar.take lock with | (q, -1) → MVar.put lock (q, -1) | (q, 0) → (match Queue.dequeueA q with | None → MVar.put lock (q, 0) | Some (Right writer, q) → MVar.put writer (); MVar.put lock (q, -1) | _ → wakeReaders q 0) | (q, count) → wakeReaders q count (* acquireR creates an mvar for the requesting reader to wait on and * adds it to the tail of the queue. It calls wake to process the * queue and then waits in the mvar. *) let acquireR ((rep, lock) : ('α,'t) array) = let (q, count) = MVar.take lock in show "acquireR" (q, count); let wait = MVar.newEmpty () in MVar.put lock (Queue.enqueue (Left wait) q, count); wake lock; MVar.take wait (* Same idea as acquireR -- could probably refactor. *) let acquireW ((rep, lock) : ('α,'β) array) = let (q, count) = MVar.take lock in show "acquireW" (q, count); let wait = MVar.newEmpty () in MVar.put lock (Queue.enqueue (Right wait) q, count); wake lock; MVar.take wait (* We don't need separate releaseR and releaseW because the count has * enough information to figure out what kind of release is happening. * We update the counter and then call wake to process the queue. *) let release ((rep, lock) : ('α,'β) array) _ = let (q, count) = MVar.take lock in show "release" (q, count); let count' = if count > 1 then count - 1 else 0 in MVar.put lock (q, count'); (wake lock, ()) let get ((rep, _) : ('α,'β) array) (ix: int) () = (A.get rep ix, ()) let set ((rep, _) : ('α,'β) array) (ix: int) (v: 'α) () = (A.set rep ix v, ()) end (* Try * RWLockTest.go n * to create n random readers and writers that all attempt to * acquire the lock. Once acquired, they perform an array operation, * sleep a bit, and then check that the array hasn't changed while * they looked away. * * Currently we create writers with probably 1/8 so * that we can see a lot of reader overlap, though other values may be * interesting as well. *) module RWLockTest = struct open RWLock let makeCounter () = let counter = MVar.new 0 in fun () → let count = MVar.take counter in MVar.put counter (count + 1); count let delay () = Thread.delay 250000 let reader (me: int) (a: (int,'β) array) = Future.new (fun () → putStrLn ("reader " ^ string_of_int me ^ ": waiting"); let !cap = acquireR a in putStrLn ("reader " ^ string_of_int me ^ ": acquired"); let n = get a 0 cap in delay (); let m = get a 0 cap in putStrLn ("reader " ^ string_of_int me ^ ": releasing"); release a cap; if n != m then failwith "reader: meh" else ()) let writer (me: int) (a: (int,'β) array) = Future.new (fun () → putStrLn ("writer " ^ string_of_int me ^ ": waiting"); let !cap = acquireW a in putStrLn ("writer " ^ string_of_int me ^ ": acquired"); set a 0 me cap; delay (); let me' = get a 0 cap in putStrLn ("writer " ^ string_of_int me ^ ": releasing"); release a cap; if me != me' then failwith "writer: meh" else ()) let go (iters: int) = let next = makeCounter () in let a = build 10 (fun x → x) in let rec start (n: int) = if n < 1 then [] else (if random_int () % 8 == 0 then writer (next ()) a else reader (next ()) a) ∷ start (n - 1) in let rec stop fs = match fs with | [] → () | f ∷ fs → Future.sync f; stop fs in stop (start iters) end