{-# LANGUAGE CPP, OverloadedStrings, RecordWildCards, EmptyDataDecls,
    FlexibleInstances, FlexibleContexts, GeneralizedNewtypeDeriving #-}

module Database.Redis.PubSub (
    publish,

    -- ** Subscribing to channels
    -- $pubsubexpl

    -- *** Single-thread Pub/Sub
    pubSub,
    Message(..),
    PubSub(),
    subscribe, unsubscribe, psubscribe, punsubscribe,
    -- *** Continuous Pub/Sub message controller
    pubSubForever,
    RedisChannel, RedisPChannel, MessageCallback, PMessageCallback,
    PubSubController, newPubSubController, currentChannels, currentPChannels,
    addChannels, addChannelsAndWait, removeChannels, removeChannelsAndWait,
    UnregisterCallbacksAction
) where

#if __GLASGOW_HASKELL__ < 710
import Control.Applicative
import Data.Monoid hiding (<>)
#endif
import Control.Concurrent.Async (withAsync, waitEitherCatch, waitEitherCatchSTM)
import Control.Concurrent.STM
import Control.Exception (throwIO)
import Control.Monad
import Control.Monad.State
import Data.ByteString.Char8 (ByteString)
import Data.List (foldl')
import Data.Maybe (isJust)
import Data.Pool
#if __GLASGOW_HASKELL__ < 808
import Data.Semigroup (Semigroup(..))
#endif
import qualified Data.HashMap.Strict as HM
import qualified Database.Redis.Core as Core
import qualified Database.Redis.Connection as Connection
import qualified Database.Redis.ProtocolPipelining as PP
import Database.Redis.Protocol (Reply(..), renderRequest)
import Database.Redis.Types

-- |While in PubSub mode, we keep track of the number of current subscriptions
--  (as reported by Redis replies) and the number of messages we expect to
--  receive after a SUBSCRIBE or PSUBSCRIBE command. We can safely leave the
--  PubSub mode when both these numbers are zero.
data PubSubState = PubSubState { PubSubState -> Int
subCnt, PubSubState -> Int
pending :: Int }

modifyPending :: (MonadState PubSubState m) => (Int -> Int) -> m ()
modifyPending :: (Int -> Int) -> m ()
modifyPending Int -> Int
f = (PubSubState -> PubSubState) -> m ()
forall s (m :: * -> *). MonadState s m => (s -> s) -> m ()
modify ((PubSubState -> PubSubState) -> m ())
-> (PubSubState -> PubSubState) -> m ()
forall a b. (a -> b) -> a -> b
$ \PubSubState
s -> PubSubState
s{ pending :: Int
pending = Int -> Int
f (PubSubState -> Int
pending PubSubState
s) }

putSubCnt :: (MonadState PubSubState m) => Int -> m ()
putSubCnt :: Int -> m ()
putSubCnt Int
n = (PubSubState -> PubSubState) -> m ()
forall s (m :: * -> *). MonadState s m => (s -> s) -> m ()
modify ((PubSubState -> PubSubState) -> m ())
-> (PubSubState -> PubSubState) -> m ()
forall a b. (a -> b) -> a -> b
$ \PubSubState
s -> PubSubState
s{ subCnt :: Int
subCnt = Int
n }

data Subscribe
data Unsubscribe
data Channel
data Pattern

-- |Encapsulates subscription changes. Use 'subscribe', 'unsubscribe',
--  'psubscribe', 'punsubscribe' or 'mempty' to construct a value. Combine
--  values by using the 'Monoid' interface, i.e. 'mappend' and 'mconcat'.
data PubSub = PubSub
    { PubSub -> Cmd Subscribe Channel
subs    :: Cmd Subscribe Channel
    , PubSub -> Cmd Unsubscribe Channel
unsubs  :: Cmd Unsubscribe Channel
    , PubSub -> Cmd Subscribe Pattern
psubs   :: Cmd Subscribe Pattern
    , PubSub -> Cmd Unsubscribe Pattern
punsubs :: Cmd Unsubscribe Pattern
    } deriving (PubSub -> PubSub -> Bool
(PubSub -> PubSub -> Bool)
-> (PubSub -> PubSub -> Bool) -> Eq PubSub
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: PubSub -> PubSub -> Bool
$c/= :: PubSub -> PubSub -> Bool
== :: PubSub -> PubSub -> Bool
$c== :: PubSub -> PubSub -> Bool
Eq)

instance Semigroup PubSub where
    <> :: PubSub -> PubSub -> PubSub
(<>) PubSub
p1 PubSub
p2 = PubSub :: Cmd Subscribe Channel
-> Cmd Unsubscribe Channel
-> Cmd Subscribe Pattern
-> Cmd Unsubscribe Pattern
-> PubSub
PubSub { subs :: Cmd Subscribe Channel
subs    = PubSub -> Cmd Subscribe Channel
subs PubSub
p1 Cmd Subscribe Channel
-> Cmd Subscribe Channel -> Cmd Subscribe Channel
forall a. Monoid a => a -> a -> a
`mappend` PubSub -> Cmd Subscribe Channel
subs PubSub
p2
                           , unsubs :: Cmd Unsubscribe Channel
unsubs  = PubSub -> Cmd Unsubscribe Channel
unsubs PubSub
p1 Cmd Unsubscribe Channel
-> Cmd Unsubscribe Channel -> Cmd Unsubscribe Channel
forall a. Monoid a => a -> a -> a
`mappend` PubSub -> Cmd Unsubscribe Channel
unsubs PubSub
p2
                           , psubs :: Cmd Subscribe Pattern
psubs   = PubSub -> Cmd Subscribe Pattern
psubs PubSub
p1 Cmd Subscribe Pattern
-> Cmd Subscribe Pattern -> Cmd Subscribe Pattern
forall a. Monoid a => a -> a -> a
`mappend` PubSub -> Cmd Subscribe Pattern
psubs PubSub
p2
                           , punsubs :: Cmd Unsubscribe Pattern
punsubs = PubSub -> Cmd Unsubscribe Pattern
punsubs PubSub
p1 Cmd Unsubscribe Pattern
-> Cmd Unsubscribe Pattern -> Cmd Unsubscribe Pattern
forall a. Monoid a => a -> a -> a
`mappend` PubSub -> Cmd Unsubscribe Pattern
punsubs PubSub
p2
                           }

instance Monoid PubSub where
    mempty :: PubSub
mempty        = Cmd Subscribe Channel
-> Cmd Unsubscribe Channel
-> Cmd Subscribe Pattern
-> Cmd Unsubscribe Pattern
-> PubSub
PubSub Cmd Subscribe Channel
forall a. Monoid a => a
mempty Cmd Unsubscribe Channel
forall a. Monoid a => a
mempty Cmd Subscribe Pattern
forall a. Monoid a => a
mempty Cmd Unsubscribe Pattern
forall a. Monoid a => a
mempty
    mappend :: PubSub -> PubSub -> PubSub
mappend = PubSub -> PubSub -> PubSub
forall a. Semigroup a => a -> a -> a
(<>)

data Cmd a b = DoNothing | Cmd { Cmd a b -> [ByteString]
changes :: [ByteString] } deriving (Cmd a b -> Cmd a b -> Bool
(Cmd a b -> Cmd a b -> Bool)
-> (Cmd a b -> Cmd a b -> Bool) -> Eq (Cmd a b)
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
forall a b. Cmd a b -> Cmd a b -> Bool
/= :: Cmd a b -> Cmd a b -> Bool
$c/= :: forall a b. Cmd a b -> Cmd a b -> Bool
== :: Cmd a b -> Cmd a b -> Bool
$c== :: forall a b. Cmd a b -> Cmd a b -> Bool
Eq)

instance Semigroup (Cmd Subscribe a) where
  <> :: Cmd Subscribe a -> Cmd Subscribe a -> Cmd Subscribe a
(<>) Cmd Subscribe a
DoNothing Cmd Subscribe a
x = Cmd Subscribe a
x
  (<>) Cmd Subscribe a
x Cmd Subscribe a
DoNothing = Cmd Subscribe a
x
  (<>) (Cmd [ByteString]
xs) (Cmd [ByteString]
ys) = [ByteString] -> Cmd Subscribe a
forall a b. [ByteString] -> Cmd a b
Cmd ([ByteString]
xs [ByteString] -> [ByteString] -> [ByteString]
forall a. [a] -> [a] -> [a]
++ [ByteString]
ys)

instance Monoid (Cmd Subscribe a) where
  mempty :: Cmd Subscribe a
mempty = Cmd Subscribe a
forall a b. Cmd a b
DoNothing
  mappend :: Cmd Subscribe a -> Cmd Subscribe a -> Cmd Subscribe a
mappend = Cmd Subscribe a -> Cmd Subscribe a -> Cmd Subscribe a
forall a. Semigroup a => a -> a -> a
(<>)

instance Semigroup (Cmd Unsubscribe a) where
  <> :: Cmd Unsubscribe a -> Cmd Unsubscribe a -> Cmd Unsubscribe a
(<>) Cmd Unsubscribe a
DoNothing Cmd Unsubscribe a
x = Cmd Unsubscribe a
x
  (<>) Cmd Unsubscribe a
x Cmd Unsubscribe a
DoNothing = Cmd Unsubscribe a
x
  -- empty subscription list => unsubscribe all channels and patterns
  (<>) (Cmd []) Cmd Unsubscribe a
_ = [ByteString] -> Cmd Unsubscribe a
forall a b. [ByteString] -> Cmd a b
Cmd []
  (<>) Cmd Unsubscribe a
_ (Cmd []) = [ByteString] -> Cmd Unsubscribe a
forall a b. [ByteString] -> Cmd a b
Cmd []
  (<>) (Cmd [ByteString]
xs) (Cmd [ByteString]
ys) = [ByteString] -> Cmd Unsubscribe a
forall a b. [ByteString] -> Cmd a b
Cmd ([ByteString]
xs [ByteString] -> [ByteString] -> [ByteString]
forall a. [a] -> [a] -> [a]
++ [ByteString]
ys)

instance Monoid (Cmd Unsubscribe a) where
  mempty :: Cmd Unsubscribe a
mempty = Cmd Unsubscribe a
forall a b. Cmd a b
DoNothing
  mappend :: Cmd Unsubscribe a -> Cmd Unsubscribe a -> Cmd Unsubscribe a
mappend = Cmd Unsubscribe a -> Cmd Unsubscribe a -> Cmd Unsubscribe a
forall a. Semigroup a => a -> a -> a
(<>)

class Command a where
    redisCmd      :: a -> ByteString
    updatePending :: a -> Int -> Int

sendCmd :: (Command (Cmd a b)) => Cmd a b -> StateT PubSubState Core.Redis ()
sendCmd :: Cmd a b -> StateT PubSubState Redis ()
sendCmd Cmd a b
DoNothing = () -> StateT PubSubState Redis ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
sendCmd Cmd a b
cmd       = do
    Redis () -> StateT PubSubState Redis ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (Redis () -> StateT PubSubState Redis ())
-> Redis () -> StateT PubSubState Redis ()
forall a b. (a -> b) -> a -> b
$ [ByteString] -> Redis ()
forall (m :: * -> *). MonadRedis m => [ByteString] -> m ()
Core.send (Cmd a b -> ByteString
forall a. Command a => a -> ByteString
redisCmd Cmd a b
cmd ByteString -> [ByteString] -> [ByteString]
forall a. a -> [a] -> [a]
: Cmd a b -> [ByteString]
forall a b. Cmd a b -> [ByteString]
changes Cmd a b
cmd)
    (Int -> Int) -> StateT PubSubState Redis ()
forall (m :: * -> *).
MonadState PubSubState m =>
(Int -> Int) -> m ()
modifyPending (Cmd a b -> Int -> Int
forall a. Command a => a -> Int -> Int
updatePending Cmd a b
cmd)

cmdCount :: Cmd a b -> Int
cmdCount :: Cmd a b -> Int
cmdCount Cmd a b
DoNothing = Int
0
cmdCount (Cmd [ByteString]
c) = [ByteString] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [ByteString]
c

totalPendingChanges :: PubSub -> Int
totalPendingChanges :: PubSub -> Int
totalPendingChanges (PubSub{Cmd Unsubscribe Pattern
Cmd Unsubscribe Channel
Cmd Subscribe Pattern
Cmd Subscribe Channel
punsubs :: Cmd Unsubscribe Pattern
psubs :: Cmd Subscribe Pattern
unsubs :: Cmd Unsubscribe Channel
subs :: Cmd Subscribe Channel
punsubs :: PubSub -> Cmd Unsubscribe Pattern
psubs :: PubSub -> Cmd Subscribe Pattern
unsubs :: PubSub -> Cmd Unsubscribe Channel
subs :: PubSub -> Cmd Subscribe Channel
..}) =
  Cmd Subscribe Channel -> Int
forall a b. Cmd a b -> Int
cmdCount Cmd Subscribe Channel
subs Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Cmd Unsubscribe Channel -> Int
forall a b. Cmd a b -> Int
cmdCount Cmd Unsubscribe Channel
unsubs Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Cmd Subscribe Pattern -> Int
forall a b. Cmd a b -> Int
cmdCount Cmd Subscribe Pattern
psubs Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Cmd Unsubscribe Pattern -> Int
forall a b. Cmd a b -> Int
cmdCount Cmd Unsubscribe Pattern
punsubs

rawSendCmd :: (Command (Cmd a b)) => PP.Connection -> Cmd a b -> IO ()
rawSendCmd :: Connection -> Cmd a b -> IO ()
rawSendCmd Connection
_ Cmd a b
DoNothing = () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
rawSendCmd Connection
conn Cmd a b
cmd    = Connection -> ByteString -> IO ()
PP.send Connection
conn (ByteString -> IO ()) -> ByteString -> IO ()
forall a b. (a -> b) -> a -> b
$ [ByteString] -> ByteString
renderRequest ([ByteString] -> ByteString) -> [ByteString] -> ByteString
forall a b. (a -> b) -> a -> b
$ Cmd a b -> ByteString
forall a. Command a => a -> ByteString
redisCmd Cmd a b
cmd ByteString -> [ByteString] -> [ByteString]
forall a. a -> [a] -> [a]
: Cmd a b -> [ByteString]
forall a b. Cmd a b -> [ByteString]
changes Cmd a b
cmd

plusChangeCnt :: Cmd a b -> Int -> Int
plusChangeCnt :: Cmd a b -> Int -> Int
plusChangeCnt Cmd a b
DoNothing = Int -> Int
forall a. a -> a
id
plusChangeCnt (Cmd [ByteString]
cs)  = (Int -> Int -> Int
forall a. Num a => a -> a -> a
+ [ByteString] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [ByteString]
cs)

instance Command (Cmd Subscribe Channel) where
    redisCmd :: Cmd Subscribe Channel -> ByteString
redisCmd      = ByteString -> Cmd Subscribe Channel -> ByteString
forall a b. a -> b -> a
const ByteString
"SUBSCRIBE"
    updatePending :: Cmd Subscribe Channel -> Int -> Int
updatePending = Cmd Subscribe Channel -> Int -> Int
forall a b. Cmd a b -> Int -> Int
plusChangeCnt

instance Command (Cmd Subscribe Pattern) where
    redisCmd :: Cmd Subscribe Pattern -> ByteString
redisCmd      = ByteString -> Cmd Subscribe Pattern -> ByteString
forall a b. a -> b -> a
const ByteString
"PSUBSCRIBE"
    updatePending :: Cmd Subscribe Pattern -> Int -> Int
updatePending = Cmd Subscribe Pattern -> Int -> Int
forall a b. Cmd a b -> Int -> Int
plusChangeCnt

instance Command (Cmd Unsubscribe Channel) where
    redisCmd :: Cmd Unsubscribe Channel -> ByteString
redisCmd      = ByteString -> Cmd Unsubscribe Channel -> ByteString
forall a b. a -> b -> a
const ByteString
"UNSUBSCRIBE"
    updatePending :: Cmd Unsubscribe Channel -> Int -> Int
updatePending = (Int -> Int) -> Cmd Unsubscribe Channel -> Int -> Int
forall a b. a -> b -> a
const Int -> Int
forall a. a -> a
id

instance Command (Cmd Unsubscribe Pattern) where
    redisCmd :: Cmd Unsubscribe Pattern -> ByteString
redisCmd      = ByteString -> Cmd Unsubscribe Pattern -> ByteString
forall a b. a -> b -> a
const ByteString
"PUNSUBSCRIBE"
    updatePending :: Cmd Unsubscribe Pattern -> Int -> Int
updatePending = (Int -> Int) -> Cmd Unsubscribe Pattern -> Int -> Int
forall a b. a -> b -> a
const Int -> Int
forall a. a -> a
id


data Message = Message  { Message -> ByteString
msgChannel, Message -> ByteString
msgMessage :: ByteString}
             | PMessage { Message -> ByteString
msgPattern, msgChannel, msgMessage :: ByteString}
    deriving (Int -> Message -> ShowS
[Message] -> ShowS
Message -> String
(Int -> Message -> ShowS)
-> (Message -> String) -> ([Message] -> ShowS) -> Show Message
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [Message] -> ShowS
$cshowList :: [Message] -> ShowS
show :: Message -> String
$cshow :: Message -> String
showsPrec :: Int -> Message -> ShowS
$cshowsPrec :: Int -> Message -> ShowS
Show)

data PubSubReply = Subscribed | Unsubscribed Int | Msg Message


------------------------------------------------------------------------------
-- Public Interface
--

-- |Post a message to a channel (<http://redis.io/commands/publish>).
publish
    :: (Core.RedisCtx m f)
    => ByteString -- ^ channel
    -> ByteString -- ^ message
    -> m (f Integer)
publish :: ByteString -> ByteString -> m (f Integer)
publish ByteString
channel ByteString
message =
    [ByteString] -> m (f Integer)
forall (m :: * -> *) (f :: * -> *) a.
(RedisCtx m f, RedisResult a) =>
[ByteString] -> m (f a)
Core.sendRequest [ByteString
"PUBLISH", ByteString
channel, ByteString
message]

-- |Listen for messages published to the given channels
--  (<http://redis.io/commands/subscribe>).
subscribe
    :: [ByteString] -- ^ channel
    -> PubSub
subscribe :: [ByteString] -> PubSub
subscribe []       = PubSub
forall a. Monoid a => a
mempty
subscribe [ByteString]
cs = PubSub
forall a. Monoid a => a
mempty{ subs :: Cmd Subscribe Channel
subs = [ByteString] -> Cmd Subscribe Channel
forall a b. [ByteString] -> Cmd a b
Cmd [ByteString]
cs }

-- |Stop listening for messages posted to the given channels
--  (<http://redis.io/commands/unsubscribe>).
unsubscribe
    :: [ByteString] -- ^ channel
    -> PubSub
unsubscribe :: [ByteString] -> PubSub
unsubscribe [ByteString]
cs = PubSub
forall a. Monoid a => a
mempty{ unsubs :: Cmd Unsubscribe Channel
unsubs = [ByteString] -> Cmd Unsubscribe Channel
forall a b. [ByteString] -> Cmd a b
Cmd [ByteString]
cs }

-- |Listen for messages published to channels matching the given patterns
--  (<http://redis.io/commands/psubscribe>).
psubscribe
    :: [ByteString] -- ^ pattern
    -> PubSub
psubscribe :: [ByteString] -> PubSub
psubscribe []       = PubSub
forall a. Monoid a => a
mempty
psubscribe [ByteString]
ps = PubSub
forall a. Monoid a => a
mempty{ psubs :: Cmd Subscribe Pattern
psubs = [ByteString] -> Cmd Subscribe Pattern
forall a b. [ByteString] -> Cmd a b
Cmd [ByteString]
ps }

-- |Stop listening for messages posted to channels matching the given patterns
--  (<http://redis.io/commands/punsubscribe>).
punsubscribe
    :: [ByteString] -- ^ pattern
    -> PubSub
punsubscribe :: [ByteString] -> PubSub
punsubscribe [ByteString]
ps = PubSub
forall a. Monoid a => a
mempty{ punsubs :: Cmd Unsubscribe Pattern
punsubs = [ByteString] -> Cmd Unsubscribe Pattern
forall a b. [ByteString] -> Cmd a b
Cmd [ByteString]
ps }

-- |Listens to published messages on subscribed channels and channels matching
--  the subscribed patterns. For documentation on the semantics of Redis
--  Pub\/Sub see <http://redis.io/topics/pubsub>.
--
--  The given callback function is called for each received message.
--  Subscription changes are triggered by the returned 'PubSub'. To keep
--  subscriptions unchanged, the callback can return 'mempty'.
--
--  Example: Subscribe to the \"news\" channel indefinitely.
--
--  @
--  pubSub (subscribe [\"news\"]) $ \\msg -> do
--      putStrLn $ \"Message from \" ++ show (msgChannel msg)
--      return mempty
--  @
--
--  Example: Receive a single message from the \"chat\" channel.
--
--  @
--  pubSub (subscribe [\"chat\"]) $ \\msg -> do
--      putStrLn $ \"Message from \" ++ show (msgChannel msg)
--      return $ unsubscribe [\"chat\"]
--  @
--
-- It should be noted that Redis Pub\/Sub by its nature is asynchronous
-- so returning `unsubscribe` does not mean that callback won't be able
-- to receive any further messages. And to guarantee that you won't
-- won't process messages after unsubscription and won't unsubscribe
-- from the same channel more than once you need to use `IORef` or
-- something similar
--
pubSub
    :: PubSub                 -- ^ Initial subscriptions.
    -> (Message -> IO PubSub) -- ^ Callback function.
    -> Core.Redis ()
pubSub :: PubSub -> (Message -> IO PubSub) -> Redis ()
pubSub PubSub
initial Message -> IO PubSub
callback
    | PubSub
initial PubSub -> PubSub -> Bool
forall a. Eq a => a -> a -> Bool
== PubSub
forall a. Monoid a => a
mempty = () -> Redis ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
    | Bool
otherwise         = StateT PubSubState Redis () -> PubSubState -> Redis ()
forall (m :: * -> *) s a. Monad m => StateT s m a -> s -> m a
evalStateT (PubSub -> StateT PubSubState Redis ()
send PubSub
initial) (Int -> Int -> PubSubState
PubSubState Int
0 Int
0)
  where
    send :: PubSub -> StateT PubSubState Core.Redis ()
    send :: PubSub -> StateT PubSubState Redis ()
send PubSub{Cmd Unsubscribe Pattern
Cmd Unsubscribe Channel
Cmd Subscribe Pattern
Cmd Subscribe Channel
punsubs :: Cmd Unsubscribe Pattern
psubs :: Cmd Subscribe Pattern
unsubs :: Cmd Unsubscribe Channel
subs :: Cmd Subscribe Channel
punsubs :: PubSub -> Cmd Unsubscribe Pattern
psubs :: PubSub -> Cmd Subscribe Pattern
unsubs :: PubSub -> Cmd Unsubscribe Channel
subs :: PubSub -> Cmd Subscribe Channel
..} = do
        Cmd Subscribe Channel -> StateT PubSubState Redis ()
forall a b.
Command (Cmd a b) =>
Cmd a b -> StateT PubSubState Redis ()
sendCmd Cmd Subscribe Channel
subs
        Cmd Unsubscribe Channel -> StateT PubSubState Redis ()
forall a b.
Command (Cmd a b) =>
Cmd a b -> StateT PubSubState Redis ()
sendCmd Cmd Unsubscribe Channel
unsubs
        Cmd Subscribe Pattern -> StateT PubSubState Redis ()
forall a b.
Command (Cmd a b) =>
Cmd a b -> StateT PubSubState Redis ()
sendCmd Cmd Subscribe Pattern
psubs
        Cmd Unsubscribe Pattern -> StateT PubSubState Redis ()
forall a b.
Command (Cmd a b) =>
Cmd a b -> StateT PubSubState Redis ()
sendCmd Cmd Unsubscribe Pattern
punsubs
        StateT PubSubState Redis ()
recv

    recv :: StateT PubSubState Core.Redis ()
    recv :: StateT PubSubState Redis ()
recv = do
        Reply
reply <- Redis Reply -> StateT PubSubState Redis Reply
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift Redis Reply
forall (m :: * -> *). MonadRedis m => m Reply
Core.recv
        case Reply -> PubSubReply
decodeMsg Reply
reply of
            Msg Message
msg        -> IO PubSub -> StateT PubSubState Redis PubSub
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Message -> IO PubSub
callback Message
msg) StateT PubSubState Redis PubSub
-> (PubSub -> StateT PubSubState Redis ())
-> StateT PubSubState Redis ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= PubSub -> StateT PubSubState Redis ()
send
            PubSubReply
Subscribed     -> (Int -> Int) -> StateT PubSubState Redis ()
forall (m :: * -> *).
MonadState PubSubState m =>
(Int -> Int) -> m ()
modifyPending (Int -> Int -> Int
forall a. Num a => a -> a -> a
subtract Int
1) StateT PubSubState Redis ()
-> StateT PubSubState Redis () -> StateT PubSubState Redis ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> StateT PubSubState Redis ()
recv
            Unsubscribed Int
n -> do
                Int -> StateT PubSubState Redis ()
forall (m :: * -> *). MonadState PubSubState m => Int -> m ()
putSubCnt Int
n
                PubSubState{Int
pending :: Int
subCnt :: Int
pending :: PubSubState -> Int
subCnt :: PubSubState -> Int
..} <- StateT PubSubState Redis PubSubState
forall s (m :: * -> *). MonadState s m => m s
get
                Bool -> StateT PubSubState Redis () -> StateT PubSubState Redis ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (Int
subCnt Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0 Bool -> Bool -> Bool
&& Int
pending Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0) StateT PubSubState Redis ()
recv

-- | A Redis channel name
type RedisChannel = ByteString

-- | A Redis pattern channel name
type RedisPChannel = ByteString

-- | A handler for a message from a subscribed channel.
-- The callback is passed the message content.
--
-- Messages are processed synchronously in the receiving thread, so if the callback
-- takes a long time it will block other callbacks and other messages from being
-- received.  If you need to move long-running work to a different thread, we suggest
-- you use 'TBQueue' with a reasonable bound, so that if messages are arriving faster
-- than you can process them, you do eventually block.
--
-- If the callback throws an exception, the exception will be thrown from 'pubSubForever'
-- which will cause the entire Redis connection for all subscriptions to be closed.
-- As long as you call 'pubSubForever' in a loop you will reconnect to your subscribed
-- channels, but you should probably add an exception handler to each callback to
-- prevent this.
type MessageCallback = ByteString -> IO ()

-- | A handler for a message from a psubscribed channel.
-- The callback is passed the channel the message was sent on plus the message content.
--
-- Similar to 'MessageCallback', callbacks are executed synchronously and any exceptions
-- are rethrown from 'pubSubForever'.
type PMessageCallback = RedisChannel -> ByteString -> IO ()

-- | An action that when executed will unregister the callbacks.  It is returned from 'addChannels'
-- or 'addChannelsAndWait' and typically you would use it in 'bracket' to guarantee that you
-- unsubscribe from channels.  For example, if you are using websockets to distribute messages to
-- clients, you could use something such as:
--
-- > websocketConn <- Network.WebSockets.acceptRequest pending
-- > let mycallback msg = Network.WebSockets.sendTextData websocketConn msg
-- > bracket (addChannelsAndWait ctrl [("hello", mycallback)] []) id $ const $ do
-- >   {- loop here calling Network.WebSockets.receiveData -}
type UnregisterCallbacksAction = IO ()

newtype UnregisterHandle = UnregisterHandle Integer
  deriving (UnregisterHandle -> UnregisterHandle -> Bool
(UnregisterHandle -> UnregisterHandle -> Bool)
-> (UnregisterHandle -> UnregisterHandle -> Bool)
-> Eq UnregisterHandle
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: UnregisterHandle -> UnregisterHandle -> Bool
$c/= :: UnregisterHandle -> UnregisterHandle -> Bool
== :: UnregisterHandle -> UnregisterHandle -> Bool
$c== :: UnregisterHandle -> UnregisterHandle -> Bool
Eq, Int -> UnregisterHandle -> ShowS
[UnregisterHandle] -> ShowS
UnregisterHandle -> String
(Int -> UnregisterHandle -> ShowS)
-> (UnregisterHandle -> String)
-> ([UnregisterHandle] -> ShowS)
-> Show UnregisterHandle
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [UnregisterHandle] -> ShowS
$cshowList :: [UnregisterHandle] -> ShowS
show :: UnregisterHandle -> String
$cshow :: UnregisterHandle -> String
showsPrec :: Int -> UnregisterHandle -> ShowS
$cshowsPrec :: Int -> UnregisterHandle -> ShowS
Show, Integer -> UnregisterHandle
UnregisterHandle -> UnregisterHandle
UnregisterHandle -> UnregisterHandle -> UnregisterHandle
(UnregisterHandle -> UnregisterHandle -> UnregisterHandle)
-> (UnregisterHandle -> UnregisterHandle -> UnregisterHandle)
-> (UnregisterHandle -> UnregisterHandle -> UnregisterHandle)
-> (UnregisterHandle -> UnregisterHandle)
-> (UnregisterHandle -> UnregisterHandle)
-> (UnregisterHandle -> UnregisterHandle)
-> (Integer -> UnregisterHandle)
-> Num UnregisterHandle
forall a.
(a -> a -> a)
-> (a -> a -> a)
-> (a -> a -> a)
-> (a -> a)
-> (a -> a)
-> (a -> a)
-> (Integer -> a)
-> Num a
fromInteger :: Integer -> UnregisterHandle
$cfromInteger :: Integer -> UnregisterHandle
signum :: UnregisterHandle -> UnregisterHandle
$csignum :: UnregisterHandle -> UnregisterHandle
abs :: UnregisterHandle -> UnregisterHandle
$cabs :: UnregisterHandle -> UnregisterHandle
negate :: UnregisterHandle -> UnregisterHandle
$cnegate :: UnregisterHandle -> UnregisterHandle
* :: UnregisterHandle -> UnregisterHandle -> UnregisterHandle
$c* :: UnregisterHandle -> UnregisterHandle -> UnregisterHandle
- :: UnregisterHandle -> UnregisterHandle -> UnregisterHandle
$c- :: UnregisterHandle -> UnregisterHandle -> UnregisterHandle
+ :: UnregisterHandle -> UnregisterHandle -> UnregisterHandle
$c+ :: UnregisterHandle -> UnregisterHandle -> UnregisterHandle
Num)

-- | A controller that stores a set of channels, pattern channels, and callbacks.
-- It allows you to manage Pub/Sub subscriptions and pattern subscriptions and alter them at
-- any time throughout the life of your program.
-- You should typically create the controller at the start of your program and then store it
-- through the life of your program, using 'addChannels' and 'removeChannels' to update the
-- current subscriptions.
data PubSubController = PubSubController
  { PubSubController
-> TVar
     (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
callbacks :: TVar (HM.HashMap RedisChannel [(UnregisterHandle, MessageCallback)])
  , PubSubController
-> TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
pcallbacks :: TVar (HM.HashMap RedisPChannel [(UnregisterHandle, PMessageCallback)])
  , PubSubController -> TBQueue PubSub
sendChanges :: TBQueue PubSub
  , PubSubController -> TVar Int
pendingCnt :: TVar Int
  , PubSubController -> TVar UnregisterHandle
lastUsedCallbackId :: TVar UnregisterHandle
  }

-- | Create a new 'PubSubController'.  Note that this does not subscribe to any channels, it just
-- creates the controller.  The subscriptions will happen once 'pubSubForever' is called.
newPubSubController :: MonadIO m => [(RedisChannel, MessageCallback)] -- ^ the initial subscriptions
                                 -> [(RedisPChannel, PMessageCallback)] -- ^ the initial pattern subscriptions
                                 -> m PubSubController
newPubSubController :: [(ByteString, ByteString -> IO ())]
-> [(ByteString, PMessageCallback)] -> m PubSubController
newPubSubController [(ByteString, ByteString -> IO ())]
x [(ByteString, PMessageCallback)]
y = IO PubSubController -> m PubSubController
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO PubSubController -> m PubSubController)
-> IO PubSubController -> m PubSubController
forall a b. (a -> b) -> a -> b
$ do
    TVar (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
cbs <- HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
-> IO
     (TVar
        (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]))
forall a. a -> IO (TVar a)
newTVarIO (((ByteString -> IO ())
 -> [(UnregisterHandle, ByteString -> IO ())])
-> HashMap ByteString (ByteString -> IO ())
-> HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
forall v1 v2 k. (v1 -> v2) -> HashMap k v1 -> HashMap k v2
HM.map (\ByteString -> IO ()
z -> [(UnregisterHandle
0,ByteString -> IO ()
z)]) (HashMap ByteString (ByteString -> IO ())
 -> HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> HashMap ByteString (ByteString -> IO ())
-> HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
forall a b. (a -> b) -> a -> b
$ [(ByteString, ByteString -> IO ())]
-> HashMap ByteString (ByteString -> IO ())
forall k v. (Eq k, Hashable k) => [(k, v)] -> HashMap k v
HM.fromList [(ByteString, ByteString -> IO ())]
x)
    TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
pcbs <- HashMap ByteString [(UnregisterHandle, PMessageCallback)]
-> IO
     (TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)]))
forall a. a -> IO (TVar a)
newTVarIO ((PMessageCallback -> [(UnregisterHandle, PMessageCallback)])
-> HashMap ByteString PMessageCallback
-> HashMap ByteString [(UnregisterHandle, PMessageCallback)]
forall v1 v2 k. (v1 -> v2) -> HashMap k v1 -> HashMap k v2
HM.map (\PMessageCallback
z -> [(UnregisterHandle
0,PMessageCallback
z)]) (HashMap ByteString PMessageCallback
 -> HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> HashMap ByteString PMessageCallback
-> HashMap ByteString [(UnregisterHandle, PMessageCallback)]
forall a b. (a -> b) -> a -> b
$ [(ByteString, PMessageCallback)]
-> HashMap ByteString PMessageCallback
forall k v. (Eq k, Hashable k) => [(k, v)] -> HashMap k v
HM.fromList [(ByteString, PMessageCallback)]
y)
    TBQueue PubSub
c <- Natural -> IO (TBQueue PubSub)
forall a. Natural -> IO (TBQueue a)
newTBQueueIO Natural
10
    TVar Int
pending <- Int -> IO (TVar Int)
forall a. a -> IO (TVar a)
newTVarIO Int
0
    TVar UnregisterHandle
lastId <- UnregisterHandle -> IO (TVar UnregisterHandle)
forall a. a -> IO (TVar a)
newTVarIO UnregisterHandle
0
    PubSubController -> IO PubSubController
forall (m :: * -> *) a. Monad m => a -> m a
return (PubSubController -> IO PubSubController)
-> PubSubController -> IO PubSubController
forall a b. (a -> b) -> a -> b
$ TVar (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> TBQueue PubSub
-> TVar Int
-> TVar UnregisterHandle
-> PubSubController
PubSubController TVar (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
cbs TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
pcbs TBQueue PubSub
c TVar Int
pending TVar UnregisterHandle
lastId

-- | Get the list of current channels in the 'PubSubController'.  WARNING! This might not
-- exactly reflect the subscribed channels in the Redis server, because there is a delay
-- between adding or removing a channel in the 'PubSubController' and when Redis receives
-- and processes the subscription change request.
#if __GLASGOW_HASKELL__ < 710
currentChannels :: (MonadIO m, Functor m) => PubSubController -> m [RedisChannel]
#else
currentChannels :: MonadIO m => PubSubController -> m [RedisChannel]
#endif
currentChannels :: PubSubController -> m [ByteString]
currentChannels PubSubController
ctrl = HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
-> [ByteString]
forall k v. HashMap k v -> [k]
HM.keys (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
 -> [ByteString])
-> m (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> m [ByteString]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (IO (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> m (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
 -> m (HashMap
         ByteString [(UnregisterHandle, ByteString -> IO ())]))
-> IO
     (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> m (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
forall a b. (a -> b) -> a -> b
$ STM (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> IO
     (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
forall a. STM a -> IO a
atomically (STM (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
 -> IO
      (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]))
-> STM
     (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> IO
     (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
forall a b. (a -> b) -> a -> b
$ TVar (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> STM
     (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
forall a. TVar a -> STM a
readTVar (TVar
   (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
 -> STM
      (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]))
-> TVar
     (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> STM
     (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
forall a b. (a -> b) -> a -> b
$ PubSubController
-> TVar
     (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
callbacks PubSubController
ctrl)

-- | Get the list of current pattern channels in the 'PubSubController'.  WARNING! This might not
-- exactly reflect the subscribed channels in the Redis server, because there is a delay
-- between adding or removing a channel in the 'PubSubController' and when Redis receives
-- and processes the subscription change request.
#if __GLASGOW_HASKELL__ < 710
currentPChannels :: (MonadIO m, Functor m) => PubSubController -> m [RedisPChannel]
#else
currentPChannels :: MonadIO m => PubSubController -> m [RedisPChannel]
#endif
currentPChannels :: PubSubController -> m [ByteString]
currentPChannels PubSubController
ctrl = HashMap ByteString [(UnregisterHandle, PMessageCallback)]
-> [ByteString]
forall k v. HashMap k v -> [k]
HM.keys (HashMap ByteString [(UnregisterHandle, PMessageCallback)]
 -> [ByteString])
-> m (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> m [ByteString]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (IO (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> m (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
 -> m (HashMap ByteString [(UnregisterHandle, PMessageCallback)]))
-> IO (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> m (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
forall a b. (a -> b) -> a -> b
$ STM (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> IO (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
forall a. STM a -> IO a
atomically (STM (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
 -> IO (HashMap ByteString [(UnregisterHandle, PMessageCallback)]))
-> STM (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> IO (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
forall a b. (a -> b) -> a -> b
$ TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> STM (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
forall a. TVar a -> STM a
readTVar (TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
 -> STM (HashMap ByteString [(UnregisterHandle, PMessageCallback)]))
-> TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> STM (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
forall a b. (a -> b) -> a -> b
$ PubSubController
-> TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
pcallbacks PubSubController
ctrl)

-- | Add channels into the 'PubSubController', and if there is an active 'pubSubForever', send the subscribe
-- and psubscribe commands to Redis.  The 'addChannels' function is thread-safe.  This function
-- does not wait for Redis to acknowledge that the channels have actually been subscribed; use
-- 'addChannelsAndWait' for that.
--
-- You can subscribe to the same channel or pattern channel multiple times; the 'PubSubController' keeps
-- a list of callbacks and executes each callback in response to a message.
--
-- The return value is an action 'UnregisterCallbacksAction' which will unregister the callbacks,
-- which should typically used with 'bracket'.
addChannels :: MonadIO m => PubSubController
                         -> [(RedisChannel, MessageCallback)] -- ^ the channels to subscribe to
                         -> [(RedisPChannel, PMessageCallback)] -- ^ the channels to pattern subscribe to
                         -> m UnregisterCallbacksAction
addChannels :: PubSubController
-> [(ByteString, ByteString -> IO ())]
-> [(ByteString, PMessageCallback)]
-> m (IO ())
addChannels PubSubController
_ [] [] = IO () -> m (IO ())
forall (m :: * -> *) a. Monad m => a -> m a
return (IO () -> m (IO ())) -> IO () -> m (IO ())
forall a b. (a -> b) -> a -> b
$ () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
addChannels PubSubController
ctrl [(ByteString, ByteString -> IO ())]
newChans [(ByteString, PMessageCallback)]
newPChans = IO (IO ()) -> m (IO ())
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IO ()) -> m (IO ())) -> IO (IO ()) -> m (IO ())
forall a b. (a -> b) -> a -> b
$ do
    UnregisterHandle
ident <- STM UnregisterHandle -> IO UnregisterHandle
forall a. STM a -> IO a
atomically (STM UnregisterHandle -> IO UnregisterHandle)
-> STM UnregisterHandle -> IO UnregisterHandle
forall a b. (a -> b) -> a -> b
$ do
      TVar UnregisterHandle
-> (UnregisterHandle -> UnregisterHandle) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar (PubSubController -> TVar UnregisterHandle
lastUsedCallbackId PubSubController
ctrl) (UnregisterHandle -> UnregisterHandle -> UnregisterHandle
forall a. Num a => a -> a -> a
+UnregisterHandle
1)
      UnregisterHandle
ident <- TVar UnregisterHandle -> STM UnregisterHandle
forall a. TVar a -> STM a
readTVar (TVar UnregisterHandle -> STM UnregisterHandle)
-> TVar UnregisterHandle -> STM UnregisterHandle
forall a b. (a -> b) -> a -> b
$ PubSubController -> TVar UnregisterHandle
lastUsedCallbackId PubSubController
ctrl
      HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
cm <- TVar (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> STM
     (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
forall a. TVar a -> STM a
readTVar (TVar
   (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
 -> STM
      (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]))
-> TVar
     (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> STM
     (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
forall a b. (a -> b) -> a -> b
$ PubSubController
-> TVar
     (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
callbacks PubSubController
ctrl
      HashMap ByteString [(UnregisterHandle, PMessageCallback)]
pm <- TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> STM (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
forall a. TVar a -> STM a
readTVar (TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
 -> STM (HashMap ByteString [(UnregisterHandle, PMessageCallback)]))
-> TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> STM (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
forall a b. (a -> b) -> a -> b
$ PubSubController
-> TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
pcallbacks PubSubController
ctrl
      let newChans' :: [ByteString]
newChans' = [ ByteString
n | (ByteString
n,ByteString -> IO ()
_) <- [(ByteString, ByteString -> IO ())]
newChans, Bool -> Bool
not (Bool -> Bool) -> Bool -> Bool
forall a b. (a -> b) -> a -> b
$ ByteString
-> HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
-> Bool
forall k a. (Eq k, Hashable k) => k -> HashMap k a -> Bool
HM.member ByteString
n HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
cm]
          newPChans' :: [ByteString]
newPChans' = [ ByteString
n | (ByteString
n, PMessageCallback
_) <- [(ByteString, PMessageCallback)]
newPChans, Bool -> Bool
not (Bool -> Bool) -> Bool -> Bool
forall a b. (a -> b) -> a -> b
$ ByteString
-> HashMap ByteString [(UnregisterHandle, PMessageCallback)]
-> Bool
forall k a. (Eq k, Hashable k) => k -> HashMap k a -> Bool
HM.member ByteString
n HashMap ByteString [(UnregisterHandle, PMessageCallback)]
pm]
          ps :: PubSub
ps = [ByteString] -> PubSub
subscribe [ByteString]
newChans' PubSub -> PubSub -> PubSub
forall a. Monoid a => a -> a -> a
`mappend` [ByteString] -> PubSub
psubscribe [ByteString]
newPChans'
      TBQueue PubSub -> PubSub -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue (PubSubController -> TBQueue PubSub
sendChanges PubSubController
ctrl) PubSub
ps
      TVar (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
-> STM ()
forall a. TVar a -> a -> STM ()
writeTVar (PubSubController
-> TVar
     (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
callbacks PubSubController
ctrl) (([(UnregisterHandle, ByteString -> IO ())]
 -> [(UnregisterHandle, ByteString -> IO ())]
 -> [(UnregisterHandle, ByteString -> IO ())])
-> HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
-> HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
-> HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
forall k v.
(Eq k, Hashable k) =>
(v -> v -> v) -> HashMap k v -> HashMap k v -> HashMap k v
HM.unionWith [(UnregisterHandle, ByteString -> IO ())]
-> [(UnregisterHandle, ByteString -> IO ())]
-> [(UnregisterHandle, ByteString -> IO ())]
forall a. [a] -> [a] -> [a]
(++) HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
cm (((ByteString -> IO ())
 -> [(UnregisterHandle, ByteString -> IO ())])
-> HashMap ByteString (ByteString -> IO ())
-> HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (\ByteString -> IO ()
z -> [(UnregisterHandle
ident,ByteString -> IO ()
z)]) (HashMap ByteString (ByteString -> IO ())
 -> HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> HashMap ByteString (ByteString -> IO ())
-> HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
forall a b. (a -> b) -> a -> b
$ [(ByteString, ByteString -> IO ())]
-> HashMap ByteString (ByteString -> IO ())
forall k v. (Eq k, Hashable k) => [(k, v)] -> HashMap k v
HM.fromList [(ByteString, ByteString -> IO ())]
newChans))
      TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> HashMap ByteString [(UnregisterHandle, PMessageCallback)]
-> STM ()
forall a. TVar a -> a -> STM ()
writeTVar (PubSubController
-> TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
pcallbacks PubSubController
ctrl) (([(UnregisterHandle, PMessageCallback)]
 -> [(UnregisterHandle, PMessageCallback)]
 -> [(UnregisterHandle, PMessageCallback)])
-> HashMap ByteString [(UnregisterHandle, PMessageCallback)]
-> HashMap ByteString [(UnregisterHandle, PMessageCallback)]
-> HashMap ByteString [(UnregisterHandle, PMessageCallback)]
forall k v.
(Eq k, Hashable k) =>
(v -> v -> v) -> HashMap k v -> HashMap k v -> HashMap k v
HM.unionWith [(UnregisterHandle, PMessageCallback)]
-> [(UnregisterHandle, PMessageCallback)]
-> [(UnregisterHandle, PMessageCallback)]
forall a. [a] -> [a] -> [a]
(++) HashMap ByteString [(UnregisterHandle, PMessageCallback)]
pm ((PMessageCallback -> [(UnregisterHandle, PMessageCallback)])
-> HashMap ByteString PMessageCallback
-> HashMap ByteString [(UnregisterHandle, PMessageCallback)]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (\PMessageCallback
z -> [(UnregisterHandle
ident,PMessageCallback
z)]) (HashMap ByteString PMessageCallback
 -> HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> HashMap ByteString PMessageCallback
-> HashMap ByteString [(UnregisterHandle, PMessageCallback)]
forall a b. (a -> b) -> a -> b
$ [(ByteString, PMessageCallback)]
-> HashMap ByteString PMessageCallback
forall k v. (Eq k, Hashable k) => [(k, v)] -> HashMap k v
HM.fromList [(ByteString, PMessageCallback)]
newPChans))
      TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar (PubSubController -> TVar Int
pendingCnt PubSubController
ctrl) (Int -> Int -> Int
forall a. Num a => a -> a -> a
+ PubSub -> Int
totalPendingChanges PubSub
ps)
      UnregisterHandle -> STM UnregisterHandle
forall (m :: * -> *) a. Monad m => a -> m a
return UnregisterHandle
ident
    IO () -> IO (IO ())
forall (m :: * -> *) a. Monad m => a -> m a
return (IO () -> IO (IO ())) -> IO () -> IO (IO ())
forall a b. (a -> b) -> a -> b
$ PubSubController
-> [ByteString] -> [ByteString] -> UnregisterHandle -> IO ()
unsubChannels PubSubController
ctrl (((ByteString, ByteString -> IO ()) -> ByteString)
-> [(ByteString, ByteString -> IO ())] -> [ByteString]
forall a b. (a -> b) -> [a] -> [b]
map (ByteString, ByteString -> IO ()) -> ByteString
forall a b. (a, b) -> a
fst [(ByteString, ByteString -> IO ())]
newChans) (((ByteString, PMessageCallback) -> ByteString)
-> [(ByteString, PMessageCallback)] -> [ByteString]
forall a b. (a -> b) -> [a] -> [b]
map (ByteString, PMessageCallback) -> ByteString
forall a b. (a, b) -> a
fst [(ByteString, PMessageCallback)]
newPChans) UnregisterHandle
ident

-- | Call 'addChannels' and then wait for Redis to acknowledge that the channels are actually subscribed.
--
-- Note that this function waits for all pending subscription change requests, so if you for example call
-- 'addChannelsAndWait' from multiple threads simultaneously, they all will wait for all pending
-- subscription changes to be acknowledged by Redis (this is due to the fact that we just track the total
-- number of pending change requests sent to Redis and just wait until that count reaches zero).
--
-- This also correctly waits if the network connection dies during the subscription change.  Say that the
-- network connection dies right after we send a subscription change to Redis.  'pubSubForever' will throw
-- 'ConnectionLost' and 'addChannelsAndWait' will continue to wait.  Once you recall 'pubSubForever'
-- with the same 'PubSubController', 'pubSubForever' will open a new connection, send subscription commands
-- for all channels in the 'PubSubController' (which include the ones we are waiting for),
-- and wait for the responses from Redis.  Only once we receive the response from Redis that it has subscribed
-- to all channels in 'PubSubController' will 'addChannelsAndWait' unblock and return.
addChannelsAndWait :: MonadIO m => PubSubController
                                -> [(RedisChannel, MessageCallback)] -- ^ the channels to subscribe to
                                -> [(RedisPChannel, PMessageCallback)] -- ^ the channels to psubscribe to
                                -> m UnregisterCallbacksAction
addChannelsAndWait :: PubSubController
-> [(ByteString, ByteString -> IO ())]
-> [(ByteString, PMessageCallback)]
-> m (IO ())
addChannelsAndWait PubSubController
_ [] [] = IO () -> m (IO ())
forall (m :: * -> *) a. Monad m => a -> m a
return (IO () -> m (IO ())) -> IO () -> m (IO ())
forall a b. (a -> b) -> a -> b
$ () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
addChannelsAndWait PubSubController
ctrl [(ByteString, ByteString -> IO ())]
newChans [(ByteString, PMessageCallback)]
newPChans = do
  IO ()
unreg <- PubSubController
-> [(ByteString, ByteString -> IO ())]
-> [(ByteString, PMessageCallback)]
-> m (IO ())
forall (m :: * -> *).
MonadIO m =>
PubSubController
-> [(ByteString, ByteString -> IO ())]
-> [(ByteString, PMessageCallback)]
-> m (IO ())
addChannels PubSubController
ctrl [(ByteString, ByteString -> IO ())]
newChans [(ByteString, PMessageCallback)]
newPChans
  IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
    Int
r <- TVar Int -> STM Int
forall a. TVar a -> STM a
readTVar (PubSubController -> TVar Int
pendingCnt PubSubController
ctrl)
    Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
r Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0) STM ()
forall a. STM a
retry
  IO () -> m (IO ())
forall (m :: * -> *) a. Monad m => a -> m a
return IO ()
unreg

-- | Remove channels from the 'PubSubController', and if there is an active 'pubSubForever', send the
-- unsubscribe commands to Redis.  Note that as soon as this function returns, no more callbacks will be
-- executed even if more messages arrive during the period when we request to unsubscribe from the channel
-- and Redis actually processes the unsubscribe request.  This function is thread-safe.
--
-- If you remove all channels, the connection in 'pubSubForever' to redis will stay open and waiting for
-- any new channels from a call to 'addChannels'.  If you really want to close the connection,
-- use 'Control.Concurrent.killThread' or 'Control.Concurrent.Async.cancel' to kill the thread running
-- 'pubSubForever'.
removeChannels :: MonadIO m => PubSubController
                            -> [RedisChannel]
                            -> [RedisPChannel]
                            -> m ()
removeChannels :: PubSubController -> [ByteString] -> [ByteString] -> m ()
removeChannels PubSubController
_ [] [] = () -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
removeChannels PubSubController
ctrl [ByteString]
remChans [ByteString]
remPChans = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
    HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
cm <- TVar (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> STM
     (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
forall a. TVar a -> STM a
readTVar (TVar
   (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
 -> STM
      (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]))
-> TVar
     (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> STM
     (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
forall a b. (a -> b) -> a -> b
$ PubSubController
-> TVar
     (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
callbacks PubSubController
ctrl
    HashMap ByteString [(UnregisterHandle, PMessageCallback)]
pm <- TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> STM (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
forall a. TVar a -> STM a
readTVar (TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
 -> STM (HashMap ByteString [(UnregisterHandle, PMessageCallback)]))
-> TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> STM (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
forall a b. (a -> b) -> a -> b
$ PubSubController
-> TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
pcallbacks PubSubController
ctrl
    let remChans' :: [ByteString]
remChans' = (ByteString -> Bool) -> [ByteString] -> [ByteString]
forall a. (a -> Bool) -> [a] -> [a]
filter (\ByteString
n -> ByteString
-> HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
-> Bool
forall k a. (Eq k, Hashable k) => k -> HashMap k a -> Bool
HM.member ByteString
n HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
cm) [ByteString]
remChans
        remPChans' :: [ByteString]
remPChans' = (ByteString -> Bool) -> [ByteString] -> [ByteString]
forall a. (a -> Bool) -> [a] -> [a]
filter (\ByteString
n -> ByteString
-> HashMap ByteString [(UnregisterHandle, PMessageCallback)]
-> Bool
forall k a. (Eq k, Hashable k) => k -> HashMap k a -> Bool
HM.member ByteString
n HashMap ByteString [(UnregisterHandle, PMessageCallback)]
pm) [ByteString]
remPChans
        ps :: PubSub
ps =        (if [ByteString] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [ByteString]
remChans' then PubSub
forall a. Monoid a => a
mempty else [ByteString] -> PubSub
unsubscribe [ByteString]
remChans')
          PubSub -> PubSub -> PubSub
forall a. Monoid a => a -> a -> a
`mappend` (if [ByteString] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [ByteString]
remPChans' then PubSub
forall a. Monoid a => a
mempty else [ByteString] -> PubSub
punsubscribe [ByteString]
remPChans')
    TBQueue PubSub -> PubSub -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue (PubSubController -> TBQueue PubSub
sendChanges PubSubController
ctrl) PubSub
ps
    TVar (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
-> STM ()
forall a. TVar a -> a -> STM ()
writeTVar (PubSubController
-> TVar
     (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
callbacks PubSubController
ctrl) ((HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
 -> ByteString
 -> HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
-> [ByteString]
-> HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
foldl' ((ByteString
 -> HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
 -> HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
-> ByteString
-> HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
forall a b c. (a -> b -> c) -> b -> a -> c
flip ByteString
-> HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
-> HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> HashMap k v
HM.delete) HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
cm [ByteString]
remChans')
    TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> HashMap ByteString [(UnregisterHandle, PMessageCallback)]
-> STM ()
forall a. TVar a -> a -> STM ()
writeTVar (PubSubController
-> TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
pcallbacks PubSubController
ctrl) ((HashMap ByteString [(UnregisterHandle, PMessageCallback)]
 -> ByteString
 -> HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> HashMap ByteString [(UnregisterHandle, PMessageCallback)]
-> [ByteString]
-> HashMap ByteString [(UnregisterHandle, PMessageCallback)]
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
foldl' ((ByteString
 -> HashMap ByteString [(UnregisterHandle, PMessageCallback)]
 -> HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> HashMap ByteString [(UnregisterHandle, PMessageCallback)]
-> ByteString
-> HashMap ByteString [(UnregisterHandle, PMessageCallback)]
forall a b c. (a -> b -> c) -> b -> a -> c
flip ByteString
-> HashMap ByteString [(UnregisterHandle, PMessageCallback)]
-> HashMap ByteString [(UnregisterHandle, PMessageCallback)]
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> HashMap k v
HM.delete) HashMap ByteString [(UnregisterHandle, PMessageCallback)]
pm [ByteString]
remPChans')
    TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar (PubSubController -> TVar Int
pendingCnt PubSubController
ctrl) (Int -> Int -> Int
forall a. Num a => a -> a -> a
+ PubSub -> Int
totalPendingChanges PubSub
ps)

-- | Internal function to unsubscribe only from those channels matching the given handle.
unsubChannels :: PubSubController -> [RedisChannel] -> [RedisPChannel] -> UnregisterHandle -> IO ()
unsubChannels :: PubSubController
-> [ByteString] -> [ByteString] -> UnregisterHandle -> IO ()
unsubChannels PubSubController
ctrl [ByteString]
chans [ByteString]
pchans UnregisterHandle
h = IO () -> IO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
    HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
cm <- TVar (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> STM
     (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
forall a. TVar a -> STM a
readTVar (TVar
   (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
 -> STM
      (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]))
-> TVar
     (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> STM
     (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
forall a b. (a -> b) -> a -> b
$ PubSubController
-> TVar
     (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
callbacks PubSubController
ctrl
    HashMap ByteString [(UnregisterHandle, PMessageCallback)]
pm <- TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> STM (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
forall a. TVar a -> STM a
readTVar (TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
 -> STM (HashMap ByteString [(UnregisterHandle, PMessageCallback)]))
-> TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> STM (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
forall a b. (a -> b) -> a -> b
$ PubSubController
-> TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
pcallbacks PubSubController
ctrl

    -- only worry about channels that exist
    let remChans :: [ByteString]
remChans = (ByteString -> Bool) -> [ByteString] -> [ByteString]
forall a. (a -> Bool) -> [a] -> [a]
filter (\ByteString
n -> ByteString
-> HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
-> Bool
forall k a. (Eq k, Hashable k) => k -> HashMap k a -> Bool
HM.member ByteString
n HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
cm) [ByteString]
chans
        remPChans :: [ByteString]
remPChans = (ByteString -> Bool) -> [ByteString] -> [ByteString]
forall a. (a -> Bool) -> [a] -> [a]
filter (\ByteString
n -> ByteString
-> HashMap ByteString [(UnregisterHandle, PMessageCallback)]
-> Bool
forall k a. (Eq k, Hashable k) => k -> HashMap k a -> Bool
HM.member ByteString
n HashMap ByteString [(UnregisterHandle, PMessageCallback)]
pm) [ByteString]
pchans

    -- helper functions to filter out handlers that match
    let filterHandle :: Maybe [(UnregisterHandle,a)] -> Maybe [(UnregisterHandle,a)]
        filterHandle :: Maybe [(UnregisterHandle, a)] -> Maybe [(UnregisterHandle, a)]
filterHandle Maybe [(UnregisterHandle, a)]
Nothing = Maybe [(UnregisterHandle, a)]
forall a. Maybe a
Nothing
        filterHandle (Just [(UnregisterHandle, a)]
lst) = case ((UnregisterHandle, a) -> Bool)
-> [(UnregisterHandle, a)] -> [(UnregisterHandle, a)]
forall a. (a -> Bool) -> [a] -> [a]
filter (\(UnregisterHandle, a)
x -> (UnregisterHandle, a) -> UnregisterHandle
forall a b. (a, b) -> a
fst (UnregisterHandle, a)
x UnregisterHandle -> UnregisterHandle -> Bool
forall a. Eq a => a -> a -> Bool
/= UnregisterHandle
h) [(UnregisterHandle, a)]
lst of
                                    [] -> Maybe [(UnregisterHandle, a)]
forall a. Maybe a
Nothing
                                    [(UnregisterHandle, a)]
xs -> [(UnregisterHandle, a)] -> Maybe [(UnregisterHandle, a)]
forall a. a -> Maybe a
Just [(UnregisterHandle, a)]
xs
    let removeHandles :: HM.HashMap ByteString [(UnregisterHandle,a)]
                      -> ByteString
                      -> HM.HashMap ByteString [(UnregisterHandle,a)]
        removeHandles :: HashMap ByteString [(UnregisterHandle, a)]
-> ByteString -> HashMap ByteString [(UnregisterHandle, a)]
removeHandles HashMap ByteString [(UnregisterHandle, a)]
m ByteString
k = case Maybe [(UnregisterHandle, a)] -> Maybe [(UnregisterHandle, a)]
forall a.
Maybe [(UnregisterHandle, a)] -> Maybe [(UnregisterHandle, a)]
filterHandle (ByteString
-> HashMap ByteString [(UnregisterHandle, a)]
-> Maybe [(UnregisterHandle, a)]
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> Maybe v
HM.lookup ByteString
k HashMap ByteString [(UnregisterHandle, a)]
m) of -- recent versions of unordered-containers have alter
            Maybe [(UnregisterHandle, a)]
Nothing -> ByteString
-> HashMap ByteString [(UnregisterHandle, a)]
-> HashMap ByteString [(UnregisterHandle, a)]
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> HashMap k v
HM.delete ByteString
k HashMap ByteString [(UnregisterHandle, a)]
m
            Just [(UnregisterHandle, a)]
v -> ByteString
-> [(UnregisterHandle, a)]
-> HashMap ByteString [(UnregisterHandle, a)]
-> HashMap ByteString [(UnregisterHandle, a)]
forall k v.
(Eq k, Hashable k) =>
k -> v -> HashMap k v -> HashMap k v
HM.insert ByteString
k [(UnregisterHandle, a)]
v HashMap ByteString [(UnregisterHandle, a)]
m

    -- maps after taking out channels matching the handle
    let cm' :: HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
cm' = (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
 -> ByteString
 -> HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
-> [ByteString]
-> HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
foldl' HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
-> ByteString
-> HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
forall a.
HashMap ByteString [(UnregisterHandle, a)]
-> ByteString -> HashMap ByteString [(UnregisterHandle, a)]
removeHandles HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
cm [ByteString]
remChans
        pm' :: HashMap ByteString [(UnregisterHandle, PMessageCallback)]
pm' = (HashMap ByteString [(UnregisterHandle, PMessageCallback)]
 -> ByteString
 -> HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> HashMap ByteString [(UnregisterHandle, PMessageCallback)]
-> [ByteString]
-> HashMap ByteString [(UnregisterHandle, PMessageCallback)]
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
foldl' HashMap ByteString [(UnregisterHandle, PMessageCallback)]
-> ByteString
-> HashMap ByteString [(UnregisterHandle, PMessageCallback)]
forall a.
HashMap ByteString [(UnregisterHandle, a)]
-> ByteString -> HashMap ByteString [(UnregisterHandle, a)]
removeHandles HashMap ByteString [(UnregisterHandle, PMessageCallback)]
pm [ByteString]
remPChans

    -- the channels to unsubscribe are those that no longer exist in cm' and pm'
    let remChans' :: [ByteString]
remChans' = (ByteString -> Bool) -> [ByteString] -> [ByteString]
forall a. (a -> Bool) -> [a] -> [a]
filter (\ByteString
n -> Bool -> Bool
not (Bool -> Bool) -> Bool -> Bool
forall a b. (a -> b) -> a -> b
$ ByteString
-> HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
-> Bool
forall k a. (Eq k, Hashable k) => k -> HashMap k a -> Bool
HM.member ByteString
n HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
cm') [ByteString]
remChans
        remPChans' :: [ByteString]
remPChans' = (ByteString -> Bool) -> [ByteString] -> [ByteString]
forall a. (a -> Bool) -> [a] -> [a]
filter (\ByteString
n -> Bool -> Bool
not (Bool -> Bool) -> Bool -> Bool
forall a b. (a -> b) -> a -> b
$ ByteString
-> HashMap ByteString [(UnregisterHandle, PMessageCallback)]
-> Bool
forall k a. (Eq k, Hashable k) => k -> HashMap k a -> Bool
HM.member ByteString
n HashMap ByteString [(UnregisterHandle, PMessageCallback)]
pm') [ByteString]
remPChans
        ps :: PubSub
ps =        (if [ByteString] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [ByteString]
remChans' then PubSub
forall a. Monoid a => a
mempty else [ByteString] -> PubSub
unsubscribe [ByteString]
remChans')
          PubSub -> PubSub -> PubSub
forall a. Monoid a => a -> a -> a
`mappend` (if [ByteString] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [ByteString]
remPChans' then PubSub
forall a. Monoid a => a
mempty else [ByteString] -> PubSub
punsubscribe [ByteString]
remPChans')

    -- do the unsubscribe
    TBQueue PubSub -> PubSub -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue (PubSubController -> TBQueue PubSub
sendChanges PubSubController
ctrl) PubSub
ps
    TVar (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
-> STM ()
forall a. TVar a -> a -> STM ()
writeTVar (PubSubController
-> TVar
     (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
callbacks PubSubController
ctrl) HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
cm'
    TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> HashMap ByteString [(UnregisterHandle, PMessageCallback)]
-> STM ()
forall a. TVar a -> a -> STM ()
writeTVar (PubSubController
-> TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
pcallbacks PubSubController
ctrl) HashMap ByteString [(UnregisterHandle, PMessageCallback)]
pm'
    TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar (PubSubController -> TVar Int
pendingCnt PubSubController
ctrl) (Int -> Int -> Int
forall a. Num a => a -> a -> a
+ PubSub -> Int
totalPendingChanges PubSub
ps)
    () -> STM ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()

-- | Call 'removeChannels' and then wait for all pending subscription change requests to be acknowledged
-- by Redis.  This uses the same waiting logic as 'addChannelsAndWait'.  Since 'removeChannels' immediately
-- notifies the 'PubSubController' to start discarding messages, you likely don't need this function and
-- can just use 'removeChannels'.
removeChannelsAndWait :: MonadIO m => PubSubController
                                   -> [RedisChannel]
                                   -> [RedisPChannel]
                                   -> m ()
removeChannelsAndWait :: PubSubController -> [ByteString] -> [ByteString] -> m ()
removeChannelsAndWait PubSubController
_ [] [] = () -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
removeChannelsAndWait PubSubController
ctrl [ByteString]
remChans [ByteString]
remPChans = do
  PubSubController -> [ByteString] -> [ByteString] -> m ()
forall (m :: * -> *).
MonadIO m =>
PubSubController -> [ByteString] -> [ByteString] -> m ()
removeChannels PubSubController
ctrl [ByteString]
remChans [ByteString]
remPChans
  IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
    Int
r <- TVar Int -> STM Int
forall a. TVar a -> STM a
readTVar (PubSubController -> TVar Int
pendingCnt PubSubController
ctrl)
    Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
r Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0) STM ()
forall a. STM a
retry

-- | Internal thread which listens for messages and executes callbacks.
-- This is the only thread which ever receives data from the underlying
-- connection.
listenThread :: PubSubController -> PP.Connection -> IO ()
listenThread :: PubSubController -> Connection -> IO ()
listenThread PubSubController
ctrl Connection
rawConn = IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
    Reply
msg <- Connection -> IO Reply
PP.recv Connection
rawConn
    case Reply -> PubSubReply
decodeMsg Reply
msg of
        Msg (Message ByteString
channel ByteString
msgCt) -> do
          HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
cm <- STM (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> IO
     (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
forall a. STM a -> IO a
atomically (STM (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
 -> IO
      (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]))
-> STM
     (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> IO
     (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
forall a b. (a -> b) -> a -> b
$ TVar (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> STM
     (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
forall a. TVar a -> STM a
readTVar (PubSubController
-> TVar
     (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
callbacks PubSubController
ctrl)
          case ByteString
-> HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
-> Maybe [(UnregisterHandle, ByteString -> IO ())]
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> Maybe v
HM.lookup ByteString
channel HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
cm of
            Maybe [(UnregisterHandle, ByteString -> IO ())]
Nothing -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
            Just [(UnregisterHandle, ByteString -> IO ())]
c -> ((UnregisterHandle, ByteString -> IO ()) -> IO ())
-> [(UnregisterHandle, ByteString -> IO ())] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (\(UnregisterHandle
_,ByteString -> IO ()
x) -> ByteString -> IO ()
x ByteString
msgCt) [(UnregisterHandle, ByteString -> IO ())]
c
        Msg (PMessage ByteString
pattern ByteString
channel ByteString
msgCt) -> do
          HashMap ByteString [(UnregisterHandle, PMessageCallback)]
pm <- STM (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> IO (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
forall a. STM a -> IO a
atomically (STM (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
 -> IO (HashMap ByteString [(UnregisterHandle, PMessageCallback)]))
-> STM (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> IO (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
forall a b. (a -> b) -> a -> b
$ TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> STM (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
forall a. TVar a -> STM a
readTVar (PubSubController
-> TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
pcallbacks PubSubController
ctrl)
          case ByteString
-> HashMap ByteString [(UnregisterHandle, PMessageCallback)]
-> Maybe [(UnregisterHandle, PMessageCallback)]
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> Maybe v
HM.lookup ByteString
pattern HashMap ByteString [(UnregisterHandle, PMessageCallback)]
pm of
            Maybe [(UnregisterHandle, PMessageCallback)]
Nothing -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
            Just [(UnregisterHandle, PMessageCallback)]
c -> ((UnregisterHandle, PMessageCallback) -> IO ())
-> [(UnregisterHandle, PMessageCallback)] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (\(UnregisterHandle
_,PMessageCallback
x) -> PMessageCallback
x ByteString
channel ByteString
msgCt) [(UnregisterHandle, PMessageCallback)]
c
        PubSubReply
Subscribed -> STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$
          TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar (PubSubController -> TVar Int
pendingCnt PubSubController
ctrl) (\Int
x -> Int
x Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1)
        Unsubscribed Int
_ -> STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$
          TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar (PubSubController -> TVar Int
pendingCnt PubSubController
ctrl) (\Int
x -> Int
x Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1)

-- | Internal thread which sends subscription change requests.
-- This is the only thread which ever sends data on the underlying
-- connection.
sendThread :: PubSubController -> PP.Connection -> IO ()
sendThread :: PubSubController -> Connection -> IO ()
sendThread PubSubController
ctrl Connection
rawConn = IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
    PubSub{Cmd Unsubscribe Pattern
Cmd Unsubscribe Channel
Cmd Subscribe Pattern
Cmd Subscribe Channel
punsubs :: Cmd Unsubscribe Pattern
psubs :: Cmd Subscribe Pattern
unsubs :: Cmd Unsubscribe Channel
subs :: Cmd Subscribe Channel
punsubs :: PubSub -> Cmd Unsubscribe Pattern
psubs :: PubSub -> Cmd Subscribe Pattern
unsubs :: PubSub -> Cmd Unsubscribe Channel
subs :: PubSub -> Cmd Subscribe Channel
..} <- STM PubSub -> IO PubSub
forall a. STM a -> IO a
atomically (STM PubSub -> IO PubSub) -> STM PubSub -> IO PubSub
forall a b. (a -> b) -> a -> b
$ TBQueue PubSub -> STM PubSub
forall a. TBQueue a -> STM a
readTBQueue (PubSubController -> TBQueue PubSub
sendChanges PubSubController
ctrl)
    Connection -> Cmd Subscribe Channel -> IO ()
forall a b. Command (Cmd a b) => Connection -> Cmd a b -> IO ()
rawSendCmd Connection
rawConn Cmd Subscribe Channel
subs
    Connection -> Cmd Unsubscribe Channel -> IO ()
forall a b. Command (Cmd a b) => Connection -> Cmd a b -> IO ()
rawSendCmd Connection
rawConn Cmd Unsubscribe Channel
unsubs
    Connection -> Cmd Subscribe Pattern -> IO ()
forall a b. Command (Cmd a b) => Connection -> Cmd a b -> IO ()
rawSendCmd Connection
rawConn Cmd Subscribe Pattern
psubs
    Connection -> Cmd Unsubscribe Pattern -> IO ()
forall a b. Command (Cmd a b) => Connection -> Cmd a b -> IO ()
rawSendCmd Connection
rawConn Cmd Unsubscribe Pattern
punsubs
    -- normally, the socket is flushed during 'recv', but
    -- 'recv' could currently be blocking on a message.
    Connection -> IO ()
PP.flush Connection
rawConn

-- | Open a connection to the Redis server, register to all channels in the 'PubSubController',
-- and process messages and subscription change requests forever.  The only way this will ever
-- exit is if there is an exception from the network code or an unhandled exception
-- in a 'MessageCallback' or 'PMessageCallback'. For example, if the network connection to Redis
-- dies, 'pubSubForever' will throw a 'ConnectionLost'.  When such an exception is
-- thrown, you can recall 'pubSubForever' with the same 'PubSubController' which will open a
-- new connection and resubscribe to all the channels which are tracked in the 'PubSubController'.
--
-- The general pattern is therefore during program startup create a 'PubSubController' and fork
-- a thread which calls 'pubSubForever' in a loop (using an exponential backoff algorithm
-- such as the <https://hackage.haskell.org/package/retry retry> package to not hammer the Redis
-- server if it does die).  For example,
--
-- @
-- myhandler :: ByteString -> IO ()
-- myhandler msg = putStrLn $ unpack $ decodeUtf8 msg
--
-- onInitialComplete :: IO ()
-- onInitialComplete = putStrLn "Redis acknowledged that mychannel is now subscribed"
--
-- main :: IO ()
-- main = do
--   conn <- connect defaultConnectInfo
--   pubSubCtrl <- newPubSubController [("mychannel", myhandler)] []
--   forkIO $ forever $
--       pubSubForever conn pubSubCtrl onInitialComplete
--         \`catch\` (\\(e :: SomeException) -> do
--           putStrLn $ "Got error: " ++ show e
--           threadDelay $ 50*1000) -- TODO: use exponential backoff
--
--   {- elsewhere in your program, use pubSubCtrl to change subscriptions -}
-- @
--
-- At most one active 'pubSubForever' can be running against a single 'PubSubController' at any time.  If
-- two active calls to 'pubSubForever' share a single 'PubSubController' there will be deadlocks.  If
-- you do want to process messages using multiple connections to Redis, you can create more than one
-- 'PubSubController'.  For example, create one PubSubController for each 'Control.Concurrent.getNumCapabilities'
-- and then create a Haskell thread bound to each capability each calling 'pubSubForever' in a loop.
-- This will create one network connection per controller/capability and allow you to
-- register separate channels and callbacks for each controller, spreading the load across the capabilities.
pubSubForever :: Connection.Connection -- ^ The connection pool
              -> PubSubController -- ^ The controller which keeps track of all subscriptions and handlers
              -> IO () -- ^ This action is executed once Redis acknowledges that all the subscriptions in
                       -- the controller are now subscribed.  You can use this after an exception (such as
                       -- 'ConnectionLost') to signal that all subscriptions are now reactivated.
              -> IO ()
pubSubForever :: Connection -> PubSubController -> IO () -> IO ()
pubSubForever (Connection.NonClusteredConnection Pool Connection
pool) PubSubController
ctrl IO ()
onInitialLoad = Pool Connection -> (Connection -> IO ()) -> IO ()
forall (m :: * -> *) a b.
MonadBaseControl IO m =>
Pool a -> (a -> m b) -> m b
withResource Pool Connection
pool ((Connection -> IO ()) -> IO ()) -> (Connection -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Connection
rawConn -> do
    -- get initial subscriptions and write them into the queue.
    STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
      let loop :: STM ()
loop = TBQueue PubSub -> STM (Maybe PubSub)
forall a. TBQueue a -> STM (Maybe a)
tryReadTBQueue (PubSubController -> TBQueue PubSub
sendChanges PubSubController
ctrl) STM (Maybe PubSub) -> (Maybe PubSub -> STM ()) -> STM ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>=
                   \Maybe PubSub
x -> if Maybe PubSub -> Bool
forall a. Maybe a -> Bool
isJust Maybe PubSub
x then STM ()
loop else () -> STM ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
      STM ()
loop
      HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
cm <- TVar (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> STM
     (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
forall a. TVar a -> STM a
readTVar (TVar
   (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
 -> STM
      (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]))
-> TVar
     (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> STM
     (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
forall a b. (a -> b) -> a -> b
$ PubSubController
-> TVar
     (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
callbacks PubSubController
ctrl
      HashMap ByteString [(UnregisterHandle, PMessageCallback)]
pm <- TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> STM (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
forall a. TVar a -> STM a
readTVar (TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
 -> STM (HashMap ByteString [(UnregisterHandle, PMessageCallback)]))
-> TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> STM (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
forall a b. (a -> b) -> a -> b
$ PubSubController
-> TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
pcallbacks PubSubController
ctrl
      let ps :: PubSub
ps = [ByteString] -> PubSub
subscribe (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
-> [ByteString]
forall k v. HashMap k v -> [k]
HM.keys HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
cm) PubSub -> PubSub -> PubSub
forall a. Monoid a => a -> a -> a
`mappend` [ByteString] -> PubSub
psubscribe (HashMap ByteString [(UnregisterHandle, PMessageCallback)]
-> [ByteString]
forall k v. HashMap k v -> [k]
HM.keys HashMap ByteString [(UnregisterHandle, PMessageCallback)]
pm)
      TBQueue PubSub -> PubSub -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue (PubSubController -> TBQueue PubSub
sendChanges PubSubController
ctrl) PubSub
ps
      TVar Int -> Int -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar (PubSubController -> TVar Int
pendingCnt PubSubController
ctrl) (PubSub -> Int
totalPendingChanges PubSub
ps)

    IO () -> (Async () -> IO ()) -> IO ()
forall a b. IO a -> (Async a -> IO b) -> IO b
withAsync (PubSubController -> Connection -> IO ()
listenThread PubSubController
ctrl Connection
rawConn) ((Async () -> IO ()) -> IO ()) -> (Async () -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Async ()
listenT ->
      IO () -> (Async () -> IO ()) -> IO ()
forall a b. IO a -> (Async a -> IO b) -> IO b
withAsync (PubSubController -> Connection -> IO ()
sendThread PubSubController
ctrl Connection
rawConn) ((Async () -> IO ()) -> IO ()) -> (Async () -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Async ()
sendT -> do

        -- wait for initial subscription count to go to zero or for threads to fail
        Either
  (Either (Either SomeException ()) (Either SomeException ())) ()
mret <- STM
  (Either
     (Either (Either SomeException ()) (Either SomeException ())) ())
-> IO
     (Either
        (Either (Either SomeException ()) (Either SomeException ())) ())
forall a. STM a -> IO a
atomically (STM
   (Either
      (Either (Either SomeException ()) (Either SomeException ())) ())
 -> IO
      (Either
         (Either (Either SomeException ()) (Either SomeException ())) ()))
-> STM
     (Either
        (Either (Either SomeException ()) (Either SomeException ())) ())
-> IO
     (Either
        (Either (Either SomeException ()) (Either SomeException ())) ())
forall a b. (a -> b) -> a -> b
$
            (Either (Either SomeException ()) (Either SomeException ())
-> Either
     (Either (Either SomeException ()) (Either SomeException ())) ()
forall a b. a -> Either a b
Left (Either (Either SomeException ()) (Either SomeException ())
 -> Either
      (Either (Either SomeException ()) (Either SomeException ())) ())
-> STM (Either (Either SomeException ()) (Either SomeException ()))
-> STM
     (Either
        (Either (Either SomeException ()) (Either SomeException ())) ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (Async ()
-> Async ()
-> STM (Either (Either SomeException ()) (Either SomeException ()))
forall a b.
Async a
-> Async b
-> STM (Either (Either SomeException a) (Either SomeException b))
waitEitherCatchSTM Async ()
listenT Async ()
sendT))
          STM
  (Either
     (Either (Either SomeException ()) (Either SomeException ())) ())
-> STM
     (Either
        (Either (Either SomeException ()) (Either SomeException ())) ())
-> STM
     (Either
        (Either (Either SomeException ()) (Either SomeException ())) ())
forall a. STM a -> STM a -> STM a
`orElse`
            (()
-> Either
     (Either (Either SomeException ()) (Either SomeException ())) ()
forall a b. b -> Either a b
Right (()
 -> Either
      (Either (Either SomeException ()) (Either SomeException ())) ())
-> STM ()
-> STM
     (Either
        (Either (Either SomeException ()) (Either SomeException ())) ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (TVar Int -> STM Int
forall a. TVar a -> STM a
readTVar (PubSubController -> TVar Int
pendingCnt PubSubController
ctrl) STM Int -> (Int -> STM ()) -> STM ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>=
                           \Int
x -> if Int
x Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0 then STM ()
forall a. STM a
retry else () -> STM ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()))
        case Either
  (Either (Either SomeException ()) (Either SomeException ())) ()
mret of
          Right () -> IO ()
onInitialLoad
          Either
  (Either (Either SomeException ()) (Either SomeException ())) ()
_ -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return () -- if there is an error, waitEitherCatch below will also see it

        -- wait for threads to end with error
        Either (Either SomeException ()) (Either SomeException ())
merr <- Async ()
-> Async ()
-> IO (Either (Either SomeException ()) (Either SomeException ()))
forall a b.
Async a
-> Async b
-> IO (Either (Either SomeException a) (Either SomeException b))
waitEitherCatch Async ()
listenT Async ()
sendT
        case Either (Either SomeException ()) (Either SomeException ())
merr of
          (Right (Left SomeException
err)) -> SomeException -> IO ()
forall e a. Exception e => e -> IO a
throwIO SomeException
err
          (Left (Left SomeException
err)) -> SomeException -> IO ()
forall e a. Exception e => e -> IO a
throwIO SomeException
err
          Either (Either SomeException ()) (Either SomeException ())
_ -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()  -- should never happen, since threads exit only with an error
pubSubForever (Connection.ClusteredConnection MVar ShardMap
_ Pool Connection
_) PubSubController
_ IO ()
_ = IO ()
forall a. HasCallStack => a
undefined


------------------------------------------------------------------------------
-- Helpers
--
decodeMsg :: Reply -> PubSubReply
decodeMsg :: Reply -> PubSubReply
decodeMsg r :: Reply
r@(MultiBulk (Just (Reply
r0:Reply
r1:Reply
r2:[Reply]
rs))) = (Reply -> PubSubReply)
-> (PubSubReply -> PubSubReply)
-> Either Reply PubSubReply
-> PubSubReply
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either (Reply -> Reply -> PubSubReply
forall a. Reply -> a
errMsg Reply
r) PubSubReply -> PubSubReply
forall a. a -> a
id (Either Reply PubSubReply -> PubSubReply)
-> Either Reply PubSubReply -> PubSubReply
forall a b. (a -> b) -> a -> b
$ do
    ByteString
kind <- Reply -> Either Reply ByteString
forall a. RedisResult a => Reply -> Either Reply a
decode Reply
r0
    case ByteString
kind :: ByteString of
        ByteString
"message"      -> Message -> PubSubReply
Msg (Message -> PubSubReply)
-> Either Reply Message -> Either Reply PubSubReply
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Either Reply Message
decodeMessage
        ByteString
"pmessage"     -> Message -> PubSubReply
Msg (Message -> PubSubReply)
-> Either Reply Message -> Either Reply PubSubReply
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Either Reply Message
decodePMessage
        ByteString
"subscribe"    -> PubSubReply -> Either Reply PubSubReply
forall (m :: * -> *) a. Monad m => a -> m a
return PubSubReply
Subscribed
        ByteString
"psubscribe"   -> PubSubReply -> Either Reply PubSubReply
forall (m :: * -> *) a. Monad m => a -> m a
return PubSubReply
Subscribed
        ByteString
"unsubscribe"  -> Int -> PubSubReply
Unsubscribed (Int -> PubSubReply)
-> Either Reply Int -> Either Reply PubSubReply
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Either Reply Int
decodeCnt
        ByteString
"punsubscribe" -> Int -> PubSubReply
Unsubscribed (Int -> PubSubReply)
-> Either Reply Int -> Either Reply PubSubReply
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Either Reply Int
decodeCnt
        ByteString
_              -> Reply -> Either Reply PubSubReply
forall a. Reply -> a
errMsg Reply
r
  where
    decodeMessage :: Either Reply Message
decodeMessage  = ByteString -> ByteString -> Message
Message  (ByteString -> ByteString -> Message)
-> Either Reply ByteString -> Either Reply (ByteString -> Message)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Reply -> Either Reply ByteString
forall a. RedisResult a => Reply -> Either Reply a
decode Reply
r1 Either Reply (ByteString -> Message)
-> Either Reply ByteString -> Either Reply Message
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Reply -> Either Reply ByteString
forall a. RedisResult a => Reply -> Either Reply a
decode Reply
r2
    decodePMessage :: Either Reply Message
decodePMessage = ByteString -> ByteString -> ByteString -> Message
PMessage (ByteString -> ByteString -> ByteString -> Message)
-> Either Reply ByteString
-> Either Reply (ByteString -> ByteString -> Message)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Reply -> Either Reply ByteString
forall a. RedisResult a => Reply -> Either Reply a
decode Reply
r1 Either Reply (ByteString -> ByteString -> Message)
-> Either Reply ByteString -> Either Reply (ByteString -> Message)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Reply -> Either Reply ByteString
forall a. RedisResult a => Reply -> Either Reply a
decode Reply
r2 Either Reply (ByteString -> Message)
-> Either Reply ByteString -> Either Reply Message
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Reply -> Either Reply ByteString
forall a. RedisResult a => Reply -> Either Reply a
decode ([Reply] -> Reply
forall a. [a] -> a
head [Reply]
rs)
    decodeCnt :: Either Reply Int
decodeCnt      = Integer -> Int
forall a. Num a => Integer -> a
fromInteger (Integer -> Int) -> Either Reply Integer -> Either Reply Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Reply -> Either Reply Integer
forall a. RedisResult a => Reply -> Either Reply a
decode Reply
r2

decodeMsg Reply
r = Reply -> PubSubReply
forall a. Reply -> a
errMsg Reply
r

errMsg :: Reply -> a
errMsg :: Reply -> a
errMsg Reply
r = String -> a
forall a. HasCallStack => String -> a
error (String -> a) -> String -> a
forall a b. (a -> b) -> a -> b
$ String
"Hedis: expected pub/sub-message but got: " String -> ShowS
forall a. [a] -> [a] -> [a]
++ Reply -> String
forall a. Show a => a -> String
show Reply
r


-- $pubsubexpl
-- There are two Pub/Sub implementations.  First, there is a single-threaded implementation 'pubSub'
-- which is simpler to use but has the restriction that subscription changes can only be made in
-- response to a message.  Secondly, there is a more complicated Pub/Sub controller 'pubSubForever'
-- that uses concurrency to support changing subscriptions at any time but requires more setup.
-- You should only use one or the other.  In addition, no types or utility functions (that are part
-- of the public API) are shared, so functions or types in one of the following sections cannot
-- be used for the other.  In particular, be aware that they use different utility functions to subscribe
-- and unsubscribe to channels.