{-# LANGUAGE RecordWildCards #-}

-- | The rationale behind this buffer not tracking any size parameter, is that
-- it will always be used as part of the receive window. As such, there should
-- only ever be at most a full window's worth of data queued, as other data will
-- be rejected.

module Hans.Buffer.Stream (
    Buffer(),
    newBuffer,
    closeBuffer,
    bytesAvailable,
    putBytes,
    takeBytes,
    tryTakeBytes,
  ) where

import Hans.Buffer.Signal

import           Control.Monad (when,unless)
import qualified Data.ByteString as S
import qualified Data.ByteString.Lazy as L
import           Data.IORef (IORef,newIORef,atomicModifyIORef',readIORef)
import qualified Data.Sequence as Seq


-- Stream Buffers --------------------------------------------------------------

data Buffer = Buffer { bufState :: !(IORef State)
                       -- ^ The buffer and current size

                     , bufSignal :: !Signal
                       -- ^ Data available signal
                     }

newBuffer :: IO Buffer
newBuffer  =
  do bufState  <- newIORef emptyState
     bufSignal <- newSignal
     return Buffer { .. }

closeBuffer :: Buffer -> IO ()
closeBuffer Buffer { .. } =
  do atomicModifyIORef' bufState sClose
     signal bufSignal

bytesAvailable :: Buffer -> IO Bool
bytesAvailable Buffer { .. } =
  do st <- readIORef bufState
     return (not (sNull st))

putBytes :: S.ByteString -> Buffer -> IO ()
putBytes bytes Buffer { .. } =
  do unless (S.null bytes) (atomicModifyIORef' bufState (sPut bytes))
     signal bufSignal

-- | Take up to n bytes from the buffer, blocking until some data is ready.
takeBytes :: Int -> Buffer -> IO L.ByteString
takeBytes n Buffer { .. }
  | n <= 0    = return L.empty
  | otherwise = loop
  where
  loop =
    do (bytes,more,closed) <- atomicModifyIORef' bufState (sTake n)
       if L.null bytes && not closed
          then do waitSignal bufSignal
                  loop
          else do when (more || closed) (signal bufSignal)
                  return bytes

-- | Take up to n bytes from the buffer, returning immediately if no data is
-- available.
tryTakeBytes :: Int -> Buffer -> IO (Maybe L.ByteString)
tryTakeBytes n Buffer { .. }
  | n <= 0    = return Nothing
  | otherwise =
    do (bytes,more,closed) <- atomicModifyIORef' bufState (sTake n)
       when (more || closed) (signal bufSignal)
       if L.null bytes
          then return Nothing
          else return (Just bytes)


-- Internal State --------------------------------------------------------------

data State = State { stBuf    :: !(Seq.Seq S.ByteString)
                   , stClosed :: !Bool
                   }

emptyState :: State
emptyState  = State { stBuf = Seq.empty, stClosed = False }

sNull :: State -> Bool
sNull State { .. } = Seq.null stBuf

sClose :: State -> (State, ())
sClose State { .. } = (State { stClosed = True, .. }, ())

-- | Remove up to n bytes of data from the internal state.
sTake :: Int -> State -> (State, (L.ByteString,Bool,Bool))
sTake n0 State { .. } = go [] n0 stBuf
  where

  go acc n mem
    | n > 0 =
      case Seq.viewl mem of

        buf Seq.:< mem'

          | S.length buf > n ->
            let (as,bs) = S.splitAt n buf
             in finalize (L.fromStrict as:acc) (bs Seq.<| mem')

          | otherwise ->
            go (L.fromStrict buf:acc) (n - S.length buf) mem'


        Seq.EmptyL ->
          finalize acc Seq.empty

    | otherwise =
      finalize acc mem

  finalize acc mem =
    ( State { stBuf = mem, .. }
    , (L.concat (reverse acc), not (Seq.null mem), stClosed)
    )


sPut :: S.ByteString -> State -> (State, ())
sPut bytes State { .. } = (State { stBuf = stBuf Seq.|> bytes, .. }, ())