module System.Hworker.SES ( SESWorker
, SESWorkerWith
, SESState(..)
, SESJob(..)
, SESConfig(..)
, RedisConnection(..)
, defaultSESConfig
, create
, createWith
, queue
, worker
, monitor
, jobs
) where
import Control.Applicative ((<|>))
import Control.Concurrent (threadDelay)
import Control.Concurrent.MVar (MVar, newMVar, putMVar, takeMVar)
import Control.Exception (SomeException, catch)
import Control.Lens (set)
import Control.Monad (mzero, void)
import Data.Aeson (FromJSON (..), ToJSON (..),
Value (Object, String), object, (.:),
(.=))
import qualified Data.HashMap.Strict as HashMap
import Data.Monoid ((<>))
import Data.Text (Text)
import qualified Data.Text as T
import Data.Time.Clock (UTCTime, diffUTCTime, getCurrentTime)
import GHC.Generics
import Network.AWS (Credentials (Discover), newEnv,
runAWS, runResourceT, send)
import Network.AWS.SES hiding (Success)
import Network.AWS.Types (Error)
import System.Hworker hiding (create, createWith)
import qualified System.Hworker as Hworker (createWith)
type SESWorkerWith a = Hworker (SESState a) (SESJob a)
type SESWorker = SESWorkerWith ()
data SESState a = SESState { sesLimit :: Int
, sesSource :: Text
, sesRecents :: MVar [UTCTime]
, sesAfter :: SESJob a -> IO ()
, sesLogger :: forall b. Show b => b -> IO ()
}
data SESJob a = SESJob { sesEmTo :: Text
, sesEmSubj :: Text
, sesEmBodyText :: Maybe Text
, sesEmBodyHtml :: Maybe Text
, sesPayload :: Either Value a
}
deriving (Generic, Show)
instance ToJSON a => ToJSON (SESJob a) where
toJSON SESJob{..} = object [ "v" .= (1 :: Int)
, "t" .= sesEmTo
, "s" .= sesEmSubj
, "x" .= sesEmBodyText
, "h" .= sesEmBodyHtml
, ("p", either id toJSON sesPayload)
]
instance FromJSON a => FromJSON (SESJob a) where
parseJSON (Object v) = SESJob <$> v .: "t"
<*> v .: "s"
<*> v .: "x"
<*> v .: "h"
<*> ((Right <$> (v .: "p")) <|>
pure (Left (HashMap.lookupDefault (String "No 'p' field.") "p" v)))
parseJSON _ = mzero
instance (ToJSON a, FromJSON a, Show a) => Job (SESState a) (SESJob a) where
job state@(SESState limit source recents after log') j@(SESJob to' subj btxt bhtml _payload) =
do now <- getCurrentTime
rs <- takeMVar recents
let active = filter ((< 1) . diffUTCTime now) rs
let count = length active
if count >= limit
then putMVar recents active >> threadDelay 100000 >> job state j
else do putMVar recents (now : active)
awsenv <- newEnv Discover
r <- catch (runResourceT $ runAWS awsenv $
do void $ send (sendEmail source
(set dToAddresses [to']
destination)
(message (content subj)
(set bHTML
(content <$> bhtml) $
set bText
(content <$> btxt)
body)))
return Success)
(\(err::Error) ->
do log' err
return (Failure (T.pack (show err))))
case r of
Success -> catch (after j)
(\(e::SomeException) ->
log' ("hworker-ses callback raised exception: " <> show e))
_ -> return ()
return r
data SESConfig a =
SESConfig { sesconfigName :: Text
, sesconfigLimit :: Int
, sesconfigSource :: Text
, sesconfigAfter :: SESJob a -> IO ()
, sesconfigLogger :: forall b. Show b => b -> IO ()
, sesconfigFailedQueueSize :: Int
, sesconfigRedisConnectInfo :: RedisConnection
}
defaultSESConfig :: (ToJSON a, FromJSON a, Show a)
=> Text
-> Int
-> Text
-> (SESJob a -> IO ())
-> SESConfig a
defaultSESConfig name limit source after =
let d = defaultHworkerConfig name ()
in SESConfig name
limit
source
after
(hwconfigLogger d)
(hwconfigFailedQueueSize d)
(hwconfigRedisConnectInfo d)
create :: (ToJSON a, FromJSON a, Show a)
=> Text
-> Int
-> Text
-> (SESJob a -> IO ())
-> IO (SESWorkerWith a)
create name limit source after =
createWith (defaultSESConfig name limit source after)
createWith :: (ToJSON a, FromJSON a, Show a)
=> SESConfig a
-> IO (SESWorkerWith a)
createWith (SESConfig name limit source after logger failed' redis) =
do recents <- newMVar []
Hworker.createWith
(defaultHworkerConfig name
(SESState limit source recents after logger)) {
hwconfigRedisConnectInfo = redis
,hwconfigFailedQueueSize = failed'
,hwconfigLogger = logger
}