module Control.Pipeline (
Pipeline, newPipeline, send, call,
Size,
Length(..),
Resource(..),
Flush(..),
Stream(..), getN
) where
import Prelude hiding (length)
import Control.Applicative ((<$>))
import Control.Monad (forever)
import Control.Exception (assert, onException)
import System.IO.Error (try, mkIOError, eofErrorType)
import System.IO (Handle, hFlush, hClose, hIsClosed)
import qualified Data.ByteString as S
import qualified Data.ByteString.Lazy as L
import Data.Monoid (Monoid(..))
import Control.Concurrent (ThreadId, forkIO, killThread)
import GHC.Conc (ThreadStatus(..), threadStatus)
import Control.Concurrent.MVar
import Control.Concurrent.Chan
type Size = Int
class Length list where
length :: list -> Size
instance Length S.ByteString where
length = S.length
instance Length L.ByteString where
length = fromEnum . L.length
class Resource m r where
close :: r -> m ()
isClosed :: r -> m Bool
instance Resource IO Handle where
close = hClose
isClosed = hIsClosed
class Flush handle where
flush :: handle -> IO ()
instance Flush Handle where
flush = hFlush
class (Length bytes, Monoid bytes, Flush handle) => Stream handle bytes where
put :: handle -> bytes -> IO ()
get :: handle -> Int -> IO bytes
getN :: (Stream h b) => h -> Int -> IO b
getN h n = assert (n >= 0) $ do
bytes <- get h n
let x = length bytes
if x >= n then return bytes
else if x == 0 then ioError (mkIOError eofErrorType "Control.Pipeline" Nothing Nothing)
else mappend bytes <$> getN h (n x)
instance Stream Handle S.ByteString where
put = S.hPut
get = S.hGet
instance Stream Handle L.ByteString where
put = L.hPut
get = L.hGet
data Pipeline handle bytes = Pipeline {
encodeSize :: Size -> bytes,
decodeSize :: bytes -> Size,
vHandle :: MVar handle,
responseQueue :: Chan (MVar (Either IOError bytes)),
listenThread :: ThreadId
}
newPipeline :: (Stream h b, Resource IO h) =>
(Size -> b)
-> (b -> Size)
-> h
-> IO (Pipeline h b)
newPipeline encodeSize decodeSize handle = do
vHandle <- newMVar handle
responseQueue <- newChan
rec
let pipe = Pipeline{..}
listenThread <- forkIO (listen pipe)
addMVarFinalizer vHandle $ do
killThread listenThread
close handle
return pipe
instance (Resource IO h) => Resource IO (Pipeline h b) where
close Pipeline{..} = do
killThread listenThread
close =<< readMVar vHandle
isClosed Pipeline{listenThread} = do
status <- threadStatus listenThread
return $ case status of
ThreadRunning -> False
ThreadFinished -> True
ThreadBlocked _ -> False
ThreadDied -> True
listen :: (Stream h b, Resource IO h) => Pipeline h b -> IO ()
listen Pipeline{..} = do
let n = length (encodeSize 0)
h <- readMVar vHandle
forever $ do
e <- try $ do
len <- decodeSize <$> getN h n
getN h len
var <- readChan responseQueue
putMVar var e
case e of
Left err -> close h >> fail (show err)
Right _ -> return ()
send :: (Stream h b, Resource IO h) => Pipeline h b -> [b] -> IO ()
send Pipeline{..} messages = withMVar vHandle (writeAll listenThread encodeSize messages)
call :: (Stream h b, Resource IO h) => Pipeline h b -> [b] -> IO (IO b)
call Pipeline{..} messages = withMVar vHandle $ \h -> do
writeAll listenThread encodeSize messages h
var <- newEmptyMVar
writeChan responseQueue var
return (either ioError return =<< readMVar var)
writeAll :: (Stream h b, Monoid b, Length b, Resource IO h) => ThreadId -> (Size -> b) -> [b] -> h -> IO ()
writeAll listenThread encodeSize messages h = onException
(mapM_ write messages >> flush h)
(killThread listenThread >> close h)
where
write bytes = put h (mappend lenBytes bytes) where lenBytes = encodeSize (length bytes)