{-# LANGUAGE BlockArguments #-}
{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NumericUnderscores #-}
{-# LANGUAGE ScopedTypeVariables #-}

module Hercules.Agent.Socket
  ( withReliableSocket,
    checkVersion',
    Socket (..),
    syncIO,
    SocketConfig (..),
  )
where

import Control.Concurrent.STM.TBQueue (TBQueue, flushTBQueue, newTBQueue, writeTBQueue)
import Control.Concurrent.STM.TChan (TChan, writeTChan)
import Control.Concurrent.STM.TVar (TVar, modifyTVar, readTVar, writeTVar)
import Control.Monad.IO.Unlift
import qualified Data.Aeson as A
import Data.DList (DList, fromList)
import Data.List (dropWhileEnd, splitAt)
import Data.Semigroup
import Data.Time (NominalDiffTime, addUTCTime, diffUTCTime, getCurrentTime)
import Data.Time.Extras
import Hercules.API.Agent.LifeCycle.ServiceInfo (ServiceInfo)
import qualified Hercules.API.Agent.LifeCycle.ServiceInfo as ServiceInfo
import Hercules.API.Agent.Socket.Frame (Frame)
import qualified Hercules.API.Agent.Socket.Frame as Frame
import Hercules.Agent.STM (atomically, newTChanIO, newTVarIO)
import Katip (KatipContext, Severity (..), katipAddContext, katipAddNamespace, logLocM, sl)
import Network.URI (URI, uriAuthority, uriPath, uriPort, uriQuery, uriRegName, uriScheme)
import Network.WebSockets (Connection, runClientWith)
import qualified Network.WebSockets as WS
import Protolude hiding (atomically, handle, race, race_)
import qualified UnliftIO
import UnliftIO.Async (race, race_)
import UnliftIO.Exception (handle)
import UnliftIO.STM (readTVarIO)
import UnliftIO.Timeout (timeout)
import Wuss (runSecureClientWith)

data Socket r w = Socket
  { forall r w. Socket r w -> w -> STM ()
write :: w -> STM (),
    forall r w. Socket r w -> TChan r
serviceChan :: TChan r,
    forall r w. Socket r w -> STM (STM ())
sync :: STM (STM ())
  }

syncIO :: Socket r w -> IO ()
syncIO :: forall r w. Socket r w -> IO ()
syncIO = STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ())
-> (Socket r w -> IO (STM ())) -> Socket r w -> IO ()
forall (m :: * -> *) b c a.
Monad m =>
(b -> m c) -> (a -> m b) -> a -> m c
<=< STM (STM ()) -> IO (STM ())
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM (STM ()) -> IO (STM ()))
-> (Socket r w -> STM (STM ())) -> Socket r w -> IO (STM ())
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Socket r w -> STM (STM ())
forall r w. Socket r w -> STM (STM ())
sync

-- | Parameters to start 'withReliableSocket'.
data SocketConfig ap sp m = SocketConfig
  { forall ap sp (m :: * -> *). SocketConfig ap sp m -> m ap
makeHello :: m ap,
    forall ap sp (m :: * -> *).
SocketConfig ap sp m -> sp -> m (Either Text ())
checkVersion :: sp -> m (Either Text ()),
    forall ap sp (m :: * -> *). SocketConfig ap sp m -> URI
baseURL :: URI,
    forall ap sp (m :: * -> *). SocketConfig ap sp m -> Text
path :: Text,
    forall ap sp (m :: * -> *). SocketConfig ap sp m -> ByteString
token :: ByteString
  }

requiredServiceVersion :: (Int, Int)
requiredServiceVersion :: (Int, Int)
requiredServiceVersion = (Int
2, Int
0)

ackTimeout :: NominalDiffTime
ackTimeout :: NominalDiffTime
ackTimeout = NominalDiffTime
60 -- seconds

withReliableSocket :: (A.FromJSON sp, A.ToJSON ap, MonadIO m, MonadUnliftIO m, KatipContext m) => SocketConfig ap sp m -> (Socket sp ap -> m a) -> m a
withReliableSocket :: forall sp ap (m :: * -> *) a.
(FromJSON sp, ToJSON ap, MonadIO m, MonadUnliftIO m,
 KatipContext m) =>
SocketConfig ap sp m -> (Socket sp ap -> m a) -> m a
withReliableSocket SocketConfig ap sp m
socketConfig Socket sp ap -> m a
f = do
  TBQueue (Frame ap ap)
writeQueue <- STM (TBQueue (Frame ap ap)) -> m (TBQueue (Frame ap ap))
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM (TBQueue (Frame ap ap)) -> m (TBQueue (Frame ap ap)))
-> STM (TBQueue (Frame ap ap)) -> m (TBQueue (Frame ap ap))
forall a b. (a -> b) -> a -> b
$ Natural -> STM (TBQueue (Frame ap ap))
forall a. Natural -> STM (TBQueue a)
newTBQueue Natural
100
  TVar Integer
agentMessageNextN <- Integer -> m (TVar Integer)
forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
newTVarIO Integer
0
  TChan sp
serviceMessageChan <- m (TChan sp)
forall (m :: * -> *) a. MonadIO m => m (TChan a)
newTChanIO
  TVar Integer
highestAcked <- Integer -> m (TVar Integer)
forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
newTVarIO (-Integer
1)
  let tagPayload :: a -> STM (Frame o a)
tagPayload a
p = do
        Integer
c <- TVar Integer -> STM Integer
forall a. TVar a -> STM a
readTVar TVar Integer
agentMessageNextN
        TVar Integer -> Integer -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar Integer
agentMessageNextN (Integer
c Integer -> Integer -> Integer
forall a. Num a => a -> a -> a
+ Integer
1)
        Frame o a -> STM (Frame o a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Frame o a -> STM (Frame o a)) -> Frame o a -> STM (Frame o a)
forall a b. (a -> b) -> a -> b
$ Msg :: forall o a. Integer -> a -> Frame o a
Frame.Msg {n :: Integer
n = Integer
c, p :: a
p = a
p}
      socketThread :: m a
