{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE OverloadedStrings #-}
module Kubernetes.Client.Watch
  ( WatchEvent
  , eventType
  , eventObject
  , dispatchWatch
  ) where

import Control.Monad
import Control.Monad.Trans (lift)
import Data.Aeson
import qualified Data.ByteString as B
import qualified Data.ByteString.Streaming.Char8 as Q
import qualified Data.Text as T
import Kubernetes.OpenAPI.Core
import Kubernetes.OpenAPI.Client
import Kubernetes.OpenAPI.MimeTypes
import Kubernetes.OpenAPI.Model (Watch(..))
import Network.HTTP.Client

data WatchEvent a = WatchEvent
  { WatchEvent a -> Text
_eventType :: T.Text
  , WatchEvent a -> a
_eventObject :: a
  } deriving (WatchEvent a -> WatchEvent a -> Bool
(WatchEvent a -> WatchEvent a -> Bool)
-> (WatchEvent a -> WatchEvent a -> Bool) -> Eq (WatchEvent a)
forall a. Eq a => WatchEvent a -> WatchEvent a -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: WatchEvent a -> WatchEvent a -> Bool
$c/= :: forall a. Eq a => WatchEvent a -> WatchEvent a -> Bool
== :: WatchEvent a -> WatchEvent a -> Bool
$c== :: forall a. Eq a => WatchEvent a -> WatchEvent a -> Bool
Eq, Int -> WatchEvent a -> ShowS
[WatchEvent a] -> ShowS
WatchEvent a -> String
(Int -> WatchEvent a -> ShowS)
-> (WatchEvent a -> String)
-> ([WatchEvent a] -> ShowS)
-> Show (WatchEvent a)
forall a. Show a => Int -> WatchEvent a -> ShowS
forall a. Show a => [WatchEvent a] -> ShowS
forall a. Show a => WatchEvent a -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [WatchEvent a] -> ShowS
$cshowList :: forall a. Show a => [WatchEvent a] -> ShowS
show :: WatchEvent a -> String
$cshow :: forall a. Show a => WatchEvent a -> String
showsPrec :: Int -> WatchEvent a -> ShowS
$cshowsPrec :: forall a. Show a => Int -> WatchEvent a -> ShowS
Show)

instance FromJSON a => FromJSON (WatchEvent a) where
  parseJSON :: Value -> Parser (WatchEvent a)
parseJSON (Object Object
x) = Text -> a -> WatchEvent a
forall a. Text -> a -> WatchEvent a
WatchEvent (Text -> a -> WatchEvent a)
-> Parser Text -> Parser (a -> WatchEvent a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Object
x Object -> Text -> Parser Text
forall a. FromJSON a => Object -> Text -> Parser a
.: Text
"type" Parser (a -> WatchEvent a) -> Parser a -> Parser (WatchEvent a)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Object
x Object -> Text -> Parser a
forall a. FromJSON a => Object -> Text -> Parser a
.: Text
"object"
  parseJSON Value
_ = String -> Parser (WatchEvent a)
forall (m :: * -> *) a. MonadFail m => String -> m a
fail String
"Expected an object"

instance ToJSON a => ToJSON (WatchEvent a) where
  toJSON :: WatchEvent a -> Value
toJSON WatchEvent a
x = [Pair] -> Value
object
    [ Text
"type"    Text -> Text -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Text -> v -> kv
.= WatchEvent a -> Text
forall a. WatchEvent a -> Text
_eventType WatchEvent a
x
    , Text
"object"  Text -> a -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Text -> v -> kv
.= WatchEvent a -> a
forall a. WatchEvent a -> a
_eventObject WatchEvent a
x
    ]

-- | Type of the 'WatchEvent'.
eventType :: WatchEvent a -> T.Text
eventType :: WatchEvent a -> Text
eventType = WatchEvent a -> Text
forall a. WatchEvent a -> Text
_eventType

-- | Object within the 'WatchEvent'.
eventObject :: WatchEvent a -> a
eventObject :: WatchEvent a -> a
eventObject = WatchEvent a -> a
forall a. WatchEvent a -> a
_eventObject

{-| Dispatch a request setting watch to true. Takes a consumer function
which consumes the 'Q.ByteString' stream. Following is a simple example which
just streams to stdout. First some setup - this assumes kubernetes is accessible
at http://localhost:8001, e.g. after running /kubectl proxy/:

@
import qualified Data.ByteString.Streaming.Char8 as Q

manager <- newManager defaultManagerSettings
defaultConfig <- newConfig
config = defaultConfig { configHost = "http://localhost:8001", configValidateAuthMethods = False }
request = listEndpointsForAllNamespaces (Accept MimeJSON)
@

Launching 'dispatchWatch' with the above we get a stream of endpoints data:

@
 > dispatchWatch manager config request Q.stdout
 {"type":\"ADDED\","object":{"kind":\"Endpoints\","apiVersion":"v1","metadata":{"name":"heapster" ....
@
-}
dispatchWatch ::
  (HasOptionalParam req Watch, MimeType accept, MimeType contentType) =>
    Manager
    -> KubernetesClientConfig
    -> KubernetesRequest req contentType resp accept
    -> (Q.ByteString IO () -> IO a)
    -> IO a
dispatchWatch :: Manager
-> KubernetesClientConfig
-> KubernetesRequest req contentType resp accept
-> (ByteString IO () -> IO a)
-> IO a
dispatchWatch Manager
manager KubernetesClientConfig
config KubernetesRequest req contentType resp accept
request ByteString IO () -> IO a
apply = do
  let watchRequest :: KubernetesRequest req contentType resp accept
watchRequest = KubernetesRequest req contentType resp accept
-> Watch -> KubernetesRequest req contentType resp accept
forall req param contentType res accept.
HasOptionalParam req param =>
KubernetesRequest req contentType res accept
-> param -> KubernetesRequest req contentType res accept
applyOptionalParam KubernetesRequest req contentType resp accept
request (Bool -> Watch
Watch Bool
True)
  (InitRequest Request
req) <- KubernetesClientConfig
-> KubernetesRequest req contentType resp accept
-> IO (InitRequest req contentType resp accept)
forall accept contentType req res.
(MimeType accept, MimeType contentType) =>
KubernetesClientConfig
-> KubernetesRequest req contentType res accept
-> IO (InitRequest req contentType res accept)
_toInitRequest KubernetesClientConfig
config KubernetesRequest req contentType resp accept
watchRequest
  Request -> Manager -> (Response (ByteString IO ()) -> IO a) -> IO a
forall a.
Request -> Manager -> (Response (ByteString IO ()) -> IO a) -> IO a
withHTTP Request
req Manager
manager ((Response (ByteString IO ()) -> IO a) -> IO a)
-> (Response (ByteString IO ()) -> IO a) -> IO a
forall a b. (a -> b) -> a -> b
$ \Response (ByteString IO ())
resp -> ByteString IO () -> IO a
apply (ByteString IO () -> IO a) -> ByteString IO () -> IO a
forall a b. (a -> b) -> a -> b
$ Response (ByteString IO ()) -> ByteString IO ()
forall body. Response body -> body
responseBody Response (ByteString IO ())
resp

withHTTP ::
  Request
  -> Manager
  -> (Response (Q.ByteString IO ()) -> IO a)
  -> IO a
withHTTP :: Request -> Manager -> (Response (ByteString IO ()) -> IO a) -> IO a
withHTTP Request
request Manager
manager Response (ByteString IO ()) -> IO a
f = Request -> Manager -> (Response BodyReader -> IO a) -> IO a
forall a.
Request -> Manager -> (Response BodyReader -> IO a) -> IO a
withResponse Request
request Manager
manager Response BodyReader -> IO a
f'
  where
    f' :: Response BodyReader -> IO a
f' Response BodyReader
resp = do
      let p :: ByteString IO ()
p = (BodyReader -> ByteString IO ()
from (BodyReader -> ByteString IO ())
-> (Response BodyReader -> BodyReader)
-> Response BodyReader
-> ByteString IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. BodyReader -> BodyReader
brRead (BodyReader -> BodyReader)
-> (Response BodyReader -> BodyReader)
-> Response BodyReader
-> BodyReader
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Response BodyReader -> BodyReader
forall body. Response body -> body
responseBody) Response BodyReader
resp
      Response (ByteString IO ()) -> IO a
f (Response BodyReader
resp {responseBody :: ByteString IO ()
responseBody = ByteString IO ()
p})
    from :: IO B.ByteString -> Q.ByteString IO ()
    from :: BodyReader -> ByteString IO ()
from BodyReader
io = ByteString IO ()
go
      where
        go :: ByteString IO ()
go = do
          ByteString
bs <- BodyReader -> ByteStream IO ByteString
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift BodyReader
io
          Bool -> ByteString IO () -> ByteString IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (ByteString -> Bool
B.null ByteString
bs) (ByteString IO () -> ByteString IO ())
-> ByteString IO () -> ByteString IO ()
forall a b. (a -> b) -> a -> b
$ do
            ByteString -> ByteString IO ()
forall (m :: * -> *). ByteString -> ByteStream m ()
Q.chunk ByteString
bs
            ByteString IO ()
go