module Database.EventStore.Internal.Processor
( InternalException(..)
, Processor(..)
, DropReason(..)
, Subscription(..)
, newProcessor
) where
import Control.Concurrent
import Control.Concurrent.STM
import Control.Exception
import Data.Monoid ((<>))
import Data.Typeable
import Data.Word
import System.IO
import Text.Printf
import Control.Concurrent.Async
import Data.Text (Text)
import Data.UUID
import FRP.Sodium
import Network
import System.Random
import Database.EventStore.Internal.Manager.Operation
import Database.EventStore.Internal.Manager.Subscription
import Database.EventStore.Internal.Packages
import Database.EventStore.Internal.Reader
import Database.EventStore.Internal.Types hiding (Event, newEvent)
import Database.EventStore.Internal.Util.Sodium
import Database.EventStore.Internal.Writer
data Processor
= Processor
{ processorConnect :: HostName -> Int -> IO ()
, processorShutdown :: IO ()
, processorNewOperation :: OperationParams -> IO ()
, processorNewSubcription :: (Subscription -> IO ())
-> Text
-> Bool
-> IO ()
}
newProcessor :: Settings -> IO Processor
newProcessor sett = sync $ network sett
data ProcessorException
= MaxAttempt HostName Int Int
deriving (Show, Typeable)
instance Exception ProcessorException
data State
= Offline
{ _maxAttempt :: !Int }
| Online
{ _uuidCon :: !UUID
, _maxAttempt :: !Int
, _packageCount :: !Int
, _host :: !HostName
, _port :: !Int
, _cleanup :: !(IO ())
}
initState :: Int -> State
initState max_at = Offline max_at
data Connect = Connect HostName Int
data Connected = Connected HostName Int UUID (IO ())
data Cleanup = Cleanup
data Reconnect = Reconnect
data Reconnected = Reconnected UUID (IO ())
heartbeatRequestCmd :: Word8
heartbeatRequestCmd = 0x01
network :: Settings -> Reactive Processor
network sett = do
(onConnect, pushConnect) <- newEvent
(onConnected, pushConnected) <- newEvent
(onCleanup, pushCleanup) <- newEvent
(onReconnect, pushReconnect) <- newEvent
(onReconnected, pushReconnected) <- newEvent
(onReceived, pushReceived) <- newEvent
(onSend, pushSend) <- newEvent
push_new_op <- operationNetwork sett
pushSend
(pushReconnect Reconnect)
onReceived
push_sub <- subscriptionNetwork sett pushSend onReceived
let stateE = fmap connected onConnected <>
fmap reconnected onReconnected <>
fmap received onReceived
stateB <- accum (initState $ s_maxRetries sett) stateE
let heartbeatP pkg = packageCmd pkg == heartbeatRequestCmd
onlyHeartbeats = filterE heartbeatP onReceived
con_snap = snapshot connectSnapshot onConnect stateB
reco_snap = snapshot reconnectSnapshot onReconnect stateB
clean_snap = snapshot cleanupSnapshot onCleanup stateB
full_reco c = do
pushCleanup Cleanup
pushReconnect c
push_reco_io = pushAsync full_reco Reconnect
push_recv_io = \pkg -> sync $ pushReceived pkg
push_recod_io = pushAsync2 $ \u c -> pushReconnected $ Reconnected u c
push_send_io = pushAsync pushSend
push_con_io = pushAsync4 $ \h p u c ->
pushConnected $ Connected h p u c
_ <- listen con_snap $ \(ConnectionSnapshot max_a host port) ->
connection push_recv_io
(push_con_io host port)
push_reco_io
onSend
max_a
host
port
_ <- listen reco_snap $ \(ConnectionSnapshot max_a host port) ->
connection push_recv_io
push_recod_io
push_reco_io
onSend
max_a
host
port
_ <- listen clean_snap $ \(CleanupSnapshot finalizer) -> finalizer
_ <- listen onlyHeartbeats $ \pkg ->
push_send_io $ heartbeatResponsePackage (packageCorrelation pkg)
let processor =
Processor
{ processorConnect = \h p -> sync $ pushConnect $ Connect h p
, processorShutdown = sync $ pushCleanup Cleanup
, processorNewOperation = \o -> sync $ push_new_op o
, processorNewSubcription = push_sub
}
return processor
data ConnectionSnapshot
= ConnectionSnapshot
{ _conMax :: !Int
, _conHost :: !HostName
, _conPort :: !Int
}
connectSnapshot :: Connect -> State -> ConnectionSnapshot
connectSnapshot (Connect host port) s =
ConnectionSnapshot
{ _conMax = _maxAttempt s
, _conHost = host
, _conPort = port
}
reconnectDelay :: Int
reconnectDelay = 500000
connection :: (Package -> IO ())
-> (UUID -> IO () -> IO ())
-> IO ()
-> Event Package
-> Int
-> HostName
-> Int
-> IO ()
connection push_pkg push_con push_reco evt_pkg max_a host port = loop 1
where
loop att
| max_a == att =
throwIO $ MaxAttempt host port max_a
| otherwise =
catch (doConnect att) $ \(_ :: SomeException) -> do
threadDelay reconnectDelay
loop (att + 1)
doConnect att = do
printf "Connecting...Attempt %d\n" att
hdl <- connectTo host (PortNumber $ fromIntegral port)
hSetBuffering hdl NoBuffering
uuid <- randomIO
chan <- newTChanIO
as_rl <- async $ sync $ listen evt_pkg (atomically . writeTChan chan)
rid <- forkFinally (readerThread push_pkg hdl) (recovering push_reco)
wid <- forkFinally (writerThread chan hdl) (recovering push_reco)
push_con uuid $ do
throwTo rid Stopped
throwTo wid Stopped
hClose hdl
rel_w <- wait as_rl
rel_w
printf "Disconnected %s\n" (toString uuid)
recovering :: IO () -> Either SomeException () -> IO ()
recovering recover (Left some_ex) = do
case fromException some_ex of
Just e ->
case e of
ConnectionClosedByServer
-> recover
Stopped
-> return ()
_ -> recover
recovering _ _ = return ()
reconnectSnapshot :: Reconnect -> State -> ConnectionSnapshot
reconnectSnapshot _ s =
ConnectionSnapshot
{ _conMax = _maxAttempt s
, _conHost = _host s
, _conPort = _port s
}
newtype CleanupSnapshot = CleanupSnapshot (IO ())
cleanupSnapshot :: Cleanup -> State -> CleanupSnapshot
cleanupSnapshot _ s =
case s of
Offline {}
-> CleanupSnapshot (return ())
Online {}
-> CleanupSnapshot $ _cleanup s
connected :: Connected -> State -> State
connected (Connected host port uuid cl) s =
case s of
Offline {}
-> Online
{ _uuidCon = uuid
, _maxAttempt = _maxAttempt s
, _packageCount = 0
, _host = host
, _port = port
, _cleanup = cl
}
_ -> s
reconnected :: Reconnected -> State -> State
reconnected (Reconnected uuid cl) s =
case s of
Online {}
-> s { _uuidCon = uuid
, _cleanup = cl
}
_ -> s
received :: Package -> State -> State
received _ s =
case s of
Online {}
-> let cnt = _packageCount s in s { _packageCount = cnt + 1 }
_ -> s