module Stream
where

  import qualified Data.Conduit as C
  import           Data.Conduit (($$),(=$))
  import           Data.Conduit.Network

  import qualified Data.ByteString.Char8 as B
  import qualified Data.ByteString.UTF8  as U

  import           Network.Mom.Stompl.Parser (stompParser)
  import qualified Network.Mom.Stompl.Frame as F
  import           Network.Mom.Stompl.Client.Exception

  import           Control.Monad (forever)
  import           Control.Monad.Trans (liftIO)
  import           Control.Concurrent

  import qualified Data.Attoparsec.ByteString as A 

  ------------------------------------------------------------------------
  -- Error Handler
  ------------------------------------------------------------------------
  type EH = StomplException -> IO ()

  ------------------------------------------------------------------------
  -- A TCP/IP fragment read by the Conduit Client has 4096 bytes.
  -- We allow 1000 fragments = 1024 * 4096 Bytes = 4MB
  ------------------------------------------------------------------------
  maxStep :: Int
  maxStep = 1024

  ------------------------------------------------------------------------
  -- Sender thread: get a Frame from a pipe, convert it into a ByteString
  --                and send it through a socket 
  ------------------------------------------------------------------------
  sender :: AppData -> Chan F.Frame -> IO ()
  sender ad ip =  pipeSource ip $$ stream =$ appSink ad

  ------------------------------------------------------------------------
  -- Receiver thread: get a ByteStream through a socket,
  --                  parse it to a Frame and send it through a pipe
  ------------------------------------------------------------------------
  receiver :: AppData -> Chan F.Frame -> EH -> IO ()
  receiver ad ip eh = appSource ad $$ parseC eh =$ pipeSink ip 

  ------------------------------------------------------------------------
  -- Put a frame into a pipe (a channel)
  ------------------------------------------------------------------------
  pipeSink :: Chan F.Frame -> C.Sink F.Frame IO ()
  pipeSink ch = C.awaitForever (liftIO . writeChan ch)

  ------------------------------------------------------------------------
  -- Read a frame from a pipe (a channel)
  ------------------------------------------------------------------------
  pipeSource :: Chan F.Frame -> C.Source IO F.Frame
  pipeSource ch = forever (liftIO (readChan ch) >>= C.yield)

  ------------------------------------------------------------------------
  -- Convert a frame to a ByteString
  ------------------------------------------------------------------------
  stream :: C.ConduitM F.Frame B.ByteString IO ()
  stream = C.awaitForever (C.yield . F.putFrame)

  ------------------------------------------------------------------------
  -- Parse a Frame from a ByteString
  ------------------------------------------------------------------------
  parseC :: EH -> C.ConduitM B.ByteString F.Frame IO ()
  parseC eh = goOn
    where goOn = go (A.parse stompParser) 0 -- start with a clean parser
          go prs step = do
            mbNew <- C.await
            case mbNew of 
              Nothing -> return () -- socket was closed
              Just s  -> case parseAll prs s of
                           -- parse error: call the error handler ---------
                           Left e -> liftIO (eh $ ProtocolException e)
                                     >> goOn
                           -- we got a result -----------------------------
                           Right (prs', fs) -> do
                             -- Do we have (at least) 1 frame to send? ----
                             step' <- if null fs then return (step+1) 
                                                 else mapM_ C.yield fs >>
                                                      return 0
                             -- Too many fragments ------------------------
                             if step' > maxStep 
                               then liftIO (eh $ ProtocolException 
                                                 "Message too long!") 

                             -- Continue with the current parser ----------
                               else go prs' step'

  ------------------------------------------------------------------------
  -- A parser is something that converts a ByteString into a Frame
  ------------------------------------------------------------------------
  type Parser = B.ByteString -> A.Result F.Frame

  ------------------------------------------------------------------------
  -- Continue parsing until we have a complete frame
  ------------------------------------------------------------------------
  parseAll :: Parser -> B.ByteString -> 
              Either String (Parser, [F.Frame])
  parseAll prs s = case prs s of
                     -- We failed ----------------------------------------
                     A.Fail _ _   e  -> Left $ U.toString s ++ ": " ++ e

                     -- We have a partial result and continue -------------
                     --    feeding this partial result --------------------
                     r@(A.Partial _) -> Right (A.feed r, [])

                     -- We are done ---------------------------------------
                     A.Done s' f     -> 
                       if B.null s' 
                         then Right (A.parse stompParser, [f])
                         -- but there may be a leftover -------------------
                         else case parseAll (A.parse stompParser) s' of
                                Left e           -> Left e
                                Right (prs', fs) -> Right (prs',f:fs)