module Network.Starling.Connection
( Connection
, open
, close
, synchRequest
, synchRequestMulti
, ignorantRequest
) where
import Network.Starling.Core
import Control.Concurrent (forkIO)
import Control.Concurrent.Chan
import Control.Concurrent.MVar
import Control.Exception (handle)
import Data.IORef
import System.IO
import qualified Data.Binary.Builder as B
import Data.ByteString.Lazy (ByteString)
import qualified Data.ByteString.Lazy as BS
data Connection = Conn
{ con_lock :: MVar ()
, con_h :: Handle
, con_q :: Chan QItem
, con_opaque :: IORef Opaque
}
data QItem
= QDone
| QHandleResp Opaque (Response -> IO ())
| QHandleMulti Opaque ([Response] -> IO ())
open :: Handle -> IO Connection
open h
= do
lock <- newMVar ()
queue <- newChan
opaque <- newIORef 0
forkIO $ flip handle (readLoop h queue) $ \StarlingReadError ->
return ()
return $ Conn lock h queue opaque
readLoop :: Handle -> Chan QItem -> IO ()
readLoop h q
= do
response <- getResponse h
tryNextQItem h q response $ readLoop h q
tryNextQItem :: Handle -> Chan QItem -> Response -> IO () -> IO ()
tryNextQItem h q response k
= do
qItem <- readChan q
case compareOpaque response qItem of
KeepQ -> unGetChan q qItem >> k
KeepR -> tryNextQItem h q response k
Match -> processResponse h q response qItem k
Done -> return ()
compareOpaque :: Response -> QItem -> CompareResult
compareOpaque response qItem
= let qIdentm = qOpaque qItem
rIdent = rsOpaque response
in case qIdentm of
Nothing -> Done
Just qIdent ->
case compare qIdent rIdent of
EQ -> Match
LT -> KeepQ
GT -> KeepQ
qOpaque QDone = Nothing
qOpaque (QHandleResp ident _) = Just ident
qOpaque (QHandleMulti ident _) = Just ident
data CompareResult
= KeepQ
| KeepR
| Match
| Done
processResponse :: Handle -> Chan QItem
-> Response -> QItem -> IO () -> IO ()
processResponse h q response qItem k
= case qItem of
QHandleResp _ f -> f response >> k
QHandleMulti ident f -> do
(resps, left) <- takeResponses ident h
f (response : resps)
tryNextQItem h q left k
takeResponses :: Opaque -> Handle -> IO ([Response], Response)
takeResponses ident h
= go []
where go xs = do
response <- getResponse h
if rsOpaque response == ident
then go (response:xs)
else return (reverse xs, response)
withConLock :: Connection -> IO a -> IO a
withConLock Conn{..} k
= withMVar_ con_lock k
withMVar_ :: MVar a -> IO b -> IO b
withMVar_ mvar f = withMVar mvar $ const f
ignorantRequest :: Connection -> Request -> IO ()
ignorantRequest conn req
= withConLock conn $
putRequest conn req >>= \_ -> return ()
synchRequest :: Connection -> Request -> IO Response
synchRequest conn req
= do
result <- newEmptyMVar
withConLock conn $ do
opaque <- putRequest conn req
enqueue conn $ QHandleResp opaque $ putMVar result
readMVar result
synchRequestMulti :: Connection -> Request -> IO [Response]
synchRequestMulti conn req
= do
result <- newEmptyMVar
withConLock conn $ do
opaque <- putRequest conn req
enqueue conn $ QHandleMulti opaque $ putMVar result
_ <- putRequest conn noop
return ()
readMVar result
close :: Connection -> IO ()
close conn
=
withConLock conn $ do
enqueue conn QDone
_ <- putRequest conn quit
return ()
putRequest :: Connection -> Request -> IO Opaque
putRequest conn@Conn{..} req
= do
opaque <- nextOpaque conn
let chunk = B.toLazyByteString $ serialize $ addOpaque opaque req
BS.hPut con_h chunk
return opaque
nextOpaque :: Connection -> IO Opaque
nextOpaque Conn{..} = do
current <- readIORef con_opaque
let next = current + 1
writeIORef con_opaque next
return $ next `seq` current
enqueue :: Connection -> QItem -> IO ()
enqueue Conn{..} item
= writeChan con_q item