module Network.MiniHTTP.Connection
( Connection
, new
, forkThreads
, close
, connoutq
, connsocket
) where
import Control.Concurrent
import Control.Concurrent.STM
import Control.Exception
import Control.Monad
import Data.Maybe (fromJust)
import qualified Data.ByteString as BS
import qualified Data.Sequence as Seq
import Network.Socket hiding (send, sendTo, recv, recvFrom)
import Network.Socket.ByteString
data Connection = Connection { connsocket :: Socket
, connoutq :: TVar (Seq.Seq BS.ByteString)
, connreaderthread :: TVar (Maybe ThreadId)
, connwriterthread :: TVar (Maybe ThreadId)
, conndeath :: IO ()
, conndead :: TVar Bool }
new :: Socket
-> IO ()
-> STM Connection
new socket deathaction = do
dead <- newTVar True
outq <- newTVar Seq.empty
p1 <- newTVar Nothing
p2 <- newTVar Nothing
let conn = Connection socket outq p1 p2 deathaction dead
return conn
forkThreads :: Connection
-> IO ()
-> IO ()
forkThreads conn readeraction = do
reader <- forkIO $ waitForReadySignal conn $
connectionThreadWrapper conn connwriterthread $
readeraction
writer <- forkIO $ waitForReadySignal conn $
connectionThreadWrapper conn connreaderthread $
seqToSocket (connoutq conn) (connsocket conn)
atomically (writeTVar (connreaderthread conn) (Just reader) >>
writeTVar (connwriterthread conn) (Just writer) >>
writeTVar (conndead conn) False)
return ()
waitForReadySignal :: Connection -> IO a -> IO a
waitForReadySignal conn action = do
atomically (do dead <- readTVar (conndead conn)
if dead == True then retry else return ())
action
connectionThreadWrapper :: Connection -> (Connection -> TVar (Maybe ThreadId)) -> IO a -> IO a
connectionThreadWrapper conn otherthread action = do
finally action
(do isDead <- atomically (do dead <- readTVar (conndead conn)
when (not dead) $ writeTVar (conndead conn) True
return dead)
when (not isDead) (do t <- atomically (readTVar $ otherthread conn)
killThread $ fromJust t
sClose (connsocket conn)
conndeath conn))
close :: Connection -> IO ()
close = sClose . connsocket
seqToSocket :: TVar (Seq.Seq BS.ByteString)
-> Socket
-> IO ()
seqToSocket q sock = do
bs <- atomically (do q' <- readTVar q
(bs, rest) <-
case Seq.viewr q' of
Seq.EmptyR -> retry
rest Seq.:> head -> return (head, rest)
writeTVar q rest
return bs)
writea sock bs
seqToSocket q sock
writea :: Socket -> BS.ByteString -> IO ()
writea sock bytes
| BS.null bytes = return ()
| otherwise = do
n <- send sock bytes
if n == BS.length bytes
then return ()
else writea sock $ BS.drop n bytes