{-# LANGUAGE TypeFamilies     #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}

-- | Simple lib to access Etcd over gRPC.
--
-- This library offers a low-on-details EtcdV3 APIs to easily add distributed
-- system mechanisms into your Haskell applications.
--
-- This library uses the 'Network.GRPC.Client.Helpers' module from
-- 'http2-client-grpc', which may trade some functionalities for the sake of
-- simplicity. A typically hidden functionality is the possibility to abort a
-- query upon a client decision (e.g., while waiting for some Lock).
--
-- In general, this library provides some simplification to express the input
-- messages of RPCs. For instance, the MVCC capabilities are mostly hidden from
-- the _querying_ side of this library (i.e., queries will requet the latest
-- revision of a key/value pair). It is pretty simple, however, to reuse the
-- smart constructor (such as 'putReq' and continue editing values with lens)
-- and then call the gRPC RPC in a one liner.
--
-- A reason for this design is that the intended user of this library will run
-- a continuous loop to acquire some leadership or monitor some values for the
-- whole duration of a program.
module Network.EtcdV3
    (
    -- * Generalities.
      etcdClientConfigSimple
    , EtcdQuery
    -- * Reading.
    , KeyRange(..)
    , range
    , rangeReq
    , rangeResponsePairs
    -- * Granting leases.
    , grantLease
    , GrantedLease
    , fromLeaseGrantResponse
    , keepAlive
    -- * Writing.
    , put
    , putReq
    , delete
    , delReq
    -- * Locking.
    , AcquiredLock
    , fromLockResponse
    , lock
    , unlock
    -- * Transactions.
    , transaction
    -- * A Few Smart Constructors To Make Life Easy When Building Transactions.
    , successTxnReq
    , kvEq
    , kvNeq
    , kvGt
    , kvLt
    -- * Watches.
    , watchForever
    , watchReq
    -- * Leader election.
    , Election(..)
    , LeaderEvidence
    , runForLeadership
    , fromCampaignResponse
    , proclaim
    , resign
    , getProclaimedValue
    , observeProclaimedValues
    , proclaimReq
    , campaignReq
    , resignReq
    , leaderReq
    -- * re-exports
    , defMessage
    , module Control.Lens
    ) where

import Control.Lens
import Control.Exception (throwIO)
import Data.ByteString (ByteString)
import qualified Data.ByteString.Char8 as C8
import Data.ProtoLens.Message (defMessage)
import Data.Semigroup (Endo)
import Data.String (IsString)
import GHC.Int (Int64)
import Network.GRPC.Client
import Network.GRPC.Client.Helpers
import Network.HTTP2.Client (TooMuchConcurrency)
import Network.Socket (HostName, PortNumber)
import qualified Proto.Etcd.Etcdserver.Etcdserverpb.Rpc as EtcdRPC
import qualified Proto.Etcd.Etcdserver.Etcdserverpb.Rpc_Fields as EtcdPB
import qualified Proto.Etcd.Etcdserver.Api.V3lock.V3lockpb.V3lock as LockRPC
import qualified Proto.Etcd.Etcdserver.Api.V3lock.V3lockpb.V3lock_Fields as LockPB
import qualified Proto.Etcd.Etcdserver.Api.V3election.V3electionpb.V3election as ElectionRPC
import qualified Proto.Etcd.Etcdserver.Api.V3election.V3electionpb.V3election_Fields as ElectionPB

-- | EtcdClient configuration.
etcdClientConfigSimple :: HostName -> PortNumber -> UseTlsOrNot -> GrpcClientConfig
etcdClientConfigSimple host port tls =
    (grpcClientConfigSimple host port tls) { _grpcClientConfigCompression = uncompressed }

-- | Type alias to simplify type signatures.
type EtcdQuery a = IO (Maybe a)

-----------------------------------------------------------------------------------------

-- | Data type to unify the three addressing schemes in etcd.
--
-- See 'range'.
data KeyRange
  = SingleKey !ByteString
  -- ^ Exactly one key.
  | FromKey !ByteString
  | Prefixed !ByteString
  deriving (Show, Eq, Ord)

-- | Lookup a range of values
range
  :: GrpcClient
  -- ^ Initialized gRPC client.
  -> KeyRange
  -- ^ Looked-up range.
  -> EtcdQuery EtcdRPC.RangeResponse
range grpc r = preview unaryOutput <$>
    rawUnary (RPC :: RPC EtcdRPC.KV "range") grpc (rangeReq r)

rangeReq :: KeyRange -> EtcdRPC.RangeRequest
rangeReq r = defMessage
  & EtcdPB.key .~ k0
  & EtcdPB.rangeEnd .~ kend
  where
    (k0, kend) = rangePairForRangeQuery r

-- | Specific fold to get the key-values of a 'RangeResponse'
--
-- Typical usage is:
--
-- @ x <- range grpc (Prefixed "/some-dir/")
-- @ print $ x ^.. _Just . rangePairs
--
-- Note that Etcd RangeResponse is a rich object, please refer to Etcd
-- documentation to understand what you will miss out (e.g., whether the list
-- is complete or not).
rangeResponsePairs
  :: Getting (Endo [(ByteString, ByteString)]) EtcdRPC.RangeResponse (ByteString, ByteString)
rangeResponsePairs = EtcdPB.kvs . traverse . to (\x -> (x ^. EtcdPB.key, x ^. EtcdPB.value))

-- | Internal helper for turning a 'KeyRange' into the start/end queries Etcd
-- 'range' RPC needs.
rangePairForRangeQuery :: KeyRange -> (ByteString, ByteString)
rangePairForRangeQuery (SingleKey k) = (k, "")
rangePairForRangeQuery (FromKey k) = (k, "\NUL")
rangePairForRangeQuery (Prefixed k) = (k, kPlus1)
  where
    rest = C8.dropWhile (== '\xff') $ C8.reverse k
    kPlus1 = if C8.null rest then "\NUL" else C8.reverse $ C8.cons (succ (C8.head rest)) (C8.drop 1 rest)

-----------------------------------------------------------------------------------------

-- | Asks for a lease of a given duration.
grantLease
  :: GrpcClient
  -- ^ Initialized gRPC client.
  -> Int64
  -- ^ TTL for the lease.
  -> EtcdQuery EtcdRPC.LeaseGrantResponse
grantLease grpc seconds =
    preview unaryOutput <$> rawUnary (RPC :: RPC EtcdRPC.Lease "leaseGrant") grpc (defMessage & EtcdPB.ttl .~ seconds)

-- | Opaque lease result.
--
-- Show instance used for printing purposes only.
newtype GrantedLease = GrantedLease { _getGrantedLeaseId :: Int64 }

instance Show GrantedLease where
  show (GrantedLease n) = "(lease #" <> show n <> ")"

-- | Constructor for 'GrantedLease'.
fromLeaseGrantResponse :: EtcdRPC.LeaseGrantResponse -> GrantedLease
fromLeaseGrantResponse r = GrantedLease $ r ^. EtcdPB.id

-- | Keep a lease alive.
keepAlive
  :: GrpcClient
  -- ^ Initialized gRPC client.
  -> GrantedLease
  -- ^ A previously-granted lease.
  -> EtcdQuery EtcdRPC.LeaseKeepAliveResponse
keepAlive grpc (GrantedLease leaseID) =
    preview unaryOutput <$> rawUnary (RPC :: RPC EtcdRPC.Lease "leaseKeepAlive") grpc (defMessage & EtcdPB.id .~ leaseID)

-----------------------------------------------------------------------------------------

-- | Put one value.
put
  :: GrpcClient
  -- ^ initialized gRPC client
  -> ByteString
  -- ^ Key.
  -> ByteString
  -- ^ Value.
  -> Maybe GrantedLease
  -- ^ Lease on the key.
  -> EtcdQuery EtcdRPC.PutResponse
put grpc k v gl =
    preview unaryOutput <$> rawUnary (RPC :: RPC EtcdRPC.KV "put") grpc (putReq k v gl)

putReq :: ByteString -> ByteString -> Maybe GrantedLease -> EtcdRPC.PutRequest
putReq k v gl = defMessage
  & EtcdPB.key .~ k
  & EtcdPB.value .~ v
  & EtcdPB.lease .~ (maybe 0 _getGrantedLeaseId gl)

-- | Delete a range of values.
delete
  :: GrpcClient
  -- ^ Initialized gRPC client.
  -> KeyRange
  -- ^ Deleted range.
  -> EtcdQuery EtcdRPC.DeleteRangeResponse
delete grpc r = preview unaryOutput <$>
    rawUnary (RPC :: RPC EtcdRPC.KV "deleteRange") grpc (delReq r)

delReq :: KeyRange -> EtcdRPC.DeleteRangeRequest
delReq r = defMessage
  & EtcdPB.key .~ k0
  & EtcdPB.rangeEnd .~ kend
  where
    (k0, kend) = rangePairForRangeQuery r

-----------------------------------------------------------------------------------------

-- | Opaque lock.
--
-- Show instance used for printing purposes only.
newtype AcquiredLock = AcquiredLock { _getAcquiredLock :: ByteString }

instance Show AcquiredLock where
  show (AcquiredLock n) = "(lock #" <> show n <> ")"

fromLockResponse :: LockRPC.LockResponse -> AcquiredLock
fromLockResponse l = AcquiredLock $ l ^. LockPB.key

lock
  :: GrpcClient
  -- ^ Initialized gRPC client.
  -> ByteString
  -- ^ Lock Name
  -> GrantedLease
  -- ^ Previously-granted lease that will bound the lifetime of the lock ownership.
  -> EtcdQuery LockRPC.LockResponse
lock grpc n (GrantedLease leaseID) = preview unaryOutput <$>
    rawUnary (RPC :: RPC LockRPC.Lock "lock") grpc (defMessage & LockPB.name .~ n & LockPB.lease .~ leaseID)

unlock
  :: GrpcClient
  -- ^ Initialized gRPC client.
  -> AcquiredLock
  -- ^ Previously-acquired lock.
  -> EtcdQuery LockRPC.UnlockResponse
unlock grpc (AcquiredLock k) = preview unaryOutput <$>
    rawUnary (RPC :: RPC LockRPC.Lock "unlock") grpc (defMessage & LockPB.key .~ k)

-----------------------------------------------------------------------------------------

kvEq :: ByteString -> ByteString -> EtcdRPC.Compare
kvEq k v = defMessage
  & EtcdPB.target .~ EtcdRPC.Compare'VALUE
  & EtcdPB.result .~ EtcdRPC.Compare'EQUAL
  & EtcdPB.key    .~ k
  & EtcdPB.value  .~ v

kvNeq :: ByteString -> ByteString -> EtcdRPC.Compare
kvNeq k v = defMessage
  & EtcdPB.target .~ EtcdRPC.Compare'VALUE
  & EtcdPB.result .~ EtcdRPC.Compare'NOT_EQUAL
  & EtcdPB.key    .~ k
  & EtcdPB.value  .~ v

kvGt :: ByteString -> ByteString -> EtcdRPC.Compare
kvGt k v = defMessage
  & EtcdPB.target .~ EtcdRPC.Compare'VALUE
  & EtcdPB.result .~ EtcdRPC.Compare'GREATER
  & EtcdPB.key    .~ k
  & EtcdPB.value  .~ v

kvLt :: ByteString -> ByteString -> EtcdRPC.Compare
kvLt k v = defMessage
  & EtcdPB.target .~ EtcdRPC.Compare'VALUE
  & EtcdPB.result .~ EtcdRPC.Compare'LESS
  & EtcdPB.key    .~ k
  & EtcdPB.value  .~ v

-- | Smart constructor, to use in conjunction with other smart constructor
-- which give a small DSL for the common cases. The proto-lens generated code
-- is fine but requires qualified imports to avoid namespace clashes.
--
-- Example use:
-- @
-- putStrLn $ Data.ProtoLens.showMessage$ successTxnReq [kvEq "hello" "world"] [putReq "txn-written" "1234" Nothing] [] []
-- compare { target: VALUE key: "hello" value: "world" }
-- success { request_put { key: "txn-written" value: "1234" } }
-- @
successTxnReq
  :: [EtcdRPC.Compare]
  -> [EtcdRPC.PutRequest]
  -> [EtcdRPC.DeleteRangeRequest]
  -> [EtcdRPC.RangeRequest]
  -> EtcdRPC.TxnRequest
successTxnReq cmps rps drrs rrs = defMessage
  & EtcdPB.compare .~ cmps
  & EtcdPB.success .~ (mconcat [
          [ defMessage & EtcdPB.maybe'requestPut ?~ rp | rp <- rps ]
        , [ defMessage & EtcdPB.maybe'requestDeleteRange ?~ drr | drr <- drrs ]
        , [ defMessage & EtcdPB.maybe'requestRange ?~ rr | rr <- rrs ]
        ])

-- | Issue a transaction request, performing many modifications at once.
--
-- See 'successTxnReq' for a smart constructor of transaction request.
transaction
  :: GrpcClient
  -- ^ Initialized gRPC client.
  -> EtcdRPC.TxnRequest
  -- ^ Transaction
  -> EtcdQuery EtcdRPC.TxnResponse
transaction grpc tx = preview unaryOutput <$>
    rawUnary (RPC :: RPC EtcdRPC.KV "txn") grpc tx

-----------------------------------------------------------------------------------------

watchReq
  :: KeyRange
  -> [EtcdRPC.WatchCreateRequest'FilterType]
  -> Int64
  -> EtcdRPC.WatchRequest
watchReq r fs wId = defMessage
  & EtcdPB.maybe'createRequest ?~ (defMessage
      & EtcdPB.key .~ k0
      & EtcdPB.rangeEnd .~ kend
      & EtcdPB.filters .~ fs
      & EtcdPB.fragment .~ True
      & EtcdPB.watchId .~ wId)
  where
    (k0, kend) = rangePairForRangeQuery r

-- | Starts some watches until an exception occurs.
--
-- Alas there is no simple way to discard a watch or stop a stream with this
-- method as http2-client-grpc does not yet expose an 'OutgoingEvent' to
-- forcefully close the client stream.
--
-- See 'watchReq' for building watch requests.
watchForever
  :: GrpcClient
  -- ^ Initialized gRPC client.
  -> [EtcdRPC.WatchRequest]
  -- ^ List of watches to register.
  -> (a -> EtcdRPC.WatchResponse -> IO a)
  -- ^ State-passing handler for handling watches.
  -> a
  -- ^ Initial state.
  -> IO (Either TooMuchConcurrency ())
watchForever grpc wcs handle v0 = do
    fmap (const ()) <$> rawGeneralStream (RPC :: RPC EtcdRPC.Watch "watch") grpc v0 handleWatch wcs registerAndWait
  where
    handleWatch v (RecvMessage msg) = handle v msg
    handleWatch _ (Invalid err)     = throwIO err
    handleWatch v _                 = pure v
    registerAndWait (x:xs)          = pure (xs, SendMessage Compressed x)
    registerAndWait []              = pure ([], Finalize)

-----------------------------------------------------------------------------------------

-- | A newtype around ByteString to speak about election names.
newtype Election = Election { _getElectionName :: ByteString }
  deriving (Show, Eq, Ord, IsString)

-- | An Opaque type around returned leader key.
newtype LeaderEvidence = LeaderEvidence { _getLeaderKey :: ElectionRPC.LeaderKey }

-- | Waits until elected.
runForLeadership
  :: GrpcClient
  -- ^ Initialized gRPC client.
  -> Election
  -- ^ An election to run for.
  -> GrantedLease
  -- ^ A lease delimiting the leadership duration.
  -> ByteString
  -- ^ The initially-proclaimed value.
  -> EtcdQuery ElectionRPC.CampaignResponse
runForLeadership grpc el gl v = preview unaryOutput <$>
    rawUnary (RPC :: RPC ElectionRPC.Election "campaign") grpc (campaignReq el gl v)

campaignReq
  :: Election
  -> GrantedLease
  -> ByteString
  -> ElectionRPC.CampaignRequest
campaignReq el gl v = defMessage
  & ElectionPB.name .~ _getElectionName el
  & ElectionPB.lease .~ _getGrantedLeaseId gl
  & ElectionPB.value .~ v

-- | Helper to retrieve an evidence that we are a leader from a CampaignResponse.
fromCampaignResponse :: ElectionRPC.CampaignResponse -> Maybe LeaderEvidence
fromCampaignResponse cr = cr ^? ElectionPB.leader . to LeaderEvidence

-- | As a leader, proclaim a new value.
--
-- Observers will be notified.
proclaim
  :: GrpcClient
  -- ^ Initialized gRPC client.
  -> LeaderEvidence
  -- ^ Proof of recent successful participation to a leader-election.
  -> ByteString
  -- ^ New value to proclaim.
  -> EtcdQuery ElectionRPC.ProclaimResponse
proclaim grpc le v = preview unaryOutput <$>
    rawUnary (RPC :: RPC ElectionRPC.Election "proclaim") grpc (proclaimReq le v)

proclaimReq
  :: LeaderEvidence
  -> ByteString
  -> ElectionRPC.ProclaimRequest
proclaimReq le v = defMessage
  & ElectionPB.leader .~ _getLeaderKey le
  & ElectionPB.value .~ v

-- | Resign from leadership.
--
-- If other campaigners are waiting for the role, they will get elected.
resign
  :: GrpcClient
  -- ^ Initialized gRPC client.
  -> LeaderEvidence
  -- ^ Proof of recent successful participation to a leader-election.
  -> EtcdQuery ElectionRPC.ResignResponse
resign grpc le = preview unaryOutput <$>
    rawUnary (RPC :: RPC ElectionRPC.Election "resign") grpc (resignReq le)

resignReq
  :: LeaderEvidence
  -> ElectionRPC.ResignRequest
resignReq le = defMessage
  & ElectionPB.leader .~ _getLeaderKey le

-- | As a bystander of an election, get the proclaimed value.
getProclaimedValue
  :: GrpcClient
  -- ^ Initialized gRPC client.
  -> Election
  -- ^ Election to read value from.
  -> EtcdQuery ElectionRPC.LeaderResponse
getProclaimedValue grpc el = preview unaryOutput <$>
    rawUnary (RPC :: RPC ElectionRPC.Election "leader") grpc (leaderReq el)

leaderReq
  :: Election
  -> ElectionRPC.LeaderRequest
leaderReq el = defMessage
  & ElectionPB.name .~ _getElectionName el

-- | As a bystander of an election, get notified for every proclaimed value.
observeProclaimedValues
  :: GrpcClient
  -- ^ Initialized gRPC client.
  -> Election
  -- ^ Election to wait for values from.
  -> (a -> ElectionRPC.LeaderResponse -> IO a)
  -- ^ State-passing handler for handling iteratively updated proclaimed values.
  -> a
  -- ^ An initial state.
  -> IO (Either TooMuchConcurrency a)
observeProclaimedValues grpc el handler v0 = fmap (view _1) <$>
    rawStreamServer (RPC :: RPC ElectionRPC.Election "observe") grpc v0 (observeReq el) handle2
  where
    observeReq = leaderReq
    handle2 v1 _ msg = handler v1 msg