{-# LINE 1 "hs-src/SecondTransfer/Http2/Session.cpphs" #-}
# 1 "hs-src/SecondTransfer/Http2/Session.cpphs"
# 1 "<command-line>"
# 12 "<command-line>"
# 1 "/usr/include/stdc-predef.h" 1 3 4

# 17 "/usr/include/stdc-predef.h" 3 4










































# 12 "<command-line>" 2
# 1 "./dist/build/autogen/cabal_macros.h" 1






































































































































































































# 12 "<command-line>" 2
# 1 "hs-src/SecondTransfer/Http2/Session.cpphs"
-- Session: links frames to streams, and helps in ordering the header frames
-- so that they don't get mixed with header frames from other streams when 
-- resources are being served concurrently.
{-# LANGUAGE FlexibleContexts, Rank2Types, TemplateHaskell, OverloadedStrings #-}
{-# OPTIONS_HADDOCK hide #-}
module SecondTransfer.Http2.Session(
    http2Session
    ,getFrameFromSession
    ,sendFrameToSession
    ,sendCommandToSession
    ,defaultSessionsConfig
    ,sessionId
    ,reportErrorCallback
    ,sessionsCallbacks
    ,nextSessionId
    ,makeSessionsContext
    ,sessionsConfig
    ,sessionExceptionHandler

    ,CoherentSession
    ,SessionInput(..)
    ,SessionInputCommand(..)
    ,SessionOutput(..)
    ,SessionOutputCommand(..)
    ,SessionsContext(..)
    ,SessionCoordinates(..)
    ,SessionComponent(..)
    ,SessionsCallbacks(..)
    ,SessionsConfig(..)
    ,ErrorCallback

    -- Internal stuff
    ,OutputFrame
    ,InputFrame
    ) where


# 1 "macros/Logging.cpphs" 1










# 38 "hs-src/SecondTransfer/Http2/Session.cpphs" 2

-- System grade utilities
import           Control.Concurrent                     (ThreadId, forkIO)
import           Control.Concurrent.Chan
import           Control.Exception                      (SomeException, throwTo)
import qualified Control.Exception                      as E
import           Control.Monad                          (forever)
import           Control.Monad.IO.Class                 (liftIO)
import           Control.Monad.Trans.Reader

import           Control.Concurrent.MVar
import qualified Data.ByteString                        as B
import qualified Data.ByteString.Builder                as Bu
import qualified Data.ByteString.Lazy                   as Bl
import           Data.Conduit
import qualified Data.HashTable.IO                      as H
import qualified Data.IntSet                            as NS
import           Data.Monoid                            as Mo

import           Control.Lens
 
-- No framing layer here... let's use Kazu's Yamamoto library
import qualified Network.HPACK                          as HP
import qualified Network.HTTP2                          as NH2

-- Logging utilities
import           System.Log.Logger

-- Imports from other parts of the program
import           SecondTransfer.MainLoop.CoherentWorker
import           SecondTransfer.MainLoop.Tokens
import           SecondTransfer.Utils                   (unfoldChannelAndSource)
import           SecondTransfer.Exception

-- Unfortunately the frame encoding API of Network.HTTP2 is a bit difficult to 
-- use :-( 
type OutputFrame = (NH2.EncodeInfo, NH2.FramePayload)
type InputFrame  = NH2.Frame


useChunkLength :: Int 
useChunkLength = 16384


-- Singleton instance used for concurrency
data HeadersSent = HeadersSent 

-- All streams put their data bits here. A "Nothing" value signals
-- end of data. 
type DataOutputToConveyor = (GlobalStreamId, Maybe B.ByteString)


-- Whatever a worker thread is going to need comes here.... 
-- this is to make refactoring easier, but not strictly needed. 
data WorkerThreadEnvironment = WorkerThreadEnvironment {
    -- What's the header stream id?
    _streamId :: GlobalStreamId

    -- A full block of headers can come here... the mvar in the middle should
    -- be populate to signal end of headers transmission. A thread will be suspended
    -- waiting for that
    , _headersOutput :: Chan (GlobalStreamId, MVar HeadersSent, Headers)

    -- And regular contents can come this way and thus be properly mixed
    -- with everything else.... for now... 
    ,_dataOutput :: Chan DataOutputToConveyor

    ,_streamsCancelled_WTE :: MVar NS.IntSet

    }

makeLenses ''WorkerThreadEnvironment


-- Basically a couple of channels ... 
type Session = (SessionInput, SessionOutput)


-- From outside, one can only write to this one ... the newtype is to enforce 
--    this.
newtype SessionInput = SessionInput ( Chan (Either SessionInputCommand InputFrame) )
sendFrameToSession :: SessionInput  -> InputFrame -> IO ()
sendFrameToSession (SessionInput chan) frame = writeChan chan $ Right frame

sendCommandToSession :: SessionInput  -> SessionInputCommand -> IO ()
sendCommandToSession (SessionInput chan) command = writeChan chan $ Left command

-- From outside, one can only read from this one 
newtype SessionOutput = SessionOutput ( Chan (Either SessionOutputCommand OutputFrame) )
getFrameFromSession :: SessionOutput -> IO (Either SessionOutputCommand OutputFrame) 
getFrameFromSession (SessionOutput chan) = readChan chan


type HashTable k v = H.CuckooHashTable k v


-- Blaze builder could be more proper here... 
type Stream2HeaderBlockFragment = HashTable GlobalStreamId Bu.Builder


type WorkerMonad = ReaderT WorkerThreadEnvironment IO 


-- Have to figure out which are these...but I would expect to have things
-- like unexpected aborts here in this type.
data SessionInputCommand = 
    CancelSession_SIC
  deriving Show 


-- temporary
data  SessionOutputCommand = 
    CancelSession_SOC
  deriving Show


-- | Information used to identify a particular session. 
newtype SessionCoordinates = SessionCoordinates  Int
    deriving Show

instance Eq SessionCoordinates where 
    (SessionCoordinates a) == (SessionCoordinates b) =  a == b

-- | Get/set a numeric Id from a `SessionCoordinates`. For example, to 
--   get the session id with this, import `Control.Lens.(^.)` and then do 
--
-- @
--      session_id = session_coordinates ^. sessionId
-- @
-- 
sessionId :: Functor f => (Int -> f Int) -> SessionCoordinates -> f SessionCoordinates
sessionId f (SessionCoordinates session_id) = 
    fmap (\ s' -> (SessionCoordinates s')) (f session_id)



-- | Components at an individual session. Used to report
--   where in the session an error was produced. This interface is likely 
--   to change in the future, as we add more metadata to exceptions
data SessionComponent = 
    SessionInputThread_SessionComponent 
    |SessionHeadersOutputThread_SessionComponent
    |SessionDataOutputThread_SessionComponent
    |Framer_SessionComponent
    deriving Show


-- | Used by this session engine to report an error at some component, in a particular
--   session. 
type ErrorCallback = (SessionComponent, SessionCoordinates, SomeException) -> IO ()

-- | Callbacks that you can provide your sessions to notify you 
--   of interesting things happening in the server. 
data SessionsCallbacks = SessionsCallbacks {
    -- Callback used to report errors during this session
    _reportErrorCallback :: Maybe ErrorCallback
}

makeLenses ''SessionsCallbacks


-- | Configuration information you can provide to the session maker.
data SessionsConfig = SessionsConfig {
    -- | Session callbacks
    _sessionsCallbacks :: SessionsCallbacks
}

-- makeLenses ''SessionsConfig

-- | Lens to access sessionsCallbacks in the `SessionsConfig` object.
sessionsCallbacks :: Lens' SessionsConfig SessionsCallbacks
sessionsCallbacks  f (
    SessionsConfig {
        _sessionsCallbacks= s 
    }) = fmap (\ s' -> SessionsConfig {_sessionsCallbacks = s'}) (f s)


-- | Contains information that applies to all 
--   sessions created in the program. Use the lenses 
--   interface to access members of this struct. 
-- 
data SessionsContext = SessionsContext {
     _sessionsConfig  :: SessionsConfig
    ,_nextSessionId   :: MVar Int
    }


makeLenses ''SessionsContext

-- Here is how we make a session 
type SessionMaker = SessionsContext -> IO Session


-- Here is how we make a session wrapping a CoherentWorker
type CoherentSession = CoherentWorker -> SessionMaker 


-- | Creates a default sessions context. Modify as needed using 
--   the lenses interfaces
defaultSessionsConfig :: SessionsConfig
defaultSessionsConfig = SessionsConfig {
    _sessionsCallbacks = SessionsCallbacks {
            _reportErrorCallback = Nothing
        }
    }


-- Adds runtime data to a context, and let it work.... 
makeSessionsContext :: SessionsConfig -> IO SessionsContext
makeSessionsContext sessions_config = do 
    next_session_id_mvar <- newMVar 1 
    return $ SessionsContext {
        _sessionsConfig = sessions_config,
        _nextSessionId = next_session_id_mvar
        }

data PostInputMechanism = PostInputMechanism (Chan (Maybe B.ByteString), InputDataStream)


-- NH2.Frame != Frame
data SessionData = SessionData {
    -- ATTENTION: Ignore the warning coming from here for now
    _sessionsContext             :: SessionsContext 

    ,_sessionInput               :: Chan (Either SessionInputCommand InputFrame)

    -- We need to lock this channel occassionally so that we can order multiple 
    -- header frames properly.... 
    ,_sessionOutput              :: MVar (Chan (Either SessionOutputCommand OutputFrame))

    -- Use to encode 
    ,_toEncodeHeaders            :: MVar HP.DynamicTable
    -- And used to decode
    ,_toDecodeHeaders            :: MVar HP.DynamicTable

    -- Used for decoding the headers
    ,_stream2HeaderBlockFragment :: Stream2HeaderBlockFragment

    -- Used for worker threads... this is actually a pre-filled template
    -- I make copies of it in different contexts, and as needed. 
    ,_forWorkerThread            :: WorkerThreadEnvironment

    ,_coherentWorker             :: CoherentWorker

    -- Some streams may be cancelled 
    ,_streamsCancelled           :: MVar NS.IntSet

    -- Data input mechanism corresponding to some threads
    ,_stream2PostInputMechanism  :: HashTable Int PostInputMechanism 

    -- Worker thread register. This is a dictionary from stream id to 
    -- the ThreadId of the thread with the worker thread. I use this to 
    -- raise asynchronous exceptions in the worker thread if the stream 
    -- is cancelled by the client. This way we get early finalization. 
    ,_stream2WorkerThread        :: HashTable Int ThreadId

    ,_sessionIdAtSession         :: Int
    }


makeLenses ''SessionData


--                                v- {headers table size comes here!!}
http2Session :: CoherentWorker -> Int -> SessionsContext -> IO Session
http2Session coherent_worker session_id sessions_context =   do 
    session_input             <- newChan
    session_output            <- newChan
    session_output_mvar       <- newMVar session_output


    -- For incremental construction of headers...
    stream_request_headers    <- H.new :: IO Stream2HeaderBlockFragment

    -- Warning: we should find a way of coping with different table sizes.
    decode_headers_table      <- HP.newDynamicTableForDecoding 4096
    decode_headers_table_mvar <- newMVar decode_headers_table

    encode_headers_table      <- HP.newDynamicTableForEncoding 4096
    encode_headers_table_mvar <- newMVar encode_headers_table

    -- These ones need independent threads taking care of sending stuff
    -- their way... 
    headers_output            <- newChan :: IO (Chan (GlobalStreamId, MVar HeadersSent, Headers))
    data_output               <- newChan :: IO (Chan DataOutputToConveyor)

    stream2postinputmechanism <- H.new 
    stream2workerthread       <- H.new

    -- What about stream cancellation?
    cancelled_streams_mvar    <- newMVar $ NS.empty :: IO (MVar NS.IntSet)

    let for_worker_thread = WorkerThreadEnvironment {
        _streamId = error "NotInitialized"  
        ,_headersOutput = headers_output
        ,_dataOutput = data_output
        ,_streamsCancelled_WTE = cancelled_streams_mvar
        }

    let session_data  = SessionData {
        _sessionsContext             = sessions_context
        ,_sessionInput                = session_input 
        ,_sessionOutput              = session_output_mvar
        ,_toDecodeHeaders            = decode_headers_table_mvar
        ,_toEncodeHeaders            = encode_headers_table_mvar
        ,_stream2HeaderBlockFragment = stream_request_headers
        ,_forWorkerThread            = for_worker_thread
        ,_coherentWorker             = coherent_worker
        ,_streamsCancelled           = cancelled_streams_mvar
        ,_stream2PostInputMechanism  = stream2postinputmechanism
        ,_stream2WorkerThread        = stream2workerthread
        ,_sessionIdAtSession         = session_id
        }

    let 
        exc_handler :: SessionComponent -> HTTP2SessionException -> IO () 
        exc_handler component e = sessionExceptionHandler component session_id sessions_context e
        exc_guard :: SessionComponent -> IO () -> IO ()
        exc_guard component action = E.catch action $ exc_handler component

    -- Create an input thread that decodes frames...
    forkIO $ exc_guard SessionInputThread_SessionComponent 
           $ runReaderT sessionInputThread session_data
 
    -- Create a thread that captures headers and sends them down the tube 
    forkIO $ exc_guard SessionHeadersOutputThread_SessionComponent 
           $ runReaderT (headersOutputThread headers_output session_output_mvar) session_data

    -- Create a thread that captures data and sends it down the tube
    forkIO $ exc_guard SessionDataOutputThread_SessionComponent 
           $ dataOutputThread data_output session_output_mvar

    -- The two previous threads fill the session_output argument below (they write to it)
    -- the session machinery in the other end is in charge of sending that data through the 
    -- socket.

    return ( (SessionInput session_input),
             (SessionOutput session_output) )


-- TODO: Some ill clients can break this thread with exceptions. Make these paths a bit
--- more robust.
sessionInputThread :: ReaderT SessionData IO ()
sessionInputThread  = do 
    (return ())

    -- This is an introductory and declarative block... all of this is tail-executed
    -- every time that  a packet needs to be processed. It may be a good idea to abstract
    -- these values in a closure... 
    session_input             <- view sessionInput 
    
    decode_headers_table_mvar <- view toDecodeHeaders 
    stream_request_headers    <- view stream2HeaderBlockFragment
    cancelled_streams_mvar    <- view streamsCancelled
    coherent_worker           <- view coherentWorker

    for_worker_thread_uns     <- view forWorkerThread
    stream2workerthread       <- view stream2WorkerThread

    input                     <- liftIO $ readChan session_input

    (return ())

    case input of 

        Left CancelSession_SIC -> do 
            -- Good place to tear down worker threads... Let the rest of the finalization
            -- to the framer
            liftIO $ do 
                H.mapM_
                    (\ (_, thread_id) -> do
                        throwTo thread_id StreamCancelledException
                        infoM "HTTP2.Session" $ "Stream successfully interrupted"
                    )
                    stream2workerthread

            -- We do not continue here, but instead let it finish
            return ()

        Right frame | Just (stream_id, bytes) <- frameIsHeaderOfStream frame -> do 
            -- Just append the frames to streamRequestHeaders
            appendHeaderFragmentBlock stream_id bytes

            if frameEndsHeaders frame then 
              do
                -- Let's decode the headers
                let for_worker_thread     = set streamId stream_id for_worker_thread_uns 
                headers_bytes             <- getHeaderBytes stream_id
                dyn_table                 <- liftIO $ takeMVar decode_headers_table_mvar
                (new_table, header_list ) <- liftIO $ {-# SCC "decodeHeader" #-} HP.decodeHeader dyn_table headers_bytes
                -- Good moment to remove the headers from the table.... we don't want a space
                -- leak here 
                liftIO $ do 
                    H.delete stream_request_headers stream_id
                    putMVar decode_headers_table_mvar new_table

                -- TODO: Validate headers, abort session if the headers are invalid.
                -- Otherwise other invariants will break!!
                -- THIS IS PROBABLY THE BEST PLACE FOR DOING IT.

                -- If the headers end the request.... 
                post_data_source <- if not (frameEndsStream frame)
                  then do 
                    mechanism <- createMechanismForStream stream_id 
                    let source = postDataSourceFromMechanism mechanism
                    return $ Just source
                  else do 
                    return Nothing

                -- I'm clear to start the worker, in its own thread
                liftIO $ do 
                    thread_id <- forkIO $ runReaderT 
                        (workerThread (header_list, post_data_source) coherent_worker)
                        for_worker_thread 
                    H.insert stream2workerthread stream_id thread_id

                return ()
            else 
                -- Frame doesn't end the headers... it was added before... so
                -- probably do nothing 
                return ()
                
            continue 

        Right frame@(NH2.Frame _ (NH2.RSTStreamFrame error_code_id)) -> do
            let stream_id = streamIdFromFrame frame
            liftIO $ do 
                (return ())
                cancelled_streams <- takeMVar cancelled_streams_mvar
                (return ())
                putMVar cancelled_streams_mvar $ NS.insert  stream_id cancelled_streams
                maybe_thread_id <- H.lookup stream2workerthread stream_id
                case maybe_thread_id  of 
                    Nothing -> 
                        -- This is actually more like an internal error
                        error "InterruptingUnexistentStream"

                    Just thread_id -> do
                        throwTo thread_id StreamCancelledException
                        (return ())

            continue 

        Right frame@(NH2.Frame (NH2.FrameHeader _ _ nh2_stream_id) (NH2.DataFrame somebytes)) -> do 
            -- So I got data to process
            -- TODO: Handle end of stream
            let stream_id = NH2.fromStreamIdentifier nh2_stream_id
            -- TODO: Handle the cases where the stream_id doesn't match an already existent
            -- stream. In such cases it is justified to reset the connection with a  protocol_error.

            streamWorkerSendData stream_id somebytes
            -- After that data has been received and forwarded downstream, we can issue a windows update
            --
            -- TODO: We can use wider frames to avoid congestion...
            -- .... and we can also be more compositional with these short bursts of data....
            --
            -- TODO: Consider that the best place to output these frames can be somewhere else...
            --
            -- TODO: Use a special, with-quota queue here to do flow control. Don't send meaningless
            --       WindowUpdateFrame's
            sendOutFrame
                (NH2.EncodeInfo
                    NH2.defaultFlags
                    nh2_stream_id
                    Nothing
                )
                (NH2.WindowUpdateFrame
                    (fromIntegral (B.length somebytes))
                )
            sendOutFrame
                (NH2.EncodeInfo
                    NH2.defaultFlags
                    (NH2.toStreamIdentifier 0)
                    Nothing
                )
                (NH2.WindowUpdateFrame
                    (fromIntegral (B.length somebytes))
                )                

            if frameEndsStream frame  
              then do 
                -- Good place to close the source ... 
                closePostDataSource stream_id 
              else 
                return ()

            continue 

        Right (NH2.Frame (NH2.FrameHeader _ flags _) (NH2.PingFrame _)) | NH2.testAck flags-> do 
            -- Deal with pings: this is an Ack, so do nothing
            continue 

        Right (NH2.Frame (NH2.FrameHeader _ _ _) (NH2.PingFrame somebytes))  -> do 
            -- Deal with pings: NOT an Ack, so answer
            (return ())
            sendOutFrame
                (NH2.EncodeInfo
                    (NH2.setAck NH2.defaultFlags)
                    (NH2.toStreamIdentifier 0)
                    Nothing 
                )
                (NH2.PingFrame somebytes)

            continue 

        Right (NH2.Frame frame_header (NH2.SettingsFrame _)) | isSettingsAck frame_header -> do 
            -- Frame was received by the peer, do nothing here...
            continue 


        Right (NH2.Frame _ (NH2.SettingsFrame settings_list))  -> do 
            (return ())
            -- Just acknowledge the frame.... for now 
            sendOutFrame 
                (NH2.EncodeInfo
                    (NH2.setAck NH2.defaultFlags)
                    (NH2.toStreamIdentifier 0)
                    Nothing )
                (NH2.SettingsFrame [])

            continue 


        Right somethingelse -> do 
            -- An undhandled case here....
            (return ())
            liftIO $ errorM "HTTP2.Session" $  "..  " ++ (show somethingelse)

            continue 

  where 

    continue = sessionInputThread

    sendOutFrame :: NH2.EncodeInfo -> NH2.FramePayload -> ReaderT SessionData IO ()
    sendOutFrame encode_info payload = do 
        session_output_mvar <- view sessionOutput 

        session_output <- liftIO $ takeMVar session_output_mvar
        liftIO $ writeChan session_output $ Right (encode_info, payload)
        liftIO $ putMVar session_output_mvar session_output


frameEndsStream :: InputFrame -> Bool 
frameEndsStream (NH2.Frame (NH2.FrameHeader _ flags _) _)  = NH2.testEndStream flags


createMechanismForStream :: GlobalStreamId -> ReaderT SessionData IO PostInputMechanism
createMechanismForStream stream_id = do 
    (chan, source) <- liftIO $ unfoldChannelAndSource
    stream2postinputmechanism <- view stream2PostInputMechanism
    let pim = PostInputMechanism (chan, source)
    liftIO $ H.insert stream2postinputmechanism stream_id pim 
    return pim


-- TODO: Can be optimized by factoring out the mechanism lookup
-- TODO IMPORTANT: This is a good place to drop the postinputmechanism
-- for a stream, so that unprocessed data can be garbage-collected.
closePostDataSource :: GlobalStreamId -> ReaderT SessionData IO ()
closePostDataSource stream_id = do 
    stream2postinputmechanism <- view stream2PostInputMechanism

    pim_maybe <- liftIO $ H.lookup stream2postinputmechanism stream_id 

    case pim_maybe of 

        Just (PostInputMechanism (chan, _))  -> 
            liftIO $ writeChan chan Nothing

        Nothing -> 
            -- TODO: This is a protocol error, handle it properly
            error "Internal error/closePostDataSource"


streamWorkerSendData :: Int -> B.ByteString -> ReaderT SessionData IO ()
streamWorkerSendData stream_id bytes = do 
    s2pim <- view stream2PostInputMechanism
    pim_maybe <- liftIO $ H.lookup s2pim stream_id 

    case pim_maybe of 

        Just pim  -> 
            sendBytesToPim pim bytes

        Nothing -> 
            -- This is an internal error, the mechanism should be 
            -- created when the headers end (and if the headers 
            -- do not finish the stream)
            error "Internal error"


sendBytesToPim :: PostInputMechanism -> B.ByteString -> ReaderT SessionData IO ()
sendBytesToPim (PostInputMechanism (chan, _)) bytes = 
    liftIO $ writeChan chan (Just bytes)


postDataSourceFromMechanism :: PostInputMechanism -> InputDataStream
postDataSourceFromMechanism (PostInputMechanism (_, source)) = source


isSettingsAck :: NH2.FrameHeader -> Bool 
isSettingsAck (NH2.FrameHeader _ flags _) = 
    NH2.testAck flags


isStreamCancelled :: GlobalStreamId  -> WorkerMonad Bool 
isStreamCancelled stream_id = do 
    cancelled_streams_mvar <- view streamsCancelled_WTE
    cancelled_streams <- liftIO $ readMVar cancelled_streams_mvar
    return $ NS.member stream_id cancelled_streams



workerThread :: Request -> CoherentWorker -> WorkerMonad ()
workerThread req coherent_worker =
  do
    headers_output <- view headersOutput
    stream_id      <- view streamId

    (headers, _, data_and_conclussion) <- liftIO $ coherent_worker req

    -- Now I send the headers, if that's possible at all
    headers_sent <- liftIO $ newEmptyMVar
    liftIO $ writeChan headers_output (stream_id, headers_sent, headers)

    -- At this moment I should ask if the stream hasn't been cancelled by the browser before
    -- commiting to the work of sending addtitional data
    is_stream_cancelled <- isStreamCancelled stream_id
    if not is_stream_cancelled

      then do
        -- I have a beautiful source that I can de-construct...
        -- TODO: Optionally pulling data out from a Conduit ....
        -- liftIO ( data_and_conclussion $$ (_sendDataOfStream stream_id) )
        -- 
        -- This threadlet should block here waiting for the headers to finish going
        (maybe_footers, _) <- runConduit $
            (transPipe liftIO data_and_conclussion) 
            `fuseBothMaybe` 
            (sendDataOfStream stream_id headers_sent)
        -- BIG TODO: Send the footers ... likely stream conclusion semantics 
        -- will need to be changed. 
        return ()
      else 

        return ()

--                                                       v-- comp. monad.
sendDataOfStream :: GlobalStreamId -> MVar HeadersSent -> Sink B.ByteString (ReaderT WorkerThreadEnvironment IO) ()
sendDataOfStream stream_id headers_sent = do
    data_output <- view dataOutput
    -- Wait for all headers sent
    liftIO $ takeMVar headers_sent
    consumer data_output
  where 
    consumer data_output = do 
        maybe_bytes <- await 
        case maybe_bytes of 
            Nothing -> 
                liftIO $ writeChan data_output (stream_id, Nothing)
            Just bytes -> do
                liftIO $ writeChan data_output (stream_id, Just bytes)
                consumer data_output


appendHeaderFragmentBlock :: GlobalStreamId -> B.ByteString -> ReaderT SessionData IO ()
appendHeaderFragmentBlock global_stream_id bytes = do 
    ht <- view stream2HeaderBlockFragment 
    maybe_old_block <- liftIO $ H.lookup ht global_stream_id
    new_block <- case maybe_old_block of 

        Nothing -> do
            (return ())
            return $ Bu.byteString bytes

        Just something -> 
            return $ something `mappend` (Bu.byteString bytes) 

    liftIO $ H.insert ht global_stream_id new_block


getHeaderBytes :: GlobalStreamId -> ReaderT SessionData IO B.ByteString
getHeaderBytes global_stream_id = do 
    ht <- view stream2HeaderBlockFragment 
    Just bytes <- liftIO $ H.lookup ht global_stream_id
    return $ Bl.toStrict $ Bu.toLazyByteString bytes


frameIsHeaderOfStream :: InputFrame -> Maybe (GlobalStreamId, B.ByteString)
frameIsHeaderOfStream (NH2.Frame (NH2.FrameHeader _ _ stream_id) ( NH2.HeadersFrame _ block_fragment   ) )
    = Just (NH2.fromStreamIdentifier stream_id, block_fragment)
frameIsHeaderOfStream (NH2.Frame (NH2.FrameHeader _ _ stream_id) ( NH2.ContinuationFrame block_fragment) )
    = Just (NH2.fromStreamIdentifier stream_id, block_fragment)
frameIsHeaderOfStream _                                       
    = Nothing 


frameEndsHeaders  :: InputFrame -> Bool 
frameEndsHeaders (NH2.Frame (NH2.FrameHeader _ flags _) _) = NH2.testEndHeader flags


streamIdFromFrame :: InputFrame -> GlobalStreamId
streamIdFromFrame (NH2.Frame (NH2.FrameHeader _ _ stream_id) _) = NH2.fromStreamIdentifier stream_id


-- TODO: Have different size for the headers..... just now going with a default size of 16 k...
-- TODO: Find a way to kill this thread....
headersOutputThread :: Chan (GlobalStreamId, MVar HeadersSent, Headers)
                       -> MVar (Chan (Either SessionOutputCommand OutputFrame)) 
                       -> ReaderT SessionData IO ()
headersOutputThread input_chan session_output_mvar = forever $ do 
    (stream_id, headers_ready_mvar, headers) <- liftIO $ readChan input_chan
    -- liftIO $ putStrLn $ "Output headers: " ++ (show headers)

    -- First encode the headers using the table
    encode_dyn_table_mvar <- view toEncodeHeaders

    encode_dyn_table <- liftIO $ takeMVar encode_dyn_table_mvar
    (new_dyn_table, data_to_send ) <- liftIO $ HP.encodeHeader HP.defaultEncodeStrategy encode_dyn_table headers
    liftIO $ putMVar encode_dyn_table_mvar new_dyn_table

    -- Now split the bytestring in chunks of the needed size.... 
    bs_chunks <- return $! bytestringChunk useChunkLength data_to_send

    -- And send the chunks through while locking the output place....
    liftIO $ E.bracket
        (takeMVar session_output_mvar)
        (putMVar session_output_mvar )
        (\ session_output -> do
            writeIndividualHeaderFrames session_output stream_id bs_chunks True
            -- And say that the headers for this thread are out 
            (return ())
            putMVar headers_ready_mvar HeadersSent
            ) 
  where 
    writeIndividualHeaderFrames :: 
        Chan (Either SessionOutputCommand OutputFrame)
        -> GlobalStreamId 
        -> [B.ByteString] 
        -> Bool 
        -> IO ()
    writeIndividualHeaderFrames session_output stream_id (last_fragment:[]) is_first = 
        writeChan session_output $ Right ( NH2.EncodeInfo {
            NH2.encodeFlags     = NH2.setEndHeader NH2.defaultFlags 
            ,NH2.encodeStreamId = NH2.toStreamIdentifier stream_id 
            ,NH2.encodePadding  = Nothing }, 
            (if is_first then NH2.HeadersFrame Nothing last_fragment else  NH2.ContinuationFrame last_fragment)
            )
    writeIndividualHeaderFrames session_output stream_id  (fragment:xs) is_first = do 
        writeChan session_output $ Right ( NH2.EncodeInfo {
            NH2.encodeFlags     = NH2.defaultFlags 
            ,NH2.encodeStreamId = NH2.toStreamIdentifier stream_id 
            ,NH2.encodePadding  = Nothing }, 
            (if is_first then NH2.HeadersFrame Nothing fragment else  NH2.ContinuationFrame fragment)
            )
        writeIndividualHeaderFrames session_output stream_id xs False


bytestringChunk :: Int -> B.ByteString -> [B.ByteString]
bytestringChunk len s | (B.length s) < len = [ s ]
bytestringChunk len s = h:(bytestringChunk len xs)
  where 
    (h, xs) = B.splitAt len s 


-- TODO: find a clean way to finish this thread (maybe with negative stream ids?)
-- TODO: This function does non-optimal chunking for the case where responses are
--       actually streamed.... in those cases we need to keep state for frames in 
--       some other format.... 
-- TODO: Right now, we are transmitting an empty last frame with the end-of-stream
--       flag set. I'm afraid that the only
--       way to avoid that is by holding a frame or by augmenting the end-user interface
--       so that the user can signal which one is the last frame. The first approach
--       restricts responsiviness, the second one clutters things.
dataOutputThread :: Chan DataOutputToConveyor
                    -> MVar (Chan (Either SessionOutputCommand OutputFrame)) 
                    -> IO ()
dataOutputThread input_chan session_output_mvar = forever $ do 
    (stream_id, maybe_contents) <- readChan input_chan
    case maybe_contents of 
        Nothing -> do
            liftIO $ do
                (return ())
                withLockedSessionOutput
                    (\ session_output ->    writeChan session_output $ Right ( NH2.EncodeInfo {
                             NH2.encodeFlags     = NH2.setEndStream NH2.defaultFlags
                            ,NH2.encodeStreamId  = NH2.toStreamIdentifier stream_id 
                            ,NH2.encodePadding   = Nothing }, 
                            NH2.DataFrame ""
                            )
                        )

        Just contents -> do 
            -- And now just simply output it...
            let bs_chunks = bytestringChunk useChunkLength $! contents
            -- And send the chunks through while locking the output place....
            writeContinuations bs_chunks stream_id
            
  where 

    withLockedSessionOutput = E.bracket 
        (takeMVar session_output_mvar) 
        (putMVar session_output_mvar) -- <-- There is an implicit argument there!!

    writeContinuations :: [B.ByteString] -> GlobalStreamId  -> IO ()
    writeContinuations fragments stream_id  = mapM_ (\ fragment -> 
        withLockedSessionOutput (\ session_output -> writeChan session_output $ Right ( NH2.EncodeInfo {
            NH2.encodeFlags     = NH2.defaultFlags 
            ,NH2.encodeStreamId = NH2.toStreamIdentifier stream_id 
            ,NH2.encodePadding  = Nothing }, 
            NH2.DataFrame fragment ) )
        ) fragments



sessionExceptionHandler :: E.Exception e => SessionComponent -> Int -> SessionsContext -> e -> IO ()
sessionExceptionHandler session_component session_id sessions_context e = do 
    let
        getit = ( sessionsConfig . sessionsCallbacks . reportErrorCallback ) 
        maybe_error_callback = sessions_context ^. getit 
        error_tuple = (
            session_component,
            SessionCoordinates session_id, 
            E.toException e
            )
    case maybe_error_callback of 
        Nothing -> 
            errorM "HTTP2.Session" (show (e))

        Just callback -> 
            callback error_tuple