{-# LANGUAGE CPP, OverloadedStrings, RecordWildCards, EmptyDataDecls,
FlexibleInstances, FlexibleContexts, GeneralizedNewtypeDeriving #-}
module Database.Redis.PubSub (
publish,
pubSub,
Message(..),
PubSub(),
subscribe, unsubscribe, psubscribe, punsubscribe,
pubSubForever,
RedisChannel, RedisPChannel, MessageCallback, PMessageCallback,
PubSubController, newPubSubController, currentChannels, currentPChannels,
addChannels, addChannelsAndWait, removeChannels, removeChannelsAndWait,
UnregisterCallbacksAction
) where
#if __GLASGOW_HASKELL__ < 710
import Control.Applicative
import Data.Monoid hiding (<>)
#endif
import Control.Concurrent.Async (withAsync, waitEitherCatch, waitEitherCatchSTM)
import Control.Concurrent.STM
import Control.Exception (throwIO)
import Control.Monad
import Control.Monad.State
import Data.ByteString.Char8 (ByteString)
import Data.List (foldl')
import Data.Maybe (isJust)
import Data.Pool
#if __GLASGOW_HASKELL__ < 808
import Data.Semigroup (Semigroup(..))
#endif
import qualified Data.HashMap.Strict as HM
import qualified Database.Redis.Core as Core
import qualified Database.Redis.Connection as Connection
import qualified Database.Redis.ProtocolPipelining as PP
import Database.Redis.Protocol (Reply(..), renderRequest)
import Database.Redis.Types
data PubSubState = PubSubState { PubSubState -> Int
subCnt, PubSubState -> Int
pending :: Int }
modifyPending :: (MonadState PubSubState m) => (Int -> Int) -> m ()
modifyPending :: forall (m :: * -> *).
MonadState PubSubState m =>
(Int -> Int) -> m ()
modifyPending Int -> Int
f = forall s (m :: * -> *). MonadState s m => (s -> s) -> m ()
modify forall a b. (a -> b) -> a -> b
$ \PubSubState
s -> PubSubState
s{ pending :: Int
pending = Int -> Int
f (PubSubState -> Int
pending PubSubState
s) }
putSubCnt :: (MonadState PubSubState m) => Int -> m ()
putSubCnt :: forall (m :: * -> *). MonadState PubSubState m => Int -> m ()
putSubCnt Int
n = forall s (m :: * -> *). MonadState s m => (s -> s) -> m ()
modify forall a b. (a -> b) -> a -> b
$ \PubSubState
s -> PubSubState
s{ subCnt :: Int
subCnt = Int
n }
data Subscribe
data Unsubscribe
data Channel
data Pattern
data PubSub = PubSub
{ PubSub -> Cmd Subscribe Channel
subs :: Cmd Subscribe Channel
, PubSub -> Cmd Unsubscribe Channel
unsubs :: Cmd Unsubscribe Channel
, PubSub -> Cmd Subscribe Pattern
psubs :: Cmd Subscribe Pattern
, PubSub -> Cmd Unsubscribe Pattern
punsubs :: Cmd Unsubscribe Pattern
} deriving (PubSub -> PubSub -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: PubSub -> PubSub -> Bool
$c/= :: PubSub -> PubSub -> Bool
== :: PubSub -> PubSub -> Bool
$c== :: PubSub -> PubSub -> Bool
Eq)
instance Semigroup PubSub where
<> :: PubSub -> PubSub -> PubSub
(<>) PubSub
p1 PubSub
p2 = PubSub { subs :: Cmd Subscribe Channel
subs = PubSub -> Cmd Subscribe Channel
subs PubSub
p1 forall a. Monoid a => a -> a -> a
`mappend` PubSub -> Cmd Subscribe Channel
subs PubSub
p2
, unsubs :: Cmd Unsubscribe Channel
unsubs = PubSub -> Cmd Unsubscribe Channel
unsubs PubSub
p1 forall a. Monoid a => a -> a -> a
`mappend` PubSub -> Cmd Unsubscribe Channel
unsubs PubSub
p2
, psubs :: Cmd Subscribe Pattern
psubs = PubSub -> Cmd Subscribe Pattern
psubs PubSub
p1 forall a. Monoid a => a -> a -> a
`mappend` PubSub -> Cmd Subscribe Pattern
psubs PubSub
p2
, punsubs :: Cmd Unsubscribe Pattern
punsubs = PubSub -> Cmd Unsubscribe Pattern
punsubs PubSub
p1 forall a. Monoid a => a -> a -> a
`mappend` PubSub -> Cmd Unsubscribe Pattern
punsubs PubSub
p2
}
instance Monoid PubSub where
mempty :: PubSub
mempty = Cmd Subscribe Channel
-> Cmd Unsubscribe Channel
-> Cmd Subscribe Pattern
-> Cmd Unsubscribe Pattern
-> PubSub
PubSub forall a. Monoid a => a
mempty forall a. Monoid a => a
mempty forall a. Monoid a => a
mempty forall a. Monoid a => a
mempty
mappend :: PubSub -> PubSub -> PubSub
mappend = forall a. Semigroup a => a -> a -> a
(<>)
data Cmd a b = DoNothing | Cmd { forall a b. Cmd a b -> [ByteString]
changes :: [ByteString] } deriving (Cmd a b -> Cmd a b -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
forall a b. Cmd a b -> Cmd a b -> Bool
/= :: Cmd a b -> Cmd a b -> Bool
$c/= :: forall a b. Cmd a b -> Cmd a b -> Bool
== :: Cmd a b -> Cmd a b -> Bool
$c== :: forall a b. Cmd a b -> Cmd a b -> Bool
Eq)
instance Semigroup (Cmd Subscribe a) where
<> :: Cmd Subscribe a -> Cmd Subscribe a -> Cmd Subscribe a
(<>) Cmd Subscribe a
DoNothing Cmd Subscribe a
x = Cmd Subscribe a
x
(<>) Cmd Subscribe a
x Cmd Subscribe a
DoNothing = Cmd Subscribe a
x
(<>) (Cmd [ByteString]
xs) (Cmd [ByteString]
ys) = forall a b. [ByteString] -> Cmd a b
Cmd ([ByteString]
xs forall a. [a] -> [a] -> [a]
++ [ByteString]
ys)
instance Monoid (Cmd Subscribe a) where
mempty :: Cmd Subscribe a
mempty = forall a b. Cmd a b
DoNothing
mappend :: Cmd Subscribe a -> Cmd Subscribe a -> Cmd Subscribe a
mappend = forall a. Semigroup a => a -> a -> a
(<>)
instance Semigroup (Cmd Unsubscribe a) where
<> :: Cmd Unsubscribe a -> Cmd Unsubscribe a -> Cmd Unsubscribe a
(<>) Cmd Unsubscribe a
DoNothing Cmd Unsubscribe a
x = Cmd Unsubscribe a
x
(<>) Cmd Unsubscribe a
x Cmd Unsubscribe a
DoNothing = Cmd Unsubscribe a
x
(<>) (Cmd []) Cmd Unsubscribe a
_ = forall a b. [ByteString] -> Cmd a b
Cmd []
(<>) Cmd Unsubscribe a
_ (Cmd []) = forall a b. [ByteString] -> Cmd a b
Cmd []
(<>) (Cmd [ByteString]
xs) (Cmd [ByteString]
ys) = forall a b. [ByteString] -> Cmd a b
Cmd ([ByteString]
xs forall a. [a] -> [a] -> [a]
++ [ByteString]
ys)
instance Monoid (Cmd Unsubscribe a) where
mempty :: Cmd Unsubscribe a
mempty = forall a b. Cmd a b
DoNothing
mappend :: Cmd Unsubscribe a -> Cmd Unsubscribe a -> Cmd Unsubscribe a
mappend = forall a. Semigroup a => a -> a -> a
(<>)
class Command a where
redisCmd :: a -> ByteString
updatePending :: a -> Int -> Int
sendCmd :: (Command (Cmd a b)) => Cmd a b -> StateT PubSubState Core.Redis ()
sendCmd :: forall a b.
Command (Cmd a b) =>
Cmd a b -> StateT PubSubState Redis ()
sendCmd Cmd a b
DoNothing = forall (m :: * -> *) a. Monad m => a -> m a
return ()
sendCmd Cmd a b
cmd = do
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *). MonadRedis m => [ByteString] -> m ()
Core.send (forall a. Command a => a -> ByteString
redisCmd Cmd a b
cmd forall a. a -> [a] -> [a]
: forall a b. Cmd a b -> [ByteString]
changes Cmd a b
cmd)
forall (m :: * -> *).
MonadState PubSubState m =>
(Int -> Int) -> m ()
modifyPending (forall a. Command a => a -> Int -> Int
updatePending Cmd a b
cmd)
cmdCount :: Cmd a b -> Int
cmdCount :: forall a b. Cmd a b -> Int
cmdCount Cmd a b
DoNothing = Int
0
cmdCount (Cmd [ByteString]
c) = forall (t :: * -> *) a. Foldable t => t a -> Int
length [ByteString]
c
totalPendingChanges :: PubSub -> Int
totalPendingChanges :: PubSub -> Int
totalPendingChanges (PubSub{Cmd Unsubscribe Pattern
Cmd Unsubscribe Channel
Cmd Subscribe Pattern
Cmd Subscribe Channel
punsubs :: Cmd Unsubscribe Pattern
psubs :: Cmd Subscribe Pattern
unsubs :: Cmd Unsubscribe Channel
subs :: Cmd Subscribe Channel
punsubs :: PubSub -> Cmd Unsubscribe Pattern
psubs :: PubSub -> Cmd Subscribe Pattern
unsubs :: PubSub -> Cmd Unsubscribe Channel
subs :: PubSub -> Cmd Subscribe Channel
..}) =
forall a b. Cmd a b -> Int
cmdCount Cmd Subscribe Channel
subs forall a. Num a => a -> a -> a
+ forall a b. Cmd a b -> Int
cmdCount Cmd Unsubscribe Channel
unsubs forall a. Num a => a -> a -> a
+ forall a b. Cmd a b -> Int
cmdCount Cmd Subscribe Pattern
psubs forall a. Num a => a -> a -> a
+ forall a b. Cmd a b -> Int
cmdCount Cmd Unsubscribe Pattern
punsubs
rawSendCmd :: (Command (Cmd a b)) => PP.Connection -> Cmd a b -> IO ()
rawSendCmd :: forall a b. Command (Cmd a b) => Connection -> Cmd a b -> IO ()
rawSendCmd Connection
_ Cmd a b
DoNothing = forall (m :: * -> *) a. Monad m => a -> m a
return ()
rawSendCmd Connection
conn Cmd a b
cmd = Connection -> ByteString -> IO ()
PP.send Connection
conn forall a b. (a -> b) -> a -> b
$ [ByteString] -> ByteString
renderRequest forall a b. (a -> b) -> a -> b
$ forall a. Command a => a -> ByteString
redisCmd Cmd a b
cmd forall a. a -> [a] -> [a]
: forall a b. Cmd a b -> [ByteString]
changes Cmd a b
cmd
plusChangeCnt :: Cmd a b -> Int -> Int
plusChangeCnt :: forall a b. Cmd a b -> Int -> Int
plusChangeCnt Cmd a b
DoNothing = forall a. a -> a
id
plusChangeCnt (Cmd [ByteString]
cs) = (forall a. Num a => a -> a -> a
+ forall (t :: * -> *) a. Foldable t => t a -> Int
length [ByteString]
cs)
instance Command (Cmd Subscribe Channel) where
redisCmd :: Cmd Subscribe Channel -> ByteString
redisCmd = forall a b. a -> b -> a
const ByteString
"SUBSCRIBE"
updatePending :: Cmd Subscribe Channel -> Int -> Int
updatePending = forall a b. Cmd a b -> Int -> Int
plusChangeCnt
instance Command (Cmd Subscribe Pattern) where
redisCmd :: Cmd Subscribe Pattern -> ByteString
redisCmd = forall a b. a -> b -> a
const ByteString
"PSUBSCRIBE"
updatePending :: Cmd Subscribe Pattern -> Int -> Int
updatePending = forall a b. Cmd a b -> Int -> Int
plusChangeCnt
instance Command (Cmd Unsubscribe Channel) where
redisCmd :: Cmd Unsubscribe Channel -> ByteString
redisCmd = forall a b. a -> b -> a
const ByteString
"UNSUBSCRIBE"
updatePending :: Cmd Unsubscribe Channel -> Int -> Int
updatePending = forall a b. a -> b -> a
const forall a. a -> a
id
instance Command (Cmd Unsubscribe Pattern) where
redisCmd :: Cmd Unsubscribe Pattern -> ByteString
redisCmd = forall a b. a -> b -> a
const ByteString
"PUNSUBSCRIBE"
updatePending :: Cmd Unsubscribe Pattern -> Int -> Int
updatePending = forall a b. a -> b -> a
const forall a. a -> a
id
data Message = Message { Message -> ByteString
msgChannel, Message -> ByteString
msgMessage :: ByteString}
| PMessage { Message -> ByteString
msgPattern, msgChannel, msgMessage :: ByteString}
deriving (Int -> Message -> ShowS
[Message] -> ShowS
Message -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [Message] -> ShowS
$cshowList :: [Message] -> ShowS
show :: Message -> String
$cshow :: Message -> String
showsPrec :: Int -> Message -> ShowS
$cshowsPrec :: Int -> Message -> ShowS
Show)
data PubSubReply = Subscribed | Unsubscribed Int | Msg Message
publish
:: (Core.RedisCtx m f)
=> ByteString
-> ByteString
-> m (f Integer)
publish :: forall (m :: * -> *) (f :: * -> *).
RedisCtx m f =>
ByteString -> ByteString -> m (f Integer)
publish ByteString
channel ByteString
message =
forall (m :: * -> *) (f :: * -> *) a.
(RedisCtx m f, RedisResult a) =>
[ByteString] -> m (f a)
Core.sendRequest [ByteString
"PUBLISH", ByteString
channel, ByteString
message]
subscribe
:: [ByteString]
-> PubSub
subscribe :: [ByteString] -> PubSub
subscribe [] = forall a. Monoid a => a
mempty
subscribe [ByteString]
cs = forall a. Monoid a => a
mempty{ subs :: Cmd Subscribe Channel
subs = forall a b. [ByteString] -> Cmd a b
Cmd [ByteString]
cs }
unsubscribe
:: [ByteString]
-> PubSub
unsubscribe :: [ByteString] -> PubSub
unsubscribe [ByteString]
cs = forall a. Monoid a => a
mempty{ unsubs :: Cmd Unsubscribe Channel
unsubs = forall a b. [ByteString] -> Cmd a b
Cmd [ByteString]
cs }
psubscribe
:: [ByteString]
-> PubSub
psubscribe :: [ByteString] -> PubSub
psubscribe [] = forall a. Monoid a => a
mempty
psubscribe [ByteString]
ps = forall a. Monoid a => a
mempty{ psubs :: Cmd Subscribe Pattern
psubs = forall a b. [ByteString] -> Cmd a b
Cmd [ByteString]
ps }
punsubscribe
:: [ByteString]
-> PubSub
punsubscribe :: [ByteString] -> PubSub
punsubscribe [ByteString]
ps = forall a. Monoid a => a
mempty{ punsubs :: Cmd Unsubscribe Pattern
punsubs = forall a b. [ByteString] -> Cmd a b
Cmd [ByteString]
ps }
pubSub
:: PubSub
-> (Message -> IO PubSub)
-> Core.Redis ()
pubSub :: PubSub -> (Message -> IO PubSub) -> Redis ()
pubSub PubSub
initial Message -> IO PubSub
callback
| PubSub
initial forall a. Eq a => a -> a -> Bool
== forall a. Monoid a => a
mempty = forall (m :: * -> *) a. Monad m => a -> m a
return ()
| Bool
otherwise = forall (m :: * -> *) s a. Monad m => StateT s m a -> s -> m a
evalStateT (PubSub -> StateT PubSubState Redis ()
send PubSub
initial) (Int -> Int -> PubSubState
PubSubState Int
0 Int
0)
where
send :: PubSub -> StateT PubSubState Core.Redis ()
send :: PubSub -> StateT PubSubState Redis ()
send PubSub{Cmd Unsubscribe Pattern
Cmd Unsubscribe Channel
Cmd Subscribe Pattern
Cmd Subscribe Channel
punsubs :: Cmd Unsubscribe Pattern
psubs :: Cmd Subscribe Pattern
unsubs :: Cmd Unsubscribe Channel
subs :: Cmd Subscribe Channel
punsubs :: PubSub -> Cmd Unsubscribe Pattern
psubs :: PubSub -> Cmd Subscribe Pattern
unsubs :: PubSub -> Cmd Unsubscribe Channel
subs :: PubSub -> Cmd Subscribe Channel
..} = do
forall a b.
Command (Cmd a b) =>
Cmd a b -> StateT PubSubState Redis ()
sendCmd Cmd Subscribe Channel
subs
forall a b.
Command (Cmd a b) =>
Cmd a b -> StateT PubSubState Redis ()
sendCmd Cmd Unsubscribe Channel
unsubs
forall a b.
Command (Cmd a b) =>
Cmd a b -> StateT PubSubState Redis ()
sendCmd Cmd Subscribe Pattern
psubs
forall a b.
Command (Cmd a b) =>
Cmd a b -> StateT PubSubState Redis ()
sendCmd Cmd Unsubscribe Pattern
punsubs
StateT PubSubState Redis ()
recv
recv :: StateT PubSubState Core.Redis ()
recv :: StateT PubSubState Redis ()
recv = do
Reply
reply <- forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift forall (m :: * -> *). MonadRedis m => m Reply
Core.recv
case Reply -> PubSubReply
decodeMsg Reply
reply of
Msg Message
msg -> forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Message -> IO PubSub
callback Message
msg) forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= PubSub -> StateT PubSubState Redis ()
send
PubSubReply
Subscribed -> forall (m :: * -> *).
MonadState PubSubState m =>
(Int -> Int) -> m ()
modifyPending (forall a. Num a => a -> a -> a
subtract Int
1) forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> StateT PubSubState Redis ()
recv
Unsubscribed Int
n -> do
forall (m :: * -> *). MonadState PubSubState m => Int -> m ()
putSubCnt Int
n
PubSubState{Int
pending :: Int
subCnt :: Int
pending :: PubSubState -> Int
subCnt :: PubSubState -> Int
..} <- forall s (m :: * -> *). MonadState s m => m s
get
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (Int
subCnt forall a. Eq a => a -> a -> Bool
== Int
0 Bool -> Bool -> Bool
&& Int
pending forall a. Eq a => a -> a -> Bool
== Int
0) StateT PubSubState Redis ()
recv
type RedisChannel = ByteString
type RedisPChannel = ByteString
type MessageCallback = ByteString -> IO ()
type PMessageCallback = RedisChannel -> ByteString -> IO ()
type UnregisterCallbacksAction = IO ()
newtype UnregisterHandle = UnregisterHandle Integer
deriving (UnregisterHandle -> UnregisterHandle -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: UnregisterHandle -> UnregisterHandle -> Bool
$c/= :: UnregisterHandle -> UnregisterHandle -> Bool
== :: UnregisterHandle -> UnregisterHandle -> Bool
$c== :: UnregisterHandle -> UnregisterHandle -> Bool
Eq, Int -> UnregisterHandle -> ShowS
[UnregisterHandle] -> ShowS
UnregisterHandle -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [UnregisterHandle] -> ShowS
$cshowList :: [UnregisterHandle] -> ShowS
show :: UnregisterHandle -> String
$cshow :: UnregisterHandle -> String
showsPrec :: Int -> UnregisterHandle -> ShowS
$cshowsPrec :: Int -> UnregisterHandle -> ShowS
Show, Integer -> UnregisterHandle
UnregisterHandle -> UnregisterHandle
UnregisterHandle -> UnregisterHandle -> UnregisterHandle
forall a.
(a -> a -> a)
-> (a -> a -> a)
-> (a -> a -> a)
-> (a -> a)
-> (a -> a)
-> (a -> a)
-> (Integer -> a)
-> Num a
fromInteger :: Integer -> UnregisterHandle
$cfromInteger :: Integer -> UnregisterHandle
signum :: UnregisterHandle -> UnregisterHandle
$csignum :: UnregisterHandle -> UnregisterHandle
abs :: UnregisterHandle -> UnregisterHandle
$cabs :: UnregisterHandle -> UnregisterHandle
negate :: UnregisterHandle -> UnregisterHandle
$cnegate :: UnregisterHandle -> UnregisterHandle
* :: UnregisterHandle -> UnregisterHandle -> UnregisterHandle
$c* :: UnregisterHandle -> UnregisterHandle -> UnregisterHandle
- :: UnregisterHandle -> UnregisterHandle -> UnregisterHandle
$c- :: UnregisterHandle -> UnregisterHandle -> UnregisterHandle
+ :: UnregisterHandle -> UnregisterHandle -> UnregisterHandle
$c+ :: UnregisterHandle -> UnregisterHandle -> UnregisterHandle
Num)
data PubSubController = PubSubController
{ PubSubController
-> TVar
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
callbacks :: TVar (HM.HashMap RedisChannel [(UnregisterHandle, MessageCallback)])
, PubSubController
-> TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
pcallbacks :: TVar (HM.HashMap RedisPChannel [(UnregisterHandle, PMessageCallback)])
, PubSubController -> TBQueue PubSub
sendChanges :: TBQueue PubSub
, PubSubController -> TVar Int
pendingCnt :: TVar Int
, PubSubController -> TVar UnregisterHandle
lastUsedCallbackId :: TVar UnregisterHandle
}
newPubSubController :: MonadIO m => [(RedisChannel, MessageCallback)]
-> [(RedisPChannel, PMessageCallback)]
-> m PubSubController
newPubSubController :: forall (m :: * -> *).
MonadIO m =>
[(ByteString, ByteString -> IO ())]
-> [(ByteString, PMessageCallback)] -> m PubSubController
newPubSubController [(ByteString, ByteString -> IO ())]
x [(ByteString, PMessageCallback)]
y = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ do
TVar (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
cbs <- forall a. a -> IO (TVar a)
newTVarIO (forall v1 v2 k. (v1 -> v2) -> HashMap k v1 -> HashMap k v2
HM.map (\ByteString -> IO ()
z -> [(UnregisterHandle
0,ByteString -> IO ()
z)]) forall a b. (a -> b) -> a -> b
$ forall k v. (Eq k, Hashable k) => [(k, v)] -> HashMap k v
HM.fromList [(ByteString, ByteString -> IO ())]
x)
TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
pcbs <- forall a. a -> IO (TVar a)
newTVarIO (forall v1 v2 k. (v1 -> v2) -> HashMap k v1 -> HashMap k v2
HM.map (\PMessageCallback
z -> [(UnregisterHandle
0,PMessageCallback
z)]) forall a b. (a -> b) -> a -> b
$ forall k v. (Eq k, Hashable k) => [(k, v)] -> HashMap k v
HM.fromList [(ByteString, PMessageCallback)]
y)
TBQueue PubSub
c <- forall a. Natural -> IO (TBQueue a)
newTBQueueIO Natural
10
TVar Int
pending <- forall a. a -> IO (TVar a)
newTVarIO Int
0
TVar UnregisterHandle
lastId <- forall a. a -> IO (TVar a)
newTVarIO UnregisterHandle
0
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ TVar (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> TBQueue PubSub
-> TVar Int
-> TVar UnregisterHandle
-> PubSubController
PubSubController TVar (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
cbs TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
pcbs TBQueue PubSub
c TVar Int
pending TVar UnregisterHandle
lastId
#if __GLASGOW_HASKELL__ < 710
currentChannels :: (MonadIO m, Functor m) => PubSubController -> m [RedisChannel]
#else
currentChannels :: MonadIO m => PubSubController -> m [RedisChannel]
#endif
currentChannels :: forall (m :: * -> *).
MonadIO m =>
PubSubController -> m [ByteString]
currentChannels PubSubController
ctrl = forall k v. HashMap k v -> [k]
HM.keys forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TVar a -> STM a
readTVar forall a b. (a -> b) -> a -> b
$ PubSubController
-> TVar
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
callbacks PubSubController
ctrl)
#if __GLASGOW_HASKELL__ < 710
currentPChannels :: (MonadIO m, Functor m) => PubSubController -> m [RedisPChannel]
#else
currentPChannels :: MonadIO m => PubSubController -> m [RedisPChannel]
#endif
currentPChannels :: forall (m :: * -> *).
MonadIO m =>
PubSubController -> m [ByteString]
currentPChannels PubSubController
ctrl = forall k v. HashMap k v -> [k]
HM.keys forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TVar a -> STM a
readTVar forall a b. (a -> b) -> a -> b
$ PubSubController
-> TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
pcallbacks PubSubController
ctrl)
addChannels :: MonadIO m => PubSubController
-> [(RedisChannel, MessageCallback)]
-> [(RedisPChannel, PMessageCallback)]
-> m UnregisterCallbacksAction
addChannels :: forall (m :: * -> *).
MonadIO m =>
PubSubController
-> [(ByteString, ByteString -> IO ())]
-> [(ByteString, PMessageCallback)]
-> m (IO ())
addChannels PubSubController
_ [] [] = forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Monad m => a -> m a
return ()
addChannels PubSubController
ctrl [(ByteString, ByteString -> IO ())]
newChans [(ByteString, PMessageCallback)]
newPChans = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ do
UnregisterHandle
ident <- forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar (PubSubController -> TVar UnregisterHandle
lastUsedCallbackId PubSubController
ctrl) (forall a. Num a => a -> a -> a
+UnregisterHandle
1)
UnregisterHandle
ident <- forall a. TVar a -> STM a
readTVar forall a b. (a -> b) -> a -> b
$ PubSubController -> TVar UnregisterHandle
lastUsedCallbackId PubSubController
ctrl
HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
cm <- forall a. TVar a -> STM a
readTVar forall a b. (a -> b) -> a -> b
$ PubSubController
-> TVar
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
callbacks PubSubController
ctrl
HashMap ByteString [(UnregisterHandle, PMessageCallback)]
pm <- forall a. TVar a -> STM a
readTVar forall a b. (a -> b) -> a -> b
$ PubSubController
-> TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
pcallbacks PubSubController
ctrl
let newChans' :: [ByteString]
newChans' = [ ByteString
n | (ByteString
n,ByteString -> IO ()
_) <- [(ByteString, ByteString -> IO ())]
newChans, Bool -> Bool
not forall a b. (a -> b) -> a -> b
$ forall k a. (Eq k, Hashable k) => k -> HashMap k a -> Bool
HM.member ByteString
n HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
cm]
newPChans' :: [ByteString]
newPChans' = [ ByteString
n | (ByteString
n, PMessageCallback
_) <- [(ByteString, PMessageCallback)]
newPChans, Bool -> Bool
not forall a b. (a -> b) -> a -> b
$ forall k a. (Eq k, Hashable k) => k -> HashMap k a -> Bool
HM.member ByteString
n HashMap ByteString [(UnregisterHandle, PMessageCallback)]
pm]
ps :: PubSub
ps = [ByteString] -> PubSub
subscribe [ByteString]
newChans' forall a. Monoid a => a -> a -> a
`mappend` [ByteString] -> PubSub
psubscribe [ByteString]
newPChans'
forall a. TBQueue a -> a -> STM ()
writeTBQueue (PubSubController -> TBQueue PubSub
sendChanges PubSubController
ctrl) PubSub
ps
forall a. TVar a -> a -> STM ()
writeTVar (PubSubController
-> TVar
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
callbacks PubSubController
ctrl) (forall k v.
(Eq k, Hashable k) =>
(v -> v -> v) -> HashMap k v -> HashMap k v -> HashMap k v
HM.unionWith forall a. [a] -> [a] -> [a]
(++) HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
cm (forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (\ByteString -> IO ()
z -> [(UnregisterHandle
ident,ByteString -> IO ()
z)]) forall a b. (a -> b) -> a -> b
$ forall k v. (Eq k, Hashable k) => [(k, v)] -> HashMap k v
HM.fromList [(ByteString, ByteString -> IO ())]
newChans))
forall a. TVar a -> a -> STM ()
writeTVar (PubSubController
-> TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
pcallbacks PubSubController
ctrl) (forall k v.
(Eq k, Hashable k) =>
(v -> v -> v) -> HashMap k v -> HashMap k v -> HashMap k v
HM.unionWith forall a. [a] -> [a] -> [a]
(++) HashMap ByteString [(UnregisterHandle, PMessageCallback)]
pm (forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (\PMessageCallback
z -> [(UnregisterHandle
ident,PMessageCallback
z)]) forall a b. (a -> b) -> a -> b
$ forall k v. (Eq k, Hashable k) => [(k, v)] -> HashMap k v
HM.fromList [(ByteString, PMessageCallback)]
newPChans))
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar (PubSubController -> TVar Int
pendingCnt PubSubController
ctrl) (forall a. Num a => a -> a -> a
+ PubSub -> Int
totalPendingChanges PubSub
ps)
forall (m :: * -> *) a. Monad m => a -> m a
return UnregisterHandle
ident
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ PubSubController
-> [ByteString] -> [ByteString] -> UnregisterHandle -> IO ()
unsubChannels PubSubController
ctrl (forall a b. (a -> b) -> [a] -> [b]
map forall a b. (a, b) -> a
fst [(ByteString, ByteString -> IO ())]
newChans) (forall a b. (a -> b) -> [a] -> [b]
map forall a b. (a, b) -> a
fst [(ByteString, PMessageCallback)]
newPChans) UnregisterHandle
ident
addChannelsAndWait :: MonadIO m => PubSubController
-> [(RedisChannel, MessageCallback)]
-> [(RedisPChannel, PMessageCallback)]
-> m UnregisterCallbacksAction
addChannelsAndWait :: forall (m :: * -> *).
MonadIO m =>
PubSubController
-> [(ByteString, ByteString -> IO ())]
-> [(ByteString, PMessageCallback)]
-> m (IO ())
addChannelsAndWait PubSubController
_ [] [] = forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Monad m => a -> m a
return ()
addChannelsAndWait PubSubController
ctrl [(ByteString, ByteString -> IO ())]
newChans [(ByteString, PMessageCallback)]
newPChans = do
IO ()
unreg <- forall (m :: * -> *).
MonadIO m =>
PubSubController
-> [(ByteString, ByteString -> IO ())]
-> [(ByteString, PMessageCallback)]
-> m (IO ())
addChannels PubSubController
ctrl [(ByteString, ByteString -> IO ())]
newChans [(ByteString, PMessageCallback)]
newPChans
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
Int
r <- forall a. TVar a -> STM a
readTVar (PubSubController -> TVar Int
pendingCnt PubSubController
ctrl)
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
r forall a. Ord a => a -> a -> Bool
> Int
0) forall a. STM a
retry
forall (m :: * -> *) a. Monad m => a -> m a
return IO ()
unreg
removeChannels :: MonadIO m => PubSubController
-> [RedisChannel]
-> [RedisPChannel]
-> m ()
removeChannels :: forall (m :: * -> *).
MonadIO m =>
PubSubController -> [ByteString] -> [ByteString] -> m ()
removeChannels PubSubController
_ [] [] = forall (m :: * -> *) a. Monad m => a -> m a
return ()
removeChannels PubSubController
ctrl [ByteString]
remChans [ByteString]
remPChans = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
cm <- forall a. TVar a -> STM a
readTVar forall a b. (a -> b) -> a -> b
$ PubSubController
-> TVar
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
callbacks PubSubController
ctrl
HashMap ByteString [(UnregisterHandle, PMessageCallback)]
pm <- forall a. TVar a -> STM a
readTVar forall a b. (a -> b) -> a -> b
$ PubSubController
-> TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
pcallbacks PubSubController
ctrl
let remChans' :: [ByteString]
remChans' = forall a. (a -> Bool) -> [a] -> [a]
filter (\ByteString
n -> forall k a. (Eq k, Hashable k) => k -> HashMap k a -> Bool
HM.member ByteString
n HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
cm) [ByteString]
remChans
remPChans' :: [ByteString]
remPChans' = forall a. (a -> Bool) -> [a] -> [a]
filter (\ByteString
n -> forall k a. (Eq k, Hashable k) => k -> HashMap k a -> Bool
HM.member ByteString
n HashMap ByteString [(UnregisterHandle, PMessageCallback)]
pm) [ByteString]
remPChans
ps :: PubSub
ps = (if forall (t :: * -> *) a. Foldable t => t a -> Bool
null [ByteString]
remChans' then forall a. Monoid a => a
mempty else [ByteString] -> PubSub
unsubscribe [ByteString]
remChans')
forall a. Monoid a => a -> a -> a
`mappend` (if forall (t :: * -> *) a. Foldable t => t a -> Bool
null [ByteString]
remPChans' then forall a. Monoid a => a
mempty else [ByteString] -> PubSub
punsubscribe [ByteString]
remPChans')
forall a. TBQueue a -> a -> STM ()
writeTBQueue (PubSubController -> TBQueue PubSub
sendChanges PubSubController
ctrl) PubSub
ps
forall a. TVar a -> a -> STM ()
writeTVar (PubSubController
-> TVar
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
callbacks PubSubController
ctrl) (forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
foldl' (forall a b c. (a -> b -> c) -> b -> a -> c
flip forall k v. (Eq k, Hashable k) => k -> HashMap k v -> HashMap k v
HM.delete) HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
cm [ByteString]
remChans')
forall a. TVar a -> a -> STM ()
writeTVar (PubSubController
-> TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
pcallbacks PubSubController
ctrl) (forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
foldl' (forall a b c. (a -> b -> c) -> b -> a -> c
flip forall k v. (Eq k, Hashable k) => k -> HashMap k v -> HashMap k v
HM.delete) HashMap ByteString [(UnregisterHandle, PMessageCallback)]
pm [ByteString]
remPChans')
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar (PubSubController -> TVar Int
pendingCnt PubSubController
ctrl) (forall a. Num a => a -> a -> a
+ PubSub -> Int
totalPendingChanges PubSub
ps)
unsubChannels :: PubSubController -> [RedisChannel] -> [RedisPChannel] -> UnregisterHandle -> IO ()
unsubChannels :: PubSubController
-> [ByteString] -> [ByteString] -> UnregisterHandle -> IO ()
unsubChannels PubSubController
ctrl [ByteString]
chans [ByteString]
pchans UnregisterHandle
h = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
cm <- forall a. TVar a -> STM a
readTVar forall a b. (a -> b) -> a -> b
$ PubSubController
-> TVar
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
callbacks PubSubController
ctrl
HashMap ByteString [(UnregisterHandle, PMessageCallback)]
pm <- forall a. TVar a -> STM a
readTVar forall a b. (a -> b) -> a -> b
$ PubSubController
-> TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
pcallbacks PubSubController
ctrl
let remChans :: [ByteString]
remChans = forall a. (a -> Bool) -> [a] -> [a]
filter (\ByteString
n -> forall k a. (Eq k, Hashable k) => k -> HashMap k a -> Bool
HM.member ByteString
n HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
cm) [ByteString]
chans
remPChans :: [ByteString]
remPChans = forall a. (a -> Bool) -> [a] -> [a]
filter (\ByteString
n -> forall k a. (Eq k, Hashable k) => k -> HashMap k a -> Bool
HM.member ByteString
n HashMap ByteString [(UnregisterHandle, PMessageCallback)]
pm) [ByteString]
pchans
let filterHandle :: Maybe [(UnregisterHandle,a)] -> Maybe [(UnregisterHandle,a)]
filterHandle :: forall a.
Maybe [(UnregisterHandle, a)] -> Maybe [(UnregisterHandle, a)]
filterHandle Maybe [(UnregisterHandle, a)]
Nothing = forall a. Maybe a
Nothing
filterHandle (Just [(UnregisterHandle, a)]
lst) = case forall a. (a -> Bool) -> [a] -> [a]
filter (\(UnregisterHandle, a)
x -> forall a b. (a, b) -> a
fst (UnregisterHandle, a)
x forall a. Eq a => a -> a -> Bool
/= UnregisterHandle
h) [(UnregisterHandle, a)]
lst of
[] -> forall a. Maybe a
Nothing
[(UnregisterHandle, a)]
xs -> forall a. a -> Maybe a
Just [(UnregisterHandle, a)]
xs
let removeHandles :: HM.HashMap ByteString [(UnregisterHandle,a)]
-> ByteString
-> HM.HashMap ByteString [(UnregisterHandle,a)]
removeHandles :: forall a.
HashMap ByteString [(UnregisterHandle, a)]
-> ByteString -> HashMap ByteString [(UnregisterHandle, a)]
removeHandles HashMap ByteString [(UnregisterHandle, a)]
m ByteString
k = case forall a.
Maybe [(UnregisterHandle, a)] -> Maybe [(UnregisterHandle, a)]
filterHandle (forall k v. (Eq k, Hashable k) => k -> HashMap k v -> Maybe v
HM.lookup ByteString
k HashMap ByteString [(UnregisterHandle, a)]
m) of
Maybe [(UnregisterHandle, a)]
Nothing -> forall k v. (Eq k, Hashable k) => k -> HashMap k v -> HashMap k v
HM.delete ByteString
k HashMap ByteString [(UnregisterHandle, a)]
m
Just [(UnregisterHandle, a)]
v -> forall k v.
(Eq k, Hashable k) =>
k -> v -> HashMap k v -> HashMap k v
HM.insert ByteString
k [(UnregisterHandle, a)]
v HashMap ByteString [(UnregisterHandle, a)]
m
let cm' :: HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
cm' = forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
foldl' forall a.
HashMap ByteString [(UnregisterHandle, a)]
-> ByteString -> HashMap ByteString [(UnregisterHandle, a)]
removeHandles HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
cm [ByteString]
remChans
pm' :: HashMap ByteString [(UnregisterHandle, PMessageCallback)]
pm' = forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
foldl' forall a.
HashMap ByteString [(UnregisterHandle, a)]
-> ByteString -> HashMap ByteString [(UnregisterHandle, a)]
removeHandles HashMap ByteString [(UnregisterHandle, PMessageCallback)]
pm [ByteString]
remPChans
let remChans' :: [ByteString]
remChans' = forall a. (a -> Bool) -> [a] -> [a]
filter (\ByteString
n -> Bool -> Bool
not forall a b. (a -> b) -> a -> b
$ forall k a. (Eq k, Hashable k) => k -> HashMap k a -> Bool
HM.member ByteString
n HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
cm') [ByteString]
remChans
remPChans' :: [ByteString]
remPChans' = forall a. (a -> Bool) -> [a] -> [a]
filter (\ByteString
n -> Bool -> Bool
not forall a b. (a -> b) -> a -> b
$ forall k a. (Eq k, Hashable k) => k -> HashMap k a -> Bool
HM.member ByteString
n HashMap ByteString [(UnregisterHandle, PMessageCallback)]
pm') [ByteString]
remPChans
ps :: PubSub
ps = (if forall (t :: * -> *) a. Foldable t => t a -> Bool
null [ByteString]
remChans' then forall a. Monoid a => a
mempty else [ByteString] -> PubSub
unsubscribe [ByteString]
remChans')
forall a. Monoid a => a -> a -> a
`mappend` (if forall (t :: * -> *) a. Foldable t => t a -> Bool
null [ByteString]
remPChans' then forall a. Monoid a => a
mempty else [ByteString] -> PubSub
punsubscribe [ByteString]
remPChans')
forall a. TBQueue a -> a -> STM ()
writeTBQueue (PubSubController -> TBQueue PubSub
sendChanges PubSubController
ctrl) PubSub
ps
forall a. TVar a -> a -> STM ()
writeTVar (PubSubController
-> TVar
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
callbacks PubSubController
ctrl) HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
cm'
forall a. TVar a -> a -> STM ()
writeTVar (PubSubController
-> TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
pcallbacks PubSubController
ctrl) HashMap ByteString [(UnregisterHandle, PMessageCallback)]
pm'
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar (PubSubController -> TVar Int
pendingCnt PubSubController
ctrl) (forall a. Num a => a -> a -> a
+ PubSub -> Int
totalPendingChanges PubSub
ps)
forall (m :: * -> *) a. Monad m => a -> m a
return ()
removeChannelsAndWait :: MonadIO m => PubSubController
-> [RedisChannel]
-> [RedisPChannel]
-> m ()
removeChannelsAndWait :: forall (m :: * -> *).
MonadIO m =>
PubSubController -> [ByteString] -> [ByteString] -> m ()
removeChannelsAndWait PubSubController
_ [] [] = forall (m :: * -> *) a. Monad m => a -> m a
return ()
removeChannelsAndWait PubSubController
ctrl [ByteString]
remChans [ByteString]
remPChans = do
forall (m :: * -> *).
MonadIO m =>
PubSubController -> [ByteString] -> [ByteString] -> m ()
removeChannels PubSubController
ctrl [ByteString]
remChans [ByteString]
remPChans
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
Int
r <- forall a. TVar a -> STM a
readTVar (PubSubController -> TVar Int
pendingCnt PubSubController
ctrl)
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
r forall a. Ord a => a -> a -> Bool
> Int
0) forall a. STM a
retry
listenThread :: PubSubController -> PP.Connection -> IO ()
listenThread :: PubSubController -> Connection -> IO ()
listenThread PubSubController
ctrl Connection
rawConn = forall (f :: * -> *) a b. Applicative f => f a -> f b
forever forall a b. (a -> b) -> a -> b
$ do
Reply
msg <- Connection -> IO Reply
PP.recv Connection
rawConn
case Reply -> PubSubReply
decodeMsg Reply
msg of
Msg (Message ByteString
channel ByteString
msgCt) -> do
HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
cm <- forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TVar a -> STM a
readTVar (PubSubController
-> TVar
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
callbacks PubSubController
ctrl)
case forall k v. (Eq k, Hashable k) => k -> HashMap k v -> Maybe v
HM.lookup ByteString
channel HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
cm of
Maybe [(UnregisterHandle, ByteString -> IO ())]
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
Just [(UnregisterHandle, ByteString -> IO ())]
c -> forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (\(UnregisterHandle
_,ByteString -> IO ()
x) -> ByteString -> IO ()
x ByteString
msgCt) [(UnregisterHandle, ByteString -> IO ())]
c
Msg (PMessage ByteString
pattern ByteString
channel ByteString
msgCt) -> do
HashMap ByteString [(UnregisterHandle, PMessageCallback)]
pm <- forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TVar a -> STM a
readTVar (PubSubController
-> TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
pcallbacks PubSubController
ctrl)
case forall k v. (Eq k, Hashable k) => k -> HashMap k v -> Maybe v
HM.lookup ByteString
pattern HashMap ByteString [(UnregisterHandle, PMessageCallback)]
pm of
Maybe [(UnregisterHandle, PMessageCallback)]
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
Just [(UnregisterHandle, PMessageCallback)]
c -> forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (\(UnregisterHandle
_,PMessageCallback
x) -> PMessageCallback
x ByteString
channel ByteString
msgCt) [(UnregisterHandle, PMessageCallback)]
c
PubSubReply
Subscribed -> forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar (PubSubController -> TVar Int
pendingCnt PubSubController
ctrl) (\Int
x -> Int
x forall a. Num a => a -> a -> a
- Int
1)
Unsubscribed Int
_ -> forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar (PubSubController -> TVar Int
pendingCnt PubSubController
ctrl) (\Int
x -> Int
x forall a. Num a => a -> a -> a
- Int
1)
sendThread :: PubSubController -> PP.Connection -> IO ()
sendThread :: PubSubController -> Connection -> IO ()
sendThread PubSubController
ctrl Connection
rawConn = forall (f :: * -> *) a b. Applicative f => f a -> f b
forever forall a b. (a -> b) -> a -> b
$ do
PubSub{Cmd Unsubscribe Pattern
Cmd Unsubscribe Channel
Cmd Subscribe Pattern
Cmd Subscribe Channel
punsubs :: Cmd Unsubscribe Pattern
psubs :: Cmd Subscribe Pattern
unsubs :: Cmd Unsubscribe Channel
subs :: Cmd Subscribe Channel
punsubs :: PubSub -> Cmd Unsubscribe Pattern
psubs :: PubSub -> Cmd Subscribe Pattern
unsubs :: PubSub -> Cmd Unsubscribe Channel
subs :: PubSub -> Cmd Subscribe Channel
..} <- forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TBQueue a -> STM a
readTBQueue (PubSubController -> TBQueue PubSub
sendChanges PubSubController
ctrl)
forall a b. Command (Cmd a b) => Connection -> Cmd a b -> IO ()
rawSendCmd Connection
rawConn Cmd Subscribe Channel
subs
forall a b. Command (Cmd a b) => Connection -> Cmd a b -> IO ()
rawSendCmd Connection
rawConn Cmd Unsubscribe Channel
unsubs
forall a b. Command (Cmd a b) => Connection -> Cmd a b -> IO ()
rawSendCmd Connection
rawConn Cmd Subscribe Pattern
psubs
forall a b. Command (Cmd a b) => Connection -> Cmd a b -> IO ()
rawSendCmd Connection
rawConn Cmd Unsubscribe Pattern
punsubs
Connection -> IO ()
PP.flush Connection
rawConn
pubSubForever :: Connection.Connection
-> PubSubController
-> IO ()
-> IO ()
pubSubForever :: Connection -> PubSubController -> IO () -> IO ()
pubSubForever (Connection.NonClusteredConnection Pool Connection
pool) PubSubController
ctrl IO ()
onInitialLoad = forall a r. Pool a -> (a -> IO r) -> IO r
withResource Pool Connection
pool forall a b. (a -> b) -> a -> b
$ \Connection
rawConn -> do
forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
let loop :: STM ()
loop = forall a. TBQueue a -> STM (Maybe a)
tryReadTBQueue (PubSubController -> TBQueue PubSub
sendChanges PubSubController
ctrl) forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>=
\Maybe PubSub
x -> if forall a. Maybe a -> Bool
isJust Maybe PubSub
x then STM ()
loop else forall (m :: * -> *) a. Monad m => a -> m a
return ()
STM ()
loop
HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
cm <- forall a. TVar a -> STM a
readTVar forall a b. (a -> b) -> a -> b
$ PubSubController
-> TVar
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
callbacks PubSubController
ctrl
HashMap ByteString [(UnregisterHandle, PMessageCallback)]
pm <- forall a. TVar a -> STM a
readTVar forall a b. (a -> b) -> a -> b
$ PubSubController
-> TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
pcallbacks PubSubController
ctrl
let ps :: PubSub
ps = [ByteString] -> PubSub
subscribe (forall k v. HashMap k v -> [k]
HM.keys HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
cm) forall a. Monoid a => a -> a -> a
`mappend` [ByteString] -> PubSub
psubscribe (forall k v. HashMap k v -> [k]
HM.keys HashMap ByteString [(UnregisterHandle, PMessageCallback)]
pm)
forall a. TBQueue a -> a -> STM ()
writeTBQueue (PubSubController -> TBQueue PubSub
sendChanges PubSubController
ctrl) PubSub
ps
forall a. TVar a -> a -> STM ()
writeTVar (PubSubController -> TVar Int
pendingCnt PubSubController
ctrl) (PubSub -> Int
totalPendingChanges PubSub
ps)
forall a b. IO a -> (Async a -> IO b) -> IO b
withAsync (PubSubController -> Connection -> IO ()
listenThread PubSubController
ctrl Connection
rawConn) forall a b. (a -> b) -> a -> b
$ \Async ()
listenT ->
forall a b. IO a -> (Async a -> IO b) -> IO b
withAsync (PubSubController -> Connection -> IO ()
sendThread PubSubController
ctrl Connection
rawConn) forall a b. (a -> b) -> a -> b
$ \Async ()
sendT -> do
Either
(Either (Either SomeException ()) (Either SomeException ())) ()
mret <- forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$
(forall a b. a -> Either a b
Left forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (forall a b.
Async a
-> Async b
-> STM (Either (Either SomeException a) (Either SomeException b))
waitEitherCatchSTM Async ()
listenT Async ()
sendT))
forall a. STM a -> STM a -> STM a
`orElse`
(forall a b. b -> Either a b
Right forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (forall a. TVar a -> STM a
readTVar (PubSubController -> TVar Int
pendingCnt PubSubController
ctrl) forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>=
\Int
x -> if Int
x forall a. Ord a => a -> a -> Bool
> Int
0 then forall a. STM a
retry else forall (m :: * -> *) a. Monad m => a -> m a
return ()))
case Either
(Either (Either SomeException ()) (Either SomeException ())) ()
mret of
Right () -> IO ()
onInitialLoad
Either
(Either (Either SomeException ()) (Either SomeException ())) ()
_ -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
Either (Either SomeException ()) (Either SomeException ())
merr <- forall a b.
Async a
-> Async b
-> IO (Either (Either SomeException a) (Either SomeException b))
waitEitherCatch Async ()
listenT Async ()
sendT
case Either (Either SomeException ()) (Either SomeException ())
merr of
(Right (Left SomeException
err)) -> forall e a. Exception e => e -> IO a
throwIO SomeException
err
(Left (Left SomeException
err)) -> forall e a. Exception e => e -> IO a
throwIO SomeException
err
Either (Either SomeException ()) (Either SomeException ())
_ -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
pubSubForever (Connection.ClusteredConnection MVar ShardMap
_ Pool Connection
_) PubSubController
_ IO ()
_ = forall a. HasCallStack => a
undefined
decodeMsg :: Reply -> PubSubReply
decodeMsg :: Reply -> PubSubReply
decodeMsg r :: Reply
r@(MultiBulk (Just (Reply
r0:Reply
r1:Reply
r2:[Reply]
rs))) = forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either (forall a. Reply -> a
errMsg Reply
r) forall a. a -> a
id forall a b. (a -> b) -> a -> b
$ do
ByteString
kind <- forall a. RedisResult a => Reply -> Either Reply a
decode Reply
r0
case ByteString
kind :: ByteString of
ByteString
"message" -> Message -> PubSubReply
Msg forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Either Reply Message
decodeMessage
ByteString
"pmessage" -> Message -> PubSubReply
Msg forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Either Reply Message
decodePMessage
ByteString
"subscribe" -> forall (m :: * -> *) a. Monad m => a -> m a
return PubSubReply
Subscribed
ByteString
"psubscribe" -> forall (m :: * -> *) a. Monad m => a -> m a
return PubSubReply
Subscribed
ByteString
"unsubscribe" -> Int -> PubSubReply
Unsubscribed forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Either Reply Int
decodeCnt
ByteString
"punsubscribe" -> Int -> PubSubReply
Unsubscribed forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Either Reply Int
decodeCnt
ByteString
_ -> forall a. Reply -> a
errMsg Reply
r
where
decodeMessage :: Either Reply Message
decodeMessage = ByteString -> ByteString -> Message
Message forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. RedisResult a => Reply -> Either Reply a
decode Reply
r1 forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> forall a. RedisResult a => Reply -> Either Reply a
decode Reply
r2
decodePMessage :: Either Reply Message
decodePMessage = ByteString -> ByteString -> ByteString -> Message
PMessage forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. RedisResult a => Reply -> Either Reply a
decode Reply
r1 forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> forall a. RedisResult a => Reply -> Either Reply a
decode Reply
r2 forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> forall a. RedisResult a => Reply -> Either Reply a
decode (forall a. [a] -> a
head [Reply]
rs)
decodeCnt :: Either Reply Int
decodeCnt = forall a. Num a => Integer -> a
fromInteger forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. RedisResult a => Reply -> Either Reply a
decode Reply
r2
decodeMsg Reply
r = forall a. Reply -> a
errMsg Reply
r
errMsg :: Reply -> a
errMsg :: forall a. Reply -> a
errMsg Reply
r = forall a. HasCallStack => String -> a
error forall a b. (a -> b) -> a -> b
$ String
"Hedis: expected pub/sub-message but got: " forall a. [a] -> [a] -> [a]
++ forall a. Show a => a -> String
show Reply
r