module Database.EventStore.Internal.Execution.Production
( Production
, newExecutionModel
, pushOperation
, shutdownExecutionModel
, pushConnectStream
, pushConnectPersist
, pushCreatePersist
, pushUpdatePersist
, pushDeletePersist
, pushAckPersist
, pushNakPersist
, pushUnsubscribe
, prodWaitTillClosed
) where
import Control.Exception (AsyncException(..), asyncExceptionFromException)
import Control.Monad.Fix
import Data.Int
import ClassyPrelude
import Data.UUID
import Database.EventStore.Internal.Connection
import Database.EventStore.Internal.Discovery
import Database.EventStore.Internal.Generator
import Database.EventStore.Internal.Manager.Subscription hiding
( submitPackage
, unsubscribe
, ackPersist
, nakPersist
, abort
)
import Database.EventStore.Internal.Operation
import Database.EventStore.Internal.Processor
import Database.EventStore.Internal.Types
import Database.EventStore.Logging
data Worker
= Reader ThreadId
| Runner ThreadId
| Writer ThreadId
deriving Show
data Slot a = Slot !a | End
newtype CycleQueue a = CycleQueue (TQueue (Slot a))
newCycleQueue :: IO (CycleQueue a)
newCycleQueue = fmap CycleQueue newTQueueIO
readCycleQueue :: CycleQueue a -> STM a
readCycleQueue (CycleQueue q) = do
Slot a <- readTQueue q
return a
writeCycleQueue :: CycleQueue a -> a -> STM ()
writeCycleQueue (CycleQueue q) a = writeTQueue q (Slot a)
emptyCycleQueue :: CycleQueue a -> STM ()
emptyCycleQueue (CycleQueue q) = writeTQueue q End >> go
where
go = do
s <- readTQueue q
case s of
End -> return ()
_ -> go
updateCycleQueue :: CycleQueue a -> (a -> STM (Maybe a)) -> STM ()
updateCycleQueue (CycleQueue q) k = writeTQueue q End >> go
where
go = do
s <- readTQueue q
case s of
End -> return ()
Slot a -> do
r <- k a
case r of
Nothing -> go
Just a' -> writeTQueue q (Slot a') >> go
isEmptyCycleQueue :: CycleQueue a -> STM Bool
isEmptyCycleQueue (CycleQueue q) = isEmptyTQueue q
wkUpdState :: Worker -> State -> State
wkUpdState (Reader tid) s = s { _reader = Just tid }
wkUpdState (Runner tid) s = s { _runner = Just tid }
wkUpdState (Writer tid) s = s { _writer = Just tid }
data Production =
Prod
{ _submit :: TVar (Msg -> IO ())
, _waitClosed :: STM ()
}
data Env =
Env
{ _setts :: Settings
, _queue :: CycleQueue Msg
, _pkgQueue :: CycleQueue Package
, _jobQueue :: CycleQueue Job
, _state :: TVar State
, _nextSubmit :: TVar (Msg -> IO ())
, _connRef :: IORef InternalConnection
, _disposed :: TMVar ()
}
data Msg
= Stopped Worker SomeException
| Arrived Package
| Shutdown
| forall a.
NewOperation (Either OperationError a -> IO ()) (Operation a)
| ConnectStream (SubConnectEvent -> IO ()) Text Bool
| ConnectPersist (SubConnectEvent -> IO ()) Text Text Int32
| Unsubscribe Running
| CreatePersist (Either PersistActionException ConfirmedAction -> IO ())
Text Text PersistentSubscriptionSettings
| UpdatePersist (Either PersistActionException ConfirmedAction -> IO ())
Text Text PersistentSubscriptionSettings
| DeletePersist (Either PersistActionException ConfirmedAction -> IO ())
Text Text
| AckPersist Running [UUID]
| NakPersist Running NakAction (Maybe Text) [UUID]
pushCmd :: Production -> Msg -> IO ()
pushCmd (Prod _sender _) msg = do
push <- readTVarIO _sender
push msg
shutdownExecutionModel :: Production -> IO ()
shutdownExecutionModel prod = pushCmd prod Shutdown
pushOperation :: Production
-> (Either OperationError a -> IO ())
-> Operation a
-> IO ()
pushOperation prod k op = pushCmd prod (NewOperation k op)
pushConnectStream :: Production
-> (SubConnectEvent -> IO ())
-> Text
-> Bool
-> IO ()
pushConnectStream prod k n tos = pushCmd prod (ConnectStream k n tos)
pushConnectPersist :: Production
-> (SubConnectEvent -> IO ())
-> Text
-> Text
-> Int32
-> IO ()
pushConnectPersist prod k g n buf = pushCmd prod (ConnectPersist k g n buf)
pushCreatePersist :: Production
-> (Either PersistActionException ConfirmedAction -> IO ())
-> Text
-> Text
-> PersistentSubscriptionSettings
-> IO ()
pushCreatePersist prod k g n setts = pushCmd prod (CreatePersist k g n setts)
pushUpdatePersist :: Production
-> (Either PersistActionException ConfirmedAction -> IO ())
-> Text
-> Text
-> PersistentSubscriptionSettings
-> IO ()
pushUpdatePersist prod k g n setts = pushCmd prod (UpdatePersist k g n setts)
pushDeletePersist :: Production
-> (Either PersistActionException ConfirmedAction -> IO ())
-> Text
-> Text
-> IO ()
pushDeletePersist prod k g n = pushCmd prod (DeletePersist k g n)
pushAckPersist :: Production -> Running -> [UUID] -> IO ()
pushAckPersist prod run evts = pushCmd prod (AckPersist run evts)
pushNakPersist :: Production
-> Running
-> NakAction
-> Maybe Text
-> [UUID]
-> IO ()
pushNakPersist prod run act res evts =
pushCmd prod (NakPersist run act res evts)
pushUnsubscribe :: Production -> Running -> IO ()
pushUnsubscribe prod r = pushCmd prod (Unsubscribe r)
prodWaitTillClosed :: Production -> IO ()
prodWaitTillClosed (Prod _ disposed) = atomically disposed
newtype Job = Job (IO ())
data State =
State
{ _proc :: !(Processor (IO ()))
, _reader :: !(Maybe ThreadId)
, _runner :: !(Maybe ThreadId)
, _writer :: !(Maybe ThreadId)
}
emptyState :: Settings -> Generator -> State
emptyState setts gen = State (newProcessor setts gen) Nothing Nothing Nothing
updateProc :: Processor (IO ()) -> State -> State
updateProc p s = s { _proc = p }
reader :: Settings -> CycleQueue Msg -> InternalConnection -> IO ()
reader sett queue c = forever $ do
pkg <- connRecv c
let cmd = packageCmd pkg
uuid = packageCorrelation pkg
atomically $ writeCycleQueue queue (Arrived pkg)
_settingsLog sett $ Info $ PackageReceived cmd uuid
writer :: Settings -> CycleQueue Package -> InternalConnection -> IO ()
writer setts pkg_queue conn = forever $ do
pkg <- atomically $ readCycleQueue pkg_queue
connSend conn pkg
let cmd = packageCmd pkg
uuid = packageCorrelation pkg
_settingsLog setts $ Info $ PackageSent cmd uuid
runner :: CycleQueue Job -> IO ()
runner job_queue = forever $ do
Job j <- atomically $ readCycleQueue job_queue
j
spawn :: Env -> (ThreadId -> Worker) -> IO Worker
spawn Env{..} mk = do
conn <- readIORef _connRef
tid <- mfix $ \tid ->
let worker = mk tid
action =
case worker of
Reader _ -> reader _setts _queue conn
Runner _ -> runner _jobQueue
Writer _ -> writer _setts _pkgQueue conn in
forkFinally action $ \r ->
case r of
Left e ->
case asyncExceptionFromException e of
Just ThreadKilled -> return ()
_ -> atomically $ writeCycleQueue _queue
$ Stopped worker e
_ -> return ()
return $ mk tid
runTransition :: Env -> Transition (IO ()) -> STM (Processor (IO ()))
runTransition Env{..} = go
where
go (Produce j nxt) = do
let job = Job j
writeCycleQueue _jobQueue job
go nxt
go (Transmit pkg nxt) = do
writeCycleQueue _pkgQueue pkg
go nxt
go (Await new_proc) = return new_proc
bootstrap :: Env -> IO ()
bootstrap env@Env{..} = do
rew <- spawn env Reader
ruw <- spawn env Runner
wrw <- spawn env Writer
let _F = wkUpdState rew .
wkUpdState ruw .
wkUpdState wrw
atomically $ modifyTVar' _state _F
cruising env
cruising :: Env -> IO ()
cruising env@Env{..} = do
msg <- atomically $ readCycleQueue _queue
s <- readTVarIO _state
case msg of
Stopped _ e -> throwIO e
Arrived pkg -> do
let sm = submitPackage pkg $ _proc s
atomically $ do
new_proc <- runTransition env sm
modifyTVar' _state $ updateProc new_proc
cruising env
Shutdown -> throwIO ClosedConnection
NewOperation k op -> do
let sm = newOperation k op $ _proc s
atomically $ do
new_proc <- runTransition env sm
modifyTVar' _state $ updateProc new_proc
cruising env
ConnectStream k n tos -> do
let sm = connectRegularStream k n tos $ _proc s
atomically $ do
new_proc <- runTransition env sm
modifyTVar' _state $ updateProc new_proc
cruising env
ConnectPersist k g n b -> do
let sm = connectPersistent k g n b $ _proc s
atomically $ do
new_proc <- runTransition env sm
modifyTVar' _state $ updateProc new_proc
cruising env
Unsubscribe r -> do
let sm = unsubscribe r $ _proc s
atomically $ do
new_proc <- runTransition env sm
modifyTVar' _state $ updateProc new_proc
cruising env
CreatePersist k g n psetts -> do
let sm = createPersistent k g n psetts $ _proc s
atomically $ do
new_proc <- runTransition env sm
modifyTVar' _state $ updateProc new_proc
cruising env
UpdatePersist k g n psetts -> do
let sm = updatePersistent k g n psetts $ _proc s
atomically $ do
new_proc <- runTransition env sm
modifyTVar' _state $ updateProc new_proc
cruising env
DeletePersist k g n -> do
let sm = deletePersistent k g n $ _proc s
atomically $ do
new_proc <- runTransition env sm
modifyTVar' _state $ updateProc new_proc
cruising env
AckPersist run evts -> do
let sm = ackPersist (return ()) run evts $ _proc s
atomically $ do
new_proc <- runTransition env sm
modifyTVar' _state $ updateProc new_proc
cruising env
NakPersist run act res evts -> do
let sm = nakPersist (return ()) run act res evts $ _proc s
atomically $ do
new_proc <- runTransition env sm
modifyTVar' _state $ updateProc new_proc
cruising env
closing :: Env -> IO ()
closing env@Env{..} = do
State _ retid rutid wutid <- readTVarIO _state
traverse_ killThread retid
traverse_ killThread wutid
atomically $ emptyCycleQueue _pkgQueue
atomically $ updateCycleQueue _queue $ \nxt ->
case nxt of
Arrived pkg -> do
s <- readTVar _state
let sm = submitPackage pkg $ _proc s
nxt_proc <- runTransition env sm
modifyTVar' _state $ updateProc nxt_proc
return Nothing
Shutdown -> return Nothing
AckPersist _ _ -> return Nothing
NakPersist _ _ _ _ -> return Nothing
Unsubscribe _ -> return Nothing
Stopped _ _ -> return Nothing
_ -> return $ Just nxt
conn <- readIORef _connRef
_ <- try $ connClose conn :: (IO (Either ConnectionException ()))
atomically $ do
s <- readTVar _state
_ <- runTransition env $ abort $ _proc s
return ()
atomically $ do
end <- isEmptyCycleQueue _jobQueue
unless end retrySTM
traverse_ killThread rutid
raiseException :: Exception e => e -> Msg -> IO ()
raiseException e _ = throwIO e
newExecutionModel :: Settings -> Discovery -> IO Production
newExecutionModel setts disc = do
gen <- newGenerator
queue <- newCycleQueue
pkg_queue <- newCycleQueue
job_queue <- newCycleQueue
conn <- newConnection setts disc
conn_ref <- newIORef conn
var <- newTVarIO $ emptyState setts gen
nxt_sub <- newTVarIO (atomically . writeCycleQueue queue)
disposed <- newEmptyTMVarIO
let env = Env setts queue pkg_queue job_queue var nxt_sub conn_ref disposed
handler res = do
closing env
case res of
Left e -> do
_settingsLog setts (Error $ UnexpectedException e)
case fromException e of
Just (_ :: ConnectionException) -> atomically $ do
writeTVar nxt_sub (raiseException e)
putTMVar disposed ()
_ -> do new_conn <- newConnection setts disc
writeIORef conn_ref new_conn
_ <- forkFinally (bootstrap env) handler
return ()
_ -> atomically $ putTMVar disposed ()
_ <- forkFinally (bootstrap env) handler
return $ Prod nxt_sub $ do
closed <- connIsClosed conn
unless closed retrySTM
readTMVar disposed