{-# OPTIONS_GHC -fglasgow-exts #-}

-- | Synchronous message receiving.
module System.Miniplex.Source (
    Source,
    attach,
    attachWait,
    read,
    getMsgs,
    detach,
    withSource,
    withSourceWait
) where

import System.Miniplex.Sekrit

import Prelude hiding (read, catch)

import Control.Concurrent
import Control.Exception
import Control.Monad
import Control.Monad.Fix
import Control.Monad.Reader ()

import Data.Typeable

import Network.Socket

import System.IO.Lock
import System.IO.Unsafe
import System.Posix.Files
import System.Posix.IO

newtype Source = Source {
    sock :: Socket
} deriving (Typeable)

-- | @'attach' tag@ returns a message source connected to the sink created by
-- a call to @'System.Miniplex.Sink.create' tag@. If no such sink exists, an
-- exception is thrown.
attach :: String -> IO Source
attach what = do
    (n, _, _) <- pathFromTag "System.Miniplex.Source.attach" what
    bracketOnError (socket AF_UNIX Stream 0) sClose $ \s -> do
        closeOnExec s
        connect s (SockAddrUnix n)
        shutdown s ShutdownSend
        return $ Source s

-- | Similar to 'attach', but if the specified sink doesn't exist, 'attachWait'
-- blocks until it becomes available.
attachWait :: String -> IO Source
attachWait what = do
    (com, lck, ret) <- pathFromTag "System.Miniplex.Source.attachWait" what
    bracketOnError (socket AF_UNIX Stream 0) sClose $ \s -> do
        closeOnExec s
        block . fix $ \retry -> do
            ld <- do
                bracket (openFd lck ReadOnly (Just mode644) defaultFileFlags) closeFd $ \lf -> do
                setLockAll lf LockRead
            x <- tryJust ioErrors (unblock $ connect s (SockAddrUnix com)) `catch` \e -> do
                unLock ld
                throwIO e
            case x of
                Right () -> unLock ld
                Left _ -> do
                    wf <- flip finally (unLock ld) $ unblock $ do
                        handleJust eexists (const $ return ()) $ createNamedPipe ret mode644
                        openFd ret ReadOnly Nothing defaultFileFlags{ nonBlock = True }
                    unblock (threadWaitRead wf) `finally` closeFd wf
                    retry
        shutdown s ShutdownSend
        return $ Source s

-- | Synchronously reads a message from a source (i.e. it blocks if there is
-- currently no message available).
read :: Source -> IO String
read so = do
    n <- liftM intFromBytes $ reallyRecv s 4
    reallyRecv s n
    where
    s = sock so

-- | Returns a lazy list of all messages arriving at a source (like
-- @'System.IO.hGetContents'@).
getMsgs :: Source -> IO [String]
getMsgs so = unsafeInterleaveIO . handle (\_ -> detach so >> return []) $
    liftM2 (liftM2 (:)) read getMsgs so

-- | Disconnects from a message sink. The detached source becomes invalid
-- and must not be used again.
detach :: Source -> IO ()
detach so = do
    sClose (sock so)

-- | Helper function to simplify resource handling. @'withSource' tag body@
-- creates a source, calls @body@, then disconnects the source, even if
-- @body@ throws an exception.
withSource :: String -> (Source -> IO a) -> IO a
withSource tag f = block $ do
    so <- attach tag
    unblock (f so) `finally` detach so

-- | Similar to 'withSource', but calls 'attachWait' instead of 'attach'.
withSourceWait :: String -> (Source -> IO a) -> IO a
withSourceWait tag f = block $ do
    so <- unblock $ attachWait tag
    unblock (f so) `finally` detach so