module Faktory.Client
  (
  -- * Client operations
    Client(..)
  , newClient
  , closeClient

  -- * High-level Client API
  , command_
  , commandOK
  , commandJSON
  , commandByteString
  ) where

import Faktory.Prelude

import Control.Concurrent.MVar
import Crypto.Hash (Digest, SHA256(..), hashWith)
import Data.Aeson
import Data.Bitraversable (bimapM)
import Data.ByteArray (ByteArrayAccess)
import Data.ByteString.Lazy (ByteString, fromStrict)
import qualified Data.ByteString.Lazy.Char8 as BSL8
import qualified Data.Text as T
import qualified Data.Text.Encoding as T
import Faktory.Connection (connect)
import Faktory.Protocol
import Faktory.Settings
import GHC.Stack
import Network.Connection
import Network.Socket (HostName)
import System.Posix.Process (getProcessID)

data Client = Client
  { Client -> MVar Connection
clientConnection :: MVar Connection
  , Client -> Settings
clientSettings :: Settings
  }

-- | <https://github.com/contribsys/faktory/wiki/Worker-Lifecycle#initial-handshake>
data HiPayload = HiPayload
  { HiPayload -> Int
hiVersion :: Int
  , HiPayload -> Maybe Text
hiNonce :: Maybe Text
  , HiPayload -> Maybe Int
hiIterations :: Maybe Int
  }

instance FromJSON HiPayload where
  parseJSON :: Value -> Parser HiPayload
parseJSON = forall a. String -> (Object -> Parser a) -> Value -> Parser a
withObject String
"HiPayload"
    forall a b. (a -> b) -> a -> b
$ \Object
o -> Int -> Maybe Text -> Maybe Int -> HiPayload
HiPayload forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Object
o forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"v" forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Object
o forall a. FromJSON a => Object -> Key -> Parser (Maybe a)
.:? Key
"s" forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Object
o forall a. FromJSON a => Object -> Key -> Parser (Maybe a)
.:? Key
"i"

data HelloPayload = HelloPayload
  { HelloPayload -> Maybe WorkerId
helloWorkerId :: Maybe WorkerId
  , HelloPayload -> String
helloHostname :: HostName
  , HelloPayload -> Integer
helloProcessId :: Integer -- TODO: Orphan ToJSON ProcessID
  , HelloPayload -> [Text]
helloLabels :: [Text]
  , HelloPayload -> Int
helloVersion :: Int
  , HelloPayload -> Maybe Text
helloPasswordHash :: Maybe Text
  }

instance ToJSON HelloPayload where
  toJSON :: HelloPayload -> Value
toJSON HelloPayload {Int
Integer
String
[Text]
Maybe Text
Maybe WorkerId
helloPasswordHash :: Maybe Text
helloVersion :: Int
helloLabels :: [Text]
helloProcessId :: Integer
helloHostname :: String
helloWorkerId :: Maybe WorkerId
helloPasswordHash :: HelloPayload -> Maybe Text
helloVersion :: HelloPayload -> Int
helloLabels :: HelloPayload -> [Text]
helloProcessId :: HelloPayload -> Integer
helloHostname :: HelloPayload -> String
helloWorkerId :: HelloPayload -> Maybe WorkerId
..} = [Pair] -> Value
object
    [ Key
"wid" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= Maybe WorkerId
helloWorkerId
    , Key
"hostname" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= String
helloHostname
    , Key
"pid" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= Integer
helloProcessId
    , Key
"labels" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= [Text]
helloLabels
    , Key
"v" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= Int
helloVersion
    , Key
"pwdhash" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= Maybe Text
helloPasswordHash
    ]
  toEncoding :: HelloPayload -> Encoding
toEncoding HelloPayload {Int
Integer
String
[Text]
Maybe Text
Maybe WorkerId
helloPasswordHash :: Maybe Text
helloVersion :: Int
helloLabels :: [Text]
helloProcessId :: Integer
helloHostname :: String
helloWorkerId :: Maybe WorkerId
helloPasswordHash :: HelloPayload -> Maybe Text
helloVersion :: HelloPayload -> Int
helloLabels :: HelloPayload -> [Text]
helloProcessId :: HelloPayload -> Integer
helloHostname :: HelloPayload -> String
helloWorkerId :: HelloPayload -> Maybe WorkerId
..} = Series -> Encoding
pairs forall a b. (a -> b) -> a -> b
$ forall a. Monoid a => [a] -> a
mconcat
    [ Key
"wid" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= Maybe WorkerId
helloWorkerId
    , Key
"hostname" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= String
helloHostname
    , Key
"pid" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= Integer
helloProcessId
    , Key
"labels" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= [Text]
helloLabels
    , Key
"v" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= Int
helloVersion
    , Key
"pwdhash" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= Maybe Text
helloPasswordHash
    ]