socketThread = SocketConfig ap sp m
-> TBQueue (Frame ap ap)
-> TChan sp
-> TVar Integer
-> forall a. m a
forall ap sp (m :: * -> *).
(ToJSON ap, FromJSON sp, MonadUnliftIO m, KatipContext m) =>
SocketConfig ap sp m
-> TBQueue (Frame ap ap)
-> TChan sp
-> TVar Integer
-> forall a. m a
runReliableSocket SocketConfig ap sp m
socketConfig TBQueue (Frame ap ap)
writeQueue TChan sp
serviceMessageChan TVar Integer
highestAcked
      socket :: Socket sp ap
socket =
        Socket :: forall r w. (w -> STM ()) -> TChan r -> STM (STM ()) -> Socket r w
Socket
          { write :: ap -> STM ()
write = ap -> STM (Frame ap ap)
forall {a} {o}. a -> STM (Frame o a)
tagPayload (ap -> STM (Frame ap ap))
-> (Frame ap ap -> STM ()) -> ap -> STM ()
forall (m :: * -> *) a b c.
Monad m =>
(a -> m b) -> (b -> m c) -> a -> m c
>=> TBQueue (Frame ap ap) -> Frame ap ap -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue (Frame ap ap)
writeQueue,
            serviceChan :: TChan sp
serviceChan = TChan sp
serviceMessageChan,
            sync :: STM (STM ())
sync = do
              Integer
counterAtSyncStart <- (\Integer
n -> Integer
n Integer -> Integer -> Integer
forall a. Num a => a -> a -> a
- Integer
1) (Integer -> Integer) -> STM Integer -> STM Integer
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TVar Integer -> STM Integer
forall a. TVar a -> STM a
readTVar TVar Integer
agentMessageNextN
              STM () -> STM (STM ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure do
                Integer
acked <- TVar Integer -> STM Integer
forall a. TVar a -> STM a
readTVar TVar Integer
highestAcked
                Bool -> STM ()
forall (f :: * -> *). Alternative f => Bool -> f ()
guard (Bool -> STM ()) -> Bool -> STM ()
forall a b. (a -> b) -> a -> b
$ Integer
acked Integer -> Integer -> Bool
forall a. Ord a => a -> a -> Bool
>= Integer
counterAtSyncStart
          }
  m a -> m a -> m (Either a a)
forall (m :: * -> *) a b.
MonadUnliftIO m =>
m a -> m b -> m (Either a b)
race m a
forall a. m a
socketThread (Socket sp ap -> m a
f Socket sp ap
socket) m (Either a a) -> (Either a a -> a) -> m a
forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> (a -> a) -> (a -> a) -> Either a a -> a
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either a -> a
forall a. a -> a
identity a -> a
forall a. a -> a
identity

checkVersion' :: Applicative m => ServiceInfo -> m (Either Text ())
checkVersion' :: forall (m :: * -> *).
Applicative m =>
ServiceInfo -> m (Either Text ())
checkVersion' ServiceInfo
si =
  if ServiceInfo -> (Int, Int)
ServiceInfo.version ServiceInfo
si (Int, Int) -> (Int, Int) -> Bool
forall a. Ord a => a -> a -> Bool
< (Int, Int)
requiredServiceVersion
    then Either Text () -> m (Either Text ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either Text () -> m (Either Text ()))
-> Either Text () -> m (Either Text ())
forall a b. (a -> b) -> a -> b
$ Text -> Either Text ()
forall a b. a -> Either a b
Left (Text -> Either Text ()) -> Text -> Either Text ()
forall a b. (a -> b) -> a -> b
$ Text
"Expected service version " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> (Int, Int) -> Text
forall a b. (Show a, StringConv [Char] b) => a -> b
show (Int, Int)
requiredServiceVersion
    else Either Text () -> m (Either Text ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either Text () -> m (Either Text ()))
-> Either Text () -> m (Either Text ())
forall a b. (a -> b) -> a -> b
$ () -> Either Text ()
forall a b. b -> Either a b
Right ()

runReliableSocket :: forall ap sp m. (A.ToJSON ap, A.FromJSON sp, MonadUnliftIO m, KatipContext m) => SocketConfig ap sp m -> TBQueue (Frame ap ap) -> TChan sp -> TVar Integer -> forall a. m a
runReliableSocket :: forall ap sp (m :: * -> *).
(ToJSON ap, FromJSON sp, MonadUnliftIO m, KatipContext m) =>
SocketConfig ap sp m
-> TBQueue (Frame ap ap)
-> TChan sp
-> TVar Integer
-> forall a. m a
runReliableSocket SocketConfig ap sp m
socketConfig TBQueue (Frame ap ap)
writeQueue TChan sp
serviceMessageChan TVar Integer
highestAcked = Namespace -> m a -> m a
forall (m :: * -> *) a. KatipContext m => Namespace -> m a -> m a
katipAddNamespace Namespace
"Socket" do
  TVar (Maybe (Integer, UTCTime))
expectedAck <- IO (TVar (Maybe (Integer, UTCTime)))
-> m (TVar (Maybe (Integer, UTCTime)))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (TVar (Maybe (Integer, UTCTime)))
 -> m (TVar (Maybe (Integer, UTCTime))))
-> IO (TVar (Maybe (Integer, UTCTime)))
-> m (TVar (Maybe (Integer, UTCTime)))
forall a b. (a -> b) -> a -> b
$ Maybe (Integer, UTCTime) -> IO (TVar (Maybe (Integer, UTCTime)))
forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
newTVarIO Maybe (Integer, UTCTime)
forall a. Maybe a
Nothing
  (TVar (DList (Frame Void ap))
unacked :: TVar (DList (Frame Void ap))) <- DList (Frame Void ap) -> m (TVar (DList (Frame Void ap)))
forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
newTVarIO DList (Frame Void ap)
forall a. Monoid a => a
mempty
  (TVar Integer
lastServiceN :: TVar Integer) <- Integer -> m (TVar Integer)
forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
newTVarIO (-Integer
1)
  let katipExceptionContext :: a -> m a -> m a
katipExceptionContext a
e =
        SimpleLogPayload -> m a -> m a
