module Data.Conduit.Redis (redisSource, redisSink) where
import Data.Conduit
import qualified Data.Conduit.List as CL
import qualified Data.ByteString.Char8 as BS
import Network
import Control.Concurrent.MVar
import Database.Redis hiding (String, decode)
import Control.Monad (void,replicateM, forever)
import Control.Monad.IO.Class (liftIO, MonadIO)
import Data.Either (rights)
import Data.Maybe (catMaybes,fromMaybe,isNothing)
import Control.Exception
import Control.Concurrent hiding (yield)
import Debug.Trace
mp :: Either Reply (Maybe (BS.ByteString, BS.ByteString)) -> Maybe BS.ByteString
mp (Right (Just (_, x))) = Just x
mp (Right Nothing) = Nothing
mp x = trace (show x) Nothing
safePush :: BS.ByteString -> MVar Connection -> ConnectInfo -> (BS.ByteString -> IO ()) -> BS.ByteString -> IO ()
safePush list mconn cinfo logfn input = catch mypush (\SomeException{} -> resetMVar)
where
resetMVar = do
void $ takeMVar mconn
connect cinfo >>= putMVar mconn
logfn "reconnecting to redis server ..."
threadDelay 500000
safePush list mconn cinfo logfn input
mypush = do
conn <- readMVar mconn
x <- runRedis conn (lpush list [input])
case x of
Left (SingleLine "OK") -> return ()
Right _ -> return ()
err -> logfn ("retrying ... " `BS.append` BS.pack (show err)) >> threadDelay 500000 >> safePush list mconn cinfo logfn input
popN :: BS.ByteString -> Int -> Integer -> Redis [BS.ByteString]
popN l n to = do
f <- fmap mp (brpop [l] to)
if isNothing f
then return []
else do
nx <- fmap rights $ replicateM (n1) (rpop l)
return $ catMaybes (f:nx)
redisSource :: (MonadResource m)
=> HostName
-> Int
-> BS.ByteString
-> Int
-> Integer
-> Source m [BS.ByteString]
redisSource h p list nb to =
let cinfo = defaultConnectInfo { connectHost = h, connectPort = PortNumber $ fromIntegral p }
myPipe :: (MonadResource m) => Connection -> Source m [BS.ByteString]
myPipe conn = forever (liftIO (runRedis conn (popN list nb to)) >>= yield)
in bracketP (connect cinfo) (\conn -> runRedis conn (void quit)) myPipe
redisSink :: (MonadResource m)
=> HostName
-> Int
-> BS.ByteString
-> Maybe (BS.ByteString -> IO ())
-> Sink BS.ByteString m ()
redisSink h p list logcmd =
let cinfo = defaultConnectInfo { connectHost = h, connectPort = PortNumber $ fromIntegral p }
logfunc = fromMaybe BS.putStrLn logcmd
in bracketP (connect cinfo >>= newMVar) (const $ return ()) (\mconn -> CL.mapM_ (liftIO . safePush list mconn cinfo logfunc))