-- | Open a new @'Client'@ connection with the given @'Settings'@
newClient :: HasCallStack => Settings -> Maybe WorkerId -> IO Client
newClient :: HasCallStack => Settings -> Maybe WorkerId -> IO Client
newClient settings :: Settings
settings@Settings {ConnectionInfo
JobOptions
String -> IO ()
settingsDefaultJobOptions :: Settings -> JobOptions
settingsLogError :: Settings -> String -> IO ()
settingsLogDebug :: Settings -> String -> IO ()
settingsConnection :: Settings -> ConnectionInfo
settingsDefaultJobOptions :: JobOptions
settingsLogError :: String -> IO ()
settingsLogDebug :: String -> IO ()
settingsConnection :: ConnectionInfo
..} Maybe WorkerId
mWorkerId =
  forall (m :: * -> *) a b c.
MonadMask m =>
m a -> (a -> m b) -> (a -> m c) -> m c
bracketOnError (ConnectionInfo -> IO Connection
connect ConnectionInfo
settingsConnection) Connection -> IO ()
connectionClose forall a b. (a -> b) -> a -> b
$ \Connection
conn -> do
    Client
client <- MVar Connection -> Settings -> Client
Client forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. a -> IO (MVar a)
newMVar Connection
conn forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> forall (f :: * -> *) a. Applicative f => a -> f a
pure Settings
settings

    ByteString
greeting <-
      forall {m :: * -> *} {a}. MonadThrow m => String -> Maybe a -> m a
fromJustThrows String
"Unexpected end of HI message"
      forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< forall (m :: * -> *) a. MonadThrow m => Either String a -> m a
fromRightThrows
      forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Settings -> Connection -> IO (Either String (Maybe ByteString))
recvUnsafe Settings
settings Connection
conn
    ByteString
stripped <-
      forall {m :: * -> *} {a}. MonadThrow m => String -> Maybe a -> m a
fromJustThrows (String
"Missing HI prefix: " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show ByteString
greeting)
        forall a b. (a -> b) -> a -> b
$ ByteString -> ByteString -> Maybe ByteString
BSL8.stripPrefix ByteString
"HI" ByteString
greeting
    HiPayload {Int
Maybe Int
Maybe Text
hiIterations :: Maybe Int
hiNonce :: Maybe Text
hiVersion :: Int
hiIterations :: HiPayload -> Maybe Int
hiNonce :: HiPayload -> Maybe Text
hiVersion :: HiPayload -> Int
..} <-
      forall {m :: * -> *} {a}. MonadThrow m => String -> Maybe a -> m a
fromJustThrows (String
"Failed to parse HI payload: " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show ByteString
stripped)
        forall a b. (a -> b) -> a -> b
$ forall a. FromJSON a => ByteString -> Maybe a
decode ByteString
stripped

    forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
hiVersion forall a. Ord a => a -> a -> Bool
> Int
expectedProtocolVersion) forall a b. (a -> b) -> a -> b
$ String -> IO ()
settingsLogError forall a b. (a -> b) -> a -> b
$ forall (t :: * -> *) a. Foldable t => t [a] -> [a]
concat
      [ String
"Server's protocol version "
      , forall a. Show a => a -> String
show Int
hiVersion
      , String
" higher than client's expected protocol version "
      , forall a. Show a => a -> String
show Int
expectedProtocolVersion
      ]

    let
      mPassword :: Maybe String
mPassword = ConnectionInfo -> Maybe String
connectionInfoPassword ConnectionInfo
settingsConnection
      mHashedPassword :: Maybe Text
mHashedPassword = Text -> Int -> String -> Text
hashPassword forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe Text
hiNonce forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Maybe Int
hiIterations forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Maybe String
mPassword

    HelloPayload
helloPayload <-
      Maybe WorkerId
-> String -> Integer -> [Text] -> Int -> Maybe Text -> HelloPayload
HelloPayload Maybe WorkerId
mWorkerId (forall a. Show a => a -> String
show forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a b. (a, b) -> a
fst forall a b. (a -> b) -> a -> b
$ Connection -> ConnectionID
connectionID Connection
conn)
      forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (forall a. Integral a => a -> Integer
toInteger forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO ProcessID
getProcessID)
      forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> forall (f :: * -> *) a. Applicative f => a -> f a
pure [Text
"haskell"]
      forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> forall (f :: * -> *) a. Applicative f => a -> f a
pure Int
expectedProtocolVersion
      forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe Text
mHashedPassword

    HasCallStack => Client -> ByteString -> [ByteString] -> IO ()
commandOK Client
client ByteString
"HELLO" [forall a. ToJSON a => a -> ByteString
encode HelloPayload
helloPayload]
    forall (f :: * -> *) a. Applicative f => a -> f a