forall i (m :: * -> *) a.
(LogItem i, KatipContext m) =>
i -> m a -> m a
katipAddContext (Text -> [Char] -> SimpleLogPayload
forall a. ToJSON a => Text -> a -> SimpleLogPayload
sl Text
"message" (a -> [Char]
forall e. Exception e => e -> [Char]
displayException a
e))
          (m a -> m a) -> (m a -> m a) -> m a -> m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SimpleLogPayload -> m a -> m a
forall i (m :: * -> *) a.
(LogItem i, KatipContext m) =>
i -> m a -> m a
katipAddContext (Text -> [Char] -> SimpleLogPayload
forall a. ToJSON a => Text -> a -> SimpleLogPayload
sl Text
"exception" (a -> [Char]
forall a b. (Show a, StringConv [Char] b) => a -> b
show a
e :: [Char]))
      logWarningPause :: SomeException -> m ()
      logWarningPause :: SomeException -> m ()
logWarningPause SomeException
e | Just ConnectionException
WS.ConnectionClosed <- SomeException -> Maybe ConnectionException
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
e = do
        SomeException -> m () -> m ()
forall {m :: * -> *} {a} {a}.
(KatipContext m, Exception a) =>
a -> m a -> m a
katipExceptionContext SomeException
e (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ Severity -> LogStr -> m ()
forall (m :: * -> *).
(Applicative m, KatipContext m, HasCallStack) =>
Severity -> LogStr -> m ()
logLocM Severity
InfoS LogStr
"Socket closed. Reconnecting."
        IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Int -> IO ()
threadDelay Int
10_000_000
      logWarningPause SomeException
e | Just (WS.ParseException [Char]
"not enough bytes") <- SomeException -> Maybe ConnectionException
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
e = do
        SomeException -> m () -> m ()
forall {m :: * -> *} {a} {a}.
(KatipContext m, Exception a) =>
a -> m a -> m a
katipExceptionContext SomeException
e (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ Severity -> LogStr -> m ()
forall (m :: * -> *).
(Applicative m, KatipContext m, HasCallStack) =>
Severity -> LogStr -> m ()
logLocM Severity
InfoS LogStr
"Socket closed prematurely. Reconnecting."
        IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Int -> IO ()
threadDelay Int
10_000_000
      logWarningPause SomeException
e = do
        SomeException -> m () -> m ()
forall {m :: * -> *} {a} {a}.
(KatipContext m, Exception a) =>
a -> m a -> m a
katipExceptionContext SomeException
e (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ Severity -> LogStr -> m ()
forall (m :: * -> *).
(Applicative m, KatipContext m, HasCallStack) =>
Severity -> LogStr -> m ()
logLocM Severity
WarningS LogStr
"Recovering from exception in socket handler. Reconnecting."
        IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Int -> IO ()
threadDelay Int
10_000_000
      setExpectedAckForMsgs :: [Frame ap ap] -> m ()
      setExpectedAckForMsgs :: [Frame ap ap] -> m ()
setExpectedAckForMsgs [Frame ap ap]
msgs =
        [Frame ap ap]
msgs
          [Frame ap ap]
-> ([Frame ap ap] -> Maybe (Max Integer)) -> Maybe (Max Integer)
forall a b. a -> (a -> b) -> b
& (Frame ap ap -> Maybe (Max Integer))
-> [Frame ap ap] -> Maybe (Max Integer)
forall (t :: * -> *) m a.
(Foldable t, Monoid m) =>
(a -> m) -> t a -> m
foldMap (\case Frame.Msg {n :: forall o a. Frame o a -> Integer
n = Integer
n} -> Max Integer -> Maybe (Max Integer)
forall a. a -> Maybe a
Just (Max Integer -> Maybe (Max Integer))
-> Max Integer -> Maybe (Max Integer)
forall a b. (a -> b) -> a -> b
$ Integer -> Max Integer
forall a. a -> Max a
Max Integer
n; Frame ap ap
_ -> Maybe (Max Integer)
forall a. Monoid a => a
mempty)
          Maybe (Max Integer) -> (Maybe (Max Integer) -> m ()) -> m ()
forall a b. a -> (a -> b) -> b
& (Max Integer -> m ()) -> Maybe (Max Integer) -> m ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ (\(Max Integer
n) -> Integer -> m ()
setExpectedAck Integer
n)
      send :: Connection -> [Frame ap ap] -> m ()
      send :: Connection -> [Frame ap ap] -> m ()
send Connection
conn = [Frame ap ap] -> m ()
sendSorted ([Frame ap ap] -> m ())
-> ([Frame ap ap] -> [Frame ap ap]) -> [Frame ap ap] -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Frame ap ap -> Frame ap ap -> Ordering)
-> [Frame ap ap] -> [Frame ap ap]
forall a. (a -> a -> Ordering) -> [a] -> [a]
sortBy (Maybe Integer -> Maybe Integer -> Ordering
forall a. Ord a => a -> a -> Ordering
compare (Maybe Integer -> Maybe Integer -> Ordering)
-> (Frame ap ap -> Maybe Integer)
-> Frame ap ap
-> Frame ap ap
-> Ordering
forall b c a. (b -> b -> c) -> (a -> b) -> a -> a -> c
`on` Frame ap ap -> Maybe Integer
forall o a. Frame o a -> Maybe Integer
msgN)
        where
          sendRaw :: [Frame ap ap] -> m ()
          sendRaw :: [Frame ap ap] -> m ()
sendRaw [Frame ap ap]
msgs = do
            IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Connection -> [DataMessage] -> IO ()
WS.sendDataMessages Connection
conn (ByteString -> DataMessage
WS.Binary (ByteString -> DataMessage)
-> (Frame ap ap -> ByteString) -> Frame ap ap -> DataMessage
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Frame ap ap -> ByteString
forall a. ToJSON a => a -> ByteString
A.encode (Frame ap ap -> DataMessage) -> [Frame ap ap] -> [DataMessage]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [Frame ap ap]
msgs)
            [Frame ap ap] -> m ()
setExpectedAckForMsgs [Frame ap ap]
msgs
          sendSorted :: [Frame ap ap] -> m ()
          sendSorted :: [Frame ap ap] -> m ()
sendSorted [] = m ()
forall (f :: * -> *). Applicative f => f ()
pass
          sendSorted [Frame ap ap]
msgs = do
            let ([Frame ap ap]
msgsNow, [Frame ap ap]
msgsLater) = Int -> [Frame ap ap] -> ([Frame ap ap], [Frame ap ap])
forall a. Int -> [a] -> ([a], [a])
Data.List.splitAt Int
100 [Frame ap ap]
msgs
            [Frame ap ap] -> m ()
sendRaw [Frame ap ap]
msgsNow
            [Frame ap ap] -> m ()
sendSorted [Frame ap ap]
msgsLater
      recv :: Connection -> m (Frame sp sp)
      recv :: Connection -> m (Frame sp sp)
recv Connection
conn = do
        NominalDiffTime -> FatalError -> m (Frame sp sp) -> m (Frame sp sp)
forall e (m :: * -> *) a.
(Exception e, MonadIO m, MonadUnliftIO m) =>
NominalDiffTime -> e -> m a -> m a
withTimeout NominalDiffTime
ackTimeout (Text -> FatalError
FatalError Text
"Hercules.Agent.Socket.recv timed out") (m (Frame sp sp) -> m (Frame sp sp))
-> m (Frame sp sp) -> m (Frame sp sp)
forall a b. (a -> b) -> a -> b
$
          IO (Either [Char] (Frame sp sp)) -> m (Either [Char] (Frame sp sp))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (ByteString -> Either [Char] (Frame sp sp)
forall a. FromJSON a => ByteString -> Either [Char] a
A.eitherDecode (ByteString -> Either [Char] (Frame sp sp))
-> IO ByteString -> IO (Either [Char] (Frame sp sp))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Connection -> IO ByteString
forall a. WebSocketsData a => Connection -> IO a
WS.receiveData Connection
conn) m (Either [Char] (Frame sp sp))
-> (Either [Char] (Frame sp sp) -> m (Frame sp sp))
-> m (Frame sp sp)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
            Left [Char]
e -> IO (Frame sp sp) -> m (Frame sp sp)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Frame sp sp) -> m (Frame sp sp))
-> IO (Frame sp sp) -> m (Frame sp sp)
forall a b. (a -> b) -> a -> b
$ FatalError -> IO (Frame sp sp)
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO (Text -> FatalError
FatalError (Text -> FatalError) -> Text -> FatalError
forall a b. (a -> b) -> a -> b
$ Text
"Error decoding service message: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> [Char] -> Text
forall a b. ConvertText a b => a -> b
toS [Char]
e)
            Right Frame sp sp
