{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE OverloadedStrings #-}
module Network.Legion.Discovery.Client (
connect,
query,
withService,
newLB,
withResponse,
httpLbs,
LBHttp,
Discovery,
Name(..),
ServiceAddr(..),
) where
import Control.Concurrent (newEmptyMVar, MVar, putMVar, tryTakeMVar,
forkIO, threadDelay)
import Control.Concurrent.LoadDistribution (withResource,
evenlyDistributed, LoadBalanced)
import Control.Monad (void)
import Control.Monad.Trans.Class (lift)
import Control.Monad.Trans.Resource (runResourceT, allocate)
import Data.Aeson (eitherDecode, Value, encode, object, (.=))
import Data.ByteString.Lazy (fromChunks)
import Data.Map (Map)
import Data.Monoid ((<>))
import Data.Set (Set)
import Data.String (IsString)
import Data.Text (Text, unpack, pack)
import Data.Text.Encoding (encodeUtf8)
import Distribution.Text (display)
import Distribution.Version (VersionRange, Version)
import Network.HTTP.Client (Request, Response, BodyReader, parseRequest,
host, secure, port, Manager, requestHeaders, responseStatus, brConsume,
responseBody, path, method, RequestBody(RequestBodyLBS), requestBody,
defaultRequest)
import Network.HTTP.Types (urlEncode, statusIsSuccessful)
import qualified Data.ByteString as BS
import qualified Data.ByteString.Lazy as BSL
import qualified Data.Map as Map
import qualified Data.Set as Set
import qualified Network.HTTP.Client as C
data Discovery = D {
dName :: Name,
dVersion :: Version,
dLb :: LBHttp
}
connect
:: Name
-> Version
-> Set ServiceAddr
-> Manager
-> IO Discovery
connect name version urls manager = do
lb <- evenlyDistributed (return urls)
return $ D name version (LB manager lb (pack (show urls)))
withService
:: ServiceAddr
-> Discovery
-> IO a
-> IO a
withService addy d io =
runResourceT $ do
void $ allocate launchPing stopPing
lift io
where
launchPing :: IO (MVar ())
launchPing = do
stop <- newEmptyMVar
void . forkIO $
let
loop =
tryTakeMVar stop >>= \case
Nothing -> ping >> threadDelay tenSeconds >> loop
Just () -> return ()
in loop
return stop
stopPing :: MVar () -> IO ()
stopPing stop = putMVar stop ()
ping :: IO ()
ping = do
let
userAgent = encodeUtf8
$ unName (dName d) <> "/" <> pack (display (dVersion d))
req = defaultRequest {
path = "/v1/ping",
method = "POST",
requestHeaders = [
("user-agent", userAgent),
("content-type", pingRequestCT)
],
requestBody = RequestBodyLBS . encode . object $ [
"serviceAddress" .= unServiceAddr addy
]
}
withResponse req (dLb d) (const (return ()))
tenSeconds :: Int
tenSeconds = 10000000
query
:: Name
-> VersionRange
-> Discovery
-> IO (Set ServiceAddr)
query name range d = do
let
userAgent = encodeUtf8
$ unName (dName d) <> "-" <> pack (display (dVersion d))
req = defaultRequest {
path =
"/v1/services/"
<> urlEncode False (encodeUtf8 (unName name))
<> "/"
<> urlEncode False (encodeUtf8 (pack (display range))),
method = "GET",
requestHeaders = [("user-agent", userAgent)]
}
withResponse req (dLb d) (\case
Nothing -> fail "No Discovery instances available."
Just resp -> do
body <- brConsume (responseBody resp)
case responseStatus resp of
status | statusIsSuccessful status ->
case eitherDecode (fromChunks body) of
Left err -> fail
$ "Couldn't decode Discovery response: " ++ show err
Right instances -> return (
Set.fromList
(ServiceAddr <$> Map.keys (instances :: Map Text Value))
)
status -> fail $ "Bad response from Discovery:" ++ show (status, body)
)
newtype Name = Name {unName :: Text} deriving (IsString)
newtype ServiceAddr = ServiceAddr {unServiceAddr :: Text}
deriving (IsString, Eq, Ord, Show)
withResponse
:: Request
-> LBHttp
-> (Maybe (Response BodyReader) -> IO a)
-> IO a
withResponse r lb f = withResource (lbLb lb) (\case
Nothing -> f Nothing
Just url -> do
lbReq <- parseRequest (unpack (unServiceAddr url))
let
req = r {
host = host lbReq,
secure = secure lbReq,
port = port lbReq
}
C.withResponse req (lbManager lb) (f . Just)
)
httpLbs
:: Request
-> LBHttp
-> IO (Maybe (Response BSL.ByteString))
httpLbs r lb = withResponse r lb (\case
Nothing -> return Nothing
Just res -> do
chunks <- consumeAll (responseBody res)
return (Just res {responseBody = BSL.fromChunks chunks})
)
where
consumeAll :: IO BS.ByteString -> IO [BS.ByteString]
consumeAll io = do
chunk <- io
if BS.null chunk
then return []
else do
more <- consumeAll io
return (chunk:more)
data LBHttp = LB {
lbManager :: Manager,
lbLb :: LoadBalanced ServiceAddr,
lbDesc :: Text
}
instance Show LBHttp where
show = unpack . lbDesc
newLB
:: Discovery
-> Name
-> VersionRange
-> IO LBHttp
newLB d n r =
LB
<$> pure (lbManager (dLb d))
<*> evenlyDistributed (query n r d)
<*> pure (unName n <> " => " <> pack (display r))
pingRequestCT :: (IsString a) => a
pingRequestCT = "application/vnd.legion-discovery.ping-request+json"