{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE TemplateHaskell #-}
module Network.Legion.Discovery.Client (
connect,
query,
withService,
newLB,
withResponse,
httpLbs,
LBHttp,
Discovery,
Name(..),
ServiceAddr(..),
) where
import Control.Concurrent (newEmptyMVar, MVar, putMVar, tryTakeMVar,
forkIO, threadDelay)
import Control.Concurrent.LoadDistribution (withResource,
evenlyDistributed, LoadBalanced)
import Control.Exception.Safe (tryAny, MonadCatch)
import Control.Monad (void)
import Control.Monad.IO.Class (liftIO)
import Control.Monad.Logger (MonadLoggerIO, askLoggerIO, runLoggingT,
Loc, LogSource, LogLevel, LogStr, logError)
import Control.Monad.Trans.Class (lift)
import Control.Monad.Trans.Resource (runResourceT, allocate)
import Data.Aeson (eitherDecode, Value, encode, object, (.=))
import Data.ByteString.Lazy (fromChunks)
import Data.Map (Map)
import Data.Monoid ((<>))
import Data.Set (Set)
import Data.String (IsString)
import Data.Text (Text, unpack, pack)
import Data.Text.Encoding (encodeUtf8)
import Distribution.Text (display)
import Distribution.Version (VersionRange, Version)
import Network.HTTP.Client (Request, Response, BodyReader, parseRequest,
host, secure, port, Manager, requestHeaders, responseStatus, brConsume,
responseBody, path, method, RequestBody(RequestBodyLBS), requestBody,
defaultRequest)
import Network.HTTP.Types (urlEncode, statusIsSuccessful)
import TextShow (showt)
import qualified Data.ByteString as BS
import qualified Data.ByteString.Lazy as BSL
import qualified Data.Map as Map
import qualified Data.Set as Set
import qualified Network.HTTP.Client as C
data Discovery = D {
dName :: Name,
dVersion :: Version,
dLb :: LBHttp
}
connect
:: Name
-> Version
-> Set ServiceAddr
-> Manager
-> IO Discovery
connect name version urls manager = do
lb <- evenlyDistributed (return urls)
return $ D name version (LB manager lb (pack (show urls)))
withService :: (MonadLoggerIO m)
=> ServiceAddr
-> Discovery
-> IO a
-> m a
withService addy d io = do
logging <- askLoggerIO
liftIO . runResourceT $ do
void $ allocate (launchPing logging) stopPing
lift io
where
launchPing
:: (Loc -> LogSource -> LogLevel -> LogStr -> IO ())
-> IO (MVar ())
launchPing logging = do
stop <- newEmptyMVar
void . forkIO $
let
loop =
tryTakeMVar stop >>= \case
Nothing -> runLoggingT ping logging >> threadDelay tenSeconds >> loop
Just () -> return ()
in loop
return stop
stopPing :: MVar () -> IO ()
stopPing stop = putMVar stop ()
ping :: (MonadLoggerIO m, MonadCatch m) => m ()
ping = do
let
userAgent = encodeUtf8
$ unName (dName d) <> "/" <> pack (display (dVersion d))
req = defaultRequest {
path = "/v1/ping",
method = "POST",
requestHeaders = [
("user-agent", userAgent),
("content-type", pingRequestCT)
],
requestBody = RequestBodyLBS . encode . object $ [
"serviceAddress" .= unServiceAddr addy
]
}
tryAny (liftIO (withResponse req (dLb d) (const (return ())))) >>= \case
Left err -> $(logError)
$ "Can't ping legion-discovery service: " <> showt err
Right () -> return ()
tenSeconds :: Int
tenSeconds = 10000000
query
:: Name
-> VersionRange
-> Discovery
-> IO (Set ServiceAddr)
query name range d = do
let
userAgent = encodeUtf8
$ unName (dName d) <> "/" <> pack (display (dVersion d))
req = defaultRequest {
path =
"/v1/services/"
<> urlEncode False (encodeUtf8 (unName name))
<> "/"
<> urlEncode False (encodeUtf8 (pack (display range))),
method = "GET",
requestHeaders = [("user-agent", userAgent)]
}
withResponse req (dLb d) (\case
Nothing -> fail "No Discovery instances available."
Just resp -> do
body <- brConsume (responseBody resp)
case responseStatus resp of
status | statusIsSuccessful status ->
case eitherDecode (fromChunks body) of
Left err -> fail
$ "Couldn't decode Discovery response: " ++ show err
Right instances -> return (
Set.fromList
(ServiceAddr <$> Map.keys (instances :: Map Text Value))
)
status -> fail $ "Bad response from Discovery:" ++ show (status, body)
)
newtype Name = Name {unName :: Text} deriving (IsString)
newtype ServiceAddr = ServiceAddr {unServiceAddr :: Text}
deriving (IsString, Eq, Ord, Show)
withResponse
:: Request
-> LBHttp
-> (Maybe (Response BodyReader) -> IO a)
-> IO a
withResponse r lb f = withResource (lbLb lb) (\case
Nothing -> f Nothing
Just url -> do
lbReq <- parseRequest (unpack (unServiceAddr url))
let
req = r {
host = host lbReq,
secure = secure lbReq,
port = port lbReq
}
C.withResponse req (lbManager lb) (f . Just)
)
httpLbs
:: Request
-> LBHttp
-> IO (Maybe (Response BSL.ByteString))
httpLbs r lb = withResponse r lb (\case
Nothing -> return Nothing
Just res -> do
chunks <- consumeAll (responseBody res)
return (Just res {responseBody = BSL.fromChunks chunks})
)
where
consumeAll :: IO BS.ByteString -> IO [BS.ByteString]
consumeAll io = do
chunk <- io
if BS.null chunk
then return []
else do
more <- consumeAll io
return (chunk:more)
data LBHttp = LB {
lbManager :: Manager,
lbLb :: LoadBalanced ServiceAddr,
lbDesc :: Text
}
instance Show LBHttp where
show = unpack . lbDesc
newLB
:: Discovery
-> Name
-> VersionRange
-> IO LBHttp
newLB d n r =
LB
<$> pure (lbManager (dLb d))
<*> evenlyDistributed (query n r d)
<*> pure (unName n <> " => " <> pack (display r))
pingRequestCT :: (IsString a) => a
pingRequestCT = "application/vnd.legion-discovery.ping-request+json"