{-# OPTIONS_GHC -fglasgow-exts #-}

-- | Asynchronous message broadcasting.
module System.Miniplex.Sink (
-- * Caveat
--
-- | This module performs writes on sockets that may be closed at any time,
-- triggering @SIGPIPE@ signals. It is therefore recommended that you call
-- @'System.Posix.Signals.installHandler' 'System.Posix.Signals.openEndedPipe'
-- 'System.Posix.Signals.Ignore' 'Nothing'@ at the beginning of your
-- program.
    Sink,
    create,
    write,
    destroy,
    withSink
) where

import System.Miniplex.Sekrit
import Prelude hiding (catch)
import Data.Typeable
import Data.List
import Control.Concurrent
import Control.Concurrent.STM
import Control.Exception
import Control.Monad
import Control.Monad.Fix
import System.Directory
import System.IO.Lock
import System.Posix.IO
import qualified Network as N
import Network.Socket

data Rend a = EOF | Empty | Some a

data Sink = Sink {
    name :: !String,
    sock :: !Socket,
    ctrl :: !(TVar [TVar (Rend String)]),
    acct :: !ThreadId,
    zmsg :: !(TVar (Maybe String))
} deriving (Typeable)

loop :: (Monad m) => m Bool -> m ()
loop m = fix $ \self -> m >>= flip when self

xsend :: Socket -> String -> IO ()
xsend s str =
    {- bracket
        (installHandler openEndedPipe Ignore Nothing)
        (\h -> installHandler openEndedPipe h Nothing)
        (\_ -> -} reallySend s (bytesFromInt (length str) ++ str) {- ) -}

finallyClose :: Socket -> IO a -> IO a
finallyClose s x = x `finally` sClose s

forkClose :: Socket -> IO () -> IO ThreadId
forkClose s x =
    forkIO $ finallyClose s $ handleJust ioErrors (const $ return ()) x

barghest :: Socket -> TVar [TVar (Rend String)] -> TVar (Maybe String) -> IO ()
barghest s cp zp = loop conn
    where
    conn = block $ do
        (t, _) <- accept s
        forkClose t $ unblock $ do
            v <- atomically (newTVar Empty)
            spawn t v
        return True
    spawn t v = do
        shutdown t ShutdownReceive
        z <- atomically $ readTVar zp
        maybe  (return ()) (xsend t) z
        block $ do
            atomically $ do
                vs <- readTVar cp
                writeTVar cp (v : vs)
            unblock (loop body) `finally`
                atomically (readTVar cp >>= writeTVar cp . delete v)
        where
        body = join . atomically $ do
            x <- readTVar v
            case x of
                EOF -> return (return False)
                Empty -> retry
                Some str -> do
                    writeTVar v Empty
                    return $ xsend t str >> return True

-- | @'create' tag@ creates a message sink. @tag@ is used to uniquely
-- identify this sink. The set of allowed characters for @tag@ includes
-- letters, digits, @_@, @-@ and @+@.
--
-- (Implementation detail: This function actually creates a named socket
-- in your @~\/.miniplex\/@.)
create :: String -> IO Sink
create what = do
    (com, lck, ret) <- pathFromTag "System.Miniplex.Sink.create" what
    bracketOnError (N.listenOn (N.UnixSocket com)) sClose $ \s -> do
    closeOnExec s
    bracket
        (openFd lck WriteOnly (Just mode644) defaultFileFlags)
        (\fd -> closeFd fd)
        $ \lf -> do
    bracket (setLockAll lf LockWrite) unLock $ \_ -> do
    brush ret
    vs <- atomically $ newTVar []
    z <- atomically $ newTVar Nothing
    t <- forkClose s (barghest s vs z)
    return $ Sink
        { name = com
        , sock = s
        , ctrl = vs
        , acct = t
        , zmsg = z
        }

brush :: String -> IO ()
brush file = do
    flip finally (del file) $ do
    r <- tryJust enoents $ openFd file WriteOnly Nothing defaultFileFlags{ nonBlock = True }
    case r of
        Left _ -> return ()
        Right fd -> do
            flip finally (closeFd fd) $ do
            fdWrite fd "\0"
            return ()
    where
    del x = handleJust ioErrors (const $ return ()) $ removeFile x

-- | @'write' si msg@ asynchronously writes @msg@ to @si@, where it will be
-- received by all currently connected readers.
write :: Sink -> String -> IO ()
write si str =
    atomically $ do
        writeTVar (zmsg si) (Just str)
        vs <- readTVar (ctrl si)
        mapM_ (flip writeTVar msg) vs
    where
    msg = Some str

-- | Deallocates a sink. The destroyed sink must not be used again.
--
-- (Implementation detail: This function closes and removes the socket from
-- the file system. If you forget to call it, you'll leave a stale entry in
-- @~\/.miniplex\/@, which will cause calls to 'create' with the same tag to
-- fail.)
destroy :: Sink -> IO ()
destroy si = do
    removeFile (name si)
    killThread (acct si)
    atomically $ do
        vs <- readTVar c
        mapM_ (flip writeTVar EOF) vs
        writeTVar c []
    where
    c = ctrl si

-- | Helper function to simplify resource handling. @'withSink' tag body@
-- creates a sink, calls @body@, then destroys the sink, even if @body@
-- throws an exception.
withSink :: String -> (Sink -> IO a) -> IO a
withSink t f = block $ do
    si <- create t
    unblock (f si) `finally` destroy si