module Database.Redis.PubSub (
publish,
pubSub,
Message(..),
PubSub(),
subscribe, unsubscribe,
psubscribe, punsubscribe,
) where
import Control.Applicative
import Control.Monad.Writer
import Data.ByteString.Char8 (ByteString)
import Data.Maybe
import Database.Redis.Internal (Redis)
import qualified Database.Redis.Internal as Internal
import Database.Redis.Reply
import Database.Redis.Types
newtype PubSub = PubSub [[ByteString]]
instance Monoid PubSub where
mempty = PubSub []
mappend (PubSub p) (PubSub p') = PubSub (p ++ p')
data Message = Message { msgChannel, msgMessage :: ByteString}
| PMessage { msgPattern, msgChannel, msgMessage :: ByteString}
deriving (Show)
publish
:: ByteString
-> ByteString
-> Redis (Either Reply Integer)
publish channel message =
Internal.sendRequest ["PUBLISH", channel, message]
subscribe
:: [ByteString]
-> PubSub
subscribe = pubSubAction "SUBSCRIBE"
unsubscribe
:: [ByteString]
-> PubSub
unsubscribe = pubSubAction "UNSUBSCRIBE"
psubscribe
:: [ByteString]
-> PubSub
psubscribe = pubSubAction "PSUBSCRIBE"
punsubscribe
:: [ByteString]
-> PubSub
punsubscribe = pubSubAction "PUNSUBSCRIBE"
pubSub
:: PubSub
-> (Message -> IO PubSub)
-> Redis ()
pubSub p callback = send p 0
where
send (PubSub cmds) pending = do
mapM_ Internal.send cmds
recv (pending + length cmds)
recv pending = do
reply <- Internal.recv
case decodeMsg reply of
Left cnt
| cnt == 0 && pending == 0
-> return ()
| otherwise -> send mempty (pending 1)
Right msg -> do act <- liftIO $ callback msg
send act pending
pubSubAction :: ByteString -> [ByteString] -> PubSub
pubSubAction cmd chans = PubSub [cmd : chans]
decodeMsg :: Reply -> Either Integer Message
decodeMsg (MultiBulk (Just (r0:r1:r2:rs))) = either (error "decodeMsg") id $ do
kind <- decode r0
case kind :: ByteString of
"message" -> Right <$> decodeMessage
"pmessage" -> Right <$> decodePMessage
_ -> Left <$> decode r2
where
decodeMessage = Message <$> decode r1 <*> decode r2
decodePMessage = PMessage <$> decode r1 <*> decode r2
<*> decode (head rs)
decodeMsg r = error $ "not a message: " ++ show r