module Database.EventStore.Internal.Execution.Production
( Production
, newExecutionModel
, pushOperation
, shutdownExecutionModel
, pushConnectStream
, pushConnectPersist
, pushCreatePersist
, pushUpdatePersist
, pushDeletePersist
, pushAckPersist
, pushNakPersist
, pushUnsubscribe
, prodWaitTillClosed
) where
import Prelude hiding (take)
import Control.Concurrent
import Control.Concurrent.STM
import Control.Exception
import Control.Monad
import Control.Monad.Fix
import Data.IORef
import Data.Int
import Data.Foldable
import Data.Text
import Data.UUID
import Database.EventStore.Internal.Connection
import Database.EventStore.Internal.Discovery
import Database.EventStore.Internal.Generator
import Database.EventStore.Internal.Manager.Subscription hiding
( submitPackage
, unsubscribe
, ackPersist
, nakPersist
, abort
)
import Database.EventStore.Internal.Operation hiding (retry)
import Database.EventStore.Internal.Processor
import Database.EventStore.Internal.Types
import Database.EventStore.Logging
data Worker
= Reader ThreadId
| Runner ThreadId
| Writer ThreadId
deriving Show
data Slot a = Slot !a | End
newtype CycleQueue a = CycleQueue (TQueue (Slot a))
newCycleQueue :: IO (CycleQueue a)
newCycleQueue = fmap CycleQueue newTQueueIO
readCycleQueue :: CycleQueue a -> STM a
readCycleQueue (CycleQueue q) = do
Slot a <- readTQueue q
return a
writeCycleQueue :: CycleQueue a -> a -> STM ()
writeCycleQueue (CycleQueue q) a = writeTQueue q (Slot a)
emptyCycleQueue :: CycleQueue a -> STM ()
emptyCycleQueue (CycleQueue q) = writeTQueue q End >> go
where
go = do
s <- readTQueue q
case s of
End -> return ()
_ -> go
updateCycleQueue :: CycleQueue a -> (a -> STM (Maybe a)) -> STM ()
updateCycleQueue (CycleQueue q) k = writeTQueue q End >> go
where
go = do
s <- readTQueue q
case s of
End -> return ()
Slot a -> do
r <- k a
case r of
Nothing -> go
Just a' -> writeTQueue q (Slot a') >> go
isEmptyCycleQueue :: CycleQueue a -> STM Bool
isEmptyCycleQueue (CycleQueue q) = isEmptyTQueue q
wkUpdState :: Worker -> State -> State
wkUpdState (Reader tid) s = s { _reader = Just tid }
wkUpdState (Runner tid) s = s { _runner = Just tid }
wkUpdState (Writer tid) s = s { _writer = Just tid }
data Production =
Prod
{ _submit :: TVar (Msg -> IO ())
, _waitClosed :: STM ()
}
data Env =
Env
{ _setts :: Settings
, _queue :: CycleQueue Msg
, _pkgQueue :: CycleQueue Package
, _jobQueue :: CycleQueue Job
, _state :: TVar State
, _nextSubmit :: TVar (Msg -> IO ())
, _connRef :: IORef InternalConnection
, _disposed :: TMVar ()
}
data Msg
= Stopped Worker SomeException
| Arrived Package
| Shutdown
| forall a.
NewOperation (Either OperationError a -> IO ()) (Operation a)
| ConnectStream (SubConnectEvent -> IO ()) Text Bool
| ConnectPersist (SubConnectEvent -> IO ()) Text Text Int32
| Unsubscribe Running
| CreatePersist (Either PersistActionException ConfirmedAction -> IO ())
Text Text PersistentSubscriptionSettings
| UpdatePersist (Either PersistActionException ConfirmedAction -> IO ())
Text Text PersistentSubscriptionSettings
| DeletePersist (Either PersistActionException ConfirmedAction -> IO ())
Text Text
| AckPersist Running [UUID]
| NakPersist Running NakAction (Maybe Text) [UUID]
pushCmd :: Production -> Msg -> IO ()
pushCmd (Prod _sender _) msg = do
push <- readTVarIO _sender
push msg
shutdownExecutionModel :: Production -> IO ()
shutdownExecutionModel prod = pushCmd prod Shutdown
pushOperation :: Production
-> (Either OperationError a -> IO ())
-> Operation a
-> IO ()
pushOperation prod k op = pushCmd prod (NewOperation k op)
pushConnectStream :: Production
-> (SubConnectEvent -> IO ())
-> Text
-> Bool
-> IO ()
pushConnectStream prod k n tos = pushCmd prod (ConnectStream k n tos)
pushConnectPersist :: Production
-> (SubConnectEvent -> IO ())
-> Text
-> Text
-> Int32
-> IO ()
pushConnectPersist prod k g n buf = pushCmd prod (ConnectPersist k g n buf)
pushCreatePersist :: Production
-> (Either PersistActionException ConfirmedAction -> IO ())
-> Text
-> Text
-> PersistentSubscriptionSettings
-> IO ()
pushCreatePersist prod k g n setts = pushCmd prod (CreatePersist k g n setts)
pushUpdatePersist :: Production
-> (Either PersistActionException ConfirmedAction -> IO ())
-> Text
-> Text
-> PersistentSubscriptionSettings
-> IO ()
pushUpdatePersist prod k g n setts = pushCmd prod (UpdatePersist k g n setts)
pushDeletePersist :: Production
-> (Either PersistActionException ConfirmedAction -> IO ())
-> Text
-> Text
-> IO ()
pushDeletePersist prod k g n = pushCmd prod (DeletePersist k g n)
pushAckPersist :: Production -> Running -> [UUID] -> IO ()
pushAckPersist prod run evts = pushCmd prod (AckPersist run evts)
pushNakPersist :: Production
-> Running
-> NakAction
-> Maybe Text
-> [UUID]
-> IO ()
pushNakPersist prod run act res evts =
pushCmd prod (NakPersist run act res evts)
pushUnsubscribe :: Production -> Running -> IO ()
pushUnsubscribe prod r = pushCmd prod (Unsubscribe r)
prodWaitTillClosed :: Production -> IO ()
prodWaitTillClosed (Prod _ disposed) = atomically disposed
newtype Job = Job (IO ())
data State =
State
{ _proc :: !(Processor (IO ()))
, _reader :: !(Maybe ThreadId)
, _runner :: !(Maybe ThreadId)
, _writer :: !(Maybe ThreadId)
}
emptyState :: Settings -> Generator -> State
emptyState setts gen = State (newProcessor setts gen) Nothing Nothing Nothing
updateProc :: Processor (IO ()) -> State -> State
updateProc p s = s { _proc = p }
reader :: Settings -> CycleQueue Msg -> InternalConnection -> IO ()
reader sett queue c = forever $ do
pkg <- connRecv c
let cmd = packageCmd pkg
uuid = packageCorrelation pkg
atomically $ writeCycleQueue queue (Arrived pkg)
_settingsLog sett $ Info $ PackageReceived cmd uuid
writer :: Settings -> CycleQueue Package -> InternalConnection -> IO ()
writer setts pkg_queue conn = forever $ do
pkg <- atomically $ readCycleQueue pkg_queue
connSend conn pkg
let cmd = packageCmd pkg
uuid = packageCorrelation pkg
_settingsLog setts $ Info $ PackageSent cmd uuid
runner :: CycleQueue Job -> IO ()
runner job_queue = forever $ do
Job j <- atomically $ readCycleQueue job_queue
j
spawn :: Env -> (ThreadId -> Worker) -> IO Worker
spawn Env{..} mk = do
conn <- readIORef _connRef
tid <- mfix $ \tid ->
let worker = mk tid
action =
case worker of
Reader _ -> reader _setts _queue conn
Runner _ -> runner _jobQueue
Writer _ -> writer _setts _pkgQueue conn in
forkFinally action $ \r ->
case r of
Left e ->
case asyncExceptionFromException e of
Just ThreadKilled -> return ()
_ -> atomically $ writeCycleQueue _queue
$ Stopped worker e
_ -> return ()
return $ mk tid
runTransition :: Env -> Transition (IO ()) -> STM (Processor (IO ()))
runTransition Env{..} = go
where
go (Produce j nxt) = do
let job = Job j
writeCycleQueue _jobQueue job
go nxt
go (Transmit pkg nxt) = do
writeCycleQueue _pkgQueue pkg
go nxt
go (Await new_proc) = return new_proc
bootstrap :: Env -> IO ()
bootstrap env@Env{..} = do
rew <- spawn env Reader
ruw <- spawn env Runner
wrw <- spawn env Writer
let _F = wkUpdState rew .
wkUpdState ruw .
wkUpdState wrw
atomically $ modifyTVar' _state _F
cruising env
cruising :: Env -> IO ()
cruising env@Env{..} = do
msg <- atomically $ readCycleQueue _queue
s <- readTVarIO _state
case msg of
Stopped _ e -> throwIO e
Arrived pkg -> do
let sm = submitPackage pkg $ _proc s
atomically $ do
new_proc <- runTransition env sm
modifyTVar' _state $ updateProc new_proc
cruising env
Shutdown -> throwIO ClosedConnection
NewOperation k op -> do
let sm = newOperation k op $ _proc s
atomically $ do
new_proc <- runTransition env sm
modifyTVar' _state $ updateProc new_proc
cruising env
ConnectStream k n tos -> do
let sm = connectRegularStream k n tos $ _proc s
atomically $ do
new_proc <- runTransition env sm
modifyTVar' _state $ updateProc new_proc
cruising env
ConnectPersist k g n b -> do
let sm = connectPersistent k g n b $ _proc s
atomically $ do
new_proc <- runTransition env sm
modifyTVar' _state $ updateProc new_proc
cruising env
Unsubscribe r -> do
let sm = unsubscribe r $ _proc s
atomically $ do
new_proc <- runTransition env sm
modifyTVar' _state $ updateProc new_proc
cruising env
CreatePersist k g n psetts -> do
let sm = createPersistent k g n psetts $ _proc s
atomically $ do
new_proc <- runTransition env sm
modifyTVar' _state $ updateProc new_proc
cruising env
UpdatePersist k g n psetts -> do
let sm = updatePersistent k g n psetts $ _proc s
atomically $ do
new_proc <- runTransition env sm
modifyTVar' _state $ updateProc new_proc
cruising env
DeletePersist k g n -> do
let sm = deletePersistent k g n $ _proc s
atomically $ do
new_proc <- runTransition env sm
modifyTVar' _state $ updateProc new_proc
cruising env
AckPersist run evts -> do
let sm = ackPersist (return ()) run evts $ _proc s
atomically $ do
new_proc <- runTransition env sm
modifyTVar' _state $ updateProc new_proc
cruising env
NakPersist run act res evts -> do
let sm = nakPersist (return ()) run act res evts $ _proc s
atomically $ do
new_proc <- runTransition env sm
modifyTVar' _state $ updateProc new_proc
cruising env
closing :: Env -> IO ()
closing env@Env{..} = do
State _ retid rutid wutid <- readTVarIO _state
traverse_ killThread retid
traverse_ killThread wutid
atomically $ emptyCycleQueue _pkgQueue
atomically $ updateCycleQueue _queue $ \nxt ->
case nxt of
Arrived pkg -> do
s <- readTVar _state
let sm = submitPackage pkg $ _proc s
nxt_proc <- runTransition env sm
modifyTVar' _state $ updateProc nxt_proc
return Nothing
Shutdown -> return Nothing
AckPersist _ _ -> return Nothing
NakPersist _ _ _ _ -> return Nothing
Unsubscribe _ -> return Nothing
Stopped _ _ -> return Nothing
_ -> return $ Just nxt
conn <- readIORef _connRef
_ <- try $ connClose conn :: (IO (Either ConnectionException ()))
atomically $ do
s <- readTVar _state
_ <- runTransition env $ abort $ _proc s
return ()
atomically $ do
end <- isEmptyCycleQueue _jobQueue
unless end retry
traverse_ killThread rutid
raiseException :: Exception e => e -> Msg -> IO ()
raiseException e _ = throwIO e
newExecutionModel :: Settings -> Discovery -> IO Production
newExecutionModel setts disc = do
gen <- newGenerator
queue <- newCycleQueue
pkg_queue <- newCycleQueue
job_queue <- newCycleQueue
conn <- newConnection setts disc
conn_ref <- newIORef conn
var <- newTVarIO $ emptyState setts gen
nxt_sub <- newTVarIO (atomically . writeCycleQueue queue)
disposed <- newEmptyTMVarIO
let env = Env setts queue pkg_queue job_queue var nxt_sub conn_ref disposed
handler res = do
closing env
case res of
Left e -> do
_settingsLog setts (Error $ UnexpectedException e)
case fromException e of
Just (_ :: ConnectionException) -> atomically $ do
writeTVar nxt_sub (raiseException e)
putTMVar disposed ()
_ -> do new_conn <- newConnection setts disc
writeIORef conn_ref new_conn
_ <- forkFinally (bootstrap env) handler
return ()
_ -> atomically $ putTMVar disposed ()
_ <- forkFinally (bootstrap env) handler
return $ Prod nxt_sub $ do
closed <- connIsClosed conn
unless closed retry
readTMVar disposed