module Database.Redis.Connection (
HostName,PortID(PortNumber,UnixSocket),
ConnectInfo(..),defaultConnectInfo,
Connection(), connect,
ConnectionLostException(..)
) where
import Prelude hiding (catch)
import Control.Applicative
import Control.Monad.Reader
import Control.Concurrent
import Control.Exception (Exception, throwIO)
import qualified Data.Attoparsec as P
import qualified Data.ByteString as B
import Data.IORef
import Data.Pool
import Data.Time
import Data.Typeable
import Network (HostName, PortID(..), connectTo)
import System.IO (Handle, hClose, hIsOpen, hSetBinaryMode, hFlush)
import System.IO.Error
import System.IO.Unsafe (unsafeInterleaveIO)
import Database.Redis.Core
import Database.Redis.Commands (auth)
import Database.Redis.Reply
data ConnectionLostException = ConnectionLost
deriving (Show, Typeable)
instance Exception ConnectionLostException
data ConnectInfo = ConnInfo
{ connectHost :: HostName
, connectPort :: PortID
, connectAuth :: Maybe B.ByteString
, connectMaxConnections :: Int
, connectMaxIdleTime :: NominalDiffTime
}
defaultConnectInfo :: ConnectInfo
defaultConnectInfo = ConnInfo
{ connectHost = "localhost"
, connectPort = PortNumber 6379
, connectAuth = Nothing
, connectMaxConnections = 50
, connectMaxIdleTime = 30
}
connect :: ConnectInfo -> IO Connection
connect ConnInfo{..} = Conn <$>
createPool create destroy 1 connectMaxIdleTime connectMaxConnections
where
create = do
h <- connectTo connectHost connectPort
rs <- hGetReplies h >>= newIORef
hSetBinaryMode h True
let conn = (h,rs)
maybe (return ())
(\pass -> runRedisInternal conn (auth pass) >> return ())
connectAuth
newMVar conn
destroy conn = withMVar conn $ \(h,_) -> do
open <- hIsOpen h
when open (hClose h)
hGetReplies :: Handle -> IO [Reply]
hGetReplies h = lazyRead (Right B.empty)
where
lazyRead rest = unsafeInterleaveIO $ do
parseResult <- either continueParse readAndParse rest
case parseResult of
P.Fail _ _ _ -> error "Hedis: reply parse failed"
P.Partial cont -> lazyRead (Left cont)
P.Done rest' r -> do
rs <- lazyRead (Right rest')
return (r:rs)
continueParse cont = cont <$> B.hGetSome h maxRead
readAndParse rest = P.parse reply <$>
if B.null rest
then do
hFlush h
s <- B.hGetSome h maxRead `catchIOError` const errConnClosed
when (B.null s) errConnClosed
return s
else return rest
maxRead = 4*1024
errConnClosed = throwIO ConnectionLost