-- The framer has two functions: to convert bytes to Frames and the other way around,
-- and two keep track of flow-control quotas.
{-# LANGUAGE OverloadedStrings, StandaloneDeriving, FlexibleInstances,
             DeriveDataTypeable, TemplateHaskell #-}
{-# OPTIONS_HADDOCK hide #-}
module SecondTransfer.Http2.Framer (
    BadPrefaceException,

    wrapSession,
    http2FrameLength,

    -- Not needed anywhere, but supress the warning about unneeded symbol
    closeAction
    ) where



import           Control.Concurrent                     hiding (yield)
import qualified Control.Concurrent                     as CC(yield)
import qualified Control.Concurrent.MSem                as MS
import           Control.Concurrent.STM.TMVar
import qualified Control.Concurrent.STM                 as STM
import           Control.Exception
import qualified Control.Exception                      as E
import           Control.Lens                           (view, (^.) )
import qualified Control.Lens                           as L
import           Control.Monad                          (unless, when, replicateM_)
import           Control.Monad.IO.Class                 (liftIO)
import qualified Control.Monad.Catch                    as C
import           Control.Monad.Trans.Class              (lift)
import           Control.DeepSeq                        (($!!))
import           Control.Monad.Trans.Reader
import           Data.Binary                            (decode)
import qualified Data.ByteString                        as B
import           Data.ByteString.Char8                  (pack)
import qualified Data.ByteString.Lazy                   as LB
import qualified Data.ByteString.Builder                as Bu
import           Data.Conduit
import           Data.Foldable                          (find)
import qualified Data.PQueue.Min                        as PQ
import           Data.Maybe                             (fromMaybe)

import qualified Network.HTTP2                          as NH2
-- Logging utilities
import           System.Log.Logger

import qualified Data.HashTable.IO                      as H
import           System.Clock                           (Clock(..),getTime)

import           SecondTransfer.Sessions.Internal       (
                                                         sessionExceptionHandler,
                                                         nextSessionId,
                                                         sessionsConfig,
                                                         SessionsContext)
import           SecondTransfer.Sessions.Config
import           SecondTransfer.Http2.Session
import           SecondTransfer.MainLoop.CoherentWorker (AwareWorker, fragmentDeliveryCallback_Ef, priorityEffect_Ef)
import qualified SecondTransfer.MainLoop.Framer         as F
import           SecondTransfer.MainLoop.PushPullType
import           SecondTransfer.Utils                   (Word24, word24ToInt)
import           SecondTransfer.Exception
import           SecondTransfer.MainLoop.Logging        (logWithExclusivity, logit)

import           Debug.Trace                            (trace)

#include "Logging.cpphs"


http2PrefixLength :: Int
http2PrefixLength = B.length NH2.connectionPreface

-- Let's do flow control here here ....

type HashTable k v = H.CuckooHashTable k v


type GlobalStreamId = Int


data FlowControlCommand =
     AddBytes_FCM Int
    |Finish_FCM

-- A hashtable from stream id to channel of availabiliy increases
type Stream2AvailSpace = HashTable GlobalStreamId (MVar FlowControlCommand)


data CanOutput = CanOutput


data NoHeadersInChannel = NoHeadersInChannel


-- Maximum number of packets in the priority queue waiting for
-- delivery. More than this, and I will simply block...
maxPacketsInQueue :: Int
maxPacketsInQueue = 32


-----In this implementation, this  v- and this  v- member should not change during the lieftime
----- of the stream.
-----------------------------------v--priority, v-- stream_id ----|------v-ordinal
newtype PrioPacket = PrioPacket ( (Int ,        Int,                     Int     ),  LB.ByteString)
                     deriving Show

instance Eq PrioPacket where
   (==) (PrioPacket (a,_)) (PrioPacket (b,_)) = a == b

instance Ord PrioPacket where
    compare (PrioPacket (a,_)) (PrioPacket (b,_)) = compare a b


-- Simple thread to prioritize frames in the session
data PrioritySendState = PrioritySendState {
     -- We need a semaphore so that this doesn't get stagnated waiting on writes
     _semToSend :: MS.MSem Int
     ,_prioQ :: TMVar (PQ.MinQueue PrioPacket)
     }


data FramerSessionData = FramerSessionData {
      _stream2flow           :: MVar Stream2AvailSpace
    -- Two members below: the place where you put data which is going to be flow-controlled,
    -- and the place with the ordinal
    , _stream2outputBytes    :: MVar ( HashTable GlobalStreamId (MVar LB.ByteString, MVar Int) )
    , _defaultStreamWindow   :: MVar Int

    -- Wait variable to output bytes to the channel
    , _canOutput             :: MVar CanOutput
    -- Flag that says if the session has been unwound... if such,
    -- threads are adviced to exit as early as possible
    , _outputIsForbidden     :: MVar Bool
    , _noHeadersInChannel    :: MVar NoHeadersInChannel
    , _pushAction            :: PushAction
    , _closeAction           :: CloseAction

    -- Global id of the session, used for e.g. error reporting.
    , _sessionIdAtFramer     :: Int

    -- Sessions context, used for thing like e.g. error reporting
    , _sessionsContext       :: SessionsContext

    -- For GoAway frames
    , _lastStream            :: MVar Int

    -- For sending data orderly
    , _prioritySendState     :: PrioritySendState
    }

L.makeLenses ''FramerSessionData


type FramerSession = ReaderT FramerSessionData IO


wrapSession :: AwareWorker -> SessionsContext -> Attendant
wrapSession aware_worker sessions_context attendant_callbacks = do

    let
        session_id_mvar = view nextSessionId sessions_context
        push_action = attendant_callbacks ^. pushAction_AtC
        pull_action = attendant_callbacks ^. pullAction_AtC
        close_action = attendant_callbacks ^. closeAction_AtC
        best_effort_pull_action = attendant_callbacks ^. bestEffortPullAction_AtC


    new_session_id <- modifyMVarMasked
        session_id_mvar $
        \ session_id -> return (session_id+1, session_id)

    (session_input, session_output) <- http2Session
                                        aware_worker
                                        new_session_id
                                        sessions_context

    -- TODO : Add type annotations....
    s2f                       <- H.new
    stream2flow_mvar          <- newMVar s2f
    s2o                       <- H.new
    stream2output_bytes_mvar  <- newMVar s2o
    default_stream_size_mvar  <- newMVar 65536
    can_output                <- newMVar CanOutput
    no_headers_in_channel     <- newMVar NoHeadersInChannel
    last_stream_id            <- newMVar 0
    output_is_forbidden       <- newMVar False

    prio_mvar                 <- STM.atomically $ newTMVar PQ.empty
    sem_to_send               <- MS.new maxPacketsInQueue



    -- We need some shared state
    let framer_session_data = FramerSessionData {
        _stream2flow          = stream2flow_mvar
        ,_stream2outputBytes  = stream2output_bytes_mvar
        ,_defaultStreamWindow = default_stream_size_mvar
        ,_canOutput           = can_output
        ,_noHeadersInChannel  = no_headers_in_channel
        ,_pushAction          = push_action
        ,_closeAction         = close_action
        ,_sessionIdAtFramer   = new_session_id
        ,_sessionsContext     = sessions_context
        ,_lastStream          = last_stream_id
        ,_outputIsForbidden   = output_is_forbidden
        ,_prioritySendState   = PrioritySendState {
                                    _semToSend = sem_to_send,
                                    _prioQ = prio_mvar
                                }
        }


    let
        -- TODO: Dodgy exception handling here...
        close_on_error session_id session_context comp =
            E.finally (
                E.catch
                    (
                        E.catch
                            comp
                            (exc_handler session_id session_context)
                    )
                    (io_exc_handler session_id session_context)
                )
                close_action

        exc_handler :: Int -> SessionsContext -> FramerException -> IO ()
        exc_handler x y e = do
            modifyMVar_ output_is_forbidden (\ _ -> return True)
            INSTRUMENTATION( errorM "HTTP2.Framer" "Exception went up" )
            sessionExceptionHandler Framer_HTTP2SessionComponent x y e

        io_exc_handler :: Int -> SessionsContext -> IOProblem -> IO ()
        io_exc_handler _x _y _e =
            modifyMVar_ output_is_forbidden (\ _ -> return True)
            -- !!! These exceptions are way too common for we to care....
            -- errorM "HTTP2.Framer" "Exception went up"
            -- sessionExceptionHandler Framer_HTTP2SessionComponent x y e


    forkIO
        $ close_on_error new_session_id sessions_context
        $ runReaderT (inputGatherer pull_action session_input ) framer_session_data
    forkIO
        $ close_on_error new_session_id sessions_context
        $ runReaderT (outputGatherer session_output ) framer_session_data
    -- Actual data is reordered before being sent
    forkIO
        $ close_on_error new_session_id sessions_context
        $ runReaderT sendReordering framer_session_data

    return ()


http2FrameLength :: F.LengthCallback
http2FrameLength bs
    | B.length bs >= 3     = let
        word24 = decode input_as_lbs :: Word24
        input_as_lbs = LB.fromStrict bs
      in
        Just $ word24ToInt word24 + 9 -- Nine bytes that the frame header always uses
    | otherwise = Nothing


addCapacity ::
        GlobalStreamId ->
        Int           ->
        FramerSession Bool
addCapacity 0         delta_cap =
    -- TODO: Implement session flow control
    return True
addCapacity stream_id delta_cap =
    do
        table_mvar <- view stream2flow
        val <- liftIO $ withMVar table_mvar $ \ table ->
            H.lookup table stream_id
        last_stream_mvar <- view lastStream
        last_stream <- liftIO . readMVar $ last_stream_mvar
        case val of
            Nothing | stream_id > last_stream ->
                      return False
                    | otherwise -> -- If the stream was seen already, interpret this as a
                                  -- rogue WINDOW_UPDATE and do nothing
                      return True

            Just command_chan -> do
                liftIO $ putMVar command_chan $ AddBytes_FCM delta_cap
                return True


finishFlowControlForStream :: GlobalStreamId -> FramerSession ()
finishFlowControlForStream stream_id =
    do
        table_mvar <- view stream2flow
        liftIO . withMVar table_mvar $ \ table -> do
            val <- H.lookup table stream_id
            case val of
                -- Weird
                Nothing -> return ()

                Just command_chan -> do
                    liftIO $
                        H.delete table stream_id
                    return ()
        table2_mvar <- view stream2outputBytes
        liftIO . withMVar table2_mvar $ \ table -> do
          val <- H.lookup table stream_id
          case val of
              Nothing -> return ()

              Just _ ->
                 liftIO $ H.delete table stream_id


readNextFrame :: Monad m =>
    (Int -> m B.ByteString)                      -- ^ Generator action
    -> Source m (Maybe NH2.Frame)                -- ^ Packet and leftovers, if we could get them
readNextFrame pull_action  = do
    -- First get 9 bytes with the frame header
    frame_header_bs <- lift $ pull_action 9
    -- decode it
    let
        (frame_type_id, frame_header) = NH2.decodeFrameHeader frame_header_bs
        NH2.FrameHeader payload_length _ _ =  frame_header
    -- Get as many bytes as the payload length identifies
    payload_bs <- lift $ pull_action payload_length
    -- Return the entire frame, or raise an exception...
    let
        either_frame = NH2.decodeFramePayload frame_type_id frame_header payload_bs
    case either_frame of
        Right frame_payload -> do
            yield . Just $ NH2.Frame frame_header frame_payload
            readNextFrame pull_action
        Left  _     ->
            yield   Nothing


-- This works by pulling bytes from the input side of the pipeline and converting them to frames.
-- The frames are then put in the SessionInput. In the other end of the SessionInput they can be
-- interpreted according to their HTTP/2 meaning.
--
-- This function also does part of the flow control: it registers WindowUpdate frames and triggers
-- quota updates on the streams.
inputGatherer :: PullAction -> SessionInput -> FramerSession ()
inputGatherer pull_action session_input = do
    -- We can start by reading off the prefix....
    prefix <- liftIO $ pull_action http2PrefixLength
    if prefix /= NH2.connectionPreface
      then do
        sendGoAwayFrame NH2.ProtocolError
        liftIO $
            -- We just use the GoAway frame, although this is awfully early
            -- and probably wrong
            throwIO BadPrefaceException
      else
        INSTRUMENTATION(  debugM "HTTP2.Framer" "Prologue validated" )
    let
        source::Source FramerSession (Maybe NH2.Frame)
        source = transPipe liftIO $ readNextFrame pull_action
    source $$ consume True
  where

    sendToSession :: Bool -> InputFrame -> IO ()
    sendToSession starting frame =
      -- print(NH2.streamId $ NH2.frameHeader frame)
      if starting
        then
          sendFirstFrameToSession session_input frame
        else
          sendMiddleFrameToSession session_input frame

    abortSession :: Sink a FramerSession ()
    abortSession =
      lift $ do
        sendGoAwayFrame NH2.ProtocolError
        -- Inform the session that it can tear down itself
        liftIO $ sendCommandToSession session_input CancelSession_SIC
        -- Any resources remaining here can be disposed
        releaseFramer

    consume_continue = consume False

    consume :: Bool -> Sink (Maybe NH2.Frame) FramerSession ()
    consume starting = do
        maybe_maybe_frame <- await

        case maybe_maybe_frame of

            Just Nothing      ->
                abortSession

            Just (Just right_frame) -> do
                case right_frame of

                    (NH2.Frame (NH2.FrameHeader _ _ stream_id) (NH2.WindowUpdateFrame credit) ) -> do
                        -- Bookkeep the increase on bytes on that stream
                        -- liftIO $ putStrLn $ "Extra capacity for stream " ++ (show stream_id)
                        succeeded <- lift $ addCapacity stream_id (fromIntegral credit)
                        unless succeeded abortSession


                    frame@(NH2.Frame _ (NH2.SettingsFrame settings_list) ) -> do
                        -- Increase all the stuff....
                        case find (\(i,_) -> i == NH2.SettingsInitialWindowSize) settings_list of

                            Just (_, new_default_stream_size) -> do
                                old_default_stream_size_mvar <- view defaultStreamWindow
                                old_default_stream_size <- liftIO $ takeMVar old_default_stream_size_mvar
                                let general_delta = new_default_stream_size - old_default_stream_size
                                stream_to_flow <- view stream2flow
                                -- Add capacity to everybody's windows
                                liftIO . withMVar stream_to_flow $ \ stream_to_flow' ->
                                    H.mapM_ (\ (k,v) ->
                                                 when (k /=0 ) $ putMVar v (AddBytes_FCM $! general_delta)
                                            )
                                            stream_to_flow'


                                -- And set a new value
                                liftIO $ putMVar old_default_stream_size_mvar $! new_default_stream_size


                            Nothing ->
                                -- This is a silenced internal error
                                return ()

                        -- And send the frame down to the session, so that session specific settings
                        -- can be applied.
                        liftIO $ sendToSession starting $! frame


                    a_frame@(NH2.Frame (NH2.FrameHeader _ _ stream_id) _ )   -> do
                        -- Update the keep of last stream
                        -- lift . startStreamOutputQueueIfNotExists (NH2.fromStreamIdentifier stream_id) priority
                        lift . updateLastStream $ stream_id

                        -- Send frame to the session
                        liftIO $ sendToSession starting a_frame

                -- tail recursion: go again...
                consume_continue

            Nothing    ->
                -- We may as well exit this thread
               return ()


-- All the output frames come this way first
outputGatherer :: SessionOutput -> FramerSession ()
outputGatherer session_output = do
    frame_sent_report_callback <- view $
       sessionsContext            .
       sessionsConfig             .
       sessionsCallbacks          .
       dataDeliveryCallback_SC

    session_id <- view sessionIdAtFramer

    let

       dataForFrame p1 p2 =
           LB.fromStrict $ NH2.encodeFrame p1 p2

       cont = loopPart session_id frame_sent_report_callback

       loopPart :: Int -> Maybe DataFrameDeliveryCallback -> FramerSession ()
       loopPart session_id frame_sent_report_callback = do
           command_or_frame  <- liftIO $ getFrameFromSession session_output
           case command_or_frame of

               Left CancelSession_SOC -> do
                   -- The session wants to cancel things
                   INSTRUMENTATION(  debugM "HTTP2.Framer" "CancelSession_SOC processed")
                   releaseFramer

               Left (FinishStream_SOC stream_id ) ->
                   -- Session knows that we are done with the given stream, and that we can release
                   -- the flow control structures
                   -- NOTICE: Unfortunately, this doesn't work, so ignore the message for now
                   -- finishFlowControlForStream stream_id.
                   cont

               Right ( p1@(NH2.EncodeInfo _ stream_idii _), p2@(NH2.DataFrame _), ef ) -> do
                   -- This frame is flow-controlled... I may be unable to send this frame in
                   -- some circumstances...
                   let
                       stream_id = stream_idii
                       priority = fromMaybe stream_id $ ef ^. priorityEffect_Ef

                   startStreamOutputQueueIfNotExists stream_id $ priority

                   stream2output_mvar <- view stream2outputBytes
                   lookup_result <- liftIO $ withMVar stream2output_mvar $ \ s2o -> H.lookup s2o stream_id
                   (stream_bytes_chan, frame_ordinal_mvar) <- case lookup_result of
                       Nothing ->
                           error "It is the end of the world at Framer.hs"
                       Just x -> return x

                   -- All the dance below is to avoid a system call if there is no need
                   case (frame_sent_report_callback, ef ^. fragmentDeliveryCallback_Ef ) of
                       (Just c1, Just c2) -> liftIO $ do
                           -- Here we invoke the client's callback.
                           when_delivered <- getTime Monotonic
                           ordinal <- modifyMVar frame_ordinal_mvar $ \ o -> return (o+1, o)
                           c1 session_id stream_id ordinal when_delivered
                           c2 ordinal when_delivered
                       (Nothing, Just c2) -> liftIO $ do
                           when_delivered <- getTime Monotonic
                           ordinal <- modifyMVar frame_ordinal_mvar $ \ o -> return (o+1, o)
                           c2 ordinal when_delivered
                       (Just c1, Nothing) -> liftIO $ do
                           when_delivered <- getTime Monotonic
                           ordinal <- modifyMVar frame_ordinal_mvar $ \ o -> return (o+1, o)
                           c1 session_id stream_id ordinal when_delivered
                       (Nothing, Nothing) -> return ()

                   liftIO $ putMVar stream_bytes_chan $! dataForFrame p1 p2
                   cont


               Right (p1, p2@(NH2.PushPromiseFrame _ _), _effect ) -> do
                   handleHeadersOfStream p1 p2
                   cont

               Right (p1, p2@(NH2.HeadersFrame _ _), _effect ) -> do
                   handleHeadersOfStream p1 p2
                   cont

               Right (p1, p2@(NH2.ContinuationFrame _), _effect ) -> do
                   handleHeadersOfStream p1 p2
                   cont

               Right (p1, p2, _effect) -> do
                   -- Most other frames go right away... as long as no headers are in process...
                   no_headers <- view noHeadersInChannel
                   liftIO $ takeMVar no_headers
                   pushFrame p1 p2
                   liftIO $ putMVar no_headers NoHeadersInChannel
                   cont

    -- We start by sending a settings frame
    pushFrame
        (NH2.EncodeInfo NH2.defaultFlags 0 Nothing)
        (NH2.SettingsFrame [])

    -- And then we continue...
    loopPart session_id frame_sent_report_callback


updateLastStream :: GlobalStreamId  -> FramerSession ()
updateLastStream stream_id = do
    last_stream_id_mvar <- view lastStream
    liftIO $ modifyMVar_ last_stream_id_mvar (\ x -> return $ max x stream_id)

startStreamOutputQueueIfNotExists :: GlobalStreamId -> Int -> FramerSession ()
startStreamOutputQueueIfNotExists stream_id priority = do
    table_mvar <- view stream2flow
    val <- liftIO . withMVar table_mvar  $ \ table -> H.lookup table stream_id
    case val of
        Nothing | stream_id /= 0 -> do
            startStreamOutputQueue stream_id priority
            return ()

        _ ->
            return ()


-- Handles only Data frames.
startStreamOutputQueue :: Int -> Int -> FramerSession (MVar LB.ByteString, MVar FlowControlCommand)
startStreamOutputQueue stream_id priority = do
    -- New thread for handling outputs of this stream is needed
    bytes_chan   <- liftIO newEmptyMVar
    ordinal_num  <- liftIO $ newMVar 0
    command_chan <- liftIO newEmptyMVar

    s2o_mvar <- view stream2outputBytes

    liftIO . withMVar s2o_mvar $ \ s2o ->  H.insert s2o stream_id (bytes_chan, ordinal_num)

    stream2flow_mvar <- view stream2flow


    liftIO . withMVar stream2flow_mvar  $ \ s2c ->  H.insert s2c stream_id command_chan

    --
    initial_cap_mvar <- view defaultStreamWindow
    initial_cap <- liftIO $ readMVar initial_cap_mvar
    close_action <- view closeAction
    sessions_context <- view sessionsContext
    session_id' <- view sessionIdAtFramer
    output_is_forbidden_mvar <- view outputIsForbidden

    -- And don't forget the thread itself
    let
        close_on_error session_id session_context comp =
            E.finally
                (E.catch
                    comp
                    (exc_handler session_id session_context)
                )
                close_action

        exc_handler :: Int -> SessionsContext -> IOProblem -> IO ()
        exc_handler x y e = do
            -- Let's also decree that other streams don't even try
            modifyMVar_ output_is_forbidden_mvar ( \ _ -> return True)
            sessionExceptionHandler Framer_HTTP2SessionComponent x y e


    read_state <- ask
    liftIO $ forkIO $ close_on_error session_id' sessions_context  $ runReaderT
        (flowControlOutput stream_id priority initial_cap 0 "" command_chan bytes_chan)
        read_state

    return (bytes_chan , command_chan)


-- This works in the output side of the HTTP/2 framing session, and it acts as a
-- semaphore ensuring that headers are output without any interleaved frames.
--
-- There are more synchronization mechanisms in Http2.Session that ensure we
-- only get here frames from one and the same stream.
handleHeadersOfStream :: NH2.EncodeInfo -> NH2.FramePayload -> FramerSession ()
handleHeadersOfStream p1@(NH2.EncodeInfo {}) frame_payload
    | frameIsHeadersAndOpensStream frame_payload && not (frameEndsHeaders p1 frame_payload) = do
        -- Take it
        no_headers <- view noHeadersInChannel
        liftIO $ takeMVar no_headers
        pushFrame p1 frame_payload
        -- Don't put the MvAR HERE

    | frameIsHeadersAndOpensStream frame_payload && frameEndsHeaders p1 frame_payload = do
        no_headers <- view noHeadersInChannel
        liftIO $ takeMVar no_headers
        pushFrame p1 frame_payload
        -- Since we finish....
        liftIO $ putMVar no_headers NoHeadersInChannel

    | frameEndsHeaders p1 frame_payload = do
        -- I can only get here for a continuation frame  after something else that is a headers
        no_headers <- view noHeadersInChannel
        pushFrame p1 frame_payload
        liftIO $ putMVar no_headers NoHeadersInChannel
        return ()

    | otherwise =
        -- Nothing to do with the mvar, the no_headers should be empty
        pushFrame p1 frame_payload


-- Used to know when we need to switch the channel to "exclusive" mode.
frameIsHeadersAndOpensStream :: NH2.FramePayload -> Bool
frameIsHeadersAndOpensStream (NH2.HeadersFrame _  _ )      = True
frameIsHeadersAndOpensStream (NH2.PushPromiseFrame _ _)    = True
frameIsHeadersAndOpensStream _                             = False


frameEndsHeaders  :: NH2.EncodeInfo -> NH2.FramePayload -> Bool
frameEndsHeaders (NH2.EncodeInfo flags _ _) (NH2.HeadersFrame _ _) = NH2.testEndHeader flags
frameEndsHeaders (NH2.EncodeInfo flags _ _) (NH2.ContinuationFrame _) = NH2.testEndHeader flags
frameEndsHeaders (NH2.EncodeInfo flags _ _) (NH2.PushPromiseFrame _ _) = NH2.testEndHeader flags
frameEndsHeaders _ _ = False


-- Push a frame into the output channel... this waits for the
-- channel to be free to send.
pushFrame :: NH2.EncodeInfo
             -> NH2.FramePayload -> FramerSession ()
pushFrame p1 p2 = do
    let bs = LB.fromStrict $ NH2.encodeFrame p1 p2
    sendBytes bs


sendGoAwayFrame :: NH2.ErrorCodeId -> FramerSession ()
sendGoAwayFrame error_code = do
    last_stream_id_mvar <- view lastStream
    last_stream_id <- liftIO $ readMVar last_stream_id_mvar
    pushFrame
        (NH2.EncodeInfo NH2.defaultFlags 0 Nothing)
        (NH2.GoAwayFrame last_stream_id error_code "")


-- From this point on data is really serialized,
sendBytes :: LB.ByteString -> FramerSession ()
sendBytes bs = do
    push_action <- view pushAction
    -- I don't think I need to lock here...
    can_output <- view canOutput
    liftIO $
        bs `seq`
            C.bracket
                (takeMVar   can_output)
                (const $ push_action bs)
                (putMVar can_output)


-- A thread in charge of doing flow control transmission....This sends already
-- formatted frames (ByteStrings), not the frames themselves. And it doesn't
-- mess with the structure of the packets.
--
-- There is one of these for each stream
--
flowControlOutput :: Int -> Int -> Int -> Int ->  LB.ByteString -> MVar FlowControlCommand -> MVar LB.ByteString ->  FramerSession ()
flowControlOutput stream_id priority capacity ordinal leftovers commands_chan bytes_chan =
    if leftovers == ""
      then do
        -- Get more data (possibly block waiting for it)
        bytes_to_send <- liftIO $ takeMVar bytes_chan
        flowControlOutput stream_id priority capacity ordinal  bytes_to_send commands_chan bytes_chan
      else do
        -- Length?
        let amount = fromIntegral  (LB.length leftovers - 9)
        if  amount <= capacity
          then do
            -- Is
            -- I can send ... if no headers are in process....
            -- liftIO . logit $ "set-priority (stream_id, prio, ordinal) " `mappend` (pack . show) (__stream_id, priority, ordinal)
            withPrioritySend priority stream_id ordinal leftovers
            flowControlOutput  stream_id priority (capacity - amount) (ordinal+1) "" commands_chan bytes_chan
          else do
            -- I can not send because flow-control is full, wait for a command instead
            command <- liftIO $ takeMVar commands_chan
            case command of
                AddBytes_FCM delta_cap ->
                    -- liftIO $ putStrLn $ "Flow control delta_cap stream " ++ (show stream_id)
                    flowControlOutput stream_id priority (capacity + delta_cap) ordinal leftovers commands_chan bytes_chan


releaseFramer :: FramerSession ()
releaseFramer =
    -- Release any resources pending...

    return ()


-- This prioritizes DATA packets, in some rudimentary way.
-- This code will be replaced in due time for something compliant.
withPrioritySend :: Int -> Int -> Int -> LB.ByteString -> FramerSession ()
withPrioritySend priority stream_id packet_ordinal datum = do
    PrioritySendState {_semToSend = s, _prioQ = pqm } <- view prioritySendState
    let
        new_record = PrioPacket  ( (priority, stream_id, packet_ordinal), datum)
    -- Now, for this to work, I need to have enough capacity to add something to the queue
    liftIO $ do
        -- We are using a semaphore to avoid overflowing this place. Notice that the flow
        -- control output is till using an unbounded queue!!!
        MS.wait s
        -- And add it to the queue....
        STM.atomically $ do
            pq <- takeTMVar pqm
            putTMVar pqm $ PQ.insert new_record pq


-- In charge of actually sending the data frames, in a special thread (create in the caller)
sendReordering :: FramerSession ()
sendReordering = do
    PrioritySendState {_semToSend = s, _prioQ = pqm } <- view prioritySendState
    no_headers <- view noHeadersInChannel
    -- Get a packet, if possible, or wait for it
    PrioPacket ( (priority_, stream_id, packed_ordinal_), datum) <- liftIO . STM.atomically $ do
        pq <- takeTMVar pqm
        if PQ.null pq
          then STM.retry
          else do
            let
              (record, thinner) =  PQ.deleteFindMin pq
            putTMVar pqm thinner
            return record
    -- liftIO . logit $ "prio-send (prio,stream_id, ordinal) " `mappend` (pack . show) (priority, stream_id, packed_ordinal_)
    -- Since I got something to send, I can let one of the guys to put more stuff
    liftIO $ MS.signal s
    -- Now we do the tries and locks for headers
    C.bracket
        (liftIO $ takeMVar no_headers)
        (\ _ -> liftIO $ putMVar no_headers NoHeadersInChannel)
        (\ _ -> sendBytes datum )

    -- Sleep for a little bit, we dont' want too many frames
    -- here too fast. BIG TODO: We need a better way to handle this
    liftIO $ replicateM_ 10 CC.yield
    --liftIO $ threadDelay 100

    -- And tail-recurse
    sendReordering