module Faktory.Client ( -- * Client operations Client , newClient , closeClient -- * High-level Job operations , pushJob , flush -- * High-level Client API , command_ , commandOK , commandJSON ) where import Faktory.Prelude import Control.Concurrent.MVar import Crypto.Hash (Digest, SHA256(..), hashWith) import Data.Aeson import Data.ByteArray (ByteArrayAccess) import Data.ByteString.Lazy (ByteString, fromStrict) import qualified Data.ByteString.Lazy.Char8 as BSL8 import Data.Text (Text) import qualified Data.Text as T import qualified Data.Text.Encoding as T import Faktory.Connection (connect) import Faktory.Protocol import Faktory.Settings import GHC.Stack import Network.Connection import Network.Socket (HostName) import System.Posix.Process (getProcessID) data Client = Client { clientConnection :: MVar Connection , clientSettings :: Settings } -- | data HiPayload = HiPayload { hiVersion :: Int , hiNonce :: Maybe Text , hiIterations :: Maybe Int } instance FromJSON HiPayload where parseJSON = withObject "HiPayload" $ \o -> HiPayload <$> o .: "v" <*> o .:? "s" <*> o .:? "i" data HelloPayload = HelloPayload { helloWorkerId :: Maybe WorkerId , helloHostname :: HostName , helloProcessId :: Integer -- TODO: Orphan ToJSON ProcessID , helloLabels :: [Text] , helloVersion :: Int , helloPasswordHash :: Maybe Text } instance ToJSON HelloPayload where toJSON HelloPayload{..} = object [ "wid" .= helloWorkerId , "hostname" .= helloHostname , "pid" .= helloProcessId , "labels" .= helloLabels , "v" .= helloVersion , "pwdhash" .= helloPasswordHash ] toEncoding HelloPayload{..} = pairs $ mconcat [ "wid" .= helloWorkerId , "hostname" .= helloHostname , "pid" .= helloProcessId , "labels" .= helloLabels , "v" .= helloVersion , "pwdhash" .= helloPasswordHash ] -- | Open a new @'Client'@ connection with the given @'Settings'@ newClient :: HasCallStack => Settings -> Maybe WorkerId -> IO Client newClient settings@Settings {..} mWorkerId = bracketOnError (connect settingsConnection) connectionClose $ \conn -> do client <- Client <$> newMVar conn <*> pure settings greeting <- fromJustThrows "Unexpected end of HI message" =<< recvUnsafe settings conn stripped <- fromJustThrows ("Missing HI prefix: " <> show greeting) $ BSL8.stripPrefix "HI" greeting HiPayload {..} <- fromJustThrows ("Failed to parse HI payload: " <> show stripped) $ decode stripped when (hiVersion > expectedProtocolVersion) $ settingsLogError $ concat [ "Server's protocol version " , show hiVersion , " higher than client's expected protocol version " , show expectedProtocolVersion ] let mPassword = connectionInfoPassword settingsConnection mHashedPassword = hashPassword <$> hiNonce <*> hiIterations <*> mPassword helloPayload <- HelloPayload mWorkerId (show . fst $ connectionID conn) <$> (toInteger <$> getProcessID) <*> pure ["haskell"] <*> pure expectedProtocolVersion <*> pure mHashedPassword commandOK client "HELLO" [encode helloPayload] pure client where fromJustThrows message = maybe (throwString message) pure -- | Close a @'Client'@ closeClient :: Client -> IO () closeClient Client {..} = withMVar clientConnection $ \conn -> do sendUnsafe clientSettings conn "END" [] connectionClose conn -- | Push a Job to the Server pushJob :: (HasCallStack, ToJSON a) => Client -> a -> IO () pushJob client job = commandOK client "PUSH" [encode job] -- | Clear all job data in the Faktory server -- -- Use with caution! -- flush :: HasCallStack => Client -> IO () flush client = commandOK client "FLUSH" [] -- | Send a command, read and discard the response command_ :: Client -> ByteString -> [ByteString] -> IO () command_ Client {..} cmd args = withMVar clientConnection $ \conn -> do sendUnsafe clientSettings conn cmd args void $ recvUnsafe clientSettings conn -- | Send a command, assert the response is @OK@ commandOK :: HasCallStack => Client -> ByteString -> [ByteString] -> IO () commandOK Client {..} cmd args = withMVar clientConnection $ \conn -> do sendUnsafe clientSettings conn cmd args response <- recvUnsafe clientSettings conn unless (response == Just "OK") $ throwString "Server not OK" -- | Send a command, parse the response as JSON commandJSON :: FromJSON a => Client -> ByteString -> [ByteString] -> IO (Either String (Maybe a)) commandJSON Client {..} cmd args = withMVar clientConnection $ \conn -> do sendUnsafe clientSettings conn cmd args mByteString <- recvUnsafe clientSettings conn pure $ traverse eitherDecode mByteString -- | Send a command to the Server socket -- -- Do not use outside of @'withMVar'@, this is not threadsafe. -- sendUnsafe :: Settings -> Connection -> ByteString -> [ByteString] -> IO () sendUnsafe Settings {..} conn cmd args = do let bs = BSL8.unwords (cmd : args) settingsLogDebug $ "> " <> show bs void . connectionPut conn . BSL8.toStrict $ bs <> "\n" -- | Receive data from the Server socket -- -- Do not use outside of @'withMVar'@, this is not threadsafe. -- recvUnsafe :: Settings -> Connection -> IO (Maybe ByteString) recvUnsafe Settings {..} conn = do eByteString <- readReply $ connectionGet conn 4096 settingsLogDebug $ "< " <> show eByteString case eByteString of Left err -> do settingsLogError err pure Nothing Right mByteString -> pure $ fromStrict <$> mByteString -- | Iteratively apply a function @n@ times -- -- This is like @iterate f s !! n@ but strict in @s@ -- times :: Int -> (s -> s) -> s -> s times n f !s | n <= 0 = s | otherwise = times (n - 1) f (f s) -- | Hash password using provided @nonce@ for @n@ iterations hashPassword :: Text -> Int -> String -> Text hashPassword nonce n password = T.pack . show . times (n - 1) hash . hash . T.encodeUtf8 $ T.pack password <> nonce where -- Note that we use hash at two different types above. -- -- 1. hash :: ByteString -> Digest SHA256 -- 2. hash :: Digest SHA256 -> Digest SHA256 hash :: (ByteArrayAccess b) => b -> Digest SHA256 hash = hashWith SHA256 -- | Protocol version the client expects expectedProtocolVersion :: Int expectedProtocolVersion = 2