{-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE FlexibleInstances #-} {-# LANGUAGE MultiParamTypeClasses #-} {-# LANGUAGE Rank2Types #-} {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE TemplateHaskell #-} {-# LANGUAGE TupleSections #-} {-# LANGUAGE TypeFamilies #-} {-# LANGUAGE UndecidableInstances #-} {-# LANGUAGE ViewPatterns #-} -- | -- Module : Control.TimeWarp.Rpc.Transfer -- Copyright : (c) Serokell, 2016 -- License : GPL-3 (see the file LICENSE) -- Maintainer : Serokell -- Stability : experimental -- Portability : POSIX, GHC -- -- This module provides implementation of `MonadTransfer`. -- -- It operates with so called /lively sockets/, so that, if error occured while sending -- or receiving, it would try to restore connection before reporting error. -- -- When some data is sent for first time to given address, connection with single -- lively-socket is created; it would be reused for further sends until closed. -- -- Then server is getting up at some port, it creates single thread to handle incoming -- connections, then for each input connection lively-socket is created. -- -- TODO [TW-67]: close all connections upon quiting `Transfer` monad. -- -- -- About lively sockets: -- -- Lively socket keeps queue of byte chunks inside. -- For given lively-socket, @send@ function just pushes chunks to send-queue, whenever -- @receive@ infinitelly acquires chunks from receive-queue. -- Those queues are connected to plain socket behind the scene. -- -- Let's say lively socket to be /active/ if it successfully sends and receives -- required data at the moment. -- Upon becoming active, lively socket spawns `processing-observer` thread, which itself -- spawns 3 threads: one pushes chunks from send-queue to socket, another one -- pulls chunks from socket to receive-queue, and the last tracks whether socket was -- closed. -- Processor thread finishes in one of the following cases: -- -- * One of it's children threads threw an error -- -- * Socket was closed -- -- If some error occures, lively socket goes to exceptional state (which is not expressed -- in code, however), where it could be closed or provided with newly created plain socket -- to continue work with and thus become active again. -- -- UPGRADE-NOTE [TW-59]: -- Currently, if an error in listener occures (parse error), socket gets closed. -- Need to make it reconnect, if possible. module Control.TimeWarp.Rpc.Transfer ( -- * Transfer Transfer (..) , TransferException (..) , ConnectionPool , runTransfer , runTransferS , runTransferRaw , getConnPool -- * Settings , FailsInRow , Settings (..) ) where import qualified Control.Concurrent as C import Control.Concurrent.STM (STM, atomically, check) import qualified Control.Concurrent.STM.TBMChan as TBM import qualified Control.Concurrent.STM.TChan as TC import qualified Control.Concurrent.STM.TVar as TV import Control.Lens (at, at, each, makeLenses, use, view, (.=), (?=), (^..)) import Control.Monad (forM_, forever, guard, unless, when) import Control.Monad.Base (MonadBase) import Control.Monad.Catch (Exception, MonadCatch, MonadMask (mask), MonadThrow (..), bracket, bracketOnError, catchAll, finally, handleAll, onException, throwM) import Control.Monad.Morph (hoist) import Control.Monad.Reader (MonadReader (ask), ReaderT (..)) import Control.Monad.State (StateT (..)) import Control.Monad.Trans (MonadIO (..), lift) import Control.Monad.Trans.Control (MonadBaseControl (..)) import Control.Monad.Extra (whenM) import qualified Data.ByteString as BS import qualified Data.ByteString.Lazy as BL import Data.Conduit (Sink, Source, ($$)) import Data.Conduit.Binary (sinkLbs, sourceLbs) import Data.Conduit.Network (sinkSocket, sourceSocket) import Data.Conduit.TMChan (sinkTBMChan, sourceTBMChan) import Data.Default (Default (..)) import Data.HashMap.Strict (HashMap) import qualified Data.IORef as IR import Data.List (intersperse) import Data.Streaming.Network (acceptSafe, bindPortTCP, getSocketFamilyTCP) import Data.Text (Text) import Data.Text.Buildable (Buildable (build), build) import Data.Text.Encoding (decodeUtf8) import Data.Typeable (Typeable) import Formatting (bprint, builder, int, sformat, shown, stext, string, (%)) import qualified Network.Socket as NS import Serokell.Util.Base (inCurrentContext) import Serokell.Util.Concurrent (modifyTVarS) import System.Wlog (CanLog, HasLoggerName, LoggerNameBox, Severity (..), WithLogger, logDebug, logInfo, logMessage, logWarning) import Control.TimeWarp.Manager (InterruptType (..), JobCurator (..), addManagerAsJob, addSafeThreadJob, addThreadJob, interruptAllJobs, isInterrupted, jcIsClosed, mkJobCurator, stopAllJobs, unlessInterrupted) import Control.TimeWarp.Rpc.MonadTransfer (Binding (..), MonadTransfer (..), NetworkAddress, Port, ResponseContext (..), ResponseT, commLog, runResponseT, runResponseT, sendRaw) import Control.TimeWarp.Timed (Microsecond, MonadTimed, ThreadId, TimedIO, for, fork, fork_, interval, killThread, sec, wait) -- * Util logSeverityUnlessClosed :: (WithLogger m, MonadIO m) => Severity -> JobCurator -> Text -> m () logSeverityUnlessClosed severityIfNotClosed jm msg = do closed <- isInterrupted jm let severity = if closed then severityIfNotClosed else Debug logMessage severity msg -- * Related datatypes -- ** Exceptions -- | Error thrown if attempt to listen at already being listened connection is performed. data TransferException = AlreadyListeningOutbound Text deriving (Show, Typeable) instance Exception TransferException instance Buildable TransferException where build (AlreadyListeningOutbound addr) = bprint ("Already listening at outbound connection to "%stext) addr -- | Error thrown if peer was detected to close connection. data PeerClosedConnection = PeerClosedConnection deriving (Show, Typeable) instance Exception PeerClosedConnection instance Buildable PeerClosedConnection where build _ = "Peer closed connection" -- ** Connections -- | Textual representation of peer node. For debugging purposes only. type PeerAddr = Text data OutputConnection s = OutputConnection { outConnSend :: forall m . (MonadIO m, MonadMask m, WithLogger m) => Source m BS.ByteString -> m () -- ^ Function to send all data produced by source , outConnRec :: forall m . (MonadIO m, MonadMask m, MonadTimed m, MonadBaseControl IO m, WithLogger m) => Sink BS.ByteString (ResponseT s m) () -> m () -- ^ Function to stark sink-listener, returns synchronous closer , outConnJobCurator :: JobCurator -- ^ Job manager for this connection , outConnAddr :: PeerAddr -- ^ Address of socket on other side of net , outConnUserState :: s -- ^ State binded to socket } -- ** Settings -- | Number of consequent fails while trying to establish connection. type FailsInRow = Int data Settings = Settings { queueSize :: Int , reconnectPolicy :: forall m . (HasLoggerName m, MonadIO m) => FailsInRow -> m (Maybe Microsecond) } -- | Default settings, you can use it like @transferSettings { queueSize = 1 }@ instance Default Settings where def = Settings { queueSize = 100 , reconnectPolicy = \failsInRow -> return $ guard (failsInRow < 3) >> Just (interval 3 sec) } -- ** ConnectionPool newtype ConnectionPool s = ConnectionPool { _outputConn :: HashMap NetworkAddress (OutputConnection s) } makeLenses ''ConnectionPool initConnectionPool :: ConnectionPool s initConnectionPool = ConnectionPool { _outputConn = mempty } -- ** SocketFrame -- | Keeps data required to implement so-called /lively socket/. data SocketFrame s = SocketFrame { sfPeerAddr :: PeerAddr -- ^ Peer address, for debuging purposes only , sfInBusy :: TV.TVar Bool -- ^ Whether someone already listens on this socket , sfInChan :: TBM.TBMChan BS.ByteString -- ^ For incoming packs of bytes , sfOutChan :: TBM.TBMChan (BL.ByteString, IO ()) -- ^ For (packs of bytes to send, notification when bytes passed to socket) , sfJobCurator :: JobCurator -- ^ Job manager, tracks whether lively-socket wasn't closed. , sfUserState :: s } mkSocketFrame :: MonadIO m => Settings -> IO s -> PeerAddr -> m (SocketFrame s) mkSocketFrame settings mkUserState sfPeerAddr = liftIO $ do sfInBusy <- TV.newTVarIO False sfInChan <- TBM.newTBMChanIO (queueSize settings) sfOutChan <- TBM.newTBMChanIO (queueSize settings) sfJobCurator <- mkJobCurator sfUserState <- mkUserState return SocketFrame{..} -- | Makes sender function in terms of @MonadTransfer@ for given `SocketFrame`. -- This first extracts ready `Lazy.ByteString` from given source, and then passes it to -- sending queue. sfSend :: (MonadIO m, WithLogger m) => SocketFrame s -> Source m BS.ByteString -> m () sfSend SocketFrame{..} src = do lbs <- src $$ sinkLbs logQueueState (notifier, awaiter) <- mkMonitor liftIO . atomically . TBM.writeTBMChan sfOutChan $ (lbs, atomically notifier) -- wait till data get consumed by socket, but immediatelly quit on socket -- get closed. liftIO . atomically $ do let jm = getJobCurator sfJobCurator closed <- view jcIsClosed <$> TV.readTVar jm unless closed awaiter where -- creates pair (@notifier@, @awaiter@), where @awaiter@ blocks thread -- until @notifier@ is called. mkMonitor = do t <- liftIO $ TV.newTVarIO False return ( TV.writeTVar t True , check =<< TV.readTVar t ) logQueueState = do whenM (liftIO . atomically $ TBM.isFullTBMChan sfOutChan) $ commLog . logWarning $ sformat ("Send channel for "%shown%" is full") sfPeerAddr whenM (liftIO . atomically $ TBM.isClosedTBMChan sfOutChan) $ commLog . logWarning $ sformat ("Send channel for "%shown%" is closed, message wouldn't be sent") sfPeerAddr -- | Constructs function which allows to infinitelly listen on given `SocketFrame` -- in terms of `MonadTransfer`. -- Attempt to use this function twice will end with `AlreadyListeningOutbound` error. sfReceive :: (MonadIO m, MonadMask m, MonadTimed m, WithLogger m, MonadBaseControl IO m) => SocketFrame s -> Sink BS.ByteString (ResponseT s m) () -> m () sfReceive sf@SocketFrame{..} sink = do busy <- liftIO . atomically $ TV.swapTVar sfInBusy True when busy $ throwM $ AlreadyListeningOutbound sfPeerAddr liManager <- mkJobCurator onTimeout <- inCurrentContext logOnInterruptTimeout let interruptType = WithTimeout (interval 3 sec) onTimeout mask $ \unmask -> do addManagerAsJob sfJobCurator interruptType liManager addThreadJob liManager $ unmask $ logOnErr $ do -- TODO: reconnect on error? (sourceTBMChan sfInChan $$ sink) `runResponseT` sfMkResponseCtx sf logListeningHappilyStopped where logOnErr = handleAll $ \e -> unlessInterrupted sfJobCurator $ do commLog . logWarning $ sformat ("Server error: "%shown) e interruptAllJobs sfJobCurator Plain logOnInterruptTimeout = commLog . logDebug $ sformat ("While closing socket to "%stext%" listener "% "worked for too long, closing with no regard to it") sfPeerAddr logListeningHappilyStopped = commLog . logDebug $ sformat ("Listening on socket to "%stext%" happily stopped") sfPeerAddr sfClose :: SocketFrame s -> IO () sfClose SocketFrame{..} = do interruptAllJobs sfJobCurator Plain atomically $ do TBM.closeTBMChan sfInChan TBM.closeTBMChan sfOutChan clearInChan where clearInChan = TBM.tryReadTBMChan sfInChan >>= maybe (return ()) (const clearInChan) sfMkOutputConn :: SocketFrame s -> OutputConnection s sfMkOutputConn sf = OutputConnection { outConnSend = sfSend sf , outConnRec = sfReceive sf , outConnJobCurator = sfJobCurator sf , outConnAddr = sfPeerAddr sf , outConnUserState = sfUserState sf } sfMkResponseCtx :: SocketFrame s -> ResponseContext s sfMkResponseCtx sf = ResponseContext { respSend = sfSend sf , respClose = sfClose sf , respPeerAddr = sfPeerAddr sf , respUserState = sfUserState sf } -- | Starts workers, which connect channels in `SocketFrame` with real `NS.Socket`. -- If error in any worker occurs, it's propagated. sfProcessSocket :: (MonadIO m, MonadMask m, MonadTimed m, WithLogger m) => SocketFrame s -> NS.Socket -> m () sfProcessSocket SocketFrame{..} sock = do -- TODO: rewrite to async when MonadTimed supports it -- create channel to notify about error eventChan <- liftIO TC.newTChanIO -- create worker threads stid <- fork $ reportErrors eventChan foreverSend $ sformat ("foreverSend on "%stext) sfPeerAddr rtid <- fork $ reportErrors eventChan foreverRec $ sformat ("foreverRec on "%stext) sfPeerAddr commLog . logDebug $ sformat ("Start processing of socket to "%stext) sfPeerAddr -- check whether @isClosed@ keeps @True@ ctid <- fork $ do let jm = getJobCurator sfJobCurator liftIO . atomically $ check . view jcIsClosed =<< TV.readTVar jm liftIO . atomically $ TC.writeTChan eventChan $ Right () mapM_ killThread [stid, rtid] -- wait for error messages let onError e = do mapM_ killThread [stid, rtid, ctid] throwM e event <- liftIO . atomically $ TC.readTChan eventChan commLog . logDebug $ sformat ("Stop processing socket to "%stext) sfPeerAddr -- Left - worker error, Right - get closed either onError return event -- at this point workers are stopped where foreverSend = mask $ \unmask -> do datm <- liftIO . atomically $ TBM.readTBMChan sfOutChan forM_ datm $ \dat@(bs, notif) -> do let pushback = liftIO . atomically $ TBM.unGetTBMChan sfOutChan dat unmask (sourceLbs bs $$ sinkSocket sock) `onException` pushback -- TODO: if get async exception here ^, will send msg twice liftIO notif unmask foreverSend foreverRec = do hoist liftIO (sourceSocket sock) $$ sinkTBMChan sfInChan False unlessInterrupted sfJobCurator $ throwM PeerClosedConnection reportErrors eventChan action desc = action `catchAll` \e -> do commLog . logDebug $ sformat ("Caught error on "%stext%": " % shown) desc e liftIO . atomically . TC.writeTChan eventChan . Left $ e -- * Transfer newtype Transfer s a = Transfer { getTransfer :: ReaderT Settings (ReaderT (TV.TVar (ConnectionPool s)) (ReaderT (IO s) (LoggerNameBox TimedIO ) ) ) a } deriving (Functor, Applicative, Monad, MonadIO, MonadBase IO, MonadThrow, MonadCatch, MonadMask, MonadTimed, CanLog, HasLoggerName) type instance ThreadId (Transfer s) = C.ThreadId runTransferRaw :: Settings -> TV.TVar (ConnectionPool s) -> IO s -> Transfer s a -> LoggerNameBox TimedIO a runTransferRaw s m us t = flip runReaderT us $ flip runReaderT m $ flip runReaderT s $ getTransfer t -- | Run with specified settings. runTransferS :: Settings -> IO s -> Transfer s a -> LoggerNameBox TimedIO a runTransferS s us t = do m <- liftIO (TV.newTVarIO initConnectionPool) runTransferRaw s m us t -- | Run `Transfer`, with a way to create initial state for socket. runTransfer :: IO s -> Transfer s a -> LoggerNameBox TimedIO a runTransfer = runTransferS def modifyManager :: StateT (ConnectionPool s) STM a -> Transfer s a modifyManager how = Transfer . lift $ ask >>= liftIO . atomically . flip modifyTVarS how getConnPool :: Transfer s (TV.TVar (ConnectionPool s)) getConnPool = Transfer $ lift ask -- * Logic buildSockAddr :: NS.SockAddr -> PeerAddr buildSockAddr (NS.SockAddrInet port host) = let buildHost = mconcat . intersperse "." . map build . (^.. each) . NS.hostAddressToTuple in sformat (builder%":"%int) (buildHost host) port buildSockAddr (NS.SockAddrInet6 port _ host _) = let buildHost6 = mconcat . intersperse "." . map build . (^.. each) . NS.hostAddress6ToTuple in sformat (builder%":"%int) (buildHost6 host) port buildSockAddr (NS.SockAddrUnix addr) = sformat string addr buildSockAddr (NS.SockAddrCan addr) = sformat ("can:"%int) addr buildNetworkAddress :: NetworkAddress -> PeerAddr buildNetworkAddress (host, port) = sformat (stext%":"%int) (decodeUtf8 host) port listenInbound :: Port -> Sink BS.ByteString (ResponseT s (Transfer s)) () -> Transfer s (Transfer s ()) listenInbound (fromIntegral -> port) sink = do serverJobCurator <- mkJobCurator -- launch server bracketOnError (liftIO $ bindPortTCP port "*") (liftIO . NS.close) $ \lsocket -> mask $ \unmask -> addThreadJob serverJobCurator $ flip finally (liftIO $ NS.close lsocket) . unmask $ handleAll (logOnServerError serverJobCurator) $ serve lsocket serverJobCurator -- return closer inCurrentContext $ do commLog . logDebug $ sformat ("Stopping server at "%int) port stopAllJobs serverJobCurator commLog . logDebug $ sformat ("Server at "%int%" fully stopped") port where serve lsocket serverJobCurator = forever $ bracketOnError (liftIO $ acceptSafe lsocket) (liftIO . NS.close . fst) $ \(sock, addr) -> mask $ \unmask -> fork_ $ do settings <- Transfer ask us <- Transfer . lift . lift $ ask sf@SocketFrame{..} <- mkSocketFrame settings us $ buildSockAddr addr addManagerAsJob serverJobCurator Plain sfJobCurator logNewInputConnection sfPeerAddr unmask (processSocket sock sf serverJobCurator) `finally` liftIO (NS.close sock) -- makes socket work, finishes once it's fully shutdown processSocket sock sf@SocketFrame{..} jc = do liftIO $ NS.setSocketOption sock NS.ReuseAddr 1 sfReceive sf sink unlessInterrupted jc $ handleAll (logErrorOnServerSocketProcessing jc sfPeerAddr) $ do sfProcessSocket sf sock logInputConnHappilyClosed sfPeerAddr -- * Logs logNewInputConnection addr = commLog . logDebug $ sformat ("New input connection: "%int%" <- "%stext) port addr logErrorOnServerSocketProcessing jm addr e = logSeverityUnlessClosed Warning jm $ sformat ("Error in server socket "%int%" connected with "%stext%": "%shown) port addr e logOnServerError jm e = logSeverityUnlessClosed Error jm $ sformat ("Server at port "%int%" stopped with error "%shown) port e logInputConnHappilyClosed addr = commLog . logInfo $ sformat ("Happily closing input connection "%int%" <- "%stext) port addr -- | Listens for incoming bytes on outbound connection. -- This thread doesn't block current thread. Use returned function to close relevant -- connection. listenOutbound :: NetworkAddress -> Sink BS.ByteString (ResponseT s (Transfer s)) () -> Transfer s (Transfer s ()) listenOutbound addr sink = do conn <- getOutConnOrOpen addr outConnRec conn sink return $ stopAllJobs $ outConnJobCurator conn getOutConnOrOpen :: NetworkAddress -> Transfer s (OutputConnection s) getOutConnOrOpen addr@(host, fromIntegral -> port) = mask $ \unmask -> do (conn, sfm) <- ensureConnExist forM_ sfm $ \sf -> addSafeThreadJob (sfJobCurator sf) $ unmask (startWorker sf) `finally` releaseConn sf return conn where addrName = buildNetworkAddress addr ensureConnExist = do settings <- Transfer ask let getOr m act = maybe act (return . (, Nothing)) m -- two-phase connection creation -- 1. check whether connection already exists: if doesn't, make `SocketFrame`. -- 2. check again, if still absent push connection to pool mconn <- modifyManager $ use $ outputConn . at addr getOr mconn $ do us <- Transfer . lift . lift $ ask sf <- mkSocketFrame settings us addrName let conn = sfMkOutputConn sf modifyManager $ do mres <- use $ outputConn . at addr getOr mres $ do outputConn . at addr ?= conn return (conn, Just sf) startWorker sf = do failsInRow <- liftIO $ IR.newIORef 0 commLog . logDebug $ sformat ("Lively socket to "%stext%" created, processing") (sfPeerAddr sf) withRecovery sf failsInRow $ bracket (liftIO $ fst <$> getSocketFamilyTCP host port NS.AF_UNSPEC) (liftIO . NS.close) $ \sock -> do liftIO $ IR.writeIORef failsInRow 0 commLog . logDebug $ sformat ("Established connection to "%stext) (sfPeerAddr sf) sfProcessSocket sf sock withRecovery sf failsInRow action = catchAll action $ \e -> do closed <- isInterrupted (sfJobCurator sf) unless closed $ do commLog . logWarning $ sformat ("Error while working with socket to "%stext%": "%shown) addrName e reconnect <- reconnectPolicy <$> Transfer ask fails <- liftIO $ succ <$> IR.readIORef failsInRow liftIO $ IR.writeIORef failsInRow fails maybeReconnect <- reconnect fails case maybeReconnect of Nothing -> commLog . logWarning $ sformat ("Can't connect to "%shown%", closing connection") addr Just delay -> do commLog . logWarning $ sformat ("Reconnect in "%shown) delay wait (for delay) withRecovery sf failsInRow action releaseConn sf = do interruptAllJobs (sfJobCurator sf) Plain modifyManager $ outputConn . at addr .= Nothing commLog . logDebug $ sformat ("Socket to "%stext%" closed") addrName instance MonadTransfer s (Transfer s) where sendRaw addr src = do conn <- getOutConnOrOpen addr outConnSend conn src listenRaw (AtPort port) = listenInbound port listenRaw (AtConnTo addr) = listenOutbound addr -- closes asynchronuosly close addr = do maybeConn <- modifyManager . use $ outputConn . at addr forM_ maybeConn $ \conn -> interruptAllJobs (outConnJobCurator conn) Plain userState addr = outConnUserState <$> getOutConnOrOpen addr -- * Instances instance MonadBaseControl IO (Transfer s) where type StM (Transfer s) a = StM (LoggerNameBox TimedIO) a liftBaseWith io = Transfer $ liftBaseWith $ \runInBase -> io $ runInBase . getTransfer restoreM = Transfer . restoreM