{-# LANGUAGE CPP, OverloadedStrings, RecordWildCards, EmptyDataDecls,
    FlexibleInstances, FlexibleContexts, GeneralizedNewtypeDeriving #-}

module Database.Redis.PubSub (
    publish,

    -- ** Subscribing to channels
    -- $pubsubexpl

    -- *** Single-thread Pub/Sub
    pubSub,
    Message(..),
    PubSub(),
    subscribe, unsubscribe, psubscribe, punsubscribe,
    -- *** Continuous Pub/Sub message controller
    pubSubForever,
    RedisChannel, RedisPChannel, MessageCallback, PMessageCallback,
    PubSubController, newPubSubController, currentChannels, currentPChannels,
    addChannels, addChannelsAndWait, removeChannels, removeChannelsAndWait,
    UnregisterCallbacksAction
) where

#if __GLASGOW_HASKELL__ < 710
import Control.Applicative
import Data.Monoid hiding (<>)
#endif
import Control.Concurrent.Async (withAsync, waitEitherCatch, waitEitherCatchSTM)
import Control.Concurrent.STM
import Control.Exception (throwIO)
import Control.Monad
import Control.Monad.State
import Data.ByteString.Char8 (ByteString)
import Data.List (foldl')
import Data.Maybe (isJust)
import Data.Pool
#if __GLASGOW_HASKELL__ < 808
import Data.Semigroup (Semigroup(..))
#endif
import qualified Data.HashMap.Strict as HM
import qualified Database.Redis.Core as Core
import qualified Database.Redis.ProtocolPipelining as PP
import Database.Redis.Protocol (Reply(..), renderRequest)
import Database.Redis.Types

-- |While in PubSub mode, we keep track of the number of current subscriptions
--  (as reported by Redis replies) and the number of messages we expect to
--  receive after a SUBSCRIBE or PSUBSCRIBE command. We can safely leave the
--  PubSub mode when both these numbers are zero.
data PubSubState = PubSubState { subCnt, pending :: Int }

modifyPending :: (MonadState PubSubState m) => (Int -> Int) -> m ()
modifyPending f = modify $ \s -> s{ pending = f (pending s) }

putSubCnt :: (MonadState PubSubState m) => Int -> m ()
putSubCnt n = modify $ \s -> s{ subCnt = n }

data Subscribe
data Unsubscribe
data Channel
data Pattern

-- |Encapsulates subscription changes. Use 'subscribe', 'unsubscribe',
--  'psubscribe', 'punsubscribe' or 'mempty' to construct a value. Combine
--  values by using the 'Monoid' interface, i.e. 'mappend' and 'mconcat'.
data PubSub = PubSub
    { subs    :: Cmd Subscribe Channel
    , unsubs  :: Cmd Unsubscribe Channel
    , psubs   :: Cmd Subscribe Pattern
    , punsubs :: Cmd Unsubscribe Pattern
    } deriving (Eq)

instance Semigroup PubSub where
    (<>) p1 p2 = PubSub { subs    = subs p1 `mappend` subs p2
                           , unsubs  = unsubs p1 `mappend` unsubs p2
                           , psubs   = psubs p1 `mappend` psubs p2
                           , punsubs = punsubs p1 `mappend` punsubs p2
                           }

instance Monoid PubSub where
    mempty        = PubSub mempty mempty mempty mempty
    mappend = (<>)

data Cmd a b = DoNothing | Cmd { changes :: [ByteString] } deriving (Eq)

instance Semigroup (Cmd Subscribe a) where
  (<>) DoNothing x = x
  (<>) x DoNothing = x
  (<>) (Cmd xs) (Cmd ys) = Cmd (xs ++ ys)

instance Monoid (Cmd Subscribe a) where
  mempty = DoNothing
  mappend = (<>)

instance Semigroup (Cmd Unsubscribe a) where
  (<>) DoNothing x = x
  (<>) x DoNothing = x
  -- empty subscription list => unsubscribe all channels and patterns
  (<>) (Cmd []) _ = Cmd []
  (<>) _ (Cmd []) = Cmd []
  (<>) (Cmd xs) (Cmd ys) = Cmd (xs ++ ys)

instance Monoid (Cmd Unsubscribe a) where
  mempty = DoNothing
  mappend = (<>)

class Command a where
    redisCmd      :: a -> ByteString
    updatePending :: a -> Int -> Int

sendCmd :: (Command (Cmd a b)) => Cmd a b -> StateT PubSubState Core.Redis ()
sendCmd DoNothing = return ()
sendCmd cmd       = do
    lift $ Core.send (redisCmd cmd : changes cmd)
    modifyPending (updatePending cmd)

cmdCount :: Cmd a b -> Int
cmdCount DoNothing = 0
cmdCount (Cmd c) = length c

totalPendingChanges :: PubSub -> Int
totalPendingChanges (PubSub{..}) =
  cmdCount subs + cmdCount unsubs + cmdCount psubs + cmdCount punsubs

rawSendCmd :: (Command (Cmd a b)) => PP.Connection -> Cmd a b -> IO ()
rawSendCmd _ DoNothing = return ()
rawSendCmd conn cmd    = PP.send conn $ renderRequest $ redisCmd cmd : changes cmd

plusChangeCnt :: Cmd a b -> Int -> Int
plusChangeCnt DoNothing = id
plusChangeCnt (Cmd cs)  = (+ length cs)

instance Command (Cmd Subscribe Channel) where
    redisCmd      = const "SUBSCRIBE"
    updatePending = plusChangeCnt

instance Command (Cmd Subscribe Pattern) where
    redisCmd      = const "PSUBSCRIBE"
    updatePending = plusChangeCnt

instance Command (Cmd Unsubscribe Channel) where
    redisCmd      = const "UNSUBSCRIBE"
    updatePending = const id

instance Command (Cmd Unsubscribe Pattern) where
    redisCmd      = const "PUNSUBSCRIBE"
    updatePending = const id


data Message = Message  { msgChannel, msgMessage :: ByteString}
             | PMessage { msgPattern, msgChannel, msgMessage :: ByteString}
    deriving (Show)

data PubSubReply = Subscribed | Unsubscribed Int | Msg Message


------------------------------------------------------------------------------
-- Public Interface
--

-- |Post a message to a channel (<http://redis.io/commands/publish>).
publish
    :: (Core.RedisCtx m f)
    => ByteString -- ^ channel
    -> ByteString -- ^ message
    -> m (f Integer)
publish channel message =
    Core.sendRequest ["PUBLISH", channel, message]

-- |Listen for messages published to the given channels
--  (<http://redis.io/commands/subscribe>).
subscribe
    :: [ByteString] -- ^ channel
    -> PubSub
subscribe []       = mempty
subscribe cs = mempty{ subs = Cmd cs }

-- |Stop listening for messages posted to the given channels
--  (<http://redis.io/commands/unsubscribe>).
unsubscribe
    :: [ByteString] -- ^ channel
    -> PubSub
unsubscribe cs = mempty{ unsubs = Cmd cs }

-- |Listen for messages published to channels matching the given patterns 
--  (<http://redis.io/commands/psubscribe>).
psubscribe
    :: [ByteString] -- ^ pattern
    -> PubSub
psubscribe []       = mempty
psubscribe ps = mempty{ psubs = Cmd ps }

-- |Stop listening for messages posted to channels matching the given patterns 
--  (<http://redis.io/commands/punsubscribe>).
punsubscribe
    :: [ByteString] -- ^ pattern
    -> PubSub
punsubscribe ps = mempty{ punsubs = Cmd ps }

-- |Listens to published messages on subscribed channels and channels matching
--  the subscribed patterns. For documentation on the semantics of Redis
--  Pub\/Sub see <http://redis.io/topics/pubsub>.
--  
--  The given callback function is called for each received message. 
--  Subscription changes are triggered by the returned 'PubSub'. To keep
--  subscriptions unchanged, the callback can return 'mempty'.
--  
--  Example: Subscribe to the \"news\" channel indefinitely.
--
--  @
--  pubSub (subscribe [\"news\"]) $ \\msg -> do
--      putStrLn $ \"Message from \" ++ show (msgChannel msg)
--      return mempty
--  @
--
--  Example: Receive a single message from the \"chat\" channel.
--
--  @
--  pubSub (subscribe [\"chat\"]) $ \\msg -> do
--      putStrLn $ \"Message from \" ++ show (msgChannel msg)
--      return $ unsubscribe [\"chat\"]
--  @
--
-- It should be noted that Redis Pub\/Sub by its nature is asynchronous
-- so returning `unsubscribe` does not mean that callback won't be able
-- to receive any further messages. And to guarantee that you won't
-- won't process messages after unsubscription and won't unsubscribe
-- from the same channel more than once you need to use `IORef` or
-- something similar
--
pubSub
    :: PubSub                 -- ^ Initial subscriptions.
    -> (Message -> IO PubSub) -- ^ Callback function.
    -> Core.Redis ()
pubSub initial callback
    | initial == mempty = return ()
    | otherwise         = evalStateT (send initial) (PubSubState 0 0)
  where
    send :: PubSub -> StateT PubSubState Core.Redis ()
    send PubSub{..} = do
        sendCmd subs
        sendCmd unsubs
        sendCmd psubs
        sendCmd punsubs
        recv

    recv :: StateT PubSubState Core.Redis ()
    recv = do
        reply <- lift Core.recv
        case decodeMsg reply of
            Msg msg        -> liftIO (callback msg) >>= send
            Subscribed     -> modifyPending (subtract 1) >> recv
            Unsubscribed n -> do
                putSubCnt n
                PubSubState{..} <- get
                unless (subCnt == 0 && pending == 0) recv

-- | A Redis channel name
type RedisChannel = ByteString

-- | A Redis pattern channel name
type RedisPChannel = ByteString

-- | A handler for a message from a subscribed channel.
-- The callback is passed the message content.
--
-- Messages are processed synchronously in the receiving thread, so if the callback
-- takes a long time it will block other callbacks and other messages from being
-- received.  If you need to move long-running work to a different thread, we suggest
-- you use 'TBQueue' with a reasonable bound, so that if messages are arriving faster
-- than you can process them, you do eventually block.
--
-- If the callback throws an exception, the exception will be thrown from 'pubSubForever'
-- which will cause the entire Redis connection for all subscriptions to be closed.
-- As long as you call 'pubSubForever' in a loop you will reconnect to your subscribed
-- channels, but you should probably add an exception handler to each callback to
-- prevent this.
type MessageCallback = ByteString -> IO ()

-- | A handler for a message from a psubscribed channel.
-- The callback is passed the channel the message was sent on plus the message content.
--
-- Similar to 'MessageCallback', callbacks are executed synchronously and any exceptions
-- are rethrown from 'pubSubForever'.
type PMessageCallback = RedisChannel -> ByteString -> IO ()

-- | An action that when executed will unregister the callbacks.  It is returned from 'addChannels'
-- or 'addChannelsAndWait' and typically you would use it in 'bracket' to guarantee that you
-- unsubscribe from channels.  For example, if you are using websockets to distribute messages to
-- clients, you could use something such as:
--
-- > websocketConn <- Network.WebSockets.acceptRequest pending
-- > let mycallback msg = Network.WebSockets.sendTextData websocketConn msg
-- > bracket (addChannelsAndWait ctrl [("hello", mycallback)] []) id $ const $ do
-- >   {- loop here calling Network.WebSockets.receiveData -}
type UnregisterCallbacksAction = IO ()

newtype UnregisterHandle = UnregisterHandle Integer
  deriving (Eq, Show, Num)

-- | A controller that stores a set of channels, pattern channels, and callbacks.
-- It allows you to manage Pub/Sub subscriptions and pattern subscriptions and alter them at
-- any time throughout the life of your program.
-- You should typically create the controller at the start of your program and then store it
-- through the life of your program, using 'addChannels' and 'removeChannels' to update the
-- current subscriptions.
data PubSubController = PubSubController
  { callbacks :: TVar (HM.HashMap RedisChannel [(UnregisterHandle, MessageCallback)])
  , pcallbacks :: TVar (HM.HashMap RedisPChannel [(UnregisterHandle, PMessageCallback)])
  , sendChanges :: TBQueue PubSub
  , pendingCnt :: TVar Int
  , lastUsedCallbackId :: TVar UnregisterHandle
  }

-- | Create a new 'PubSubController'.  Note that this does not subscribe to any channels, it just
-- creates the controller.  The subscriptions will happen once 'pubSubForever' is called.
newPubSubController :: MonadIO m => [(RedisChannel, MessageCallback)] -- ^ the initial subscriptions
                                 -> [(RedisPChannel, PMessageCallback)] -- ^ the initial pattern subscriptions
                                 -> m PubSubController
newPubSubController x y = liftIO $ do
    cbs <- newTVarIO (HM.map (\z -> [(0,z)]) $ HM.fromList x)
    pcbs <- newTVarIO (HM.map (\z -> [(0,z)]) $ HM.fromList y)
    c <- newTBQueueIO 10
    pending <- newTVarIO 0
    lastId <- newTVarIO 0
    return $ PubSubController cbs pcbs c pending lastId

-- | Get the list of current channels in the 'PubSubController'.  WARNING! This might not
-- exactly reflect the subscribed channels in the Redis server, because there is a delay
-- between adding or removing a channel in the 'PubSubController' and when Redis receives
-- and processes the subscription change request.
#if __GLASGOW_HASKELL__ < 710
currentChannels :: (MonadIO m, Functor m) => PubSubController -> m [RedisChannel]
#else
currentChannels :: MonadIO m => PubSubController -> m [RedisChannel]
#endif
currentChannels ctrl = HM.keys <$> (liftIO $ atomically $ readTVar $ callbacks ctrl)

-- | Get the list of current pattern channels in the 'PubSubController'.  WARNING! This might not
-- exactly reflect the subscribed channels in the Redis server, because there is a delay
-- between adding or removing a channel in the 'PubSubController' and when Redis receives
-- and processes the subscription change request.
#if __GLASGOW_HASKELL__ < 710
currentPChannels :: (MonadIO m, Functor m) => PubSubController -> m [RedisPChannel]
#else
currentPChannels :: MonadIO m => PubSubController -> m [RedisPChannel]
#endif
currentPChannels ctrl = HM.keys <$> (liftIO $ atomically $ readTVar $ pcallbacks ctrl)

-- | Add channels into the 'PubSubController', and if there is an active 'pubSubForever', send the subscribe
-- and psubscribe commands to Redis.  The 'addChannels' function is thread-safe.  This function
-- does not wait for Redis to acknowledge that the channels have actually been subscribed; use
-- 'addChannelsAndWait' for that.
--
-- You can subscribe to the same channel or pattern channel multiple times; the 'PubSubController' keeps
-- a list of callbacks and executes each callback in response to a message.
--
-- The return value is an action 'UnregisterCallbacksAction' which will unregister the callbacks,
-- which should typically used with 'bracket'.
addChannels :: MonadIO m => PubSubController
                         -> [(RedisChannel, MessageCallback)] -- ^ the channels to subscribe to
                         -> [(RedisPChannel, PMessageCallback)] -- ^ the channels to pattern subscribe to
                         -> m UnregisterCallbacksAction
addChannels _ [] [] = return $ return ()
addChannels ctrl newChans newPChans = liftIO $ do
    ident <- atomically $ do
      modifyTVar (lastUsedCallbackId ctrl) (+1)
      ident <- readTVar $ lastUsedCallbackId ctrl
      cm <- readTVar $ callbacks ctrl
      pm <- readTVar $ pcallbacks ctrl
      let newChans' = [ n | (n,_) <- newChans, not $ HM.member n cm]
          newPChans' = [ n | (n, _) <- newPChans, not $ HM.member n pm]
          ps = subscribe newChans' `mappend` psubscribe newPChans'
      writeTBQueue (sendChanges ctrl) ps
      writeTVar (callbacks ctrl) (HM.unionWith (++) cm (fmap (\z -> [(ident,z)]) $ HM.fromList newChans))
      writeTVar (pcallbacks ctrl) (HM.unionWith (++) pm (fmap (\z -> [(ident,z)]) $ HM.fromList newPChans))
      modifyTVar (pendingCnt ctrl) (+ totalPendingChanges ps)
      return ident
    return $ unsubChannels ctrl (map fst newChans) (map fst newPChans) ident

-- | Call 'addChannels' and then wait for Redis to acknowledge that the channels are actually subscribed.
--
-- Note that this function waits for all pending subscription change requests, so if you for example call
-- 'addChannelsAndWait' from multiple threads simultaneously, they all will wait for all pending
-- subscription changes to be acknowledged by Redis (this is due to the fact that we just track the total
-- number of pending change requests sent to Redis and just wait until that count reaches zero).
--
-- This also correctly waits if the network connection dies during the subscription change.  Say that the
-- network connection dies right after we send a subscription change to Redis.  'pubSubForever' will throw
-- 'ConnectionLost' and 'addChannelsAndWait' will continue to wait.  Once you recall 'pubSubForever'
-- with the same 'PubSubController', 'pubSubForever' will open a new connection, send subscription commands
-- for all channels in the 'PubSubController' (which include the ones we are waiting for),
-- and wait for the responses from Redis.  Only once we receive the response from Redis that it has subscribed
-- to all channels in 'PubSubController' will 'addChannelsAndWait' unblock and return.
addChannelsAndWait :: MonadIO m => PubSubController
                                -> [(RedisChannel, MessageCallback)] -- ^ the channels to subscribe to
                                -> [(RedisPChannel, PMessageCallback)] -- ^ the channels to psubscribe to
                                -> m UnregisterCallbacksAction
addChannelsAndWait _ [] [] = return $ return ()
addChannelsAndWait ctrl newChans newPChans = do
  unreg <- addChannels ctrl newChans newPChans
  liftIO $ atomically $ do
    r <- readTVar (pendingCnt ctrl)
    when (r > 0) retry
  return unreg

-- | Remove channels from the 'PubSubController', and if there is an active 'pubSubForever', send the
-- unsubscribe commands to Redis.  Note that as soon as this function returns, no more callbacks will be
-- executed even if more messages arrive during the period when we request to unsubscribe from the channel
-- and Redis actually processes the unsubscribe request.  This function is thread-safe.
--
-- If you remove all channels, the connection in 'pubSubForever' to redis will stay open and waiting for
-- any new channels from a call to 'addChannels'.  If you really want to close the connection,
-- use 'Control.Concurrent.killThread' or 'Control.Concurrent.Async.cancel' to kill the thread running
-- 'pubSubForever'.
removeChannels :: MonadIO m => PubSubController
                            -> [RedisChannel]
                            -> [RedisPChannel]
                            -> m ()
removeChannels _ [] [] = return ()
removeChannels ctrl remChans remPChans = liftIO $ atomically $ do
    cm <- readTVar $ callbacks ctrl
    pm <- readTVar $ pcallbacks ctrl
    let remChans' = filter (\n -> HM.member n cm) remChans
        remPChans' = filter (\n -> HM.member n pm) remPChans
        ps =        (if null remChans' then mempty else unsubscribe remChans')
          `mappend` (if null remPChans' then mempty else punsubscribe remPChans')
    writeTBQueue (sendChanges ctrl) ps
    writeTVar (callbacks ctrl) (foldl' (flip HM.delete) cm remChans')
    writeTVar (pcallbacks ctrl) (foldl' (flip HM.delete) pm remPChans')
    modifyTVar (pendingCnt ctrl) (+ totalPendingChanges ps)

-- | Internal function to unsubscribe only from those channels matching the given handle.
unsubChannels :: PubSubController -> [RedisChannel] -> [RedisPChannel] -> UnregisterHandle -> IO ()
unsubChannels ctrl chans pchans h = liftIO $ atomically $ do
    cm <- readTVar $ callbacks ctrl
    pm <- readTVar $ pcallbacks ctrl

    -- only worry about channels that exist
    let remChans = filter (\n -> HM.member n cm) chans
        remPChans = filter (\n -> HM.member n pm) pchans

    -- helper functions to filter out handlers that match
    let filterHandle :: Maybe [(UnregisterHandle,a)] -> Maybe [(UnregisterHandle,a)]
        filterHandle Nothing = Nothing
        filterHandle (Just lst) = case filter (\x -> fst x /= h) lst of
                                    [] -> Nothing
                                    xs -> Just xs
    let removeHandles :: HM.HashMap ByteString [(UnregisterHandle,a)]
                      -> ByteString
                      -> HM.HashMap ByteString [(UnregisterHandle,a)]
        removeHandles m k = case filterHandle (HM.lookup k m) of -- recent versions of unordered-containers have alter
            Nothing -> HM.delete k m
            Just v -> HM.insert k v m

    -- maps after taking out channels matching the handle
    let cm' = foldl' removeHandles cm remChans
        pm' = foldl' removeHandles pm remPChans

    -- the channels to unsubscribe are those that no longer exist in cm' and pm'
    let remChans' = filter (\n -> not $ HM.member n cm') remChans
        remPChans' = filter (\n -> not $ HM.member n pm') remPChans
        ps =        (if null remChans' then mempty else unsubscribe remChans')
          `mappend` (if null remPChans' then mempty else punsubscribe remPChans')

    -- do the unsubscribe
    writeTBQueue (sendChanges ctrl) ps
    writeTVar (callbacks ctrl) cm'
    writeTVar (pcallbacks ctrl) pm'
    modifyTVar (pendingCnt ctrl) (+ totalPendingChanges ps)
    return ()

-- | Call 'removeChannels' and then wait for all pending subscription change requests to be acknowledged
-- by Redis.  This uses the same waiting logic as 'addChannelsAndWait'.  Since 'removeChannels' immediately
-- notifies the 'PubSubController' to start discarding messages, you likely don't need this function and
-- can just use 'removeChannels'.
removeChannelsAndWait :: MonadIO m => PubSubController
                                   -> [RedisChannel]
                                   -> [RedisPChannel]
                                   -> m ()
removeChannelsAndWait _ [] [] = return ()
removeChannelsAndWait ctrl remChans remPChans = do
  removeChannels ctrl remChans remPChans
  liftIO $ atomically $ do
    r <- readTVar (pendingCnt ctrl)
    when (r > 0) retry

-- | Internal thread which listens for messages and executes callbacks.
-- This is the only thread which ever receives data from the underlying
-- connection.
listenThread :: PubSubController -> PP.Connection -> IO ()
listenThread ctrl rawConn = forever $ do
    msg <- PP.recv rawConn
    case decodeMsg msg of
        Msg (Message channel msgCt) -> do
          cm <- atomically $ readTVar (callbacks ctrl)
          case HM.lookup channel cm of
            Nothing -> return ()
            Just c -> mapM_ (\(_,x) -> x msgCt) c
        Msg (PMessage pattern channel msgCt) -> do
          pm <- atomically $ readTVar (pcallbacks ctrl)
          case HM.lookup pattern pm of
            Nothing -> return ()
            Just c -> mapM_ (\(_,x) -> x channel msgCt) c
        Subscribed -> atomically $
          modifyTVar (pendingCnt ctrl) (\x -> x - 1)
        Unsubscribed _ -> atomically $
          modifyTVar (pendingCnt ctrl) (\x -> x - 1)

-- | Internal thread which sends subscription change requests.
-- This is the only thread which ever sends data on the underlying
-- connection.
sendThread :: PubSubController -> PP.Connection -> IO ()
sendThread ctrl rawConn = forever $ do
    PubSub{..} <- atomically $ readTBQueue (sendChanges ctrl)
    rawSendCmd rawConn subs
    rawSendCmd rawConn unsubs
    rawSendCmd rawConn psubs
    rawSendCmd rawConn punsubs
    -- normally, the socket is flushed during 'recv', but
    -- 'recv' could currently be blocking on a message.
    PP.flush rawConn

-- | Open a connection to the Redis server, register to all channels in the 'PubSubController',
-- and process messages and subscription change requests forever.  The only way this will ever
-- exit is if there is an exception from the network code or an unhandled exception
-- in a 'MessageCallback' or 'PMessageCallback'. For example, if the network connection to Redis
-- dies, 'pubSubForever' will throw a 'ConnectionLost'.  When such an exception is
-- thrown, you can recall 'pubSubForever' with the same 'PubSubController' which will open a
-- new connection and resubscribe to all the channels which are tracked in the 'PubSubController'.
--
-- The general pattern is therefore during program startup create a 'PubSubController' and fork
-- a thread which calls 'pubSubForever' in a loop (using an exponential backoff algorithm
-- such as the <https://hackage.haskell.org/package/retry retry> package to not hammer the Redis
-- server if it does die).  For example,
--
-- @
-- myhandler :: ByteString -> IO ()
-- myhandler msg = putStrLn $ unpack $ decodeUtf8 msg
--
-- onInitialComplete :: IO ()
-- onInitialComplete = putStrLn "Redis acknowledged that mychannel is now subscribed"
--
-- main :: IO ()
-- main = do
--   conn <- connect defaultConnectInfo
--   pubSubCtrl <- newPubSubController [("mychannel", myhandler)] []
--   forkIO $ forever $
--       pubSubForever conn pubSubCtrl onInitialComplete
--         \`catch\` (\\(e :: SomeException) -> do
--           putStrLn $ "Got error: " ++ show e
--           threadDelay $ 50*1000) -- TODO: use exponential backoff
--
--   {- elsewhere in your program, use pubSubCtrl to change subscriptions -}
-- @
--
-- At most one active 'pubSubForever' can be running against a single 'PubSubController' at any time.  If
-- two active calls to 'pubSubForever' share a single 'PubSubController' there will be deadlocks.  If
-- you do want to process messages using multiple connections to Redis, you can create more than one
-- 'PubSubController'.  For example, create one PubSubController for each 'Control.Concurrent.getNumCapabilities'
-- and then create a Haskell thread bound to each capability each calling 'pubSubForever' in a loop.
-- This will create one network connection per controller/capability and allow you to
-- register separate channels and callbacks for each controller, spreading the load across the capabilities.
pubSubForever :: Core.Connection -- ^ The connection pool
              -> PubSubController -- ^ The controller which keeps track of all subscriptions and handlers
              -> IO () -- ^ This action is executed once Redis acknowledges that all the subscriptions in
                       -- the controller are now subscribed.  You can use this after an exception (such as
                       -- 'ConnectionLost') to signal that all subscriptions are now reactivated.
              -> IO ()
pubSubForever (Core.Conn pool) ctrl onInitialLoad = withResource pool $ \rawConn -> do
    -- get initial subscriptions and write them into the queue.
    atomically $ do
      let loop = tryReadTBQueue (sendChanges ctrl) >>=
                   \x -> if isJust x then loop else return ()
      loop
      cm <- readTVar $ callbacks ctrl
      pm <- readTVar $ pcallbacks ctrl
      let ps = subscribe (HM.keys cm) `mappend` psubscribe (HM.keys pm)
      writeTBQueue (sendChanges ctrl) ps
      writeTVar (pendingCnt ctrl) (totalPendingChanges ps)

    withAsync (listenThread ctrl rawConn) $ \listenT ->
      withAsync (sendThread ctrl rawConn) $ \sendT -> do

        -- wait for initial subscription count to go to zero or for threads to fail
        mret <- atomically $
            (Left <$> (waitEitherCatchSTM listenT sendT))
          `orElse`
            (Right <$> (readTVar (pendingCnt ctrl) >>=
                           \x -> if x > 0 then retry else return ()))
        case mret of
          Right () -> onInitialLoad
          _ -> return () -- if there is an error, waitEitherCatch below will also see it

        -- wait for threads to end with error
        merr <- waitEitherCatch listenT sendT
        case merr of
          (Right (Left err)) -> throwIO err
          (Left (Left err)) -> throwIO err
          _ -> return ()  -- should never happen, since threads exit only with an error


------------------------------------------------------------------------------
-- Helpers
--
decodeMsg :: Reply -> PubSubReply
decodeMsg r@(MultiBulk (Just (r0:r1:r2:rs))) = either (errMsg r) id $ do
    kind <- decode r0
    case kind :: ByteString of
        "message"      -> Msg <$> decodeMessage
        "pmessage"     -> Msg <$> decodePMessage
        "subscribe"    -> return Subscribed
        "psubscribe"   -> return Subscribed
        "unsubscribe"  -> Unsubscribed <$> decodeCnt
        "punsubscribe" -> Unsubscribed <$> decodeCnt
        _              -> errMsg r
  where
    decodeMessage  = Message  <$> decode r1 <*> decode r2
    decodePMessage = PMessage <$> decode r1 <*> decode r2 <*> decode (head rs)
    decodeCnt      = fromInteger <$> decode r2

decodeMsg r = errMsg r

errMsg :: Reply -> a
errMsg r = error $ "Hedis: expected pub/sub-message but got: " ++ show r


-- $pubsubexpl
-- There are two Pub/Sub implementations.  First, there is a single-threaded implementation 'pubSub'
-- which is simpler to use but has the restriction that subscription changes can only be made in
-- response to a message.  Secondly, there is a more complicated Pub/Sub controller 'pubSubForever'
-- that uses concurrency to support changing subscriptions at any time but requires more setup.
-- You should only use one or the other.  In addition, no types or utility functions (that are part
-- of the public API) are shared, so functions or types in one of the following sections cannot
-- be used for the other.  In particular, be aware that they use different utility functions to subscribe
-- and unsubscribe to channels.