{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE PatternGuards #-}

module Network.HTTP2.Server.Receiver (frameReceiver) where

import Control.Concurrent
import Control.Concurrent.STM
import qualified Control.Exception as E
import qualified Data.ByteString as BS
import qualified Data.ByteString.Char8 as C8
import Data.IORef
import Network.HPACK
import Network.HPACK.Token
import Network.HTTP2
import Network.HTTP2.Priority (toPrecedence, delete, prepare)

import Imports hiding (delete, insert)
import Network.HTTP2.Server.API
import Network.HTTP2.Server.EncodeFrame
import Network.HTTP2.Server.HPACK
import Network.HTTP2.Server.Types
import Network.HTTP2.Server.Context
import Network.HTTP2.Server.Stream
import Network.HTTP2.Server.Queue

----------------------------------------------------------------

-- | Type for input streaming.
data Source = Source !(IORef ByteString) !(IO ByteString)

mkSource :: IO ByteString -> IO Source
mkSource func = do
    ref <- newIORef BS.empty
    return $! Source ref func

readSource :: Source -> IO ByteString
readSource (Source ref func) = do
    bs <- readIORef ref
    if BS.null bs
        then func
        else do
            writeIORef ref BS.empty
            return bs

frameReceiver :: Context -> (BufferSize -> IO ByteString) -> IO ()
frameReceiver ctx recvN = loop 0 `E.catch` sendGoaway
  where
    Context{ http2settings
           , streamTable
           , concurrency
           , continued
           , clientStreamId
           , inputQ
           , controlQ
           } = ctx
    sendGoaway e
      | Just (ConnectionError err msg) <- E.fromException e = do
          csid <- readIORef clientStreamId
          let !frame = goawayFrame csid err msg
          enqueueControl controlQ $ CGoaway frame
      | otherwise = return ()

    sendReset err sid = do
        let !frame = resetFrame err sid
        enqueueControl controlQ $ CFrame frame

    loop :: Int -> IO ()
    loop !n
      | n == 6 = do
          yield
          loop 0
      | otherwise = do
        hd <- recvN frameHeaderLength
        if BS.null hd then
            enqueueControl controlQ CFinish
          else do
            cont <- processStreamGuardingError $ decodeFrameHeader hd
            when cont $ loop (n + 1)

    processStreamGuardingError (fid, FrameHeader{streamId})
      | isResponse streamId &&
        (fid `notElem` [FramePriority,FrameRSTStream,FrameWindowUpdate]) =
        E.throwIO $ ConnectionError ProtocolError "stream id should be odd"
    processStreamGuardingError (FrameUnknown _, FrameHeader{payloadLength}) = do
        mx <- readIORef continued
        case mx of
            Nothing -> do
                -- ignoring unknown frame
                consume payloadLength
                return True
            Just _  -> E.throwIO $ ConnectionError ProtocolError "unknown frame"
    processStreamGuardingError (FramePushPromise, _) =
        E.throwIO $ ConnectionError ProtocolError "push promise is not allowed"
    processStreamGuardingError typhdr@(ftyp, header@FrameHeader{payloadLength}) = do
        settings <- readIORef http2settings
        case checkFrameHeader settings typhdr of
            Left h2err -> case h2err of
                StreamError err sid -> do
                    sendReset err sid
                    consume payloadLength
                    return True
                connErr -> E.throwIO connErr
            Right _ -> do
                ex <- E.try $ controlOrStream ftyp header
                case ex of
                    Left (StreamError err sid) -> do
                        sendReset err sid
                        return True
                    Left connErr -> E.throw connErr
                    Right cont -> return cont

    controlOrStream ftyp header@FrameHeader{streamId, payloadLength}
      | isControl streamId = do
          pl <- recvN payloadLength
          control ftyp header pl ctx
      | otherwise = do
          checkContinued
          !mstrm <- getStream
          pl <- recvN payloadLength
          case mstrm of
            Nothing -> do
                -- for h2spec only
                when (ftyp == FramePriority) $ do
                    PriorityFrame newpri <- guardIt $ decodePriorityFrame header pl
                    checkPriority newpri streamId
                return True -- just ignore this frame
            Just strm@Stream{streamPrecedence} -> do
              state <- readStreamState strm
              state' <- stream ftyp header pl ctx state strm
              case state' of
                  Open (NoBody tbl@(_,reqvt) pri) -> do
                      resetContinued
                      let mcl = fst <$> (getHeaderValue tokenContentLength reqvt >>= C8.readInt)
                      when (just mcl (/= (0 :: Int))) $
                          E.throwIO $ StreamError ProtocolError streamId
                      writeIORef streamPrecedence $ toPrecedence pri
                      halfClosedRemote ctx strm
                      tlr <- newIORef Nothing
                      atomically $ writeTQueue inputQ $ Input strm $ Request tbl (Just 0) (return "") tlr
                  Open (HasBody tbl@(_,reqvt) pri) -> do
                      resetContinued
                      q <- newTQueueIO
                      let !mcl = fst <$> (getHeaderValue tokenContentLength reqvt >>= C8.readInt)
                      writeIORef streamPrecedence $ toPrecedence pri
                      bodyLength <- newIORef 0
                      tlr <- newIORef Nothing
                      setStreamState ctx strm $ Open (Body q mcl bodyLength tlr)
                      readQ <- newReadBody q
                      bodySource <- mkSource readQ
                      atomically $ writeTQueue inputQ $ Input strm $ Request tbl mcl (readSource bodySource) tlr
                  s@(Open Continued{}) -> do
                      setContinued
                      setStreamState ctx strm s
                  HalfClosedRemote -> do
                      resetContinued
                      halfClosedRemote ctx strm
                  s -> do -- Idle, Open Body, Closed
                      resetContinued
                      setStreamState ctx strm s
              return True
       where
         setContinued = writeIORef continued (Just streamId)
         resetContinued = writeIORef continued Nothing
         checkContinued = do
             mx <- readIORef continued
             case mx of
                 Nothing  -> return ()
                 Just sid
                   | sid == streamId && ftyp == FrameContinuation -> return ()
                   | otherwise -> E.throwIO $ ConnectionError ProtocolError "continuation frame must follow"
         getStream = do
             mstrm0 <- search streamTable streamId
             case mstrm0 of
                 js@(Just strm0) -> do
                     when (ftyp == FrameHeaders) $ do
                         st <- readStreamState strm0
                         when (isHalfClosedRemote st) $ E.throwIO $ ConnectionError StreamClosed "header must not be sent to half or fully closed stream"
                         -- Priority made an idele stream
                         when (isIdle st) $ opened ctx strm0
                     return js
                 Nothing
                   | isResponse streamId -> return Nothing
                   | otherwise           -> do
                         csid <- readIORef clientStreamId
                         if streamId <= csid then -- consider the stream closed
                             if ftyp `elem` [FrameWindowUpdate, FrameRSTStream, FramePriority] then
                                 return Nothing -- will be ignored
                               else
                                 E.throwIO $ ConnectionError ProtocolError "stream identifier must not decrease"
                           else do -- consider the stream idle
                             when (ftyp `notElem` [FrameHeaders,FramePriority]) $
                                 E.throwIO $ ConnectionError ProtocolError $ "this frame is not allowed in an idle stream: " `BS.append` C8.pack (show ftyp)
                             when (ftyp == FrameHeaders) $ do
                                 writeIORef clientStreamId streamId
                                 cnt <- readIORef concurrency
                                 -- Checking the limitation of concurrency
                                 when (cnt >= maxConcurrency) $
                                     E.throwIO $ StreamError RefusedStream streamId
                             ws <- initialWindowSize <$> readIORef http2settings
                             newstrm <- newStream streamId (fromIntegral ws)
                             when (ftyp == FrameHeaders) $ opened ctx newstrm
                             insert streamTable streamId newstrm
                             return $ Just newstrm

    consume = void . recvN

maxConcurrency :: Int
maxConcurrency = recommendedConcurrency

initialFrame :: ByteString
initialFrame = settingsFrame id [(SettingsMaxConcurrentStreams,maxConcurrency)]

----------------------------------------------------------------

control :: FrameTypeId -> FrameHeader -> ByteString -> Context -> IO Bool
control FrameSettings header@FrameHeader{flags} bs Context{http2settings, controlQ, firstSettings, streamTable} = do
    SettingsFrame alist <- guardIt $ decodeSettingsFrame header bs
    traverse_ E.throwIO $ checkSettingsList alist
    -- HTTP/2 Setting from a browser
    unless (testAck flags) $ do
        oldws <- initialWindowSize <$> readIORef http2settings
        modifyIORef' http2settings $ \old -> updateSettings old alist
        newws <- initialWindowSize <$> readIORef http2settings
        let diff = newws - oldws
        when (diff /= 0) $ updateAllStreamWindow (+ diff) streamTable
        let !frame = settingsFrame setAck []
        sent <- readIORef firstSettings
        let !setframe
              | sent      = CSettings               frame alist
              | otherwise = CSettings0 initialFrame frame alist
        unless sent $ writeIORef firstSettings True
        enqueueControl controlQ setframe
    return True

control FramePing FrameHeader{flags} bs Context{controlQ} =
    if testAck flags then
        return True -- just ignore
      else do
        let !frame = pingFrame bs
        enqueueControl controlQ $ CFrame frame
        return True

control FrameGoAway _ _ Context{controlQ} = do
    enqueueControl controlQ CFinish
    return False

control FrameWindowUpdate header bs Context{connectionWindow} = do
    WindowUpdateFrame n <- guardIt $ decodeWindowUpdateFrame header bs
    !w <- atomically $ do
      w0 <- readTVar connectionWindow
      let !w1 = w0 + n
      writeTVar connectionWindow w1
      return w1
    when (isWindowOverflow w) $ E.throwIO $ ConnectionError FlowControlError "control window should be less than 2^31"
    return True

control _ _ _ _ =
    -- must not reach here
    return False

----------------------------------------------------------------

{-# INLINE guardIt #-}
guardIt :: Either HTTP2Error a -> IO a
guardIt x = case x of
    Left err    -> E.throwIO err
    Right frame -> return frame


{-# INLINE checkPriority #-}
checkPriority :: Priority -> StreamId -> IO ()
checkPriority p me
  | dep == me = E.throwIO $ StreamError ProtocolError me
  | otherwise = return ()
  where
    dep = streamDependency p

stream :: FrameTypeId -> FrameHeader -> ByteString -> Context -> StreamState -> Stream -> IO StreamState
stream FrameHeaders header@FrameHeader{flags} bs ctx (Open JustOpened) Stream{streamNumber} = do
    HeadersFrame mp frag <- guardIt $ decodeHeadersFrame header bs
    pri <- case mp of
        Nothing -> return defaultPriority
        Just p  -> do
            checkPriority p streamNumber
            return p
    let !endOfStream = testEndStream flags
        !endOfHeader = testEndHeader flags
    if endOfHeader then do
        tbl <- hpackDecodeHeader frag ctx -- fixme
        return $ if endOfStream then
                    Open (NoBody tbl pri)
                   else
                    Open (HasBody tbl pri)
      else do
        let !siz = BS.length frag
        return $ Open $ Continued [frag] siz 1 endOfStream pri

stream FrameHeaders header@FrameHeader{flags} bs ctx (Open (Body q _ _ tlr)) _ = do
    HeadersFrame _ frag <- guardIt $ decodeHeadersFrame header bs
    let !endOfStream = testEndStream flags
    if endOfStream then do
        tbl <- hpackDecodeTrailer frag ctx
        writeIORef tlr (Just tbl)
        atomically $ writeTQueue q ""
        return HalfClosedRemote
      else
        -- we don't support continuation here.
        E.throwIO $ ConnectionError ProtocolError "continuation in trailer is not supported"

-- ignore data-frame except for flow-control when we're done locally
stream FrameData
       FrameHeader{flags,payloadLength}
       _bs
       Context{controlQ} s@(HalfClosedLocal _)
       Stream{streamNumber} = do
    let !endOfStream = testEndStream flags
    when (payloadLength /= 0) $ do
        let !frame1 = windowUpdateFrame 0 payloadLength
            !frame2 = windowUpdateFrame streamNumber payloadLength
            !frame = frame1 `BS.append` frame2
        enqueueControl controlQ $ CFrame frame
    if endOfStream then do
        return HalfClosedRemote
      else
        return s

stream FrameData
       header@FrameHeader{flags,payloadLength,streamId}
       bs
       Context{controlQ} s@(Open (Body q mcl bodyLength _))
       Stream{streamNumber} = do
    DataFrame body <- guardIt $ decodeDataFrame header bs
    let !endOfStream = testEndStream flags
    len0 <- readIORef bodyLength
    let !len = len0 + payloadLength
    writeIORef bodyLength len
    when (payloadLength /= 0) $ do
        let !frame1 = windowUpdateFrame 0 payloadLength
            !frame2 = windowUpdateFrame streamNumber payloadLength
            !frame = frame1 `BS.append` frame2
        enqueueControl controlQ $ CFrame frame
    atomically $ writeTQueue q body
    if endOfStream then do
        case mcl of
            Nothing -> return ()
            Just cl -> when (cl /= len) $ E.throwIO $ StreamError ProtocolError streamId
        -- no trailers
        atomically $ writeTQueue q ""
        return HalfClosedRemote
      else
        return s

stream FrameContinuation FrameHeader{flags} frag ctx (Open (Continued rfrags siz n endOfStream pri)) _ = do
    let !endOfHeader = testEndHeader flags
        !rfrags' = frag : rfrags
        !siz' = siz + BS.length frag
        !n' = n + 1
    when (siz' > 51200) $ -- fixme: hard coding: 50K
      E.throwIO $ ConnectionError EnhanceYourCalm "Header is too big"
    when (n' > 10) $ -- fixme: hard coding
      E.throwIO $ ConnectionError EnhanceYourCalm "Header is too fragmented"
    if endOfHeader then do
        let !hdrblk = BS.concat $ reverse rfrags'
        tbl <- hpackDecodeHeader hdrblk ctx
        return $ if endOfStream then
                    Open (NoBody tbl pri)
                   else
                    Open (HasBody tbl pri)
      else
        return $ Open $ Continued rfrags' siz' n' endOfStream pri

stream FrameWindowUpdate header@FrameHeader{streamId} bs _ s Stream{streamWindow} = do
    WindowUpdateFrame n <- guardIt $ decodeWindowUpdateFrame header bs
    !w <- atomically $ do
      w0 <- readTVar streamWindow
      let !w1 = w0 + n
      writeTVar streamWindow w1
      return w1
    when (isWindowOverflow w) $
        E.throwIO $ StreamError FlowControlError streamId
    return s

stream FrameRSTStream header bs ctx _ strm = do
    RSTStreamFrame e <- guardIt $ decoderstStreamFrame header bs
    let !cc = Reset e
    closed ctx strm cc
    return $ Closed cc -- will be written to streamState again

stream FramePriority header bs Context{outputQ,priorityTreeSize} s Stream{streamNumber,streamPrecedence} = do
    PriorityFrame newpri <- guardIt $ decodePriorityFrame header bs
    checkPriority newpri streamNumber
    oldpre <- readIORef streamPrecedence
    let !newpre = toPrecedence newpri
    writeIORef streamPrecedence newpre
    if isIdle s then do
        n <- atomicModifyIORef' priorityTreeSize (\x -> (x+1,x+1))
        -- fixme hard coding
        when (n >= 20) $ E.throwIO $ ConnectionError EnhanceYourCalm "too many idle priority frames"
        prepare outputQ streamNumber newpri
      else do
        mout <- delete outputQ streamNumber oldpre
        traverse_ (enqueueOutput outputQ) mout
    return s

-- this ordering is important
stream FrameContinuation _ _ _ _ _ = E.throwIO $ ConnectionError ProtocolError "continue frame cannot come here"
stream _ _ _ _ (Open Continued{}) _ = E.throwIO $ ConnectionError ProtocolError "an illegal frame follows header/continuation frames"
-- Ignore frames to streams we have just reset, per section 5.1.
stream _ _ _ _ st@(Closed (ResetByMe _)) _ = return st
stream FrameData FrameHeader{streamId} _ _ _ _ = E.throwIO $ StreamError StreamClosed streamId
stream _ FrameHeader{streamId} _ _ _ _ = E.throwIO $ StreamError ProtocolError streamId

----------------------------------------------------------------

{-# INLINE newReadBody #-}
newReadBody :: TQueue ByteString -> IO (IO ByteString)
newReadBody q = do
    ref <- newIORef False
    return $ readBody q ref

{-# INLINE readBody #-}
readBody :: TQueue ByteString -> IORef Bool -> IO ByteString
readBody q ref = do
    eof <- readIORef ref
    if eof then
        return ""
      else do
        bs <- atomically $ readTQueue q
        when (bs == "") $ writeIORef ref True
        return bs