{-# LANGUAGE GeneralizedNewtypeDeriving #-} {-# LANGUAGE LambdaCase #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE TemplateHaskell #-} {- | This module contains a client library for communicating with the 'legion-discovery' service discovery program. -} module Network.Legion.Discovery.Client ( -- * Establishing a Connection. connect, -- * Performing Queries. query, -- * Registering a Service. withService, -- * HTTP Utilities. newLB, withResponse, httpLbs, -- * Other Types. LBHttp, Discovery, Name(..), ServiceAddr(..), ) where import Control.Concurrent (newEmptyMVar, MVar, putMVar, tryTakeMVar, forkIO, threadDelay) import Control.Concurrent.LoadDistribution (withResource, evenlyDistributed, LoadBalanced) import Control.Exception.Safe (tryAny, MonadCatch) import Control.Monad (void) import Control.Monad.IO.Class (liftIO) import Control.Monad.Logger (MonadLoggerIO, askLoggerIO, runLoggingT, Loc, LogSource, LogLevel, LogStr, logError) import Control.Monad.Trans.Class (lift) import Control.Monad.Trans.Resource (runResourceT, allocate) import Data.Aeson (eitherDecode, Value, encode, object, (.=)) import Data.ByteString.Lazy (fromChunks) import Data.Map (Map) import Data.Monoid ((<>)) import Data.Set (Set) import Data.String (IsString) import Data.Text (Text, unpack, pack) import Data.Text.Encoding (encodeUtf8) import Distribution.Text (display) import Distribution.Version (VersionRange, Version) import Network.HTTP.Client (Request, Response, BodyReader, parseRequest, host, secure, port, Manager, requestHeaders, responseStatus, brConsume, responseBody, path, method, RequestBody(RequestBodyLBS), requestBody, defaultRequest) import Network.HTTP.Types (urlEncode, statusIsSuccessful) import TextShow (showt) import qualified Data.ByteString as BS import qualified Data.ByteString.Lazy as BSL import qualified Data.Map as Map import qualified Data.Set as Set import qualified Network.HTTP.Client as C {- | This type represents a handle on the discovery service. -} data Discovery = D { dName :: Name, dVersion :: Version, dLb :: LBHttp } {- | Create a connection to the discovery service. -} connect :: Name {- ^ The name of the local program. This is used to track and display a network graph of service dependencies. -} -> Version {- ^ The version of the local program. -} -> Set ServiceAddr {- ^ The well-known set of urls where the discovery service lives. -} -> Manager {- ^ The http manager used to manage communication to the discovery service, and also for communication with any services accessed via the 'withResponse' function. -} -> IO Discovery connect name version urls manager = do lb <- evenlyDistributed (return urls) return $ D name version (LB manager lb (pack (show urls))) {- | Run a registered service, making sure to unregister upon ternination. -} withService :: (MonadLoggerIO m) => ServiceAddr {- ^ The service address on which your service instance can be contacted. -} -> Discovery {- ^ A handle on the discovery service, obtained via `connect`. -} -> IO a {- ^ The IO action to perform while registered as a service. -} -> m a withService addy d io = do logging <- askLoggerIO liftIO . runResourceT $ do void $ allocate (launchPing logging) stopPing lift io where launchPing :: (Loc -> LogSource -> LogLevel -> LogStr -> IO ()) -> IO (MVar ()) launchPing logging = do stop <- newEmptyMVar void . forkIO $ let loop = tryTakeMVar stop >>= \case Nothing -> runLoggingT ping logging >> threadDelay tenSeconds >> loop Just () -> return () in loop return stop stopPing :: MVar () -> IO () stopPing stop = putMVar stop () ping :: (MonadLoggerIO m, MonadCatch m) => m () ping = do let userAgent = encodeUtf8 $ unName (dName d) <> "/" <> pack (display (dVersion d)) req = defaultRequest { path = "/v1/ping", method = "POST", requestHeaders = [ ("user-agent", userAgent), ("content-type", pingRequestCT) ], requestBody = RequestBodyLBS . encode . object $ [ "serviceAddress" .= unServiceAddr addy ] } {- TODO figure out what we want to do when the ping fails. -} tryAny (liftIO (withResponse req (dLb d) (const (return ())))) >>= \case Left err -> $(logError) $ "Can't ping legion-discovery service: " <> showt err Right () -> return () tenSeconds :: Int tenSeconds = 10000000 {- in microseconds. -} {- | Query the discovery service. -} query :: Name {- ^ The name of the service you are looking for. -} -> VersionRange {- ^ The range of service versions with which you are compatible. -} -> Discovery {- ^ A handle on the discovery service. -} -> IO (Set ServiceAddr) query name range d = do let userAgent = encodeUtf8 $ unName (dName d) <> "/" <> pack (display (dVersion d)) req = defaultRequest { path = "/v1/services/" <> urlEncode False (encodeUtf8 (unName name)) <> "/" <> urlEncode False (encodeUtf8 (pack (display range))), method = "GET", requestHeaders = [("user-agent", userAgent)] } withResponse req (dLb d) (\case Nothing -> fail "No Discovery instances available." Just resp -> do body <- brConsume (responseBody resp) case responseStatus resp of status | statusIsSuccessful status -> case eitherDecode (fromChunks body) of Left err -> fail $ "Couldn't decode Discovery response: " ++ show err Right instances -> return ( Set.fromList (ServiceAddr <$> Map.keys (instances :: Map Text Value)) ) status -> fail $ "Bad response from Discovery:" ++ show (status, body) ) {- | The name of a service. -} newtype Name = Name {unName :: Text} deriving (IsString) {- | The type of a service address. A 'ServiceAddr' is a wrapper around an unstructured text value. The meaning of the text value is dependent on the registered service, as it is the one responsible for publishing the service address. We __highly__ recommend that registered services choose to publish a fully qualified URL as their service address. -} newtype ServiceAddr = ServiceAddr {unServiceAddr :: Text} deriving (IsString, Eq, Ord, Show) {- | Analog of 'http-client''s 'C.withResponse', but automatically replaces the host, port, and scheme portions of the 'Request' with values obtained from the load balancer. If a 'Nothing' value is passed to the response handler, that means that there are no available instances that match the query params that were passed to 'newLB'. -} withResponse :: Request -> LBHttp -> (Maybe (Response BodyReader) -> IO a) -> IO a withResponse r lb f = withResource (lbLb lb) (\case Nothing -> f Nothing Just url -> do lbReq <- parseRequest (unpack (unServiceAddr url)) let req = r { host = host lbReq, secure = secure lbReq, port = port lbReq } C.withResponse req (lbManager lb) (f . Just) ) {- | Analog of 'http-client''s 'C.httpLbs', but automatically replaces the host, port, and scheme portions of the 'Request' with values obtained from the load balancer. If a 'Nothing' value is returned, that means that there are no available instances that match the query params that were passed to 'newLB'. -} httpLbs :: Request -> LBHttp -> IO (Maybe (Response BSL.ByteString)) httpLbs r lb = withResponse r lb (\case Nothing -> return Nothing Just res -> do chunks <- consumeAll (responseBody res) return (Just res {responseBody = BSL.fromChunks chunks}) ) where consumeAll :: IO BS.ByteString -> IO [BS.ByteString] consumeAll io = do chunk <- io if BS.null chunk then return [] else do more <- consumeAll io return (chunk:more) {- | A handle on the load balancing context. -} data LBHttp = LB { lbManager :: Manager, lbLb :: LoadBalanced ServiceAddr, lbDesc :: Text } instance Show LBHttp where show = unpack . lbDesc {- | Create a new load balanced http client, for use with 'withResponse'. -} newLB :: Discovery {- ^ A handle on the discovery service, obtained via 'connect'. -} -> Name {- ^ The name of the target service. -} -> VersionRange {- ^ The range of service versions with which you are compatible. -} -> IO LBHttp {- ^ Returns a load balanced http client, for use with 'withResponse'. -} newLB d n r = LB <$> pure (lbManager (dLb d)) <*> evenlyDistributed (query n r d) <*> pure (unName n <> " => " <> pack (display r)) {- | The content type of a ping request. -} pingRequestCT :: (IsString a) => a pingRequestCT = "application/vnd.legion-discovery.ping-request+json"