pure Client
client
  where fromJustThrows :: String -> Maybe a -> m a
fromJustThrows String
message = forall b a. b -> (a -> b) -> Maybe a -> b
maybe (forall (m :: * -> *) a.
(MonadThrow m, HasCallStack) =>
String -> m a
throwString String
message) forall (f :: * -> *) a. Applicative f => a -> f a
pure

-- | Close a @'Client'@
closeClient :: Client -> IO ()
closeClient :: Client -> IO ()
closeClient Client {MVar Connection
Settings
clientSettings :: Settings
clientConnection :: MVar Connection
clientSettings :: Client -> Settings
clientConnection :: Client -> MVar Connection
..} = forall a b. MVar a -> (a -> IO b) -> IO b
withMVar MVar Connection
clientConnection forall a b. (a -> b) -> a -> b
$ \Connection
conn -> do
  Settings -> Connection -> ByteString -> [ByteString] -> IO ()
sendUnsafe Settings
clientSettings Connection
conn ByteString
"END" []
  Connection -> IO ()
connectionClose Connection
conn

-- | Send a command, read and discard the response
command_ :: Client -> ByteString -> [ByteString] -> IO ()
command_ :: Client -> ByteString -> [ByteString] -> IO ()
command_ Client
client ByteString
cmd [ByteString]
args = do
  Either String (Maybe ByteString)
response <- Client
-> ByteString
-> [ByteString]
-> IO (Either String (Maybe ByteString))
commandByteString Client
client ByteString
cmd [ByteString]
args
  forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MonadThrow m => Either String a -> m a
fromRightThrows Either String (Maybe ByteString)
response

-- | Send a command, assert the response is @OK@
commandOK :: HasCallStack => Client -> ByteString -> [ByteString] -> IO ()
commandOK :: HasCallStack => Client -> ByteString -> [ByteString] -> IO ()
commandOK Client
client ByteString
cmd [ByteString]
args = do
  Either String (Maybe ByteString)
response <- Client
-> ByteString
-> [ByteString]
-> IO (Either String (Maybe ByteString))
commandByteString Client
client ByteString
cmd [ByteString]
args
  forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (Either String (Maybe ByteString)
response forall a. Eq a => a -> a -> Bool
== forall a b. b -> Either a b
Right (forall a. a -> Maybe a
Just ByteString
"OK"))
    forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
(MonadThrow m, HasCallStack) =>
String -> m a
throwString
    forall a b. (a -> b) -> a -> b
$ String
"Server not OK. Reply was: "
    forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show Either String (Maybe ByteString)
response

-- | Send a command, parse the response as JSON
commandJSON
  :: FromJSON a
  => Client
  -> ByteString
  -> [ByteString]
  -> IO (Either String (Maybe a))
commandJSON :: forall a.
FromJSON a =>
Client
-> ByteString -> [ByteString] -> IO (Either String (Maybe a))
commandJSON Client
client ByteString
cmd [ByteString]
args = do
  Either String (Maybe ByteString)
emByteString <- Client
-> ByteString
-> [ByteString]
-> IO (Either String (Maybe ByteString))
commandByteString Client
client ByteString
cmd [ByteString]
args
  forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either (forall (f :: * -> *) a. Applicative f => a -> f a
pure forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a b. a -> Either a b
Left) (forall (f :: * -> *) a. Applicative f => a -> f a
pure forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
traverse forall a. FromJSON a => ByteString -> Either String a
eitherDecode) Either String (Maybe ByteString)
emByteString

commandByteString
  :: Client
  -> ByteString
  -> [ByteString]
  -> IO (Either String (Maybe ByteString))
commandByteString :: Client
-> ByteString
-> [ByteString]
-> IO (Either String (Maybe ByteString))
commandByteString Client {MVar Connection
Settings
clientSettings :: Settings
clientConnection :: MVar Connection
clientSettings :: Client -> Settings
clientConnection :: Client -> MVar Connection
..} ByteString
cmd [ByteString]
args = forall a b. MVar a -> (a -> IO b) -> IO b
withMVar MVar Connection
clientConnection forall a b. (a -> b) -> a -> b
$ \Connection
conn ->
  do
    Settings -> Connection -> ByteString -> [ByteString] -> IO ()
sendUnsafe Settings
clientSettings Connection
conn ByteString
cmd [ByteString]
args
    Settings -> Connection -> IO (Either String (Maybe ByteString))
recvUnsafe Settings
clientSettings Connection
conn

