{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Network.HTTP2.H2.Manager (
Manager,
start,
stopAfter,
forkManaged,
forkManagedUnmask,
timeoutKillThread,
timeoutClose,
KilledByHttp2ThreadManager (..),
waitCounter0,
) where
import Data.Foldable
import Data.Map (Map)
import qualified Data.Map.Strict as Map
import qualified System.TimeManager as T
import UnliftIO.Concurrent
import UnliftIO.Exception
import qualified UnliftIO.Exception as E
import UnliftIO.STM
import Imports
data Command
= Stop (MVar ()) (Maybe SomeException)
| Add ThreadId
| RegisterTimeout ThreadId T.Handle
| Delete ThreadId
data Manager = Manager (TQueue Command) (TVar Int) T.Manager
data TimeoutHandle
= ThreadWithTimeout T.Handle
| ThreadWithoutTimeout
cancelTimeout :: TimeoutHandle -> IO ()
cancelTimeout :: TimeoutHandle -> IO ()
cancelTimeout (ThreadWithTimeout Handle
h) = Handle -> IO ()
T.cancel Handle
h
cancelTimeout TimeoutHandle
ThreadWithoutTimeout = () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
type ManagedThreads = Map ThreadId TimeoutHandle
start :: T.Manager -> IO Manager
start :: Manager -> IO Manager
start Manager
timmgr = do
TQueue Command
q <- IO (TQueue Command)
forall (m :: * -> *) a. MonadIO m => m (TQueue a)
newTQueueIO
TVar Int
cnt <- Int -> IO (TVar Int)
forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
newTVarIO Int
0
IO ThreadId -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO ThreadId -> IO ()) -> IO ThreadId -> IO ()
forall a b. (a -> b) -> a -> b
$ IO () -> IO ThreadId
forall (m :: * -> *). MonadUnliftIO m => m () -> m ThreadId
forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ do
String -> IO ()
labelMe String
"H2 thread manager"
TQueue Command -> ManagedThreads -> IO ()
go TQueue Command
q ManagedThreads
forall k a. Map k a
Map.empty
Manager -> IO Manager
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Manager -> IO Manager) -> Manager -> IO Manager
forall a b. (a -> b) -> a -> b
$ TQueue Command -> TVar Int -> Manager -> Manager
Manager TQueue Command
q TVar Int
cnt Manager
timmgr
where
go :: TQueue Command -> ManagedThreads -> IO ()
go :: TQueue Command -> ManagedThreads -> IO ()
go TQueue Command
q ManagedThreads
threadMap0 = do
Command
x <- STM Command -> IO Command
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM Command -> IO Command) -> STM Command -> IO Command
forall a b. (a -> b) -> a -> b
$ TQueue Command -> STM Command
forall a. TQueue a -> STM a
readTQueue TQueue Command
q
case Command
x of
Stop MVar ()
signalTimeoutsDisabled Maybe SomeException
err -> do
MVar () -> ManagedThreads -> Maybe SomeException -> IO ()
kill MVar ()
signalTimeoutsDisabled ManagedThreads
threadMap0 Maybe SomeException
err
Add ThreadId
newtid -> do
let threadMap :: ManagedThreads
threadMap = ThreadId -> ManagedThreads -> ManagedThreads
add ThreadId
newtid ManagedThreads
threadMap0
TQueue Command -> ManagedThreads -> IO ()
go TQueue Command
q ManagedThreads
threadMap
RegisterTimeout ThreadId
tid Handle
h -> do
let threadMap :: ManagedThreads
threadMap = ThreadId -> Handle -> ManagedThreads -> ManagedThreads
registerTimeout ThreadId
tid Handle
h ManagedThreads
threadMap0
TQueue Command -> ManagedThreads -> IO ()
go TQueue Command
q ManagedThreads
threadMap
Delete ThreadId
oldtid -> do
ManagedThreads
threadMap <- ThreadId -> ManagedThreads -> IO ManagedThreads
del ThreadId
oldtid ManagedThreads
threadMap0
TQueue Command -> ManagedThreads -> IO ()
go TQueue Command
q ManagedThreads
threadMap
stopAfter :: Manager -> IO a -> (Maybe SomeException -> IO ()) -> IO a
stopAfter :: forall a. Manager -> IO a -> (Maybe SomeException -> IO ()) -> IO a
stopAfter (Manager TQueue Command
q TVar Int
_ Manager
_) IO a
action Maybe SomeException -> IO ()
cleanup = do
((forall a. IO a -> IO a) -> IO a) -> IO a
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> m a) -> m b) -> m b
mask (((forall a. IO a -> IO a) -> IO a) -> IO a)
-> ((forall a. IO a -> IO a) -> IO a) -> IO a
forall a b. (a -> b) -> a -> b
$ \forall a. IO a -> IO a
unmask -> do
Either SomeException a
ma <- IO a -> IO (Either SomeException a)
forall (m :: * -> *) e a.
(MonadUnliftIO m, Exception e) =>
m a -> m (Either e a)
trySyncOrAsync (IO a -> IO (Either SomeException a))
-> IO a -> IO (Either SomeException a)
forall a b. (a -> b) -> a -> b
$ IO a -> IO a
forall a. IO a -> IO a
unmask IO a
action
MVar ()
signalTimeoutsDisabled <- IO (MVar ())
forall (m :: * -> *) a. MonadIO m => m (MVar a)
newEmptyMVar
STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$
TQueue Command -> Command -> STM ()
forall a. TQueue a -> a -> STM ()
writeTQueue TQueue Command
q (Command -> STM ()) -> Command -> STM ()
forall a b. (a -> b) -> a -> b
$
MVar () -> Maybe SomeException -> Command
Stop MVar ()
signalTimeoutsDisabled ((SomeException -> Maybe SomeException)
-> (a -> Maybe SomeException)
-> Either SomeException a
-> Maybe SomeException
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either SomeException -> Maybe SomeException
forall a. a -> Maybe a
Just (Maybe SomeException -> a -> Maybe SomeException
forall a b. a -> b -> a
const Maybe SomeException
forall a. Maybe a
Nothing) Either SomeException a
ma)
MVar () -> IO ()
forall (m :: * -> *) a. MonadIO m => MVar a -> m a
takeMVar MVar ()
signalTimeoutsDisabled
case Either SomeException a
ma of
Left SomeException
err -> Maybe SomeException -> IO ()
cleanup (SomeException -> Maybe SomeException
forall a. a -> Maybe a
Just SomeException
err) IO () -> IO a -> IO a
forall a b. IO a -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> SomeException -> IO a
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO SomeException
err
Right a
a -> Maybe SomeException -> IO ()
cleanup Maybe SomeException
forall a. Maybe a
Nothing IO () -> IO a -> IO a
forall a b. IO a -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> a -> IO a
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return a
a
forkManaged :: Manager -> String -> IO () -> IO ()
forkManaged :: Manager -> String -> IO () -> IO ()
forkManaged Manager
mgr String
label IO ()
io =
Manager -> String -> ((forall a. IO a -> IO a) -> IO ()) -> IO ()
forkManagedUnmask Manager
mgr String
label (((forall a. IO a -> IO a) -> IO ()) -> IO ())
-> ((forall a. IO a -> IO a) -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \forall a. IO a -> IO a
unmask -> IO () -> IO ()
forall a. IO a -> IO a
unmask IO ()
io
forkManagedUnmask
:: Manager -> String -> ((forall x. IO x -> IO x) -> IO ()) -> IO ()
forkManagedUnmask :: Manager -> String -> ((forall a. IO a -> IO a) -> IO ()) -> IO ()
forkManagedUnmask Manager
mgr String
label (forall a. IO a -> IO a) -> IO ()
io =
IO ThreadId -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO ThreadId -> IO ()) -> IO ThreadId -> IO ()
forall a b. (a -> b) -> a -> b
$ IO ThreadId -> IO ThreadId
forall (m :: * -> *) a. MonadUnliftIO m => m a -> m a
mask_ (IO ThreadId -> IO ThreadId) -> IO ThreadId -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ ((forall a. IO a -> IO a) -> IO ()) -> IO ThreadId
forall (m :: * -> *).
MonadUnliftIO m =>
((forall a. m a -> m a) -> m ()) -> m ThreadId
forkIOWithUnmask (((forall a. IO a -> IO a) -> IO ()) -> IO ThreadId)
-> ((forall a. IO a -> IO a) -> IO ()) -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ \forall a. IO a -> IO a
unmask -> (SomeException -> IO ()) -> IO () -> IO ()
forall (m :: * -> *) e a.
(MonadUnliftIO m, Exception e) =>
(e -> m a) -> m a -> m a
E.handleSyncOrAsync SomeException -> IO ()
forall {m :: * -> *}. Monad m => SomeException -> m ()
handler (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
String -> IO ()
labelMe String
label
Manager -> IO ()
addMyId Manager
mgr
Manager -> IO ()
incCounter Manager
mgr
(forall a. IO a -> IO a) -> IO ()
io IO x -> IO x
forall a. IO a -> IO a
unmask IO () -> (SomeException -> IO ()) -> IO ()
forall (m :: * -> *) e a.
(MonadUnliftIO m, Exception e) =>
m a -> (e -> m a) -> m a
`catchSyncOrAsync` \(SomeException
_e :: SomeException) -> () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Manager -> IO ()
deleteMyId Manager
mgr
Manager -> IO ()
decCounter Manager
mgr
where
handler :: SomeException -> m ()
handler (E.SomeException e
_) = () -> m ()
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
addMyId :: Manager -> IO ()
addMyId :: Manager -> IO ()
addMyId (Manager TQueue Command
q TVar Int
_ Manager
_) = do
ThreadId
tid <- IO ThreadId
forall (m :: * -> *). MonadIO m => m ThreadId
myThreadId
STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TQueue Command -> Command -> STM ()
forall a. TQueue a -> a -> STM ()
writeTQueue TQueue Command
q (Command -> STM ()) -> Command -> STM ()
forall a b. (a -> b) -> a -> b
$ ThreadId -> Command
Add ThreadId
tid
deleteMyId :: Manager -> IO ()
deleteMyId :: Manager -> IO ()
deleteMyId (Manager TQueue Command
q TVar Int
_ Manager
_) = do
ThreadId
tid <- IO ThreadId
forall (m :: * -> *). MonadIO m => m ThreadId
myThreadId
STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TQueue Command -> Command -> STM ()
forall a. TQueue a -> a -> STM ()
writeTQueue TQueue Command
q (Command -> STM ()) -> Command -> STM ()
forall a b. (a -> b) -> a -> b
$ ThreadId -> Command
Delete ThreadId
tid
add :: ThreadId -> ManagedThreads -> ManagedThreads
add :: ThreadId -> ManagedThreads -> ManagedThreads
add ThreadId
tid = ThreadId -> TimeoutHandle -> ManagedThreads -> ManagedThreads
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert ThreadId
tid TimeoutHandle
ThreadWithoutTimeout
registerTimeout :: ThreadId -> T.Handle -> ManagedThreads -> ManagedThreads
registerTimeout :: ThreadId -> Handle -> ManagedThreads -> ManagedThreads
registerTimeout ThreadId
tid = ThreadId -> TimeoutHandle -> ManagedThreads -> ManagedThreads
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert ThreadId
tid (TimeoutHandle -> ManagedThreads -> ManagedThreads)
-> (Handle -> TimeoutHandle)
-> Handle
-> ManagedThreads
-> ManagedThreads
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Handle -> TimeoutHandle
ThreadWithTimeout
del :: ThreadId -> ManagedThreads -> IO ManagedThreads
del :: ThreadId -> ManagedThreads -> IO ManagedThreads
del ThreadId
tid ManagedThreads
threadMap = do
Maybe TimeoutHandle -> (TimeoutHandle -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ (ThreadId -> ManagedThreads -> Maybe TimeoutHandle
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup ThreadId
tid ManagedThreads
threadMap) TimeoutHandle -> IO ()
cancelTimeout
ManagedThreads -> IO ManagedThreads
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (ManagedThreads -> IO ManagedThreads)
-> ManagedThreads -> IO ManagedThreads
forall a b. (a -> b) -> a -> b
$ ThreadId -> ManagedThreads -> ManagedThreads
forall k a. Ord k => k -> Map k a -> Map k a
Map.delete ThreadId
tid ManagedThreads
threadMap
kill :: MVar () -> ManagedThreads -> Maybe SomeException -> IO ()
kill :: MVar () -> ManagedThreads -> Maybe SomeException -> IO ()
kill MVar ()
signalTimeoutsDisabled ManagedThreads
threadMap Maybe SomeException
err = do
[TimeoutHandle] -> (TimeoutHandle -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ (ManagedThreads -> [TimeoutHandle]
forall k a. Map k a -> [a]
Map.elems ManagedThreads
threadMap) TimeoutHandle -> IO ()
cancelTimeout
MVar () -> () -> IO ()
forall (m :: * -> *) a. MonadIO m => MVar a -> a -> m ()
putMVar MVar ()
signalTimeoutsDisabled ()
[ThreadId] -> (ThreadId -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ (ManagedThreads -> [ThreadId]
forall k a. Map k a -> [k]
Map.keys ManagedThreads
threadMap) ((ThreadId -> IO ()) -> IO ()) -> (ThreadId -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \ThreadId
tid ->
ThreadId -> KilledByHttp2ThreadManager -> IO ()
forall e (m :: * -> *).
(Exception e, MonadIO m) =>
ThreadId -> e -> m ()
E.throwTo ThreadId
tid (KilledByHttp2ThreadManager -> IO ())
-> KilledByHttp2ThreadManager -> IO ()
forall a b. (a -> b) -> a -> b
$ Maybe SomeException -> KilledByHttp2ThreadManager
KilledByHttp2ThreadManager Maybe SomeException
err
timeoutKillThread :: Manager -> (T.Handle -> IO a) -> IO a
timeoutKillThread :: forall a. Manager -> (Handle -> IO a) -> IO a
timeoutKillThread (Manager TQueue Command
q TVar Int
_ Manager
tmgr) Handle -> IO a
action = IO Handle -> (Handle -> IO ()) -> (Handle -> IO a) -> IO a
forall (m :: * -> *) a b c.
MonadUnliftIO m =>
m a -> (a -> m b) -> (a -> m c) -> m c
E.bracket IO Handle
register Handle -> IO ()
T.cancel Handle -> IO a
action
where
register :: IO Handle
register = do
Handle
h <- Manager -> IO () -> IO Handle
T.registerKillThread Manager
tmgr (() -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ())
ThreadId
tid <- IO ThreadId
forall (m :: * -> *). MonadIO m => m ThreadId
myThreadId
STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TQueue Command -> Command -> STM ()
forall a. TQueue a -> a -> STM ()
writeTQueue TQueue Command
q (ThreadId -> Handle -> Command
RegisterTimeout ThreadId
tid Handle
h)
Handle -> IO Handle
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Handle
h
timeoutClose :: Manager -> IO () -> IO (IO ())
timeoutClose :: Manager -> IO () -> IO (IO ())
timeoutClose (Manager TQueue Command
_ TVar Int
_ Manager
tmgr) IO ()
closer = do
Handle
th <- Manager -> IO () -> IO Handle
T.register Manager
tmgr IO ()
closer
IO () -> IO (IO ())
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (IO () -> IO (IO ())) -> IO () -> IO (IO ())
forall a b. (a -> b) -> a -> b
$ Handle -> IO ()
T.tickle Handle
th
data KilledByHttp2ThreadManager = KilledByHttp2ThreadManager (Maybe SomeException)
deriving (Int -> KilledByHttp2ThreadManager -> ShowS
[KilledByHttp2ThreadManager] -> ShowS
KilledByHttp2ThreadManager -> String
(Int -> KilledByHttp2ThreadManager -> ShowS)
-> (KilledByHttp2ThreadManager -> String)
-> ([KilledByHttp2ThreadManager] -> ShowS)
-> Show KilledByHttp2ThreadManager
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> KilledByHttp2ThreadManager -> ShowS
showsPrec :: Int -> KilledByHttp2ThreadManager -> ShowS
$cshow :: KilledByHttp2ThreadManager -> String
show :: KilledByHttp2ThreadManager -> String
$cshowList :: [KilledByHttp2ThreadManager] -> ShowS
showList :: [KilledByHttp2ThreadManager] -> ShowS
Show)
instance Exception KilledByHttp2ThreadManager where
toException :: KilledByHttp2ThreadManager -> SomeException
toException = KilledByHttp2ThreadManager -> SomeException
forall e. Exception e => e -> SomeException
asyncExceptionToException
fromException :: SomeException -> Maybe KilledByHttp2ThreadManager
fromException = SomeException -> Maybe KilledByHttp2ThreadManager
forall e. Exception e => SomeException -> Maybe e
asyncExceptionFromException
incCounter :: Manager -> IO ()
incCounter :: Manager -> IO ()
incCounter (Manager TQueue Command
_ TVar Int
cnt Manager
_) = STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar Int
cnt (Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
decCounter :: Manager -> IO ()
decCounter :: Manager -> IO ()
decCounter (Manager TQueue Command
_ TVar Int
cnt Manager
_) = STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar Int
cnt (Int -> Int -> Int
forall a. Num a => a -> a -> a
subtract Int
1)
waitCounter0 :: Manager -> IO ()
waitCounter0 :: Manager -> IO ()
waitCounter0 (Manager TQueue Command
_ TVar Int
cnt Manager
_) = STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
Int
n <- TVar Int -> STM Int
forall a. TVar a -> STM a
readTVar TVar Int
cnt
Bool -> STM ()
checkSTM (Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
1)