module Database.Redis.Core (
Connection(..), connect,
ConnectInfo(..), defaultConnectInfo,
Redis(),runRedis,
send, recv, sendRequest,
HostName, PortID(..),
ConnectionLostException(..),
auth
) where
import Prelude hiding (catch)
import Control.Applicative
import Control.Monad.Reader
import Control.Concurrent
import Control.Concurrent.STM
import Control.Exception
import qualified Data.Attoparsec as P
import qualified Data.ByteString as B
import Data.Pool
import Data.Time
import Data.Typeable
import Network
import System.IO
import System.IO.Unsafe
import Database.Redis.Reply
import Database.Redis.Request
import Database.Redis.Types
newtype Redis a = Redis (ReaderT RedisEnv IO a)
deriving (Monad, MonadIO, Functor, Applicative)
runRedis :: Connection -> Redis a -> IO a
runRedis (Conn pool) redis =
withResource pool $ \conn ->
withMVar conn $ \conn' -> runRedisInternal conn' redis
runRedisInternal :: RedisEnv -> Redis a -> IO a
runRedisInternal env (Redis redis) = runReaderT redis env
data RedisEnv = Env
{ envHandle :: Handle
, envReplies :: TVar [Reply]
, envThunkCnt :: TVar Integer
, envEvalTId :: ThreadId
}
newEnv :: Handle -> IO RedisEnv
newEnv envHandle = do
replies <- lazify <$> hGetReplies envHandle
envReplies <- newTVarIO replies
envThunkCnt <- newTVarIO 0
envEvalTId <- forkIO $ forceThunks envThunkCnt replies
return Env{..}
where
lazify rs = head rs : lazify (tail rs)
forceThunks :: TVar Integer -> [Reply] -> IO ()
forceThunks thunkCnt = go
where
go [] = return ()
go (r:rs) = do
atomically $ do
cnt <- readTVar thunkCnt
guard (cnt > 0)
writeTVar thunkCnt (cnt1)
r `seq` go rs
recv :: Redis Reply
recv = Redis $ do
Env{..} <- ask
liftIO $ atomically $ do
cnt <- readTVar envThunkCnt
guard $ cnt < 1000
writeTVar envThunkCnt (cnt+1)
r:rs <- readTVar envReplies
writeTVar envReplies rs
return r
send :: [B.ByteString] -> Redis ()
send req = Redis $ do
h <- asks envHandle
liftIO $ B.hPut h (renderRequest req)
sendRequest :: (RedisResult a) => [B.ByteString] -> Redis (Either Reply a)
sendRequest req = decode <$> (send req >> recv)
newtype Connection = Conn (Pool (MVar RedisEnv))
data ConnectionLostException = ConnectionLost
deriving (Show, Typeable)
instance Exception ConnectionLostException
data ConnectInfo = ConnInfo
{ connectHost :: HostName
, connectPort :: PortID
, connectAuth :: Maybe B.ByteString
, connectMaxConnections :: Int
, connectMaxIdleTime :: NominalDiffTime
}
defaultConnectInfo :: ConnectInfo
defaultConnectInfo = ConnInfo
{ connectHost = "localhost"
, connectPort = PortNumber 6379
, connectAuth = Nothing
, connectMaxConnections = 50
, connectMaxIdleTime = 30
}
connect :: ConnectInfo -> IO Connection
connect ConnInfo{..} = Conn <$>
createPool create destroy 1 connectMaxIdleTime connectMaxConnections
where
create = do
h <- connectTo connectHost connectPort
hSetBinaryMode h True
conn <- newEnv h
maybe (return ())
(\pass -> runRedisInternal conn (auth pass) >> return ())
connectAuth
newMVar conn
destroy conn = withMVar conn $ \Env{..} -> do
open <- hIsOpen envHandle
when open (hClose envHandle)
killThread envEvalTId
hGetReplies :: Handle -> IO [Reply]
hGetReplies h = go B.empty
where
go rest = unsafeInterleaveIO $ do
parseResult <- P.parseWith readMore reply rest
case parseResult of
P.Fail _ _ _ -> errConnClosed
P.Partial _ -> error "Hedis: parseWith returned Partial"
P.Done rest' r -> do
rs <- go rest'
return (r:rs)
readMore = do
hFlush h
B.hGetSome h maxRead `catchIOError` const errConnClosed
maxRead = 4*1024
errConnClosed = throwIO ConnectionLost
catchIOError :: IO a -> (IOError -> IO a) -> IO a
catchIOError = catch
auth
:: B.ByteString
-> Redis (Either Reply Status)
auth password = sendRequest ["AUTH", password]