module PostgresWebsockets.HasqlBroadcast
( newHasqlBroadcaster
, newHasqlBroadcasterOrError
, acquire
, relayMessages
, relayMessagesForever
) where
import Protolude
import Hasql.Connection
import Data.Either.Combinators (mapBoth)
import Data.Function (id)
import Control.Retry (RetryStatus, retrying, capDelay, exponentialBackoff)
import PostgresWebsockets.Database
import PostgresWebsockets.Broadcast
newHasqlBroadcaster :: ByteString -> IO Multiplexer
newHasqlBroadcaster = newHasqlBroadcasterForConnection . tryUntilConnected
newHasqlBroadcasterOrError :: ByteString -> IO (Either ByteString Multiplexer)
newHasqlBroadcasterOrError =
acquire >=> (sequence . mapBoth show (newHasqlBroadcasterForConnection . return))
tryUntilConnected :: ByteString -> IO Connection
tryUntilConnected =
fmap (either (panic "Failure on connection retry") id) . retryConnection
where
retryConnection conStr = retrying retryPolicy shouldRetry (const $ acquire conStr)
maxDelayInMicroseconds = 32000000
firstDelayInMicroseconds = 1000000
retryPolicy = capDelay maxDelayInMicroseconds $ exponentialBackoff firstDelayInMicroseconds
shouldRetry :: RetryStatus -> Either ConnectionError Connection -> IO Bool
shouldRetry _ con =
case con of
Left err -> do
putErrLn $ "Error connecting notification listener to database: " <> show err
return True
_ -> return False
newHasqlBroadcasterForConnection :: IO Connection -> IO Multiplexer
newHasqlBroadcasterForConnection getCon = do
multi <- newMultiplexer openProducer closeProducer
void $ relayMessagesForever multi
return multi
where
closeProducer _ = putErrLn "Broadcaster is dead"
openProducer cmds msgs = do
con <- getCon
waitForNotifications
(\c m-> atomically $ writeTQueue msgs $ Message c m)
con
forever $ do
cmd <- atomically $ readTQueue cmds
case cmd of
Open ch -> listen con $ toPgIdentifier ch
Close ch -> unlisten con $ toPgIdentifier ch
putErrLn :: Text -> IO ()
putErrLn = hPutStrLn stderr