{-# OPTIONS_GHC -fglasgow-exts #-}

-- | Asynchronous message broadcasting.
module System.Miniplex.Sink (
-- * Caveat
--
-- | This module performs writes on sockets that may be closed at any time,
-- triggering @SIGPIPE@ signals. It is therefore recommended that you call
-- @'System.Posix.Signals.installHandler' 'System.Posix.Signals.openEndedPipe'
-- 'System.Posix.Signals.Ignore' 'Nothing'@ at the beginning of your
-- program.
    Sink,
    create,
    write,
    destroy,
    withSink
) where

import System.Miniplex.Sekrit
import Prelude hiding (catch)
import Data.Typeable
import Data.List
import Control.Concurrent
import Control.Concurrent.STM
import Control.Exception
import Control.Monad
import Control.Monad.Fix
import System.Directory
import qualified Network as N
import Network.Socket

data Rend a = EOF | Empty | Some a

data Sink = Sink {
    name :: !String,
    sock :: !Socket,
    ctrl :: !(TVar [TVar (Rend String)]),
    acct :: !ThreadId,
    zmsg :: !(TVar (Maybe String))
} deriving (Typeable)

loop :: (Monad m) => m Bool -> m ()
loop m = fix $ \self -> m >>= flip when self

xsend :: Socket -> String -> IO ()
xsend s str =
    {- bracket
        (installHandler openEndedPipe Ignore Nothing)
        (\h -> installHandler openEndedPipe h Nothing)
        (\_ -> -} reallySend s (bytesFromInt (length str) ++ str) {- ) -}

finallyClose :: Socket -> IO a -> IO a
finallyClose s x = x `finally` sClose s

forkClose :: Socket -> IO () -> IO ThreadId
forkClose s x =
    forkIO $ finallyClose s $ handleJust ioErrors (const $ return ()) x

barghest :: Socket -> TVar [TVar (Rend String)] -> TVar (Maybe String) -> IO ()
barghest s cp zp = loop conn
    where
    conn = block $ do
        (t, _) <- accept s
        forkClose t $ unblock $ do
            v <- atomically (newTVar Empty)
            spawn t v
        return True
    spawn t v = do
        shutdown t ShutdownReceive
        z <- atomically $ readTVar zp
        maybe  (return ()) (xsend t) z
        block $ do
            atomically $ do
                vs <- readTVar cp
                writeTVar cp (v : vs)
            unblock (loop body) `finally`
                atomically (readTVar cp >>= writeTVar cp . delete v)
        where
        body = join . atomically $ do
            x <- readTVar v
            case x of
                EOF -> return (return False)
                Empty -> retry
                Some str -> do
                    writeTVar v Empty
                    return $ xsend t str >> return True

-- | @'create' tag@ creates a message sink. @tag@ is used to uniquely
-- identify this sink. The set of allowed characters for @tag@ includes
-- letters, digits, @_@, @-@ and @+@.
--
-- (Implementation detail: This function actually creates a named socket
-- in your @~\/.miniplex\/@.)
create :: String -> IO Sink
create what = do
    n <- pathFromTag "System.Miniplex.Sink.create" what
    s <- N.listenOn (N.UnixSocket n)
    closeOnExec s
    vs <- atomically $ newTVar []
    z <- atomically $ newTVar Nothing
    t <- forkClose s (barghest s vs z)
    return $ Sink
        { name = n
        , sock = s
        , ctrl = vs
        , acct = t
        , zmsg = z
        }

-- | @'write' si msg@ asynchronously writes @msg@ to @si@, where it will be
-- received by all currently connected readers.
write :: Sink -> String -> IO ()
write si str =
    atomically $ do
        writeTVar (zmsg si) (Just str)
        vs <- readTVar (ctrl si)
        mapM_ (flip writeTVar msg) vs
    where
    msg = Some str

-- | Deallocates a sink. The destroyed sink must not be used again.
--
-- (Implementation detail: This function closes and removes the socket from
-- the file system. If you forget to call it, you'll leave a stale entry in
-- @~\/.miniplex\/@, which will cause calls to 'create' with the same tag to
-- fail.)
destroy :: Sink -> IO ()
destroy si = do
    removeFile (name si)
    killThread (acct si)
    atomically $ do
        vs <- readTVar c
        mapM_ (flip writeTVar EOF) vs
        writeTVar c []
    where
    c = ctrl si

-- | Helper function to simplify resource handling. @'withSink' tag body@
-- creates a sink, calls @body@, then destroys the sink, even if @body@
-- throws an exception.
withSink :: String -> (Sink -> IO a) -> IO a
withSink t f = block $ do
    si <- create t
    unblock (f si) `finally` destroy si