(* Asynchronous channels *) #load "libqueue" module AChan : sig type 'a achan val new : all 'a. unit -> 'a achan val recv : all 'a. 'a achan -> 'a val send : all 'a. 'a achan -> 'a -> unit val tryRecv : all 'a. 'a achan -> 'a option val trySend : all 'a. 'a achan -> 'a -> bool val size : all 'a. 'a achan -> int end = struct module Q = Queue module M = MVar type 'a mvar = 'a M.mvar type 'a queue = 'a Q.queue type 'a repr = Readers of 'a M.mvar queue | Writers of 'a queue type 'a achan = 'a repr M.mvar let new () = M.new (Writers Q.empty) let recv (mv : 'a achan) = let wait (readers : 'a mvar queue) = let reader = M.newEmpty () in (Readers (Q.enqueue reader readers), fun () -> M.take reader) in M.modify mv (fun repr -> match repr with | Readers readers -> wait readers | Writers writers -> match Q.dequeueA writers with | None -> wait Q.empty | Some (x, xs) -> (Writers xs, fun () -> x)) () let send (mv : 'a achan) (x : 'a) = M.modify_ mv (fun repr -> match repr with | Writers writers -> Writers (Q.enqueue x writers) | Readers readers -> match Q.dequeueA readers with | None -> Writers (Q.enqueue x Q.empty) | Some (reader, readers') -> M.put reader x; Readers readers') let tryRecv (mv : 'a achan) = M.modify mv (fun repr -> match repr with | Readers readers -> (repr, None) | Writers writers -> match Q.dequeueA writers with | None -> (repr, None) | Some (x, xs) -> (Writers xs, Some x)) (* Send always succeeds, but trySend succeeds only if there's a reader ready to receive the send. *) let trySend (mv : 'a achan) (x : 'a) = M.modify mv (fun repr -> match repr with | Writers writers -> (repr, false) | Readers readers -> match Q.dequeueA readers with | None -> (repr, false) | Some (reader, readers') -> M.put reader x; (Readers readers', true)) let size (mv : 'a achan) = match M.read mv with | Writers writers -> Q.size writers | Readers readers -> ~(Q.size readers) end