{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TemplateHaskell #-}
{-# LANGUAGE TypeApplications #-}
module Reflex.Backend.Socket
( socket
, SocketConfig(..)
, Socket(..)
, scInitSocket
, scMaxRx
, scSend
, scClose
, sReceive
, sOpen
, sClose
, sError
, module Reflex.Backend.Socket.Accept
, module Reflex.Backend.Socket.Connect
, module Reflex.Backend.Socket.Error
) where
import Control.Concurrent (forkIO)
import qualified Control.Concurrent.STM as STM
import Control.Exception (IOException, try)
import Control.Lens.TH (makeLenses)
import Control.Monad.IO.Class (MonadIO(..))
import Control.Monad.STM (atomically)
import Data.Align (align)
import Data.ByteString (ByteString)
import qualified Data.ByteString as B
import Data.Functor (($>), (<&>), void)
import Data.These
import qualified Network.Socket as NS
import Network.Socket.ByteString (sendAll, recv)
import Reflex
import Reflex.Backend.Socket.Accept
import Reflex.Backend.Socket.Connect
import Reflex.Backend.Socket.Error
data SocketConfig t = SocketConfig
{ _scInitSocket :: NS.Socket
, _scMaxRx :: Int
, _scSend :: Event t ByteString
, _scClose :: Event t ()
}
$(makeLenses ''SocketConfig)
data Socket t = Socket
{ _sReceive :: Event t ByteString
, _sOpen :: Event t ()
, _sClose :: Event t ()
, _sError :: Event t IOException
}
$(makeLenses ''Socket)
data SocketState
= Open
| Draining
| Closed
socket
:: forall t m.
( Reflex t
, PerformEvent t m
, PostBuild t m
, TriggerEvent t m
, MonadIO (Performable m)
, MonadIO m
)
=> SocketConfig t
-> m (Socket t)
socket (SocketConfig sock maxRx eTx eClose) = do
(eRx, onRx) <- newTriggerEvent
(eOpen, onOpen) <- newTriggerEvent
(eClosed, onClosed) <- newTriggerEvent
(eError, onError) <- newTriggerEvent
payloadQueue <- liftIO STM.newTQueueIO
state <- liftIO $ STM.newTVarIO Open
let
start = liftIO $ do
void $ forkIO txLoop
void $ forkIO rxLoop
void $ forkIO closeSentinel
onOpen ()
where
txLoop =
let
loop = do
mBytes <- atomically $
STM.readTVar state >>= \case
Closed -> pure Nothing
Draining -> STM.tryReadTQueue payloadQueue
Open -> STM.tryReadTQueue payloadQueue
>>= maybe STM.retry (pure . Just)
case mBytes of
Nothing -> shutdown
Just bs ->
try (sendAll sock bs) >>= \case
Left exc -> onError exc *> shutdown
Right () -> loop
in loop
rxLoop =
let
loop = atomically (STM.readTVar state) >>= \case
Open -> try (recv sock maxRx) >>= \case
Left exc -> onError exc *> shutdown
Right bs
| B.null bs -> shutdown
| otherwise -> onRx bs *> loop
_ -> pure ()
in loop
closeSentinel = do
atomically $ STM.readTVar state >>= \case
Closed -> pure ()
_ -> STM.retry
void . try @IOException $ NS.close sock
onClosed ()
shutdown = void . atomically $ STM.writeTVar state Closed
ePostBuild <- getPostBuild
performEvent_ $ ePostBuild $> start
let
eTxOrClose :: Event t (These ByteString ())
eTxOrClose = align eTx eClose
queueSend bs = STM.readTVar state >>= \case
Open -> STM.writeTQueue payloadQueue bs
_ -> pure ()
queueClose = STM.modifyTVar state $ \case
Open -> Draining
s -> s
performEvent_ $ eTxOrClose <&> liftIO . atomically . \case
This bs -> queueSend bs
That () -> queueClose
These bs () -> queueSend bs *> queueClose
pure $ Socket eRx eOpen eClosed eError