{-# LANGUAGE DerivingVia #-}

-- | Support for the @BATCH@ command (Enterprise only)
--
-- <https://github.com/contribsys/faktory/wiki/Ent-Batches>
--
-- Batches allow multiple Jobs to be enqueued as a group, with a description
-- (visible in the admin UI) and Jobs attached to run on completion of all Jobs
-- within the group (always, or only if all were successful).
--
-- Usage:
--
-- @
-- -- Build a Job to run at completion of the Batch. Arguments are the same as
-- -- you would pass to 'perform' the Job.
-- onComplete <- buildJob mempty producer myJob
--
-- 'runBatch' ('complete' onComplete <> 'description' "My Batch") producer $ do
--   -- Use 'batchPerform' instead of 'perform'
--   void $ 'batchPerform' mempty producer myBatchedJob1
--   void $ 'batchPerform' mempty producer myBatchedJob2
-- @
--
-- __/NOTE__: This module does not support batched Jobs dynamically adding more
-- Jobs to the Batch. PRs welcome.
--
module Faktory.Ent.Batch
  (
  -- * Options
    BatchOptions
  , description
  , complete
  , success

  -- * Running
  , runBatch
  , Batch
  , batchPerform

  -- * Low-level
  , BatchId(..)
  , CustomBatchId(..)
  , newBatch
  , commitBatch
  ) where

import Faktory.Prelude

import Control.Monad.Reader
import Data.Aeson
import Data.Aeson.Casing
import Data.ByteString.Lazy as BSL
import Data.Semigroup (Last(..))
import Data.Semigroup.Generic
import Data.Text.Encoding (decodeUtf8, encodeUtf8)
import Faktory.Client
import Faktory.Job
import Faktory.Producer
import GHC.Generics
import GHC.Stack

newtype Batch a = Batch (ReaderT BatchId IO a)
  deriving newtype (a -> Batch b -> Batch a
(a -> b) -> Batch a -> Batch b
(forall a b. (a -> b) -> Batch a -> Batch b)
-> (forall a b. a -> Batch b -> Batch a) -> Functor Batch
forall a b. a -> Batch b -> Batch a
forall a b. (a -> b) -> Batch a -> Batch b
forall (f :: * -> *).
(forall a b. (a -> b) -> f a -> f b)
-> (forall a b. a -> f b -> f a) -> Functor f
<$ :: a -> Batch b -> Batch a
$c<$ :: forall a b. a -> Batch b -> Batch a
fmap :: (a -> b) -> Batch a -> Batch b
$cfmap :: forall a b. (a -> b) -> Batch a -> Batch b
Functor, Functor Batch
a -> Batch a
Functor Batch
-> (forall a. a -> Batch a)
-> (forall a b. Batch (a -> b) -> Batch a -> Batch b)
-> (forall a b c. (a -> b -> c) -> Batch a -> Batch b -> Batch c)
-> (forall a b. Batch a -> Batch b -> Batch b)
-> (forall a b. Batch a -> Batch b -> Batch a)
-> Applicative Batch
Batch a -> Batch b -> Batch b
Batch a -> Batch b -> Batch a
Batch (a -> b) -> Batch a -> Batch b
(a -> b -> c) -> Batch a -> Batch b -> Batch c
forall a. a -> Batch a
forall a b. Batch a -> Batch b -> Batch a
forall a b. Batch a -> Batch b -> Batch b
forall a b. Batch (a -> b) -> Batch a -> Batch b
forall a b c. (a -> b -> c) -> Batch a -> Batch b -> Batch c
forall (f :: * -> *).
Functor f
-> (forall a. a -> f a)
-> (forall a b. f (a -> b) -> f a -> f b)
-> (forall a b c. (a -> b -> c) -> f a -> f b -> f c)
-> (forall a b. f a -> f b -> f b)
-> (forall a b. f a -> f b -> f a)
-> Applicative f
<* :: Batch a -> Batch b -> Batch a
$c<* :: forall a b. Batch a -> Batch b -> Batch a
*> :: Batch a -> Batch b -> Batch b
$c*> :: forall a b. Batch a -> Batch b -> Batch b
liftA2 :: (a -> b -> c) -> Batch a -> Batch b -> Batch c
$cliftA2 :: forall a b c. (a -> b -> c) -> Batch a -> Batch b -> Batch c
<*> :: Batch (a -> b) -> Batch a -> Batch b
$c<*> :: forall a b. Batch (a -> b) -> Batch a -> Batch b
pure :: a -> Batch a
$cpure :: forall a. a -> Batch a
$cp1Applicative :: Functor Batch
Applicative, Applicative Batch
a -> Batch a
Applicative Batch
-> (forall a b. Batch a -> (a -> Batch b) -> Batch b)
-> (forall a b. Batch a -> Batch b -> Batch b)
-> (forall a. a -> Batch a)
-> Monad Batch
Batch a -> (a -> Batch b) -> Batch b
Batch a -> Batch b -> Batch b
forall a. a -> Batch a
forall a b. Batch a -> Batch b -> Batch b
forall a b. Batch a -> (a -> Batch b) -> Batch b
forall (m :: * -> *).
Applicative m
-> (forall a b. m a -> (a -> m b) -> m b)
-> (forall a b. m a -> m b -> m b)
-> (forall a. a -> m a)
-> Monad m
return :: a -> Batch a
$creturn :: forall a. a -> Batch a
>> :: Batch a -> Batch b -> Batch b
$c>> :: forall a b. Batch a -> Batch b -> Batch b
>>= :: Batch a -> (a -> Batch b) -> Batch b
$c>>= :: forall a b. Batch a -> (a -> Batch b) -> Batch b
$cp1Monad :: Applicative Batch
Monad, Monad Batch
Monad Batch -> (forall a. IO a -> Batch a) -> MonadIO Batch
IO a -> Batch a
forall a. IO a -> Batch a
forall (m :: * -> *).
Monad m -> (forall a. IO a -> m a) -> MonadIO m
liftIO :: IO a -> Batch a
$cliftIO :: forall a. IO a -> Batch a
$cp1MonadIO :: Monad Batch
MonadIO, MonadReader BatchId)

newtype BatchId = BatchId Text
  deriving newtype (Value -> Parser [BatchId]
Value -> Parser BatchId
(Value -> Parser BatchId)
-> (Value -> Parser [BatchId]) -> FromJSON BatchId
forall a.
(Value -> Parser a) -> (Value -> Parser [a]) -> FromJSON a
parseJSONList :: Value -> Parser [BatchId]
$cparseJSONList :: Value -> Parser [BatchId]
parseJSON :: Value -> Parser BatchId
$cparseJSON :: Value -> Parser BatchId
FromJSON, [BatchId] -> Encoding
[BatchId] -> Value
BatchId -> Encoding
BatchId -> Value
(BatchId -> Value)
-> (BatchId -> Encoding)
-> ([BatchId] -> Value)
-> ([BatchId] -> Encoding)
-> ToJSON BatchId
forall a.
(a -> Value)
-> (a -> Encoding)
-> ([a] -> Value)
-> ([a] -> Encoding)
-> ToJSON a
toEncodingList :: [BatchId] -> Encoding
$ctoEncodingList :: [BatchId] -> Encoding
toJSONList :: [BatchId] -> Value
$ctoJSONList :: [BatchId] -> Value
toEncoding :: BatchId -> Encoding
$ctoEncoding :: BatchId -> Encoding
toJSON :: BatchId -> Value
$ctoJSON :: BatchId -> Value
ToJSON)

data BatchOptions arg = BatchOptions
  { BatchOptions arg -> Maybe (Last Text)
boDescription :: Maybe (Last Text)
  , BatchOptions arg -> Maybe (Last (Job arg))
boSuccess :: Maybe (Last (Job arg))
  , BatchOptions arg -> Maybe (Last (Job arg))
boComplete :: Maybe (Last (Job arg))
  }
  deriving stock (forall x. BatchOptions arg -> Rep (BatchOptions arg) x)
-> (forall x. Rep (BatchOptions arg) x -> BatchOptions arg)
-> Generic (BatchOptions arg)
forall x. Rep (BatchOptions arg) x -> BatchOptions arg
forall x. BatchOptions arg -> Rep (BatchOptions arg) x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
forall arg x. Rep (BatchOptions arg) x -> BatchOptions arg
forall arg x. BatchOptions arg -> Rep (BatchOptions arg) x
$cto :: forall arg x. Rep (BatchOptions arg) x -> BatchOptions arg
$cfrom :: forall arg x. BatchOptions arg -> Rep (BatchOptions arg) x
Generic
  deriving (b -> BatchOptions arg -> BatchOptions arg
NonEmpty (BatchOptions arg) -> BatchOptions arg
BatchOptions arg -> BatchOptions arg -> BatchOptions arg
(BatchOptions arg -> BatchOptions arg -> BatchOptions arg)
-> (NonEmpty (BatchOptions arg) -> BatchOptions arg)
-> (forall b.
    Integral b =>
    b -> BatchOptions arg -> BatchOptions arg)
-> Semigroup (BatchOptions arg)
forall b. Integral b => b -> BatchOptions arg -> BatchOptions arg
forall arg. NonEmpty (BatchOptions arg) -> BatchOptions arg
forall arg.
BatchOptions arg -> BatchOptions arg -> BatchOptions arg
forall a.
(a -> a -> a)
-> (NonEmpty a -> a)
-> (forall b. Integral b => b -> a -> a)
-> Semigroup a
forall arg b.
Integral b =>
b -> BatchOptions arg -> BatchOptions arg
stimes :: b -> BatchOptions arg -> BatchOptions arg
$cstimes :: forall arg b.
Integral b =>
b -> BatchOptions arg -> BatchOptions arg
sconcat :: NonEmpty (BatchOptions arg) -> BatchOptions arg
$csconcat :: forall arg. NonEmpty (BatchOptions arg) -> BatchOptions arg
<> :: BatchOptions arg -> BatchOptions arg -> BatchOptions arg
$c<> :: forall arg.
BatchOptions arg -> BatchOptions arg -> BatchOptions arg
Semigroup, Semigroup (BatchOptions arg)
BatchOptions arg
Semigroup (BatchOptions arg)
-> BatchOptions arg
-> (BatchOptions arg -> BatchOptions arg -> BatchOptions arg)
-> ([BatchOptions arg] -> BatchOptions arg)
-> Monoid (BatchOptions arg)
[BatchOptions arg] -> BatchOptions arg
BatchOptions arg -> BatchOptions arg -> BatchOptions arg
forall arg. Semigroup (BatchOptions arg)
forall arg. BatchOptions arg
forall a.
Semigroup a -> a -> (a -> a -> a) -> ([a] -> a) -> Monoid a
forall arg. [BatchOptions arg] -> BatchOptions arg
forall arg.
BatchOptions arg -> BatchOptions arg -> BatchOptions arg
mconcat :: [BatchOptions arg] -> BatchOptions arg
$cmconcat :: forall arg. [BatchOptions arg] -> BatchOptions arg
mappend :: BatchOptions arg -> BatchOptions arg -> BatchOptions arg
$cmappend :: forall arg.
BatchOptions arg -> BatchOptions arg -> BatchOptions arg
mempty :: BatchOptions arg
$cmempty :: forall arg. BatchOptions arg
$cp1Monoid :: forall arg. Semigroup (BatchOptions arg)
Monoid) via GenericSemigroupMonoid (BatchOptions arg)

instance ToJSON arg => ToJSON (BatchOptions arg) where
  toJSON :: BatchOptions arg -> Value
toJSON = Options -> BatchOptions arg -> Value
forall a.
(Generic a, GToJSON' Value Zero (Rep a)) =>
Options -> a -> Value
genericToJSON (Options -> BatchOptions arg -> Value)
-> Options -> BatchOptions arg -> Value
forall a b. (a -> b) -> a -> b
$ (String -> String) -> Options
aesonPrefix String -> String
snakeCase
  toEncoding :: BatchOptions arg -> Encoding
toEncoding = Options -> BatchOptions arg -> Encoding
forall a.
(Generic a, GToJSON' Encoding Zero (Rep a)) =>
Options -> a -> Encoding
genericToEncoding (Options -> BatchOptions arg -> Encoding)
-> Options -> BatchOptions arg -> Encoding
forall a b. (a -> b) -> a -> b
$ (String -> String) -> Options
aesonPrefix String -> String
snakeCase

description :: Text -> BatchOptions arg
description :: Text -> BatchOptions arg
description Text
d = BatchOptions arg
forall a. Monoid a => a
mempty { boDescription :: Maybe (Last Text)
boDescription = Last Text -> Maybe (Last Text)
forall a. a -> Maybe a
Just (Last Text -> Maybe (Last Text)) -> Last Text -> Maybe (Last Text)
forall a b. (a -> b) -> a -> b
$ Text -> Last Text
forall a. a -> Last a
Last Text
d }

complete :: Job arg -> BatchOptions arg
complete :: Job arg -> BatchOptions arg
complete Job arg
job = BatchOptions arg
forall a. Monoid a => a
mempty { boComplete :: Maybe (Last (Job arg))
boComplete = Last (Job arg) -> Maybe (Last (Job arg))
forall a. a -> Maybe a
Just (Last (Job arg) -> Maybe (Last (Job arg)))
-> Last (Job arg) -> Maybe (Last (Job arg))
forall a b. (a -> b) -> a -> b
$ Job arg -> Last (Job arg)
forall a. a -> Last a
Last Job arg
job }

success :: Job arg -> BatchOptions arg
success :: Job arg -> BatchOptions arg
success Job arg
job = BatchOptions arg
forall a. Monoid a => a
mempty { boSuccess :: Maybe (Last (Job arg))
boSuccess = Last (Job arg) -> Maybe (Last (Job arg))
forall a. a -> Maybe a
Just (Last (Job arg) -> Maybe (Last (Job arg)))
-> Last (Job arg) -> Maybe (Last (Job arg))
forall a b. (a -> b) -> a -> b
$ Job arg -> Last (Job arg)
forall a. a -> Last a
Last Job arg
job }

runBatch :: ToJSON arg => BatchOptions arg -> Producer -> Batch a -> IO a
runBatch :: BatchOptions arg -> Producer -> Batch a -> IO a
runBatch BatchOptions arg
options Producer
producer (Batch ReaderT BatchId IO a
f) = do
  BatchId
bid <- Producer -> BatchOptions arg -> IO BatchId
forall arg.
ToJSON arg =>
Producer -> BatchOptions arg -> IO BatchId
newBatch Producer
producer BatchOptions arg
options
  a
result <- ReaderT BatchId IO a -> BatchId -> IO a
forall r (m :: * -> *) a. ReaderT r m a -> r -> m a
runReaderT ReaderT BatchId IO a
f BatchId
bid
  a
result a -> IO () -> IO a
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ Producer -> BatchId -> IO ()
commitBatch Producer
producer BatchId
bid

newtype CustomBatchId = CustomBatchId
  { CustomBatchId -> BatchId
bid :: BatchId
  }
  deriving stock (forall x. CustomBatchId -> Rep CustomBatchId x)
-> (forall x. Rep CustomBatchId x -> CustomBatchId)
-> Generic CustomBatchId
forall x. Rep CustomBatchId x -> CustomBatchId
forall x. CustomBatchId -> Rep CustomBatchId x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep CustomBatchId x -> CustomBatchId
$cfrom :: forall x. CustomBatchId -> Rep CustomBatchId x
Generic
  deriving anyclass [CustomBatchId] -> Encoding
[CustomBatchId] -> Value
CustomBatchId -> Encoding
CustomBatchId -> Value
(CustomBatchId -> Value)
-> (CustomBatchId -> Encoding)
-> ([CustomBatchId] -> Value)
-> ([CustomBatchId] -> Encoding)
-> ToJSON CustomBatchId
forall a.
(a -> Value)
-> (a -> Encoding)
-> ([a] -> Value)
-> ([a] -> Encoding)
-> ToJSON a
toEncodingList :: [CustomBatchId] -> Encoding
$ctoEncodingList :: [CustomBatchId] -> Encoding
toJSONList :: [CustomBatchId] -> Value
$ctoJSONList :: [CustomBatchId] -> Value
toEncoding :: CustomBatchId -> Encoding
$ctoEncoding :: CustomBatchId -> Encoding
toJSON :: CustomBatchId -> Value
$ctoJSON :: CustomBatchId -> Value
ToJSON

batchPerform
  :: (HasCallStack, ToJSON arg) => JobOptions -> Producer -> arg -> Batch JobId
batchPerform :: JobOptions -> Producer -> arg -> Batch String
batchPerform JobOptions
options Producer
producer arg
arg = do
  BatchId
bid <- Batch BatchId
forall r (m :: * -> *). MonadReader r m => m r
ask
  ReaderT BatchId IO String -> Batch String
forall a. ReaderT BatchId IO a -> Batch a
Batch (ReaderT BatchId IO String -> Batch String)
-> ReaderT BatchId IO String -> Batch String
forall a b. (a -> b) -> a -> b
$ IO String -> ReaderT BatchId IO String
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (IO String -> ReaderT BatchId IO String)
-> IO String -> ReaderT BatchId IO String
forall a b. (a -> b) -> a -> b
$ JobOptions -> Producer -> arg -> IO String
forall arg.
(HasCallStack, ToJSON arg) =>
JobOptions -> Producer -> arg -> IO String
perform (JobOptions
options JobOptions -> JobOptions -> JobOptions
forall a. Semigroup a => a -> a -> a
<> CustomBatchId -> JobOptions
forall a. ToJSON a => a -> JobOptions
custom (BatchId -> CustomBatchId
CustomBatchId BatchId
bid)) Producer
producer arg
arg

newBatch :: ToJSON arg => Producer -> BatchOptions arg -> IO BatchId
newBatch :: Producer -> BatchOptions arg -> IO BatchId
newBatch Producer
producer BatchOptions arg
options = do
  Either String (Maybe ByteString)
result <- Client
-> ByteString
-> [ByteString]
-> IO (Either String (Maybe ByteString))
commandByteString
    (Producer -> Client
producerClient Producer
producer)
    ByteString
"BATCH NEW"
    [BatchOptions arg -> ByteString
forall a. ToJSON a => a -> ByteString
encode BatchOptions arg
options]
  case Either String (Maybe ByteString)
result of
    Left String
err -> String -> IO BatchId
forall (m :: * -> *) a. MonadThrow m => String -> m a
batchNewError String
err
    Right Maybe ByteString
Nothing -> String -> IO BatchId
forall (m :: * -> *) a. MonadThrow m => String -> m a
batchNewError String
"No BatchId returned"
    Right (Just ByteString
bs) -> BatchId -> IO BatchId
forall (f :: * -> *) a. Applicative f => a -> f a
pure (BatchId -> IO BatchId) -> BatchId -> IO BatchId
forall a b. (a -> b) -> a -> b
$ Text -> BatchId
BatchId (Text -> BatchId) -> Text -> BatchId
forall a b. (a -> b) -> a -> b
$ ByteString -> Text
decodeUtf8 (ByteString -> Text) -> ByteString -> Text
forall a b. (a -> b) -> a -> b
$ ByteString -> ByteString
BSL.toStrict ByteString
bs
  where batchNewError :: String -> m a
batchNewError String
err = String -> m a
forall (m :: * -> *) a.
(MonadThrow m, HasCallStack) =>
String -> m a
throwString (String -> m a) -> String -> m a
forall a b. (a -> b) -> a -> b
$ String
"BATCH NEW error: " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
err

commitBatch :: Producer -> BatchId -> IO ()
commitBatch :: Producer -> BatchId -> IO ()
commitBatch Producer
producer (BatchId Text
bid) = Client -> ByteString -> [ByteString] -> IO ()
command_
  (Producer -> Client
producerClient Producer
producer)
  ByteString
"BATCH COMMIT"
  [ByteString -> ByteString
BSL.fromStrict (ByteString -> ByteString) -> ByteString -> ByteString
forall a b. (a -> b) -> a -> b
$ Text -> ByteString
encodeUtf8 Text
bid]