module Database.EventStore.Internal.Processor
( ConnectionException(..)
, InternalException(..)
, Processor(..)
, DropReason(..)
, NewSubscriptionCB
, Subscription
, subAwait
, subId
, subStream
, subResolveLinkTos
, subLastCommitPos
, subLastEventNumber
, subUnsubscribe
, newProcessor
) where
import Control.Concurrent
import Control.Exception
import Data.Functor (void)
import Data.Monoid ((<>))
import Data.Word
import Text.Printf
import Data.UUID
import FRP.Sodium
import Network
import Database.EventStore.Internal.Connection
import Database.EventStore.Internal.Manager.Operation
import Database.EventStore.Internal.Manager.Subscription
import Database.EventStore.Internal.Packages
import Database.EventStore.Internal.Reader
import Database.EventStore.Internal.Types hiding (Event, newEvent)
import Database.EventStore.Internal.Util.Sodium
import Database.EventStore.Internal.Writer
data Processor
= Processor
{ processorConnect :: HostName -> Int -> IO ()
, processorShutdown :: IO ()
, processorNewOperation :: OperationParams -> IO ()
, processorNewSubcription :: NewSubscriptionCB
}
newProcessor :: Settings -> IO Processor
newProcessor sett = sync . network sett =<< newChan
data State
= Offline
| Online
{ _uuidCon :: !UUID
, _packageCount :: !Int
, _host :: !HostName
, _port :: !Int
, _cleanup :: !(IO ())
}
data Connect = Connect HostName Int
data Connected = Connected HostName Int UUID (IO ())
data Cleanup = Cleanup
data Reconnect = Reconnect
data Reconnected = Reconnected UUID (IO ())
heartbeatRequestCmd :: Word8
heartbeatRequestCmd = 0x01
network :: Settings -> Chan Package -> Reactive Processor
network sett chan = do
(onConnect, pushConnect) <- newEvent
(onConnected, pushConnected) <- newEvent
(onCleanup, pushCleanup) <- newEvent
(onReconnect, pushReconnect) <- newEvent
(onReconnected, pushReconnected) <- newEvent
(onReceived, pushReceived) <- newEvent
(onSend, pushSend) <- newEvent
push_new_op <- operationNetwork sett
pushSend
(pushReconnect Reconnect)
onReceived
push_sub <- subscriptionNetwork sett pushSend onReceived
let stateE = fmap connected onConnected <>
fmap reconnected onReconnected <>
fmap received onReceived
stateB <- accum Offline stateE
let heartbeatP pkg = packageCmd pkg == heartbeatRequestCmd
onlyHeartbeats = filterE heartbeatP onReceived
con_snap = fmap connectSnapshot onConnect
reco_snap = snapshot reconnectSnapshot onReconnect stateB
clean_snap = snapshot cleanupSnapshot onCleanup stateB
full_reco c = do
pushCleanup Cleanup
pushReconnect c
push_reco_io = pushAsync full_reco Reconnect
push_recv_io = \pkg -> sync $ pushReceived pkg
push_recod_io = pushAsync2 $ \u c -> pushReconnected $ Reconnected u c
push_send_io = pushAsync pushSend
push_con_io = pushAsync4 $ \h p u c ->
pushConnected $ Connected h p u c
_ <- listen con_snap $ \(ConnectionSnapshot host port) ->
connection sett
chan
push_recv_io
(push_con_io host port)
push_reco_io
host
port
_ <- listen reco_snap $ \(ConnectionSnapshot host port) ->
connection sett
chan
push_recv_io
push_recod_io
push_reco_io
host
port
_ <- listen clean_snap $ \(CleanupSnapshot finalizer) -> finalizer
_ <- listen onlyHeartbeats $ \pkg ->
push_send_io $ heartbeatResponsePackage (packageCorrelation pkg)
let processor =
Processor
{ processorConnect = \h p -> void $ forkIO $
sync $ pushConnect $ Connect h p
, processorShutdown = void $ forkIO $ sync $
pushCleanup Cleanup
, processorNewOperation = \o -> void $ forkIO $
sync $ push_new_op o
, processorNewSubcription = push_sub
}
_ <- listen onSend (writeChan chan)
return processor
data ConnectionSnapshot
= ConnectionSnapshot
{ _conHost :: !HostName
, _conPort :: !Int
}
connectSnapshot :: Connect -> ConnectionSnapshot
connectSnapshot (Connect host port) =
ConnectionSnapshot
{ _conHost = host
, _conPort = port
}
connection :: Settings
-> Chan Package
-> (Package -> IO ())
-> (UUID -> IO () -> IO ())
-> IO ()
-> HostName
-> Int
-> IO ()
connection sett chan push_pkg push_con push_reco host port = do
conn <- newConnection sett host port
rid <- forkFinally (readerThread push_pkg conn) (recovering push_reco)
wid <- forkFinally (writerThread chan conn) (recovering push_reco)
push_con (connUUID conn) $ do
throwTo rid Stopped
throwTo wid Stopped
connClose conn
printf "Disconnected %s\n" (toString $ connUUID conn)
recovering :: IO () -> Either SomeException () -> IO ()
recovering recover (Left some_ex) = do
case fromException some_ex of
Just e ->
case e of
ConnectionClosedByServer
-> recover
Stopped
-> return ()
_ -> recover
recovering _ _ = return ()
reconnectSnapshot :: Reconnect -> State -> ConnectionSnapshot
reconnectSnapshot _ s =
ConnectionSnapshot
{ _conHost = _host s
, _conPort = _port s
}
newtype CleanupSnapshot = CleanupSnapshot (IO ())
cleanupSnapshot :: Cleanup -> State -> CleanupSnapshot
cleanupSnapshot _ s =
case s of
Offline {}
-> CleanupSnapshot (return ())
Online {}
-> CleanupSnapshot $ _cleanup s
connected :: Connected -> State -> State
connected (Connected host port uuid cl) s =
case s of
Offline
-> Online
{ _uuidCon = uuid
, _packageCount = 0
, _host = host
, _port = port
, _cleanup = cl
}
_ -> s
reconnected :: Reconnected -> State -> State
reconnected (Reconnected uuid cl) s =
case s of
Online {}
-> s { _uuidCon = uuid
, _cleanup = cl
}
_ -> s
received :: Package -> State -> State
received _ s =
case s of
Online {}
-> let cnt = _packageCount s in s { _packageCount = cnt + 1 }
_ -> s