{-# LANGUAGE FlexibleContexts    #-}
{-# LANGUAGE LambdaCase          #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TemplateHaskell     #-}
{-# LANGUAGE TypeApplications    #-}

{-|
Module      : Reflex.Backend.Socket
Description : Wrap a TCP socket to communicate through @Event t ByteString@
Copyright   : (c) 2018-2019, Commonwealth Scientific and Industrial Research Organisation
License     : BSD3
Maintainer  : dave.laing.80@gmail.com, jack.kelly@data61.csiro.au
Stability   : experimental
Portability : non-portable

Use 'socket' to wrap a network 'Socket' so that it sends out the
firings of an @'Event' t 'ByteString'@, and fires any data that it
receives on another @'Event' t 'ByteString'@.
-}

module Reflex.Backend.Socket
  ( socket

    -- * Socket configuration
  , SocketConfig(..)

    -- * Socket output events
  , Socket(..)

    -- * Lenses
    -- ** 'SocketConfig'
  , scInitSocket
  , scMaxRx
  , scSend
  , scClose

    -- ** 'Socket'
  , sReceive
  , sOpen
  , sClose
  , sError

    -- * Convenience re-exports
  , module Reflex.Backend.Socket.Accept
  , module Reflex.Backend.Socket.Connect
  , module Reflex.Backend.Socket.Error
  ) where

import           Control.Concurrent (forkIO)
import qualified Control.Concurrent.STM as STM
import           Control.Exception (IOException, try)
import           Control.Lens.TH (makeLenses)
import           Control.Monad.IO.Class (MonadIO(..))
import           Control.Monad.STM (atomically)
import           Data.Align (align)
import           Data.ByteString (ByteString)
import qualified Data.ByteString as B
import           Data.Functor (($>), (<&>), void)
import           Data.These
import qualified Network.Socket as NS
import           Network.Socket.ByteString (sendAll, recv)
import           Reflex
import           Reflex.Backend.Socket.Accept
import           Reflex.Backend.Socket.Connect
import           Reflex.Backend.Socket.Error

-- | Holds the socket to wire into the FRP network, and events that
-- drive it.
data SocketConfig t = SocketConfig
  { _scInitSocket :: NS.Socket
    -- ^ Socket to wrap.
  , _scMaxRx :: Int
    -- ^ Maximum number of bytes to read at a time.
  , _scSend :: Event t ByteString
    -- ^ Data to send out on this socket.
  , _scClose :: Event t ()
    -- ^ Ask to close the socket. The socket will stop trying to
    -- receive data (and the '_sReceive' event will stop firing), and
    -- the socket will be "drained": future events on '_scSend' will
    -- be ignored, and it will close after writing all pending data.
    -- If '_scSend' and '_scClose' fire in the same frame, the data
    -- will nevertheless be queued for sending.
  }

$(makeLenses ''SocketConfig)

-- | Events produced by an active socket.
data Socket t = Socket
  { _sReceive :: Event t ByteString
    -- ^ Data has arrived.
  , _sOpen :: Event t ()
    -- ^ The socket has opened, and its receive/send loops are running.
  , _sClose :: Event t ()
    -- ^ The socket has closed. This will fire exactly once when the
    -- socket closes for any reason, including if your '_scClose'
    -- event fires, the other end disconnects, or if the socket closes
    -- in response to a caught exception.
  , _sError :: Event t IOException
    -- ^ An exception occurred. Treat the socket as closed after you
    -- see this. If the socket was open, you will see the '_sClose'
    -- event fire as well, but not necessarily in the same frame.
  }

$(makeLenses ''Socket)

data SocketState
  = Open
    -- ^ Data flows in both directions
  | Draining
    -- ^ We've been asked to close, but will transmit all pending data
    -- first (and not accept any more)
  | Closed
    -- ^ Hard close. Don't transmit pending data.

-- | Wire a socket into the FRP network. You will likely use this to
-- attach events to a socket that you just connected (from
-- 'Reflex.Backend.Socket.Connect.connect'), or a socket that you just
-- accepted (from the 'Reflex.Backend.Socket.Accept._aAcceptSocket'
-- event you got when you called
-- 'Reflex.Backend.Socket.Accept.accept').
socket
  :: forall t m.
     ( Reflex t
     , PerformEvent t m
     , PostBuild t m
     , TriggerEvent t m
     , MonadIO (Performable m)
     , MonadIO m
     )
  => SocketConfig t
  -> m (Socket t)
socket (SocketConfig sock maxRx eTx eClose) = do
  (eRx, onRx) <- newTriggerEvent
  (eOpen, onOpen) <- newTriggerEvent
  (eClosed, onClosed) <- newTriggerEvent
  (eError, onError) <- newTriggerEvent

  payloadQueue <- liftIO STM.newTQueueIO
  state <- liftIO $ STM.newTVarIO Open

  let
    start = liftIO $ do
      void $ forkIO txLoop
      void $ forkIO rxLoop
      void $ forkIO closeSentinel
      onOpen ()

      where
        txLoop =
          let
            loop = do
              mBytes <- atomically $
                STM.readTVar state >>= \case
                  Closed -> pure Nothing
                  Draining -> STM.tryReadTQueue payloadQueue
                  Open -> STM.tryReadTQueue payloadQueue
                    >>= maybe STM.retry (pure . Just)

              case mBytes of
                Nothing -> shutdown
                Just bs ->
                  try (sendAll sock bs) >>= \case
                    Left exc -> onError exc *> shutdown
                    Right () -> loop
          in loop

        rxLoop =
          let
            loop = atomically (STM.readTVar state) >>= \case
              Open -> try (recv sock maxRx) >>= \case
                Left exc -> onError exc *> shutdown
                Right bs
                  | B.null bs -> shutdown
                  | otherwise -> onRx bs *> loop
              _ -> pure ()
          in loop

        closeSentinel = do
          atomically $ STM.readTVar state >>= \case
            Closed -> pure ()
            _ -> STM.retry

          void . try @IOException $ NS.close sock
          onClosed ()

        shutdown = void . atomically $ STM.writeTVar state Closed

  ePostBuild <- getPostBuild
  performEvent_ $ ePostBuild $> start

  -- If we see a tx and a close event in the same frame, we want to
  -- process the tx before the close, so it doesn't get lost.
  let
    eTxOrClose :: Event t (These ByteString ())
    eTxOrClose = align eTx eClose

    queueSend bs = STM.readTVar state >>= \case
      Open -> STM.writeTQueue payloadQueue bs
      _ -> pure ()

    queueClose = STM.modifyTVar state $ \case
      Open -> Draining
      s -> s

  performEvent_ $ eTxOrClose <&> liftIO . atomically . \case
    This bs -> queueSend bs
    That () -> queueClose
    These bs () -> queueSend bs *> queueClose

  pure $ Socket eRx eOpen eClosed eError