r -> Frame sp sp -> m (Frame sp sp)
forall (f :: * -> *) a. Applicative f => a -> f a
pure Frame sp sp
r
      handshake :: Connection -> IO () -> m ()
handshake Connection
conn (IO ()
removeHandshakeTimeout :: IO ()) = Namespace -> m () -> m ()
forall (m :: * -> *) a. KatipContext m => Namespace -> m a -> m a
katipAddNamespace Namespace
"Handshake" do
        Frame sp sp
siMsg <- Connection -> m (Frame sp sp)
recv Connection
conn
        case Frame sp sp
siMsg of
          Frame.Oob {o :: forall o a. Frame o a -> o
o = sp
o'} ->
            SocketConfig ap sp m -> sp -> m (Either Text ())
forall ap sp (m :: * -> *).
SocketConfig ap sp m -> sp -> m (Either Text ())
checkVersion SocketConfig ap sp m
socketConfig sp
o' m (Either Text ()) -> (Either Text () -> m ()) -> m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
              Left Text
e -> do
                Connection -> [Frame ap ap] -> m ()
send Connection
conn [Text -> Frame ap ap
forall o a. Text -> Frame o a
Frame.Exception Text
e]
                FatalError -> m ()
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO (FatalError -> m ()) -> FatalError -> m ()
forall a b. (a -> b) -> a -> b
$ Text -> FatalError
FatalError Text
"It looks like you're running a development version of hercules-ci-agent that is not yet supported on hercules-ci.com. Please use the stable branch or a tag."
              Right ()
_ -> m ()
forall (f :: * -> *). Applicative f => f ()
pass
          Frame sp sp
_ -> FatalError -> m ()
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO (FatalError -> m ()) -> FatalError -> m ()
forall a b. (a -> b) -> a -> b
$ Text -> FatalError
FatalError Text
"Unexpected message. This is either a bug or you might need to update your agent."
        ap
hello <- SocketConfig ap sp m -> m ap
forall ap sp (m :: * -> *). SocketConfig ap sp m -> m ap
makeHello SocketConfig ap sp m
socketConfig
        Connection -> [Frame ap ap] -> m ()
send Connection
conn [ap -> Frame ap ap
forall o a. o -> Frame o a
Frame.Oob ap
hello]
        Frame sp sp
ackMsg <- Connection -> m (Frame sp sp)
recv Connection
conn
        case Frame sp sp
ackMsg of
          Frame.Ack {n :: forall o a. Frame o a -> Integer
n = Integer
n} -> Integer -> m ()
forall {m :: * -> *}. MonadIO m => Integer -> m ()
cleanAcknowledged Integer
n
          Frame sp sp
_ -> FatalError -> m ()
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO (FatalError -> m ()) -> FatalError -> m ()
forall a b. (a -> b) -> a -> b
$ Text -> FatalError
FatalError Text
"Expected acknowledgement. This is either a bug or you might need to update your agent."
        IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO ()
removeHandshakeTimeout
        Connection -> m ()
sendUnacked Connection
conn
      sendUnacked :: Connection -> m ()
      sendUnacked :: Connection -> m ()
sendUnacked Connection
conn = do
        DList (Frame Void ap)
unackedNow <- TVar (DList (Frame Void ap)) -> m (DList (Frame Void ap))
forall (m :: * -> *) a. MonadIO m => TVar a -> m a
readTVarIO TVar (DList (Frame Void ap))
unacked
        Connection -> [Frame ap ap] -> m ()
send Connection
conn ([Frame ap ap] -> m ()) -> [Frame ap ap] -> m ()
forall a b. (a -> b) -> a -> b
$ (Void -> ap) -> Frame Void ap -> Frame ap ap
forall a b c. (a -> b) -> Frame a c -> Frame b c
Frame.mapOob Void -> ap
forall a. Void -> a
absurd (Frame Void ap -> Frame ap ap) -> [Frame Void ap] -> [Frame ap ap]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> DList (Frame Void ap) -> [Frame Void ap]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList DList (Frame Void ap)
unackedNow
      cleanAcknowledged :: Integer -> m ()
cleanAcknowledged Integer
newAck = STM () -> m ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically do
        DList (Frame Void ap)
unacked0 <- TVar (DList (Frame Void ap)) -> STM (DList (Frame Void ap))
forall a. TVar a -> STM a
readTVar TVar (DList (Frame Void ap))
unacked
        TVar (DList (Frame Void ap)) -> DList (Frame Void ap) -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (DList (Frame Void ap))
unacked (DList (Frame Void ap) -> STM ())
-> DList (Frame Void ap) -> STM ()
forall a b. (a -> b) -> a -> b
$
          DList (Frame Void ap)
unacked0
            DList (Frame Void ap)
-> (DList (Frame Void ap) -> [Frame Void ap]) -> [Frame Void ap]
forall a b. a -> (a -> b) -> b
& DList (Frame Void ap) -> [Frame Void ap]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList
            [Frame Void ap]
-> ([Frame Void ap] -> [Frame Void ap]) -> [Frame Void ap]
forall a b. a -> (a -> b) -> b
& (Frame Void ap -> Bool) -> [Frame Void ap] -> [Frame Void ap]
forall a. (a -> Bool) -> [a] -> [a]
filter
              ( \case
                  Frame.Msg {n :: forall o a. Frame o a -> Integer
n = Integer
n} -> Integer
n Integer -> Integer -> Bool
forall a. Ord a => a -> a -> Bool
> Integer
newAck
                  Frame.Oob Void
x -> Void -> Bool
forall a. Void -> a
absurd Void
x
                  Frame.Ack {} -> Bool
False
                  Frame.Exception {} -> Bool
False
              )
            [Frame Void ap]
-> ([Frame Void ap] -> DList (Frame Void ap))
-> DList (Frame Void ap)
forall a b. a -> (a -> b) -> b
& [Frame Void ap] -> DList (Frame Void ap)
forall a. [a] -> DList a
fromList
        TVar Integer -> (Integer -> Integer) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar TVar Integer
highestAcked (Integer -> Integer -> Integer
forall a. Ord a => a -> a -> a
max Integer
newAck)
      -- TODO (performance) IntMap?

      readThread :: Connection -> m a
readThread Connection
conn = Namespace -> m a -> m a
forall (m :: * -> *) a. KatipContext m => Namespace -> m a -> m a
katipAddNamespace Namespace
"Reader" do
        m () -> m a
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (m () -> m a) -> m () -> m a
forall a b. (a -> b) -> a -> b
$ do
          Frame sp sp
msg <- Connection -> m (Frame sp sp)
recv Connection
conn
          case Frame sp sp
msg of
            Frame.Msg {p :: forall o a. Frame o a -> a
p = sp
pl, n :: forall o a. Frame o a -> Integer
n = Integer
n} -> STM () -> m ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically do
              Integer
lastN <- TVar Integer -> STM Integer
forall a. TVar a -> STM a
readTVar TVar Integer
lastServiceN
              -- when recent
              Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Integer
n Integer -> Integer -> Bool
forall a. Ord a => a -> a -> Bool
> Integer
lastN) do
                TChan sp -> sp -> STM ()
