{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE OverloadedStrings #-}
module Network.Legion.Discovery.Client (
connect,
query,
withService,
newLB,
withResponse,
LBHttp,
Discovery,
Name(..),
ServiceAddr(..),
) where
import Control.Concurrent (newEmptyMVar, MVar, putMVar, tryTakeMVar,
forkIO, threadDelay)
import Control.Concurrent.LoadDistribution (withResource,
evenlyDistributed, LoadBalanced)
import Control.Exception (bracket)
import Control.Monad (void)
import Data.Aeson (eitherDecode, Value, encode, object, (.=))
import Data.ByteString.Lazy (fromChunks)
import Data.Default.Class (def)
import Data.Map (Map)
import Data.Monoid ((<>))
import Data.Set (Set)
import Data.String (IsString)
import Data.Text (Text, unpack, pack)
import Data.Text.Encoding (encodeUtf8)
import Distribution.Text (display)
import Distribution.Version (VersionRange, Version)
import Network.HTTP.Client (Request, Response, BodyReader,
parseRequest, host, secure, port, Manager, requestHeaders,
checkStatus, responseStatus, brConsume, responseBody, path, method,
RequestBody(RequestBodyLBS), requestBody)
import Network.HTTP.Types (urlEncode, statusIsSuccessful)
import qualified Data.Map as Map
import qualified Data.Set as Set
import qualified Network.HTTP.Client as C
data Discovery = D {
dName :: Name,
dVersion :: Version,
dLb :: LBHttp
}
connect
:: Name
-> Version
-> Set ServiceAddr
-> Manager
-> IO Discovery
connect name version urls manager =
D name version . LB manager <$> evenlyDistributed (return urls)
withService
:: ServiceAddr
-> Discovery
-> IO a
-> IO a
withService addy d =
bracket launchPing stopPing . const
where
launchPing :: IO (MVar ())
launchPing = do
stop <- newEmptyMVar
void . forkIO $
let
loop =
tryTakeMVar stop >>= \case
Nothing -> ping >> threadDelay tenSeconds >> loop
Just () -> return ()
in loop
return stop
stopPing :: MVar () -> IO ()
stopPing stop = putMVar stop ()
ping :: IO ()
ping = do
let
userAgent = encodeUtf8
$ unName (dName d) <> "-" <> pack (display (dVersion d))
req = def {
path =
"/v1/ping/"
<> urlEncode False (encodeUtf8 (unName (dName d)))
<> "/"
<> urlEncode False (encodeUtf8 (pack (display (dVersion d)))),
method = "POST",
requestHeaders = [
("user-agent", userAgent),
("content-type", pingRequestCT)
],
checkStatus = const . const . const $ Nothing,
requestBody = RequestBodyLBS . encode . object $ [
"serviceAddress" .= unServiceAddr addy
]
}
withResponse req (dLb d) (const (return ()))
tenSeconds :: Int
tenSeconds = 10000000
query
:: Name
-> VersionRange
-> Discovery
-> IO (Set ServiceAddr)
query name range d = do
let
userAgent = encodeUtf8
$ unName (dName d) <> "-" <> pack (display (dVersion d))
req = def {
path =
"/v1/services/"
<> urlEncode False (encodeUtf8 (unName name))
<> "/"
<> urlEncode False (encodeUtf8 (pack (display range))),
method = "GET",
requestHeaders = [("user-agent", userAgent)],
checkStatus = const . const . const $ Nothing
}
withResponse req (dLb d) (\case
Nothing -> fail "No Discovery instances available."
Just resp -> do
body <- brConsume (responseBody resp)
case responseStatus resp of
status | statusIsSuccessful status ->
case eitherDecode (fromChunks body) of
Left err -> fail
$ "Couldn't decode Discovery response: " ++ show err
Right instances -> return (
Set.fromList
(ServiceAddr <$> Map.keys (instances :: Map Text Value))
)
status -> fail $ "Bad response from Discovery:" ++ show (status, body)
)
newtype Name = Name {unName :: Text} deriving (IsString)
newtype ServiceAddr = ServiceAddr {unServiceAddr :: Text}
deriving (IsString, Eq, Ord, Show)
withResponse
:: Request
-> LBHttp
-> (Maybe (Response BodyReader) -> IO a)
-> IO a
withResponse r lb f = withResource (lbLb lb) (\case
Nothing -> f Nothing
Just url -> do
lbReq <- parseRequest (unpack (unServiceAddr url))
let
req = r {
host = host lbReq,
secure = secure lbReq,
port = port lbReq
}
C.withResponse req (lbManager lb) (f . Just)
)
data LBHttp = LB {
lbManager :: Manager,
lbLb :: LoadBalanced ServiceAddr
}
newLB
:: Discovery
-> Name
-> VersionRange
-> IO LBHttp
newLB d n r = LB (lbManager (dLb d)) <$> evenlyDistributed (query n r d)
pingRequestCT :: (IsString a) => a
pingRequestCT = "application/vnd.legion-discovery.ping-request+json"