module Faktory.Client
(
Client
, newClient
, closeClient
, pushJob
, flush
, command_
, commandOK
, commandJSON
) where
import Faktory.Prelude
import Control.Concurrent.MVar
import Crypto.Hash (Digest, SHA256(..), hashWith)
import Data.Aeson
import Data.ByteArray (ByteArrayAccess)
import Data.ByteString.Lazy (ByteString, fromStrict)
import qualified Data.ByteString.Lazy.Char8 as BSL8
import qualified Data.Text as T
import qualified Data.Text.Encoding as T
import Faktory.Connection (connect)
import Faktory.Protocol
import Faktory.Settings
import GHC.Stack
import Network.Connection
import Network.Socket (HostName)
import System.Posix.Process (getProcessID)
data Client = Client
{ clientConnection :: MVar Connection
, clientSettings :: Settings
}
data HiPayload = HiPayload
{ hiVersion :: Int
, hiNonce :: Maybe Text
, hiIterations :: Maybe Int
}
instance FromJSON HiPayload where
parseJSON = withObject "HiPayload"
$ \o -> HiPayload <$> o .: "v" <*> o .:? "s" <*> o .:? "i"
data HelloPayload = HelloPayload
{ helloWorkerId :: Maybe WorkerId
, helloHostname :: HostName
, helloProcessId :: Integer
, helloLabels :: [Text]
, helloVersion :: Int
, helloPasswordHash :: Maybe Text
}
instance ToJSON HelloPayload where
toJSON HelloPayload {..} = object
[ "wid" .= helloWorkerId
, "hostname" .= helloHostname
, "pid" .= helloProcessId
, "labels" .= helloLabels
, "v" .= helloVersion
, "pwdhash" .= helloPasswordHash
]
toEncoding HelloPayload {..} = pairs $ mconcat
[ "wid" .= helloWorkerId
, "hostname" .= helloHostname
, "pid" .= helloProcessId
, "labels" .= helloLabels
, "v" .= helloVersion
, "pwdhash" .= helloPasswordHash
]
newClient :: HasCallStack => Settings -> Maybe WorkerId -> IO Client
newClient settings@Settings {..} mWorkerId =
bracketOnError (connect settingsConnection) connectionClose $ \conn -> do
client <- Client <$> newMVar conn <*> pure settings
greeting <-
fromJustThrows "Unexpected end of HI message" =<< recvUnsafe settings conn
stripped <-
fromJustThrows ("Missing HI prefix: " <> show greeting)
$ BSL8.stripPrefix "HI" greeting
HiPayload {..} <-
fromJustThrows ("Failed to parse HI payload: " <> show stripped)
$ decode stripped
when (hiVersion > expectedProtocolVersion) $ settingsLogError $ concat
[ "Server's protocol version "
, show hiVersion
, " higher than client's expected protocol version "
, show expectedProtocolVersion
]
let
mPassword = connectionInfoPassword settingsConnection
mHashedPassword = hashPassword <$> hiNonce <*> hiIterations <*> mPassword
helloPayload <-
HelloPayload mWorkerId (show . fst $ connectionID conn)
<$> (toInteger <$> getProcessID)
<*> pure ["haskell"]
<*> pure expectedProtocolVersion
<*> pure mHashedPassword
commandOK client "HELLO" [encode helloPayload]
pure client
where fromJustThrows message = maybe (throwString message) pure
closeClient :: Client -> IO ()
closeClient Client {..} = withMVar clientConnection $ \conn -> do
sendUnsafe clientSettings conn "END" []
connectionClose conn
pushJob :: (HasCallStack, ToJSON a) => Client -> a -> IO ()
pushJob client job = commandOK client "PUSH" [encode job]
flush :: HasCallStack => Client -> IO ()
flush client = commandOK client "FLUSH" []
command_ :: Client -> ByteString -> [ByteString] -> IO ()
command_ Client {..} cmd args = withMVar clientConnection $ \conn -> do
sendUnsafe clientSettings conn cmd args
void $ recvUnsafe clientSettings conn
commandOK :: HasCallStack => Client -> ByteString -> [ByteString] -> IO ()
commandOK Client {..} cmd args = withMVar clientConnection $ \conn -> do
sendUnsafe clientSettings conn cmd args
response <- recvUnsafe clientSettings conn
unless (response == Just "OK")
$ throwString
$ "Server not OK. Reply was: "
<> show response
commandJSON
:: FromJSON a
=> Client
-> ByteString
-> [ByteString]
-> IO (Either String (Maybe a))
commandJSON Client {..} cmd args = withMVar clientConnection $ \conn -> do
sendUnsafe clientSettings conn cmd args
mByteString <- recvUnsafe clientSettings conn
pure $ traverse eitherDecode mByteString
sendUnsafe :: Settings -> Connection -> ByteString -> [ByteString] -> IO ()
sendUnsafe Settings {..} conn cmd args = do
let bs = BSL8.unwords (cmd : args)
settingsLogDebug $ "> " <> show bs
void . connectionPut conn . BSL8.toStrict $ bs <> "\n"
recvUnsafe :: Settings -> Connection -> IO (Maybe ByteString)
recvUnsafe Settings {..} conn = do
eByteString <- readReply $ connectionGet conn 4096
settingsLogDebug $ "< " <> show eByteString
case eByteString of
Left err -> do
settingsLogError err
pure Nothing
Right mByteString -> pure $ fromStrict <$> mByteString
times :: Int -> (s -> s) -> s -> s
times n f !s
| n <= 0 = s
| otherwise = times (n - 1) f (f s)
hashPassword :: Text -> Int -> String -> Text
hashPassword nonce n password =
T.pack
. show
. times (n - 1) hash
. hash
. T.encodeUtf8
$ T.pack password
<> nonce
where
hash :: (ByteArrayAccess b) => b -> Digest SHA256
hash = hashWith SHA256
expectedProtocolVersion :: Int
expectedProtocolVersion = 2