module System.Miniplex.Sink (
Sink,
create,
write,
destroy,
withSink
) where
import System.Miniplex.Sekrit
import Prelude hiding (catch)
import Data.Typeable
import Data.List
import Control.Concurrent
import Control.Concurrent.STM
import Control.Exception
import Control.Monad
import Control.Monad.Fix
import System.Directory
import qualified Network as N
import Network.Socket
data Rend a = EOF | Empty | Some a
data Sink = Sink {
name :: !String,
sock :: !Socket,
ctrl :: !(TVar [TVar (Rend String)]),
acct :: !ThreadId,
zmsg :: !(TVar (Maybe String))
} deriving (Typeable)
loop :: (Monad m) => m Bool -> m ()
loop m = fix $ \self -> m >>= flip when self
xsend :: Socket -> String -> IO ()
xsend s str =
reallySend s (bytesFromInt (length str) ++ str)
finallyClose :: Socket -> IO a -> IO a
finallyClose s x = x `finally` sClose s
forkClose :: Socket -> IO () -> IO ThreadId
forkClose s x =
forkIO $ finallyClose s $ handleJust ioErrors (const $ return ()) x
barghest :: Socket -> TVar [TVar (Rend String)] -> TVar (Maybe String) -> IO ()
barghest s cp zp = loop conn
where
conn = block $ do
(t, _) <- accept s
forkClose t $ unblock $ do
v <- atomically (newTVar Empty)
spawn t v
return True
spawn t v = do
shutdown t ShutdownReceive
z <- atomically $ readTVar zp
maybe (return ()) (xsend t) z
block $ do
atomically $ do
vs <- readTVar cp
writeTVar cp (v : vs)
unblock (loop body) `finally`
atomically (readTVar cp >>= writeTVar cp . delete v)
where
body = join . atomically $ do
x <- readTVar v
case x of
EOF -> return (return False)
Empty -> retry
Some str -> do
writeTVar v Empty
return $ xsend t str >> return True
create :: String -> IO Sink
create what = do
n <- pathFromTag "System.Miniplex.Sink.create" what
s <- N.listenOn (N.UnixSocket n)
closeOnExec s
vs <- atomically $ newTVar []
z <- atomically $ newTVar Nothing
t <- forkClose s (barghest s vs z)
return $ Sink
{ name = n
, sock = s
, ctrl = vs
, acct = t
, zmsg = z
}
write :: Sink -> String -> IO ()
write si str =
atomically $ do
writeTVar (zmsg si) (Just str)
vs <- readTVar (ctrl si)
mapM_ (flip writeTVar msg) vs
where
msg = Some str
destroy :: Sink -> IO ()
destroy si = do
removeFile (name si)
killThread (acct si)
atomically $ do
vs <- readTVar c
mapM_ (flip writeTVar EOF) vs
writeTVar c []
where
c = ctrl si
withSink :: String -> (Sink -> IO a) -> IO a
withSink t f = block $ do
si <- create t
unblock (f si) `finally` destroy si