module Database.EventStore.Internal.Processor
( ConnectionException(..)
, InternalException(..)
, Cmd(..)
, newProcessor
) where
import Control.Concurrent
import Control.Exception
import Data.Functor (void)
import Data.Int
import Data.Monoid ((<>))
import Data.Word
import Data.Text (Text)
import Data.UUID
import FRP.Sodium
import Network
import Database.EventStore.Internal.Connection
import Database.EventStore.Internal.Manager.Operation
import Database.EventStore.Internal.Manager.Subscription
import Database.EventStore.Internal.Packages
import Database.EventStore.Internal.Reader
import Database.EventStore.Internal.Types hiding (Event, newEvent)
import Database.EventStore.Internal.Util.Sodium
import Database.EventStore.Internal.Writer
import Database.EventStore.Logging (Log(..), InfoMessage (Disconnected))
type Result a = a -> IO ()
type EResult a = Result (Either OperationException a)
data Cmd
= DoConnect HostName Int
| DoShutdown
| NewOperation OperationParams
| NewSub Text Bool (Result (Subscription Regular))
| CreatePersist Text Text PersistentSubscriptionSettings (EResult ())
| UpdatePersist Text Text PersistentSubscriptionSettings (EResult ())
| DeletePersist Text Text (EResult ())
| ConnectPersist Text Text Int32 (Result (Subscription Persistent))
type Processor = Cmd -> IO ()
newProcessor :: Settings -> IO Processor
newProcessor sett = sync . network sett =<< newChan
data State
= Offline
| Online
{ _uuidCon :: !UUID
, _packageCount :: !Int
, _host :: !HostName
, _port :: !Int
, _cleanup :: !(IO ())
}
data Connect = Connect HostName Int
data Connected = Connected HostName Int UUID (IO ())
data Cleanup = Cleanup
data Reconnect = Reconnect
data Reconnected = Reconnected UUID (IO ())
heartbeatRequestCmd :: Word8
heartbeatRequestCmd = 0x01
network :: Settings -> Chan Package -> Reactive Processor
network sett chan = do
(onConnect, pushConnect) <- newEvent
(onConnected, pushConnected) <- newEvent
(onCleanup, pushCleanup) <- newEvent
(onReconnect, pushReconnect) <- newEvent
(onReconnected, pushReconnected) <- newEvent
(onReceived, pushReceived) <- newEvent
(onSend, pushSend) <- newEvent
push_new_op <- operationNetwork sett
pushSend
(pushReconnect Reconnect)
onReceived
runSubCmd <- subscriptionNetwork sett pushSend onReceived
let stateE = fmap connected onConnected <>
fmap reconnected onReconnected <>
fmap received onReceived
stateB <- accum Offline stateE
let heartbeatP pkg = packageCmd pkg == heartbeatRequestCmd
onlyHeartbeats = filterE heartbeatP onReceived
con_snap = fmap connectSnapshot onConnect
reco_snap = snapshot reconnectSnapshot onReconnect stateB
clean_snap = snapshot cleanupSnapshot onCleanup stateB
full_reco c = do
pushCleanup Cleanup
pushReconnect c
push_reco_io = pushAsync full_reco Reconnect
push_recv_io = \pkg -> sync $ pushReceived pkg
push_recod_io = pushAsync2 $ \u c -> pushReconnected $ Reconnected u c
push_send_io = pushAsync pushSend
push_con_io = pushAsync4 $ \h p u c ->
pushConnected $ Connected h p u c
_ <- listen con_snap $ \(ConnectionSnapshot host port) ->
connection sett
chan
push_recv_io
(push_con_io host port)
push_reco_io
host
port
_ <- listen reco_snap $ \(ConnectionSnapshot host port) ->
connection sett
chan
push_recv_io
push_recod_io
push_reco_io
host
port
_ <- listen clean_snap $ \(CleanupSnapshot finalizer) -> finalizer
_ <- listen onlyHeartbeats $ \pkg ->
push_send_io $ heartbeatResponsePackage (packageCorrelation pkg)
let runCmd (DoConnect h p) =
void $ forkIO $ sync $ pushConnect $ Connect h p
runCmd DoShutdown =
void $ forkIO $ sync $ pushCleanup Cleanup
runCmd (NewOperation o) =
void $ forkIO $ sync $ push_new_op o
runCmd (NewSub stream tos cb) =
runSubCmd (SubscribeTo (RegularSub stream tos) cb)
runCmd (CreatePersist g s stgs cb) =
runSubCmd (SubmitPersistAction g s (PersistCreate stgs) cb)
runCmd (UpdatePersist g s stgs cb) =
runSubCmd (SubmitPersistAction g s (PersistUpdate stgs) cb)
runCmd (DeletePersist g s cb) =
runSubCmd (SubmitPersistAction g s PersistDelete cb)
runCmd (ConnectPersist g s b cb) =
runSubCmd (SubscribeTo (PersistentSub g s b) cb)
_ <- listen onSend (writeChan chan)
return runCmd
data ConnectionSnapshot
= ConnectionSnapshot
{ _conHost :: !HostName
, _conPort :: !Int
}
connectSnapshot :: Connect -> ConnectionSnapshot
connectSnapshot (Connect host port) =
ConnectionSnapshot
{ _conHost = host
, _conPort = port
}
connection :: Settings
-> Chan Package
-> (Package -> IO ())
-> (UUID -> IO () -> IO ())
-> IO ()
-> HostName
-> Int
-> IO ()
connection sett chan push_pkg push_con push_reco host port = do
conn <- newConnection sett host port
rid <- forkFinally (readerThread sett push_pkg conn) (recovering push_reco)
wid <- forkFinally (writerThread chan conn) (recovering push_reco)
push_con (connUUID conn) $ do
throwTo rid Stopped
throwTo wid Stopped
connClose conn
_settingsLog sett (Info $ Disconnected $ connUUID conn)
recovering :: IO () -> Either SomeException () -> IO ()
recovering recover (Left some_ex) = do
case fromException some_ex of
Just e ->
case e of
ConnectionClosedByServer
-> recover
Stopped
-> return ()
_ -> recover
recovering _ _ = return ()
reconnectSnapshot :: Reconnect -> State -> ConnectionSnapshot
reconnectSnapshot _ s =
ConnectionSnapshot
{ _conHost = _host s
, _conPort = _port s
}
newtype CleanupSnapshot = CleanupSnapshot (IO ())
cleanupSnapshot :: Cleanup -> State -> CleanupSnapshot
cleanupSnapshot _ s =
case s of
Offline {}
-> CleanupSnapshot (return ())
Online {}
-> CleanupSnapshot $ _cleanup s
connected :: Connected -> State -> State
connected (Connected host port uuid cl) s =
case s of
Offline
-> Online
{ _uuidCon = uuid
, _packageCount = 0
, _host = host
, _port = port
, _cleanup = cl
}
_ -> s
reconnected :: Reconnected -> State -> State
reconnected (Reconnected uuid cl) s =
case s of
Online {}
-> s { _uuidCon = uuid
, _cleanup = cl
}
_ -> s
received :: Package -> State -> State
received _ s =
case s of
Online {}
-> let cnt = _packageCount s in s { _packageCount = cnt + 1 }
_ -> s