forall a. TChan a -> a -> STM ()
writeTChan TChan sp
serviceMessageChan sp
pl
                TVar Integer -> Integer -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar Integer
lastServiceN Integer
n
              TBQueue (Frame ap ap) -> Frame ap ap -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue (Frame ap ap)
writeQueue (Ack :: forall o a. Integer -> Frame o a
Frame.Ack {n :: Integer
n = Integer
n})
            Frame.Ack {n :: forall o a. Frame o a -> Integer
n = Integer
n} ->
              Integer -> m ()
forall {m :: * -> *}. MonadIO m => Integer -> m ()
cleanAcknowledged Integer
n
            Frame.Oob sp
o -> STM () -> m ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically do
              TChan sp -> sp -> STM ()
forall a. TChan a -> a -> STM ()
writeTChan TChan sp
serviceMessageChan sp
o
            Frame.Exception Text
e -> SimpleLogPayload -> m () -> m ()
forall i (m :: * -> *) a.
(LogItem i, KatipContext m) =>
i -> m a -> m a
katipAddContext (Text -> Text -> SimpleLogPayload
forall a. ToJSON a => Text -> a -> SimpleLogPayload
sl Text
"message" Text
e) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ Severity -> LogStr -> m ()
forall (m :: * -> *).
(Applicative m, KatipContext m, HasCallStack) =>
Severity -> LogStr -> m ()
logLocM Severity
WarningS LogStr
"Service exception"
      writeThread :: Connection -> m a
writeThread Connection
conn = Namespace -> m a -> m a
forall (m :: * -> *) a. KatipContext m => Namespace -> m a -> m a
katipAddNamespace Namespace
"Writer" do
        m () -> m a
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever do
          [Frame ap ap]
msgs <- STM [Frame ap ap] -> m [Frame ap ap]
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically do
            -- TODO: make unacked bounded
            [Frame ap ap]
allMessages <- TBQueue (Frame ap ap) -> STM [Frame ap ap]
forall a. TBQueue a -> STM [a]
flushTBQueue TBQueue (Frame ap ap)
writeQueue
            Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when ([Frame ap ap] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [Frame ap ap]
allMessages) STM ()
forall a. STM a
retry
            TVar (DList (Frame Void ap))
