module Network.AWS.Flow
( register
, execute
, act
, decide
, flowEnv
, runFlowT
, runDecide
, nextEvent
, select
, maybeThrow
, Uid
, Queue
, Metadata
, Artifact
, Blob
, Task (..)
, Timer (..)
, Start (..)
, Spec (..)
, End (..)
, Plan (..)
) where
import Network.AWS.Flow.Env
import Network.AWS.Flow.Logger
import Network.AWS.Flow.S3
import Network.AWS.Flow.SWF
import Network.AWS.Flow.Types
import Network.AWS.Flow.Uid
import Network.AWS.Flow.Prelude hiding ( ByteString, Metadata, handle )
import Control.Monad.Catch
import qualified Data.HashMap.Strict as Map
import Formatting
import Network.AWS.SWF
import Network.HTTP.Types
import Safe
serviceError :: MonadFlow m => ErrorCode -> Error -> m ()
serviceError code = \case
e@(ServiceError s) ->
unless check $ throwM e where
check =
s ^. serviceStatus == badRequest400 &&
s ^. serviceAbbrev == "SWF" &&
s ^. serviceCode == code
e -> throwM e
register :: MonadFlow m => Plan -> m ()
register Plan{..} = do
logInfo' "event=register"
handle (serviceError "DomainAlreadyExists") registerDomainAction
handle (serviceError "TypeAlreadyExists") $ registerWorkflowTypeAction
(tskName $ strtTask plnStart)
(tskVersion $ strtTask plnStart)
(tskTimeout $ strtTask plnStart)
mapM_ go plnSpecs where
go Work{..} =
handle (serviceError "TypeAlreadyExists") $ registerActivityTypeAction
(tskName wrkTask)
(tskVersion wrkTask)
(tskTimeout wrkTask)
go Sleep{..} =
return ()
execute :: MonadFlow m => Task -> Metadata -> m ()
execute Task{..} input = do
uid <- newUid
logInfo' $ sformat ("event=execute uid=" % stext) uid
startWorkflowExecutionAction uid tskName tskVersion tskQueue input
serializeError :: MonadFlow m => Error -> m ()
serializeError = \case
e@(SerializeError s) ->
unless check $ throwM e where
check =
s ^. serializeStatus == ok200 &&
s ^. serializeAbbrev == "SWF" &&
s ^. serializeMessage == "key \"taskToken\" not present"
e -> throwM e
actException :: MonadFlow m => Token -> SomeException -> m ()
actException token e = do
logError' "event=act-exception-begin"
logError' $ show e
logError' "event=act-exception-finish"
respondActivityTaskFailedAction token
act :: MonadFlow m => Queue -> (Uid -> Metadata -> [Blob] -> m (Metadata, [Artifact])) -> m ()
act queue action =
handle serializeError $ do
logInfo' "event=act"
(token, uid, input) <- pollForActivityTaskAction queue
logInfo' $ sformat ("event=act-begin uid=" % stext) uid
keys <- listObjectsAction uid
unless (null keys) $ logInfo' $ sformat ("event=list-blobs uid=" % stext) uid
blobs <- forM keys $ getObjectAction uid
unless (null blobs) $ logInfo' $ sformat ("event=blobs uid=" % stext) uid
handle (actException token) $ do
(output, artifacts) <- action uid input blobs
logInfo' $ sformat ("event=act-finish uid=" % stext) uid
forM_ artifacts $ putObjectAction uid
unless (null artifacts) $ logInfo' $ sformat ("event=artifacts uid=" % stext) uid
respondActivityTaskCompletedAction token output
decide :: MonadFlow m => Plan -> m ()
decide plan@Plan{..} =
handle serializeError $ do
logInfo' "event=decide"
(token', events) <- pollForDecisionTaskAction (tskQueue $ strtTask plnStart)
token <- maybeThrow (userError "No Token") token'
logger <- asks feLogger
decisions <- runDecide logger plan events select
respondDecisionTaskCompletedAction token decisions
runDecide :: Log -> Plan -> [HistoryEvent] -> DecideT m a -> m a
runDecide logger plan events =
runDecideT env where
env = DecideEnv logger plan events findEvent where
findEvent =
flip Map.lookup $ Map.fromList $ flip map events $ \e ->
(e ^. heEventId, e)
nextEvent :: MonadDecide m => [EventType] -> m HistoryEvent
nextEvent ets = do
events <- asks deEvents
maybeThrow (userError "No Next Event") $ flip find events $ \e ->
e ^. heEventType `elem` ets
workNext :: MonadDecide m => Name -> m (Maybe Spec)
workNext name = do
specs <- asks (plnSpecs . dePlan)
return $ tailMay (dropWhile p specs) >>= headMay where
p Work{..} = tskName wrkTask /= name
p _ = True
sleepNext :: MonadDecide m => Name -> m (Maybe Spec)
sleepNext name = do
specs <- asks (plnSpecs . dePlan)
return $ tailMay (dropWhile p specs) >>= headMay where
p Sleep{..} = tmrName slpTimer /= name
p _ = True
select :: MonadDecide m => m [Decision]
select = do
event <- nextEvent [ WorkflowExecutionStarted
, ActivityTaskCompleted
, TimerFired
, StartChildWorkflowExecutionInitiated ]
case event ^. heEventType of
WorkflowExecutionStarted -> start event
ActivityTaskCompleted -> completed event
TimerFired -> timer event
StartChildWorkflowExecutionInitiated -> child
_ -> throwM (userError "Unknown Select Event")
start :: MonadDecide m => HistoryEvent -> m [Decision]
start event = do
logInfo' "event=start"
input <- maybeThrow (userError "No Start Information") $ do
attrs <- event ^. heWorkflowExecutionStartedEventAttributes
return $ attrs ^. weseaInput
specs <- asks (plnSpecs . dePlan)
schedule input $ headMay specs
completed :: MonadDecide m => HistoryEvent -> m [Decision]
completed event = do
logInfo' "event=completed"
findEvent <- asks deFindEvent
(input, name) <- maybeThrow (userError "No Completed Information") $ do
attrs <- event ^. heActivityTaskCompletedEventAttributes
event' <- findEvent $ attrs ^. atceaScheduledEventId
attrs' <- event' ^. heActivityTaskScheduledEventAttributes
return (attrs ^. atceaResult, attrs' ^. atseaActivityType ^. atName)
next <- workNext name
schedule input next
timer :: MonadDecide m => HistoryEvent -> m [Decision]
timer event = do
logInfo' "event=timer"
findEvent <- asks deFindEvent
name <- maybeThrow (userError "No Timer Information") $ do
attrs <- event ^. heTimerFiredEventAttributes
event' <- findEvent $ attrs ^. tfeaStartedEventId
attrs' <- event' ^. heTimerStartedEventAttributes
attrs' ^. tseaControl
event' <- nextEvent [WorkflowExecutionStarted, ActivityTaskCompleted]
case event' ^. heEventType of
WorkflowExecutionStarted -> timerStart event' name
ActivityTaskCompleted -> timerCompleted event' name
_ -> throwM (userError "Unknown Timer Event")
timerStart :: MonadDecide m => HistoryEvent -> Name -> m [Decision]
timerStart event name = do
logInfo' $ sformat ("event=timer-start name=" % stext) name
input <- maybeThrow (userError "No Timer Start Information") $ do
attrs <- event ^. heWorkflowExecutionStartedEventAttributes
return $ attrs ^. weseaInput
next <- sleepNext name
schedule input next
timerCompleted :: MonadDecide m => HistoryEvent -> Name -> m [Decision]
timerCompleted event name = do
logInfo' $ sformat ("event=timer-completed name=" % stext) name
input <- maybeThrow (userError "No Timer Completed Information") $ do
attrs <- event ^. heActivityTaskCompletedEventAttributes
return $ attrs ^. atceaResult
next <- sleepNext name
schedule input next
schedule :: MonadDecide m => Metadata -> Maybe Spec -> m [Decision]
schedule input = maybe (scheduleEnd input) (scheduleSpec input)
scheduleSpec :: MonadDecide m => Metadata -> Spec -> m [Decision]
scheduleSpec input spec = do
uid <- newUid
logInfo' $ sformat ("event=schedule-spec uid=" % stext) uid
case spec of
Work{..} ->
return [scheduleActivityTaskDecision uid
(tskName wrkTask)
(tskVersion wrkTask)
(tskQueue wrkTask)
input]
Sleep{..} ->
return [startTimerDecision uid
(tmrName slpTimer)
(tmrTimeout slpTimer)]
scheduleEnd :: MonadDecide m => Metadata -> m [Decision]
scheduleEnd input = do
logInfo' "event=schedule-end"
end <- asks (plnEnd . dePlan)
case end of
Stop -> return [completeWorkflowExecutionDecision input]
Continue -> scheduleContinue
scheduleContinue :: MonadDecide m => m [Decision]
scheduleContinue = do
logInfo' "event=schedule-continue"
event <- nextEvent [WorkflowExecutionStarted]
input <- maybeThrow (userError "No Continue Start Information") $ do
attrs <- event ^. heWorkflowExecutionStartedEventAttributes
return $ attrs ^. weseaInput
uid <- newUid
task <- asks (strtTask . plnStart . dePlan)
return [startChildWorkflowExecutionDecision uid
(tskName task)
(tskVersion task)
(tskQueue task)
input]
child :: MonadDecide m => m [Decision]
child = do
event <- nextEvent [WorkflowExecutionStarted, ActivityTaskCompleted]
case event ^. heEventType of
WorkflowExecutionStarted -> childStart event
ActivityTaskCompleted -> childCompleted event
_ -> throwM (userError "Unknown Child Event")
childStart :: MonadDecide m => HistoryEvent -> m [Decision]
childStart event = do
logInfo' "event=child-start"
input <- maybeThrow (userError "No Child Start Information") $ do
attrs <- event ^. heWorkflowExecutionStartedEventAttributes
return $ attrs ^. weseaInput
return [completeWorkflowExecutionDecision input]
childCompleted :: MonadDecide m => HistoryEvent -> m [Decision]
childCompleted event = do
logInfo' "event=child-completed"
input <- maybeThrow (userError "No Child Completed Information") $ do
attrs <- event ^. heActivityTaskCompletedEventAttributes
return $ attrs ^. atceaResult
return [completeWorkflowExecutionDecision input]
maybeThrow :: (MonadThrow m, Exception e) => e -> Maybe a -> m a
maybeThrow e = maybe (throwM e) return