module Faktory.Client
(
Client
, newClient
, closeClient
, pushJob
, flush
, command_
, commandOK
, commandJSON
) where
import Faktory.Prelude
import Control.Concurrent.MVar
import Crypto.Hash (Digest, SHA256(..), hashWith)
import Data.Aeson
import Data.ByteArray (ByteArrayAccess)
import Data.ByteString.Lazy (ByteString, fromStrict)
import qualified Data.ByteString.Lazy.Char8 as BSL8
import Data.Text (Text)
import qualified Data.Text as T
import qualified Data.Text.Encoding as T
import Faktory.Connection (connect)
import Faktory.Protocol
import Faktory.Settings
import GHC.Stack
import Network.Connection
import Network.Socket (HostName)
import System.Posix.Process (getProcessID)
data Client = Client
{ clientConnection :: MVar Connection
, clientSettings :: Settings
}
data HiPayload = HiPayload
{ hiVersion :: Int
, hiNonce :: Maybe Text
, hiIterations :: Maybe Int
}
instance FromJSON HiPayload where
parseJSON = withObject "HiPayload" $ \o ->
HiPayload
<$> o .: "v"
<*> o .:? "s"
<*> o .:? "i"
data HelloPayload = HelloPayload
{ helloWorkerId :: Maybe WorkerId
, helloHostname :: HostName
, helloProcessId :: Integer
, helloLabels :: [Text]
, helloVersion :: Int
, helloPasswordHash :: Maybe Text
}
instance ToJSON HelloPayload where
toJSON HelloPayload{..} =
object
[ "wid" .= helloWorkerId
, "hostname" .= helloHostname
, "pid" .= helloProcessId
, "labels" .= helloLabels
, "v" .= helloVersion
, "pwdhash" .= helloPasswordHash
]
toEncoding HelloPayload{..} =
pairs $ mconcat
[ "wid" .= helloWorkerId
, "hostname" .= helloHostname
, "pid" .= helloProcessId
, "labels" .= helloLabels
, "v" .= helloVersion
, "pwdhash" .= helloPasswordHash
]
newClient :: HasCallStack => Settings -> Maybe WorkerId -> IO Client
newClient settings@Settings {..} mWorkerId =
bracketOnError (connect settingsConnection) connectionClose $ \conn -> do
client <- Client <$> newMVar conn <*> pure settings
greeting <-
fromJustThrows "Unexpected end of HI message" =<< recvUnsafe settings conn
stripped <-
fromJustThrows ("Missing HI prefix: " <> show greeting)
$ BSL8.stripPrefix "HI" greeting
HiPayload {..} <-
fromJustThrows ("Failed to parse HI payload: " <> show stripped)
$ decode stripped
when (hiVersion > expectedProtocolVersion) $ settingsLogError $ concat
[ "Server's protocol version "
, show hiVersion
, " higher than client's expected protocol version "
, show expectedProtocolVersion
]
let
mPassword = connectionInfoPassword settingsConnection
mHashedPassword = hashPassword <$> hiNonce <*> hiIterations <*> mPassword
helloPayload <-
HelloPayload mWorkerId (show . fst $ connectionID conn)
<$> (toInteger <$> getProcessID)
<*> pure ["haskell"]
<*> pure expectedProtocolVersion
<*> pure mHashedPassword
commandOK client "HELLO" [encode helloPayload]
pure client
where fromJustThrows message = maybe (throwString message) pure
closeClient :: Client -> IO ()
closeClient Client {..} = withMVar clientConnection $ \conn -> do
sendUnsafe clientSettings conn "END" []
connectionClose conn
pushJob :: (HasCallStack, ToJSON a) => Client -> a -> IO ()
pushJob client job = commandOK client "PUSH" [encode job]
flush :: HasCallStack => Client -> IO ()
flush client = commandOK client "FLUSH" []
command_ :: Client -> ByteString -> [ByteString] -> IO ()
command_ Client {..} cmd args = withMVar clientConnection $ \conn -> do
sendUnsafe clientSettings conn cmd args
void $ recvUnsafe clientSettings conn
commandOK :: HasCallStack => Client -> ByteString -> [ByteString] -> IO ()
commandOK Client {..} cmd args = withMVar clientConnection $ \conn -> do
sendUnsafe clientSettings conn cmd args
response <- recvUnsafe clientSettings conn
unless (response == Just "OK") $ throwString "Server not OK"
commandJSON
:: FromJSON a
=> Client
-> ByteString
-> [ByteString]
-> IO (Either String (Maybe a))
commandJSON Client {..} cmd args = withMVar clientConnection $ \conn -> do
sendUnsafe clientSettings conn cmd args
mByteString <- recvUnsafe clientSettings conn
pure $ traverse eitherDecode mByteString
sendUnsafe :: Settings -> Connection -> ByteString -> [ByteString] -> IO ()
sendUnsafe Settings {..} conn cmd args = do
let bs = BSL8.unwords (cmd : args)
settingsLogDebug $ "> " <> show bs
void . connectionPut conn . BSL8.toStrict $ bs <> "\n"
recvUnsafe :: Settings -> Connection -> IO (Maybe ByteString)
recvUnsafe Settings {..} conn = do
eByteString <- readReply $ connectionGet conn 4096
settingsLogDebug $ "< " <> show eByteString
case eByteString of
Left err -> do
settingsLogError err
pure Nothing
Right mByteString -> pure $ fromStrict <$> mByteString
times :: Int -> (s -> s) -> s -> s
times n f !s
| n <= 0 = s
| otherwise = times (n - 1) f (f s)
hashPassword :: Text -> Int -> String -> Text
hashPassword nonce n password =
T.pack
. show
. times (n - 1) hash
. hash
. T.encodeUtf8
$ T.pack password
<> nonce
where
hash :: (ByteArrayAccess b) => b -> Digest SHA256
hash = hashWith SHA256
expectedProtocolVersion :: Int
expectedProtocolVersion = 2