-> (DList (Frame Void ap) -> DList (Frame Void ap)) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar TVar (DList (Frame Void ap))
unacked (DList (Frame Void ap)
-> DList (Frame Void ap) -> DList (Frame Void ap)
forall a. Semigroup a => a -> a -> a
<> ([Frame ap ap]
allMessages [Frame ap ap]
-> (Frame ap ap -> [Frame Void ap]) -> [Frame Void ap]
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Frame ap ap -> [Frame Void ap]
forall (f :: * -> *) o a.
Alternative f =>
Frame o a -> f (Frame Void a)
Frame.removeOob [Frame Void ap]
-> ([Frame Void ap] -> DList (Frame Void ap))
-> DList (Frame Void ap)
forall a b. a -> (a -> b) -> b
& [Frame Void ap] -> DList (Frame Void ap)
forall a. [a] -> DList a
fromList))
            [Frame ap ap] -> STM [Frame ap ap]
forall (f :: * -> *) a. Applicative f => a -> f a
pure [Frame ap ap]
allMessages
          Connection -> [Frame ap ap] -> m ()
send Connection
conn [Frame ap ap]
msgs
      setExpectedAck :: Integer -> m ()
      setExpectedAck :: Integer -> m ()
setExpectedAck Integer
n = do
        UTCTime
now <- IO UTCTime -> m UTCTime
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO UTCTime
getCurrentTime
        STM () -> m ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically do
          TVar (Maybe (Integer, UTCTime))
-> Maybe (Integer, UTCTime) -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (Maybe (Integer, UTCTime))
expectedAck (Maybe (Integer, UTCTime) -> STM ())
-> Maybe (Integer, UTCTime) -> STM ()
forall a b. (a -> b) -> a -> b
$ (Integer, UTCTime) -> Maybe (Integer, UTCTime)
forall a. a -> Maybe a
Just (Integer
n, UTCTime
now)
      noAckCleanupThread :: m ()
noAckCleanupThread = Integer -> m ()
forall {m :: * -> *}. KatipContext m => Integer -> m ()
noAckCleanupThread' (-Integer
1)
      noAckCleanupThread' :: Integer -> m ()
noAckCleanupThread' Integer
confirmedLastTime = do
        (Integer
expectedN, UTCTime
sendTime) <- STM (Integer, UTCTime) -> m (Integer, UTCTime)
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically do
          TVar (Maybe (Integer, UTCTime)) -> STM (Maybe (Integer, UTCTime))
forall a. TVar a -> STM a
readTVar TVar (Maybe (Integer, UTCTime))
expectedAck
            STM (Maybe (Integer, UTCTime))
-> (Maybe (Integer, UTCTime) -> STM (Integer, UTCTime))
-> STM (Integer, UTCTime)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
              Maybe (Integer, UTCTime)
Nothing -> STM (Integer, UTCTime)
forall a. STM a
retry
              Just (Integer
expectedN, UTCTime
_) | Integer
expectedN Integer -> Integer -> Bool
forall a. Ord a => a -> a -> Bool
<= Integer
confirmedLastTime -> STM (Integer, UTCTime)
forall a. STM a
retry
              Just (Integer, UTCTime)
a -> (Integer, UTCTime) -> STM (Integer, UTCTime)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Integer, UTCTime)
a
        let expectedArrival :: UTCTime
expectedArrival = NominalDiffTime
ackTimeout NominalDiffTime -> UTCTime -> UTCTime
`addUTCTime` UTCTime
sendTime
        IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO do
          UTCTime
now <- IO UTCTime
getCurrentTime
          let waitTime :: NominalDiffTime
waitTime = UTCTime
expectedArrival UTCTime -> UTCTime -> NominalDiffTime
`diffUTCTime` UTCTime
now
          NominalDiffTime -> IO ()
delayNominalDiffTime NominalDiffTime
waitTime
        Integer
currentHighestAck <- STM Integer -> m Integer
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically do
          TVar Integer -> STM Integer
forall a. TVar a -> STM a
readTVar TVar Integer
highestAcked
        if Integer
expectedN Integer -> Integer -> Bool
forall a. Ord a => a -> a -> Bool
> Integer
currentHighestAck
          then do
            SimpleLogPayload -> m () -> m ()
forall i (m :: * -> *) a.
(LogItem i, KatipContext m) =>
i -> m a -> m a
katipAddContext (Text -> Integer -> SimpleLogPayload
forall a. ToJSON a => Text -> a -> SimpleLogPayload
sl Text
"expectedAck" Integer
expectedN SimpleLogPayload -> SimpleLogPayload -> SimpleLogPayload
forall a. Semigroup a => a -> a -> a
<> Text -> Integer -> SimpleLogPayload
forall a. ToJSON a => Text -> a -> SimpleLogPayload
sl Text
"highestAck" Integer
currentHighestAck) do
              Severity -> LogStr -> m ()
forall (m :: * -> *).
(Applicative m, KatipContext m, HasCallStack) =>
Severity -> LogStr -> m ()
logLocM Severity
Katip.DebugS LogStr
"Did not receive ack in time. Will reconnect."
            -- terminate other threads via race_
            m ()
forall (f :: * -> *). Applicative f => f ()
pass
          else Integer -> m ()
noAckCleanupThread' Integer
expectedN
  m () -> m a
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever do
    (SomeException -> m ()) -> m () -> m ()
