module PostgresWebsockets.HasqlBroadcast
( newHasqlBroadcaster
, newHasqlBroadcasterOrError
, acquire
, relayMessages
, relayMessagesForever
) where
import Protolude hiding (putErrLn)
import Hasql.Connection
import Hasql.Notifications
import Data.Aeson (decode, Value(..))
import Data.HashMap.Lazy (lookupDefault)
import Data.Either.Combinators (mapBoth)
import Data.Function (id)
import Control.Retry (RetryStatus(..), retrying, capDelay, exponentialBackoff)
import PostgresWebsockets.Broadcast
newHasqlBroadcaster :: IO () -> Text -> Int -> ByteString -> IO Multiplexer
newHasqlBroadcaster onConnectionFailure ch maxRetries = newHasqlBroadcasterForConnection . tryUntilConnected maxRetries
where
newHasqlBroadcasterForConnection = newHasqlBroadcasterForChannel onConnectionFailure ch
newHasqlBroadcasterOrError :: IO () -> Text -> ByteString -> IO (Either ByteString Multiplexer)
newHasqlBroadcasterOrError onConnectionFailure ch =
acquire >=> (sequence . mapBoth show (newHasqlBroadcasterForConnection . return))
where
newHasqlBroadcasterForConnection = newHasqlBroadcasterForChannel onConnectionFailure ch
tryUntilConnected :: Int -> ByteString -> IO Connection
tryUntilConnected maxRetries =
fmap (either (panic "Failure on connection retry") id) . retryConnection
where
retryConnection conStr = retrying retryPolicy shouldRetry (const $ acquire conStr)
maxDelayInMicroseconds = 32000000
firstDelayInMicroseconds = 1000000
retryPolicy = capDelay maxDelayInMicroseconds $ exponentialBackoff firstDelayInMicroseconds
shouldRetry :: RetryStatus -> Either ConnectionError Connection -> IO Bool
shouldRetry RetryStatus{..} con =
case con of
Left err -> do
putErrLn $ "Error connecting notification listener to database: " <> show err
pure $ rsIterNumber < maxRetries - 1
_ -> return False
newHasqlBroadcasterForChannel :: IO () -> Text -> IO Connection -> IO Multiplexer
newHasqlBroadcasterForChannel onConnectionFailure ch getCon = do
multi <- newMultiplexer openProducer $ const onConnectionFailure
void $ relayMessagesForever multi
return multi
where
toMsg :: ByteString -> ByteString -> Message
toMsg c m = case decode (toS m) of
Just v -> Message (channelDef c v) m
Nothing -> Message c m
lookupStringDef :: Text -> ByteString -> Value -> ByteString
lookupStringDef key d (Object obj) =
case lookupDefault (String $ toS d) key obj of
String s -> toS s
_ -> d
lookupStringDef _ d _ = d
channelDef = lookupStringDef "channel"
openProducer msgQ = do
con <- getCon
listen con $ toPgIdentifier ch
waitForNotifications
(\c m-> atomically $ writeTQueue msgQ $ toMsg c m)
con
putErrLn :: Text -> IO ()
putErrLn = hPutStrLn stderr