module Network.HTTP2.Client (
runHttp2Client
, withHttp2Stream
, headers
, sendData
, Http2Client(..)
, PushPromiseHandler
, StreamDefinition(..)
, StreamStarter
, TooMuchConcurrency(..)
, StreamThread
, Http2Stream(..)
, IncomingFlowControl(..)
, OutgoingFlowControl(..)
, linkAsyncs
, RemoteSentGoAwayFrame(..)
, GoAwayHandler
, defaultGoAwayHandler
, FallBackFrameHandler
, ignoreFallbackHandler
, FlagSetter
, Http2ClientAsyncs(..)
, _gtfo
, module Network.HTTP2.Client.FrameConnection
, module Network.Socket
, module Network.TLS
) where
import Control.Concurrent.Async (Async, race, withAsync, link)
import Control.Exception (bracket, throwIO, SomeException, catch)
import Control.Concurrent.MVar (newMVar, takeMVar, putMVar)
import Control.Concurrent (threadDelay)
import Control.Concurrent.Chan (Chan, newChan, dupChan, readChan, writeChan)
import Control.Monad (forever, when, forM_)
import Data.ByteString (ByteString)
import qualified Data.ByteString as ByteString
import Data.IORef (IORef, newIORef, atomicModifyIORef', readIORef)
import Data.Maybe (fromMaybe)
import GHC.Exception (Exception)
import Network.HPACK as HPACK
import Network.HTTP2 as HTTP2
import Network.Socket (HostName, PortNumber)
import Network.TLS (ClientParams)
import Network.HTTP2.Client.FrameConnection
data IncomingFlowControl = IncomingFlowControl {
_addCredit :: WindowSize -> IO ()
, _consumeCredit :: WindowSize -> IO Int
, _updateWindow :: IO Bool
}
data OutgoingFlowControl = OutgoingFlowControl {
_receiveCredit :: WindowSize -> IO ()
, _withdrawCredit :: WindowSize -> IO WindowSize
}
data StreamDefinition a = StreamDefinition {
_initStream :: IO StreamThread
, _handleStream :: IncomingFlowControl -> OutgoingFlowControl -> IO a
}
type StreamStarter a =
(Http2Stream -> StreamDefinition a) -> IO (Either TooMuchConcurrency a)
newtype TooMuchConcurrency = TooMuchConcurrency { _getStreamRoomNeeded :: Int }
deriving Show
data Http2Client = Http2Client {
_ping :: ByteString -> IO (IO (FrameHeader, FramePayload))
, _settings :: SettingsList -> IO (IO (FrameHeader, FramePayload))
, _goaway :: ErrorCodeId -> ByteString -> IO ()
, _startStream :: forall a. StreamStarter a
, _incomingFlowControl :: IncomingFlowControl
, _outgoingFlowControl :: OutgoingFlowControl
, _paylodSplitter :: IO PayloadSplitter
, _asyncs :: !Http2ClientAsyncs
, _close :: IO ()
}
data Http2ClientAsyncs = Http2ClientAsyncs {
_waitSettingsAsync :: Async (FrameHeader, FramePayload)
, _creditFlowsAsync :: Async ()
, _HPACKAsync :: Async ()
, _controlFramesAsync :: Async ()
, _incomingFramesAsync :: Async ()
}
linkAsyncs :: Http2Client -> IO ()
linkAsyncs client =
let Http2ClientAsyncs{..} = _asyncs client in do
link _waitSettingsAsync
link _creditFlowsAsync
link _HPACKAsync
link _controlFramesAsync
link _incomingFramesAsync
data ConnectionSettings = ConnectionSettings {
_clientSettings :: !Settings
, _serverSettings :: !Settings
}
defaultConnectionSettings :: ConnectionSettings
defaultConnectionSettings =
ConnectionSettings defaultSettings defaultSettings
_gtfo :: Http2Client -> ErrorCodeId -> ByteString -> IO ()
_gtfo = _goaway
data StreamThread = CST
data Http2Stream = Http2Stream {
_headers :: HPACK.HeaderList
-> (FrameFlags -> FrameFlags)
-> IO StreamThread
, _prio :: Priority -> IO ()
, _rst :: ErrorCodeId -> IO ()
, _waitHeaders :: IO (FrameHeader, StreamId, Either ErrorCode HeaderList)
, _waitData :: IO (FrameHeader, Either ErrorCode ByteString)
, _sendDataChunk :: (FrameFlags -> FrameFlags) -> ByteString -> IO ()
, _waitPushPromise :: PushPromiseHandler -> IO ()
}
type PushPromiseHandler =
StreamId -> Http2Stream -> HeaderList -> IncomingFlowControl -> OutgoingFlowControl -> IO ()
data HpackEncoderContext = HpackEncoderContext {
_encodeHeaders :: HeaderList -> IO HeaderBlockFragment
, _applySettings :: Int -> IO ()
}
withHttp2Stream :: Http2Client -> StreamStarter a
withHttp2Stream = _startStream
type FlagSetter = FrameFlags -> FrameFlags
headers :: Http2Stream -> HeaderList -> FlagSetter -> IO StreamThread
headers = _headers
runHttp2Client
:: Http2FrameConnection
-> Int
-> Int
-> SettingsList
-> GoAwayHandler
-> FallBackFrameHandler
-> (Http2Client -> IO a)
-> IO a
runHttp2Client conn encoderBufSize decoderBufSize initSettings goAwayHandler fallbackHandler mainHandler = do
let _close = closeConnection conn
hpackEncoder <- do
let strategy = (HPACK.defaultEncodeStrategy { HPACK.useHuffman = True })
dt <- HPACK.newDynamicTableForEncoding HPACK.defaultDynamicTableSize
let _encodeHeaders = HPACK.encodeHeader strategy encoderBufSize dt
let _applySettings n = HPACK.setLimitForEncoding n dt
return HpackEncoderContext{..}
clientStreamIdMutex <- newMVar 0
let withClientStreamId h = bracket (takeMVar clientStreamIdMutex)
(putMVar clientStreamIdMutex . succ)
(\k -> h (2 * k + 1))
let controlStream = makeFrameClientStream conn 0
let ackPing = sendPingFrame controlStream HTTP2.setAck
let ackSettings = sendSettingsFrame controlStream HTTP2.setAck []
maxReceivedStreamId <- newIORef 0
serverFrames <- newChan
let incomingLoop = incomingFramesLoop conn serverFrames maxReceivedStreamId
withAsync incomingLoop $ \aIncoming -> do
settings <- newIORef defaultConnectionSettings
controlFrames <- dupChan serverFrames
let controlLoop = incomingControlFramesLoop controlFrames settings hpackEncoder ackPing ackSettings goAwayHandler fallbackHandler
withAsync controlLoop $ \aControl -> do
serverStreamFrames <- dupChan serverFrames
serverHeaders <- newChan
hpackDecoder <- do
dt <- newDynamicTableForDecoding HPACK.defaultDynamicTableSize decoderBufSize
return dt
creditFrames <- dupChan serverFrames
_outgoingFlowControl <- newOutgoingFlowControl settings 0 creditFrames
_incomingFlowControl <- newIncomingFlowControl settings controlStream
serverPushPromises <- newChan
let hpackLoop = incomingHPACKFramesLoop serverStreamFrames serverHeaders serverPushPromises hpackDecoder
withAsync hpackLoop $ \aHPACK -> do
dataFrames <- dupChan serverFrames
let creditLoop = creditDataFramesLoop _incomingFlowControl dataFrames
withAsync creditLoop $ \aCredit -> do
conccurentStreams <- newIORef 0
let _startStream getWork = do
maxConcurrency <- fromMaybe 100 . maxConcurrentStreams . _serverSettings
<$> readIORef settings
roomNeeded <- atomicModifyIORef' conccurentStreams
(\n -> if n < maxConcurrency then (n + 1, 0) else (n, 1 + n maxConcurrency))
if roomNeeded > 0
then
return $ Left $ TooMuchConcurrency roomNeeded
else do
cont <- withClientStreamId $ \sid -> do
initializeStream conn
settings
serverFrames
serverHeaders
serverPushPromises
hpackEncoder
sid
getWork
v <- cont
atomicModifyIORef' conccurentStreams (\n -> (n 1, ()))
return $ Right v
let _ping dat = do
pingFrames <- dupChan serverFrames
sendPingFrame controlStream id dat
return $ waitFrame (isPingReply dat) pingFrames
let _settings settslist = do
pingFrames <- dupChan serverFrames
sendSettingsFrame controlStream id settslist
return $ do
ret <- waitFrame isSettingsReply pingFrames
atomicModifyIORef' settings
(\(ConnectionSettings cli srv) ->
(ConnectionSettings (HTTP2.updateSettings cli settslist) srv, ()))
return ret
let _goaway err errStr = do
sId <- readIORef maxReceivedStreamId
sendGTFOFrame controlStream sId err errStr
let _paylodSplitter = settingsPayloadSplitter <$> readIORef settings
settsIO <- _settings initSettings
withAsync settsIO $ \aSettings -> do
let _asyncs = Http2ClientAsyncs aSettings aCredit aHPACK aControl aIncoming
mainHandler $ Http2Client{..}
initializeStream
:: Exception e
=> Http2FrameConnection
-> IORef ConnectionSettings
-> FramesChan e
-> HeadersChan
-> PushPromisesChan e
-> HpackEncoderContext
-> StreamId
-> (Http2Stream -> StreamDefinition a)
-> IO (IO a)
initializeStream conn settings serverFrames serverHeaders serverPushPromises hpackEncoder sid getWork = do
let frameStream = makeFrameClientStream conn sid
frames <- dupChan serverFrames
credits <- dupChan serverFrames
headersFrames <- dupChan serverHeaders
pushPromises <- dupChan serverPushPromises
incomingStreamFlowControl <- newIncomingFlowControl settings frameStream
outgoingStreamFlowControl <- newOutgoingFlowControl settings sid credits
let _headers headersList flags = do
splitter <- settingsPayloadSplitter <$> readIORef settings
sendHeaders frameStream hpackEncoder headersList splitter flags
let _waitHeaders = waitHeadersWithStreamId sid headersFrames
let _waitData = do
(fh, fp) <- waitFrameWithTypeIdForStreamId sid [FrameRSTStream, FrameData] frames
case fp of
DataFrame dat -> do
_ <- _consumeCredit incomingStreamFlowControl (HTTP2.payloadLength fh)
_addCredit incomingStreamFlowControl (HTTP2.payloadLength fh)
return (fh, Right dat)
RSTStreamFrame err -> do
return (fh, Left $ HTTP2.fromErrorCodeId err)
_ -> error "waitFrameWithTypeIdForStreamId returned an unknown frame"
let _sendDataChunk = sendDataFrame frameStream
let _rst = sendResetFrame frameStream
let _prio = sendPriorityFrame frameStream
let _waitPushPromise ppHandler = do
(_,ppFrames,ppHeaders,ppSid,ppReadHeaders) <- waitPushPromiseWithParentStreamId sid pushPromises
let mkStreamActions stream = StreamDefinition (return CST) (ppHandler sid stream ppReadHeaders)
ppCont <- initializeStream conn
settings
ppFrames
ppHeaders
serverPushPromises
hpackEncoder
ppSid
mkStreamActions
ppCont
let streamActions = getWork $ Http2Stream{..}
_ <- _initStream streamActions
return $ _handleStream streamActions incomingStreamFlowControl outgoingStreamFlowControl
incomingFramesLoop
:: Http2FrameConnection
-> Chan (FrameHeader, Either HTTP2Error FramePayload)
-> IORef StreamId
-> IO ()
incomingFramesLoop conn frames maxReceivedStreamId = delayException $ forever $ do
frame@(fh, _) <- next conn
atomicModifyIORef' maxReceivedStreamId (\n -> (max n (streamId fh), ()))
writeChan frames frame
delayException :: IO a -> IO a
delayException act = act `catch` slowdown
where
slowdown :: SomeException -> IO a
slowdown e = threadDelay 50000 >> throwIO e
incomingControlFramesLoop
:: Exception e
=> Chan (FrameHeader, Either e FramePayload)
-> IORef ConnectionSettings
-> HpackEncoderContext
-> (ByteString -> IO ())
-> IO ()
-> GoAwayHandler
-> FallBackFrameHandler
-> IO ()
incomingControlFramesLoop frames settings hpackEncoder ackPing ackSettings goAwayHandler fallbackHandler = forever $ do
controlFrame@(fh, payload) <- waitFrameWithStreamId 0 frames
case payload of
(SettingsFrame settsList)
| not . testAck . flags $ fh -> do
atomicModifyIORef' settings
(\(ConnectionSettings cli srv) ->
(ConnectionSettings cli (HTTP2.updateSettings srv settsList), ()))
maybe (return ())
(_applySettings hpackEncoder)
(lookup SettingsHeaderTableSize settsList)
ackSettings
| otherwise -> do
ignore "TODO: settings ack should be taken into account only after reception, we should return a waitSettingsAck in the _settings function"
(PingFrame pingMsg)
| not . testAck . flags $ fh ->
ackPing pingMsg
| otherwise -> do
ignore "PingFrame replies waited for in the requestor thread"
(WindowUpdateFrame _ ) ->
ignore "connection-wide WindowUpdateFrame waited for in OutgoingFlowControl threads"
(GoAwayFrame lastSid errCode reason) ->
goAwayHandler $ RemoteSentGoAwayFrame lastSid errCode reason
_ ->
fallbackHandler controlFrame
where
ignore :: String -> IO ()
ignore _ = return ()
type FallBackFrameHandler = (FrameHeader, FramePayload) -> IO ()
ignoreFallbackHandler :: FallBackFrameHandler
ignoreFallbackHandler = const $ pure ()
type GoAwayHandler = RemoteSentGoAwayFrame -> IO ()
defaultGoAwayHandler :: GoAwayHandler
defaultGoAwayHandler = throwIO
data RemoteSentGoAwayFrame = RemoteSentGoAwayFrame !StreamId !ErrorCodeId !ByteString
deriving Show
instance Exception RemoteSentGoAwayFrame
creditDataFramesLoop
:: Exception e
=> IncomingFlowControl
-> Chan (FrameHeader, Either e FramePayload)
-> IO ()
creditDataFramesLoop flowControl frames = forever $ do
(fh,_) <- waitFrameWithTypeId [FrameData] frames
_ <- _consumeCredit flowControl (HTTP2.payloadLength fh)
_addCredit flowControl (HTTP2.payloadLength fh)
data HPACKLoopDecision =
ForwardHeader !StreamId
| OpenPushPromise !StreamId !StreamId
type FramesChan e = Chan (FrameHeader, Either e FramePayload)
type HeadersChanContent = (FrameHeader, StreamId, Either ErrorCode HeaderList)
type HeadersChan = Chan HeadersChanContent
type PushPromisesChanContent e = (StreamId, FramesChan e, HeadersChan, StreamId, HeaderList)
type PushPromisesChan e = Chan (PushPromisesChanContent e)
incomingHPACKFramesLoop
:: Exception e
=> FramesChan e
-> HeadersChan
-> PushPromisesChan e
-> DynamicTable
-> IO ()
incomingHPACKFramesLoop frames hdrs pushPromises hpackDecoder = forever $ do
(fh, fp) <- waitFrameWithTypeId [ FrameRSTStream
, FramePushPromise
, FrameHeaders
]
frames
let sid = HTTP2.streamId fh
(decision, pattern) <- case fp of
PushPromiseFrame ppSid hbf -> do
return (OpenPushPromise sid ppSid, Right hbf)
HeadersFrame _ hbf ->
return (ForwardHeader sid, Right hbf)
RSTStreamFrame err ->
return (ForwardHeader sid, Left err)
_ -> error "wrong TypeId"
let go curFh (Right buffer) =
if not $ HTTP2.testEndHeader (HTTP2.flags curFh)
then do
(lastFh, lastFp) <- waitFrameWithTypeId [ FrameRSTStream
, FrameContinuation
]
frames
case lastFp of
ContinuationFrame chbf ->
go lastFh (Right (ByteString.append buffer chbf))
RSTStreamFrame err ->
go lastFh (Left err)
_ -> error "waitFrameWithTypeIdForStreamId returned an unknown frame"
else do
newHdrs <- decodeHeader hpackDecoder buffer
case decision of
ForwardHeader sId ->
writeChan hdrs (curFh, sId, Right newHdrs)
OpenPushPromise parentSid newSid -> do
ppChan <- dupChan frames
ppHeaders <- dupChan hdrs
writeChan pushPromises (parentSid, ppChan, ppHeaders, newSid, newHdrs)
go curFh (Left err) =
writeChan hdrs (curFh, sid, (Left $ HTTP2.fromErrorCodeId err))
go fh pattern
newIncomingFlowControl
:: IORef ConnectionSettings
-> Http2FrameClientStream
-> IO IncomingFlowControl
newIncomingFlowControl settings stream = do
let getBase = if _getStreamId stream == 0
then return HTTP2.defaultInitialWindowSize
else initialWindowSize . _clientSettings <$> readIORef settings
creditAdded <- newIORef 0
creditConsumed <- newIORef 0
let _addCredit n = atomicModifyIORef' creditAdded (\c -> (c + n, ()))
let _consumeCredit n = do
conso <- atomicModifyIORef' creditConsumed (\c -> (c + n, c + n))
base <- getBase
extra <- readIORef creditAdded
return $ base + extra conso
let _updateWindow = do
base <- initialWindowSize . _clientSettings <$> readIORef settings
added <- readIORef creditAdded
consumed <- readIORef creditConsumed
let transferred = min added (HTTP2.maxWindowSize base + consumed)
let shouldUpdate = transferred > 0
_addCredit (negate transferred)
_ <- _consumeCredit (negate transferred)
when shouldUpdate (sendWindowUpdateFrame stream transferred)
return shouldUpdate
return $ IncomingFlowControl _addCredit _consumeCredit _updateWindow
newOutgoingFlowControl ::
Exception e
=> IORef ConnectionSettings
-> StreamId
-> Chan (FrameHeader, Either e FramePayload)
-> IO OutgoingFlowControl
newOutgoingFlowControl settings sid frames = do
credit <- newIORef 0
let getBase = if sid == 0
then return HTTP2.defaultInitialWindowSize
else initialWindowSize . _serverSettings <$> readIORef settings
let receive n = atomicModifyIORef' credit (\c -> (c + n, ()))
let withdraw 0 = return 0
withdraw n = do
base <- getBase
got <- atomicModifyIORef' credit (\c ->
if base + c >= n
then (c n, n)
else (0 base, base + c))
if got > 0
then return got
else do
amount <- race (waitSettingsChange base) waitSomeCredit
receive (either (const 0) id amount)
withdraw n
return $ OutgoingFlowControl receive withdraw
where
waitSettingsChange prev = do
new <- initialWindowSize . _serverSettings <$> readIORef settings
if new == prev then threadDelay 1000000 >> waitSettingsChange prev else return ()
waitSomeCredit = do
(_, fp) <- waitFrameWithTypeIdForStreamId sid [FrameWindowUpdate] frames
case fp of
WindowUpdateFrame amt -> return amt
_ -> error "waitFrameWithTypeIdForStreamId returned an unknown frame"
sendHeaders
:: Http2FrameClientStream
-> HpackEncoderContext
-> HeaderList
-> PayloadSplitter
-> (FrameFlags -> FrameFlags)
-> IO StreamThread
sendHeaders s enc hdrs blockSplitter flagmod = do
headerBlockFragments <- blockSplitter <$> _encodeHeaders enc hdrs
let framers = (HeadersFrame Nothing) : repeat ContinuationFrame
let frames = zipWith ($) framers headerBlockFragments
let modifiersReversed = (HTTP2.setEndHeader . flagmod) : repeat id
let arrangedFrames = reverse $ zip modifiersReversed (reverse frames)
sendBackToBack s arrangedFrames
return CST
type PayloadSplitter = ByteString -> [ByteString]
settingsPayloadSplitter :: ConnectionSettings -> PayloadSplitter
settingsPayloadSplitter (ConnectionSettings _ srv) =
fixedSizeChunks (maxFrameSize srv)
fixedSizeChunks :: Int -> ByteString -> [ByteString]
fixedSizeChunks 0 _ = error "cannot chunk by zero-length blocks"
fixedSizeChunks _ "" = []
fixedSizeChunks len bstr =
let
(chunk, rest) = ByteString.splitAt len bstr
in
chunk : fixedSizeChunks len rest
sendData :: Http2Client -> Http2Stream -> FlagSetter -> ByteString -> IO ()
sendData conn stream flagmod dat = do
splitter <- _paylodSplitter conn
let chunks = splitter dat
let pairs = reverse $ zip (flagmod : repeat id) (reverse chunks)
when (null chunks) $ _sendDataChunk stream flagmod ""
forM_ pairs $ \(flags, chunk) -> _sendDataChunk stream flags chunk
sendDataFrame
:: Http2FrameClientStream
-> (FrameFlags -> FrameFlags) -> ByteString -> IO ()
sendDataFrame s flagmod dat = do
sendOne s flagmod (DataFrame dat)
sendResetFrame :: Http2FrameClientStream -> ErrorCodeId -> IO ()
sendResetFrame s err = do
sendOne s id (RSTStreamFrame err)
sendGTFOFrame
:: Http2FrameClientStream
-> StreamId -> ErrorCodeId -> ByteString -> IO ()
sendGTFOFrame s lastStreamId err errStr = do
sendOne s id (GoAwayFrame lastStreamId err errStr)
rfcError :: String -> a
rfcError msg = error (msg ++ "draft-ietf-httpbis-http2-17")
sendPingFrame
:: Http2FrameClientStream
-> (FrameFlags -> FrameFlags)
-> ByteString
-> IO ()
sendPingFrame s flags dat
| _getStreamId s /= 0 =
rfcError "PING frames are not associated with any individual stream."
| ByteString.length dat /= 8 =
rfcError "PING frames MUST contain 8 octets"
| otherwise = sendOne s flags (PingFrame dat)
sendWindowUpdateFrame
:: Http2FrameClientStream -> WindowSize -> IO ()
sendWindowUpdateFrame s amount = do
let payload = WindowUpdateFrame amount
sendOne s id payload
return ()
sendSettingsFrame
:: Http2FrameClientStream
-> (FrameFlags -> FrameFlags) -> SettingsList -> IO ()
sendSettingsFrame s flags setts
| _getStreamId s /= 0 =
rfcError "The stream identifier for a SETTINGS frame MUST be zero (0x0)."
| otherwise = do
let payload = SettingsFrame setts
sendOne s flags payload
return ()
sendPriorityFrame :: Http2FrameClientStream -> Priority -> IO ()
sendPriorityFrame s p = do
let payload = PriorityFrame p
sendOne s id payload
return ()
waitFrameWithStreamId
:: Exception e
=> StreamId
-> FramesChan e
-> IO (FrameHeader, FramePayload)
waitFrameWithStreamId sid = waitFrame (\h _ -> streamId h == sid)
waitFrameWithTypeId
:: (Exception e)
=> [FrameTypeId]
-> FramesChan e
-> IO (FrameHeader, FramePayload)
waitFrameWithTypeId tids = waitFrame (\_ p -> HTTP2.framePayloadToFrameTypeId p `elem` tids)
waitFrameWithTypeIdForStreamId
:: (Exception e)
=> StreamId
-> [FrameTypeId]
-> FramesChan e
-> IO (FrameHeader, FramePayload)
waitFrameWithTypeIdForStreamId sid tids =
waitFrame (\h p -> streamId h == sid && HTTP2.framePayloadToFrameTypeId p `elem` tids)
waitFrame
:: Exception e
=> (FrameHeader -> FramePayload -> Bool)
-> FramesChan e
-> IO (FrameHeader, FramePayload)
waitFrame test chan =
loop
where
loop = do
(fHead, fPayload) <- readChan chan
dat <- either throwIO pure fPayload
if test fHead dat
then return (fHead, dat)
else loop
isPingReply :: ByteString -> FrameHeader -> FramePayload -> Bool
isPingReply datSent _ (PingFrame datRcv) = datSent == datRcv
isPingReply _ _ _ = False
isSettingsReply :: FrameHeader -> FramePayload -> Bool
isSettingsReply fh (SettingsFrame _) = HTTP2.testAck (flags fh)
isSettingsReply _ _ = False
waitHeadersWithStreamId
:: StreamId
-> HeadersChan
-> IO HeadersChanContent
waitHeadersWithStreamId sid =
waitHeaders (\_ s _ -> s == sid)
waitHeaders
:: (FrameHeader -> StreamId -> Either ErrorCode HeaderList -> Bool)
-> HeadersChan
-> IO HeadersChanContent
waitHeaders test chan =
loop
where
loop = do
tuple@(fH, sId, hdrs) <- readChan chan
if test fH sId hdrs
then return tuple
else loop
waitPushPromiseWithParentStreamId
:: StreamId
-> PushPromisesChan e
-> IO (PushPromisesChanContent e)
waitPushPromiseWithParentStreamId sid chan =
loop
where
loop = do
tuple@(parentSid,_,_,_,_) <- readChan chan
if parentSid == sid
then return tuple
else loop