forall (m :: * -> *) e a.
(MonadUnliftIO m, Exception e) =>
(e -> m a) -> m a -> m a
handle SomeException -> m ()
logWarningPause (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
      Int -> HandshakeTimeout -> (IO () -> m ()) -> m ()
forall e (m :: * -> *) a.
(Exception e, MonadUnliftIO m) =>
Int -> e -> (IO () -> m a) -> m a
withCancelableTimeout Int
handshakeTimeoutMicroseconds HandshakeTimeout
HandshakeTimeout \IO ()
removeTimeout -> do
        SocketConfig ap sp m -> (Connection -> m ()) -> m ()
forall (m :: * -> *) any0 any1 a.
MonadUnliftIO m =>
SocketConfig any0 any1 m -> (Connection -> m a) -> m a
withConnection' SocketConfig ap sp m
socketConfig ((Connection -> m ()) -> m ()) -> (Connection -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$
          \Connection
conn -> do
            Namespace -> m () -> m ()
forall (m :: * -> *) a. KatipContext m => Namespace -> m a -> m a
katipAddNamespace Namespace
"Handshake" do
              Connection -> IO () -> m ()
handshake Connection
conn IO ()
removeTimeout
            Connection -> m Any
forall {a}. Connection -> m a
readThread Connection
conn m Any -> m Any -> m ()
forall (m :: * -> *) a b. MonadUnliftIO m => m a -> m b -> m ()
`race_` Connection -> m Any
forall {a}. Connection -> m a
writeThread Connection
conn m () -> m () -> m ()
forall (m :: * -> *) a b. MonadUnliftIO m => m a -> m b -> m ()
`race_` m ()
noAckCleanupThread

handshakeTimeoutMicroseconds :: Int
handshakeTimeoutMicroseconds :: Int
handshakeTimeoutMicroseconds = Int
30_000_000

data HandshakeTimeout = HandshakeTimeout
  deriving (Int -> HandshakeTimeout -> ShowS
[HandshakeTimeout] -> ShowS
HandshakeTimeout -> [Char]
(Int -> HandshakeTimeout -> ShowS)
-> (HandshakeTimeout -> [Char])
-> ([HandshakeTimeout] -> ShowS)
-> Show HandshakeTimeout
forall a.
(Int -> a -> ShowS) -> (a -> [Char]) -> ([a] -> ShowS) -> Show a
showList :: [HandshakeTimeout] -> ShowS
$cshowList :: [HandshakeTimeout] -> ShowS
show :: HandshakeTimeout -> [Char]
$cshow :: HandshakeTimeout -> [Char]
showsPrec :: Int -> HandshakeTimeout -> ShowS
$cshowsPrec :: Int -> HandshakeTimeout -> ShowS
Show, Show HandshakeTimeout
Typeable HandshakeTimeout
Typeable HandshakeTimeout
-> Show HandshakeTimeout
-> (HandshakeTimeout -> SomeException)
-> (SomeException -> Maybe HandshakeTimeout)
-> (HandshakeTimeout -> [Char])
-> Exception HandshakeTimeout
SomeException -> Maybe HandshakeTimeout
HandshakeTimeout -> [Char]
HandshakeTimeout -> SomeException
forall e.
Typeable e
-> Show e
-> (e -> SomeException)
-> (SomeException -> Maybe e)
-> (e -> [Char])
-> Exception e
displayException :: HandshakeTimeout -> [Char]
$cdisplayException :: HandshakeTimeout -> [Char]
fromException :: SomeException -> Maybe HandshakeTimeout
$cfromException :: SomeException -> Maybe HandshakeTimeout
toException :: HandshakeTimeout -> SomeException
$ctoException :: HandshakeTimeout -> SomeException
Exception)

withCancelableTimeout :: (Exception e, MonadUnliftIO m) => Int -> e -> (IO () -> m a) -> m a
withCancelableTimeout :: forall e (m :: * -> *) a.
(Exception e, MonadUnliftIO m) =>
Int -> e -> (IO () -> m a) -> m a
withCancelableTimeout Int
delay e
exc IO () -> m a
cont = do
  ThreadId
requestingThread <- IO ThreadId -> m ThreadId
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO ThreadId
myThreadId
  m () -> (Async () -> m a) -> m a
forall (m :: * -> *) a b.
MonadUnliftIO m =>
m a -> (Async a -> m b) -> m b
UnliftIO.withAsync
    ( IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO do
        Int -> IO ()
threadDelay Int
delay
        ThreadId -> e -> IO ()
forall (m :: * -> *) e.
(MonadIO m, Exception e) =>
ThreadId -> e -> m ()
throwTo ThreadId
requestingThread e
exc
    )
    (IO () -> m a
cont (IO () -> m a) -> (Async () -> IO ()) -> Async () -> m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Async () -> IO ()
forall a. Async a -> IO ()
cancel)

msgN :: Frame o a -> Maybe Integer
msgN :: forall o a. Frame o a -> Maybe Integer
msgN Frame.Msg {n :: forall o a. Frame o a -> Integer
n = Integer
n} = Integer -> Maybe Integer
forall a. a -> Maybe a
Just Integer
n
msgN Frame o a
_ = Maybe Integer
forall a. Maybe a
Nothing

withConnection' :: (MonadUnliftIO m) => SocketConfig any0 any1 m -> (Connection -> m a) -> m a
withConnection' :: forall (m :: * -> *) any0 any1 a.
MonadUnliftIO m =>
SocketConfig any0 any1 m -> (Connection -> m a) -> m a
withConnection' SocketConfig any0 any1 m
socketConfig Connection -> m a
m = do
  UnliftIO forall a. m a -> IO a
unlift <- m (UnliftIO m)
forall (m :: * -> *). MonadUnliftIO m => m (UnliftIO m)
askUnliftIO
  let opts :: ConnectionOptions
opts = ConnectionOptions
WS.defaultConnectionOptions
      headers :: [(CI ByteString, ByteString)]
headers = [(CI ByteString
"Authorization", ByteString
"Bearer " ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> SocketConfig any0 any1 m -> ByteString
forall ap sp (m :: * -> *). SocketConfig ap sp m -> ByteString
token SocketConfig any0 any1 m
socketConfig)]
      base :: URI
base = SocketConfig any0 any1 m -> URI
forall ap sp (m :: * -> *). SocketConfig ap sp m -> URI
baseURL SocketConfig any0 any1 m
socketConfig
      url :: URI
url = URI
base {uriPath :: [Char]
uriPath = URI -> [Char]
uriPath URI
base [Char] -> ShowS
`slash` Text -> [Char]
forall a b. ConvertText a b => a -> b
toS (SocketConfig any0 any1 m -> Text
forall ap sp (m :: * -> *). SocketConfig ap sp m -> Text
path SocketConfig any0 any1 m
socketConfig)}
      defaultPort :: Int
defaultPort
        | URI -> [Char]
uriScheme URI
url [Char] -> [Char] -> Bool
forall a. Eq a => a -> a -> Bool
== [Char]
"http:" = Int
80
        | URI -> [Char]
uriScheme URI
url [Char] -> [Char] -> Bool
forall a. Eq a => a -> a -> Bool
== [Char]
"https:" = Int
443
        | Bool
otherwise = Text -> Int
forall a. HasCallStack => Text -> a
panic Text
"Hercules.Agent.Socket: invalid uri scheme"
      port :: Int
port = Int -> Maybe Int -> Int
forall a. a -> Maybe a -> a
fromMaybe Int
defaultPort (Maybe Int -> Int) -> Maybe Int -> Int
forall a b. (a -> b) -> a -> b
$ do
        URIAuth
auth <- URI -> Maybe URIAuth
uriAuthority URI
url
        [Char] -> Maybe Int
forall b a. (Read b, StringConv a [Char]) => a -> Maybe b
readMaybe ([Char] -> Maybe Int) -> [Char] -> Maybe Int
forall a b. (a -> b) -> a -> b
$ (Char -> Bool) -> ShowS
forall a. (a -> Bool) -> [a] -> [a]
dropWhile (Char -> Char -> Bool
forall a. Eq a => a -> a -> Bool
== Char
':') ShowS -> ShowS
forall a b. (a -> b) -> a -> b
$ URIAuth -> [Char]
uriPort URIAuth
auth
      regname :: [Char]
regname = [Char] -> Maybe [Char] -> [Char]
forall a. a -> Maybe a -> a
fromMaybe (Text -> [Char]
forall a. HasCallStack => Text -> a
panic Text
"Hercules.Agent.Socket: url has no regname") (Maybe [Char] -> [Char]) -> Maybe [Char] -> [Char]
forall a b. (a -> b) -> a -> b
$ do
        URIAuth
auth <- URI -> Maybe URIAuth
uriAuthority URI
url
        [Char] -> Maybe [Char]
forall (f :: * -> *) a. Applicative f => a -> f a
pure ([Char] -> Maybe [Char]) -> [Char] -> Maybe [Char]
forall a b. (a -> b) -> a -> b
$ URIAuth -> [Char]
uriRegName URIAuth
auth
      httpPath :: [Char]
httpPath = URI -> [Char]
uriPath URI
url [Char] -> ShowS
forall a. Semigroup a => a -> a -> a
<> URI -> [Char]
uriQuery URI
url
      runSocket :: ClientApp a -> IO a
runSocket
        | URI -> [Char]
uriScheme URI
url [Char] -> [Char] -> Bool
forall a. Eq a => a -> a -> Bool
== [Char]
"http:" = [Char]
-> Int
-> [Char]
-> ConnectionOptions
-> [(CI ByteString, ByteString)]
-> ClientApp a
-> IO a
forall a.
[Char]
-> Int
-> [Char]
-> ConnectionOptions
-> [(CI ByteString, ByteString)]
-> ClientApp a
-> IO a
runClientWith [Char]
regname Int
port [Char]
httpPath ConnectionOptions
opts [(CI ByteString, ByteString)]
headers
        | URI -> [Char]
uriScheme URI
url [Char] -> [Char] -> Bool
forall a. Eq a => a -> a -> Bool
== [Char]
"https:" = [Char]
-> PortNumber
-> [Char]
-> ConnectionOptions
-> [(CI ByteString, ByteString)]
-> ClientApp a
-> IO a
forall a.
[Char]
-> PortNumber
-> [Char]
-> ConnectionOptions
-> [(CI ByteString, ByteString)]
-> ClientApp a
-> IO a
runSecureClientWith [Char]
regname (Int -> PortNumber
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
port) [Char]
httpPath ConnectionOptions
opts [(CI ByteString, ByteString)]
headers
        | Bool
otherwise = Text -> ClientApp a -> IO a
forall a. HasCallStack => Text -> a
panic Text
"Hercules.Agent.Socket: invalid uri scheme"
  IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO a -> m a) -> IO a -> m a
forall a b. (a -> b) -> a -> b
$ ClientApp a -> IO a
forall {a}. ClientApp a -> IO a
runSocket (ClientApp a -> IO a) -> ClientApp a -> IO a
forall a b. (a -> b) -> a -> b
$ \Connection
conn -> m a -> IO a
forall a. m a -> IO a
unlift (Connection -> m a
m Connection
conn)

slash :: [Char] -> [Char] -> [Char]
[Char]
a slash :: [Char] -> ShowS
`slash` [Char]
b = (Char -> Bool) -> ShowS
forall a. (a -> Bool) -> [a] -> [a]
dropWhileEnd (Char -> Char -> Bool
forall a. Eq a => a -> a -> Bool
== Char
'/') [Char]
a [Char] -> ShowS
forall a. Semigroup a => a -> a -> a
<> [Char]
"/" [Char] -> ShowS
forall a. Semigroup a => a -> a -> a
<> (Char -> Bool) -> ShowS
forall a. (a -> Bool) -> [a] -> [a]
dropWhile (Char -> Char -> Bool
forall a. Eq a => a -> a -> Bool
== Char
'/') [Char]
b

withTimeout :: (Exception e, MonadIO m, MonadUnliftIO m) => NominalDiffTime -> e -> m a -> m a
withTimeout :: forall e (m :: * -> *) a.
(Exception e, MonadIO m, MonadUnliftIO m) =>
NominalDiffTime -> e -> m a -> m a
withTimeout NominalDiffTime
t e
e m a
_ | NominalDiffTime
t NominalDiffTime -> NominalDiffTime -> Bool
forall a. Ord a => a -> a -> Bool
<= NominalDiffTime
0 = e -> m a
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO e
e
withTimeout NominalDiffTime
t e
e m a
m =
  Int -> m a -> m (Maybe a)
forall (m :: * -> *) a.
MonadUnliftIO m =>
Int -> m a -> m (Maybe a)
timeout (NominalDiffTime -> Int
forall a b. (RealFrac a, Integral b) => a -> b
ceiling (NominalDiffTime -> Int) -> NominalDiffTime -> Int
forall a b. (a -> b) -> a -> b
$ NominalDiffTime
t NominalDiffTime -> NominalDiffTime -> NominalDiffTime
forall a. Num a => a -> a -> a
* NominalDiffTime
1_000_000) m a
m m (Maybe a) -> (Maybe a -> m a) -> m a
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
    Maybe a
Nothing -> e -> m a
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO e
e
    Just a
a -> a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure a
a