-- | Send a command to the Server socket
--
-- Do not use outside of @'withMVar'@, this is not threadsafe.
--
sendUnsafe :: Settings -> Connection -> ByteString -> [ByteString] -> IO ()
sendUnsafe :: Settings -> Connection -> ByteString -> [ByteString] -> IO ()
sendUnsafe Settings {ConnectionInfo
JobOptions
String -> IO ()
settingsDefaultJobOptions :: JobOptions
settingsLogError :: String -> IO ()
settingsLogDebug :: String -> IO ()
settingsConnection :: ConnectionInfo
settingsDefaultJobOptions :: Settings -> JobOptions
settingsLogError :: Settings -> String -> IO ()
settingsLogDebug :: Settings -> String -> IO ()
settingsConnection :: Settings -> ConnectionInfo
..} Connection
conn ByteString
cmd [ByteString]
args = do
  let bs :: ByteString
bs = [ByteString] -> ByteString
BSL8.unwords (ByteString
cmd forall a. a -> [a] -> [a]
: [ByteString]
args)
  String -> IO ()
settingsLogDebug forall a b. (a -> b) -> a -> b
$ String
"> " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show ByteString
bs
  forall (f :: * -> *) a. Functor f => f a -> f ()
void forall b c a. (b -> c) -> (a -> b) -> a -> c
. Connection -> ByteString -> IO ()
connectionPut Connection
conn forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> ByteString
BSL8.toStrict forall a b. (a -> b) -> a -> b
$ ByteString
bs forall a. Semigroup a => a -> a -> a
<> ByteString
"\n"

-- | Receive data from the Server socket
--
-- Do not use outside of @'withMVar'@, this is not threadsafe.
--
recvUnsafe :: Settings -> Connection -> IO (Either String (Maybe ByteString))
recvUnsafe :: Settings -> Connection -> IO (Either String (Maybe ByteString))
recvUnsafe Settings {ConnectionInfo
JobOptions
String -> IO ()
settingsDefaultJobOptions :: JobOptions
settingsLogError :: String -> IO ()
settingsLogDebug :: String -> IO ()
settingsConnection :: ConnectionInfo
settingsDefaultJobOptions :: Settings -> JobOptions
settingsLogError :: Settings -> String -> IO ()
settingsLogDebug :: Settings -> String -> IO ()
settingsConnection :: Settings -> ConnectionInfo
..} Connection
conn = do
  Either String (Maybe ByteString)
emByteString <- IO ByteString -> IO (Either String (Maybe ByteString))
readReply forall a b. (a -> b) -> a -> b
$ Connection -> Int -> IO ByteString
connectionGet Connection
conn Int
4096
  String -> IO ()
settingsLogDebug forall a b. (a -> b) -> a -> b
$ String
"< " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show Either String (Maybe ByteString)
emByteString
  forall (t :: * -> * -> *) (f :: * -> *) a c b d.
(Bitraversable t, Applicative f) =>
(a -> f c) -> (b -> f d) -> t a b -> f (t c d)
bimapM forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall (f :: * -> *) a. Applicative f => a -> f a
pure forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ByteString -> ByteString
fromStrict) Either String (Maybe ByteString)
emByteString

-- | Iteratively apply a function @n@ times
--
-- This is like @iterate f s !! n@ but strict in @s@
--
times :: Int -> (s -> s) -> s -> s
times :: forall s. Int -> (s -> s) -> s -> s
times Int
n s -> s
f !s
s
  | Int
n forall a. Ord a => a -> a -> Bool
<= Int
0 = s
s
  | Bool
otherwise = forall s. Int -> (s -> s) -> s -> s
times (Int
n forall a. Num a => a -> a -> a
- Int
1) s -> s
f (s -> s
f s
s)

-- | Hash password using provided @nonce@ for @n@ iterations
hashPassword :: Text -> Int -> String -> Text
hashPassword :: Text -> Int -> String -> Text
hashPassword Text
nonce Int
n String
password =
  String -> Text
T.pack
    forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. Show a => a -> String
show
    forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall s. Int -> (s -> s) -> s -> s
times (Int
n forall a. Num a => a -> a -> a
- Int
1) forall b. ByteArrayAccess b => b -> Digest SHA256
hash
    forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall b. ByteArrayAccess b => b -> Digest SHA256
hash
    forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> ByteString
T.encodeUtf8
    forall a b. (a -> b) -> a -> b
$ String -> Text
T.pack String
password
    forall a. Semigroup a => a -> a -> a
<> Text
nonce
 where
  -- Note that we use hash at two different types above.
  --
  -- 1. hash :: ByteString    -> Digest SHA256
  -- 2. hash :: Digest SHA256 -> Digest SHA256
  hash :: (ByteArrayAccess b) => b -> Digest SHA256
  hash :: forall b. ByteArrayAccess b => b -> Digest SHA256
hash = forall ba alg.
(ByteArrayAccess ba, HashAlgorithm alg) =>
alg -> ba -> Digest alg
hashWith SHA256
SHA256

-- | Protocol version the client expects
expectedProtocolVersion :: Int
expectedProtocolVersion :: Int
expectedProtocolVersion = Int
2