module System.Miniplex.Sink (
Sink,
create,
write,
destroy,
withSink
) where
import System.Miniplex.Sekrit
import Prelude hiding (catch)
import Data.Typeable
import Data.List
import Control.Concurrent
import Control.Concurrent.STM
import Control.Exception
import Control.Monad
import Control.Monad.Fix
import System.Directory
import System.IO.Lock
import System.Posix.IO
import qualified Network as N
import Network.Socket
data Rend a = EOF | Empty | Some a
data Sink = Sink {
name :: !String,
sock :: !Socket,
ctrl :: !(TVar [TVar (Rend String)]),
acct :: !ThreadId,
zmsg :: !(TVar (Maybe String))
} deriving (Typeable)
loop :: (Monad m) => m Bool -> m ()
loop m = fix $ \self -> m >>= flip when self
xsend :: Socket -> String -> IO ()
xsend s str =
reallySend s (bytesFromInt (length str) ++ str)
finallyClose :: Socket -> IO a -> IO a
finallyClose s x = x `finally` sClose s
forkClose :: Socket -> IO () -> IO ThreadId
forkClose s x =
forkIO $ finallyClose s $ handleJust ioErrors (const $ return ()) x
barghest :: Socket -> TVar [TVar (Rend String)] -> TVar (Maybe String) -> IO ()
barghest s cp zp = loop conn
where
conn = block $ do
(t, _) <- accept s
forkClose t $ unblock $ do
v <- atomically (newTVar Empty)
spawn t v
return True
spawn t v = do
shutdown t ShutdownReceive
z <- atomically $ readTVar zp
maybe (return ()) (xsend t) z
block $ do
atomically $ do
vs <- readTVar cp
writeTVar cp (v : vs)
unblock (loop body) `finally`
atomically (readTVar cp >>= writeTVar cp . delete v)
where
body = join . atomically $ do
x <- readTVar v
case x of
EOF -> return (return False)
Empty -> retry
Some str -> do
writeTVar v Empty
return $ xsend t str >> return True
create :: String -> IO Sink
create what = do
(com, lck, ret) <- pathFromTag "System.Miniplex.Sink.create" what
bracketOnError (N.listenOn (N.UnixSocket com)) sClose $ \s -> do
closeOnExec s
bracket
(openFd lck WriteOnly (Just mode644) defaultFileFlags)
(\fd -> closeFd fd)
$ \lf -> do
bracket (setLockAll lf LockWrite) unLock $ \_ -> do
brush ret
vs <- atomically $ newTVar []
z <- atomically $ newTVar Nothing
t <- forkClose s (barghest s vs z)
return $ Sink
{ name = com
, sock = s
, ctrl = vs
, acct = t
, zmsg = z
}
brush :: String -> IO ()
brush file = do
flip finally (del file) $ do
r <- tryJust enoents $ openFd file WriteOnly Nothing defaultFileFlags{ nonBlock = True }
case r of
Left _ -> return ()
Right fd -> do
flip finally (closeFd fd) $ do
fdWrite fd "\0"
return ()
where
del x = handleJust ioErrors (const $ return ()) $ removeFile x
write :: Sink -> String -> IO ()
write si str =
atomically $ do
writeTVar (zmsg si) (Just str)
vs <- readTVar (ctrl si)
mapM_ (flip writeTVar msg) vs
where
msg = Some str
destroy :: Sink -> IO ()
destroy si = do
removeFile (name si)
killThread (acct si)
atomically $ do
vs <- readTVar c
mapM_ (flip writeTVar EOF) vs
writeTVar c []
where
c = ctrl si
withSink :: String -> (Sink -> IO a) -> IO a
withSink t f = block $ do
si <- create t
unblock (f si) `finally` destroy si