{-# LANGUAGE FlexibleContexts      #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE TypeOperators         #-}
-- |
-- Conduit bindings for AMQP (see amqp package) https://hackage.haskell.org/package/amqp
--
-- Create a AMQP connection, a channel, declare a queue and an exchange
-- and run the given action.
--
-- /Example/:
--
-- Connect to a server, declare a queue and an exchange and setup a callback for messages coming in the queue, then publish a message to our new exchange
--
-- >{-# LANGUAGE OverloadedStrings #-}
-- >
-- >import           Control.Concurrent           (threadDelay)
-- >import           Control.Monad.IO.Class       (MonadIO, liftIO)
-- >import           Control.Monad.Trans.Resource (runResourceT)
-- >import qualified Data.ByteString.Lazy.Char8   as BL
-- >import           Data.Conduit
-- >import           Network.AMQP
-- >import           Network.AMQP.Conduit
-- >import           Test.Hspec
-- >
-- >
-- >main :: IO ()
-- >main = hspec $ do
-- >    describe "produce and consume test" $ do
-- >        it "send a message and recieve the message" $ do
-- >            runResourceT $ withChannel config $ \conn -> do
-- >                sendMsg str $$ amqpSendSink conn "myExchange" "myKey"
-- >            amqp <- createChannel config
-- >            amqp' <- createConsumer amqp "myQueue" Ack $ \(msg,env) -> do
-- >                amqpReceiveSource (msg,env) $$ printMsg
-- >            -- | NOTE: RabbitMQ 1.7 doesn't implement this command.
-- >            -- amqp'' <- pauseConsumers amqp'
-- >            -- amqp''' <- resumeConsumers amqp''
-- >            threadDelay $ 15  * 1000000
-- >            _ <- deleteConsumer amqp'
-- >            return ()
-- >
-- >str :: String
-- >str = "This is a test message"
-- >
-- >config :: AmqpConf
-- >config = AmqpConf "amqp://guest:guest@localhost:5672/" queue exchange "myKey"
-- >    where
-- >        exchange = newExchange {exchangeName = "myExchange", exchangeType = "direct"}
-- >        queue = newQueue {queueName = "myQueue"}
-- >
-- >sendMsg :: (Monad m, MonadIO m) => String -> Source m Message
-- >sendMsg msg = do
-- >    yield (newMsg {msgBody = (BL.pack msg),msgDeliveryMode = Just Persistent} )
-- >
-- >printMsg :: (Monad m, MonadIO m) => Sink (Message, Envelope) m ()
-- >printMsg = do
-- >    m <- await
-- >    case m of
-- >       Nothing -> printMsg
-- >       Just (msg,env) -> do
-- >           liftIO $ ackEnv env
-- >           liftIO $ (BL.unpack $ msgBody msg) `shouldBe` str
-- >           liftIO $ putStrLn $ "received message: " ++ (BL.unpack $ msgBody msg)
-- >           -
-- >


module Network.AMQP.Conduit (
    -- * Conduit
      amqpReceiveSource
    , amqpSendSink
    -- * Data type
    , AmqpConf (..)
    , AmqpConn (..)
    , ExchangeKey
    , Exchange
    , QueueName
    , AmqpURI
    -- * Connection and Channel
    , withChannel
    , createConnectionChannel
    , destoryConnection
    -- * Exchange and Queue utils
    , createQueue
    , createExchange
    , bindQueueExchange
    -- * Consumer utils
    , createConsumer
    , deleteConsumer
    , pauseConsumer
    , resumeConsumer
    ) where

import           Control.Exception           (throwIO)
import           Control.Exception.Lifted    (bracket)
import           Control.Monad.IO.Class      (MonadIO, liftIO)
import           Control.Monad.Trans.Control (MonadBaseControl)
import           Data.Conduit                (Sink, Source, await, yield)
import           Data.Text                   (Text)
import           Network.AMQP                (AMQPException (ConnectionClosedException),
                                              Ack, Channel, Connection,
                                              ConsumerTag, Envelope,
                                              ExchangeOpts, Message, QueueOpts,
                                              addConnectionClosedHandler,
                                              bindQueue, cancelConsumer,
                                              closeConnection, consumeMsgs,
                                              declareExchange, declareQueue,
                                              exchangeName, flow, fromURI,
                                              openChannel, openConnection'',
                                              publishMsg, queueName)

-- |  Amqp Connection and Channel
data AmqpConn = AmqpConn
    { amqpConn :: Connection
    , amqpChan :: (Channel, Maybe ConsumerTag)
    }

-- | Amqp connection configuration. queue name, exchange name, exchange key name, and amqp URI.
data AmqpConf = AmqpConf
    {
     -- | Connection string to the database.
      amqpUri       :: AmqpURI
    , amqpQueue     :: QueueOpts
    , amqpExchange  :: ExchangeOpts
    , amqpExchanKey :: ExchangeKey
    }

type ExchangeKey = Text
type Exchange = Text
type QueueName = Text
type AmqpURI = String

-- |Create a AMQP connection and a channel and run the given action. The connetion and channnel are properly released after the action finishes using it. Note that you should not use the given Connection, channel outside the action since it may be already been released.
withChannel:: (MonadIO m, MonadBaseControl IO m)
        => AmqpConf
         -- ^ Connection config to the AMQP server.
        -> (AmqpConn -> m a)
        -- ^ Action to be executed that uses the connection.
        -> m a
withChannel conf f = do
    bracket
        (do
            -- liftIO $ putStrLn "connecting.."
            liftIO $ connect (amqpUri conf))
        (\conn -> do
            -- liftIO $ putStrLn "disconnecting.."
            liftIO $ disconnect conn)
        (\conn -> do
            -- liftIO $ putStrLn "calling function"
            f conn)


-- | Create a connection and a channel. Note that it's your responsability to properly close the connection and the channels when unneeded. Use withAMQPChannels for an automatic resource control.
createConnectionChannel :: AmqpConf
        -- ^ Connection config to the AMQP server.
        -> IO AmqpConn
createConnectionChannel conf = connect $ amqpUri conf

-- | Close a connection
destoryConnection :: AmqpConn
        -> IO ()
destoryConnection conn = do
    addConnectionClosedHandler (amqpConn conn) False (return ())
    closeConnection (amqpConn conn)

-- Consumer utils
createConsumer ::  AmqpConn
        -> QueueName
        -> Ack
        -> ((Message, Envelope) -> IO ())
        -> IO AmqpConn
createConsumer conn queue ack f = do
    tag' <- getTag chan
    return $ conn {amqpChan =(chan, Just tag')}
    where
    getTag chan' = consumeMsgs chan' queue ack f
    chan = fst $ amqpChan conn

deleteConsumer :: AmqpConn -> IO AmqpConn
deleteConsumer conn =
    case tag of
        Nothing -> return conn
        Just s -> do
            putStrLn "cancel cunsumer channel."
            cancelConsumer chan s
            return $ conn {amqpChan = (chan, Nothing)}
    where
        (chan, tag) = amqpChan conn

pauseConsumer :: AmqpConn
        -> IO AmqpConn
pauseConsumer chan = flowConsumer chan False

resumeConsumer :: AmqpConn
        -> IO AmqpConn
resumeConsumer chan = flowConsumer chan True

flowConsumer :: AmqpConn
        -> Bool
        -> IO AmqpConn
flowConsumer conn flag =
    case tag of
        Nothing -> return conn
        Just _ -> do
            flow chan flag
            return conn
    where
        (chan, tag) = amqpChan conn

-- utils
--
createQueue :: AmqpConf -> AmqpConn -> IO (QueueName, Int, Int)
createQueue conf conn = declareQueue (fst $ amqpChan conn) (amqpQueue conf)

createExchange :: AmqpConf -> AmqpConn -> IO ()
createExchange conf conn = declareExchange (fst $ amqpChan conn) (amqpExchange conf)

bindQueueExchange :: AmqpConf -> AmqpConn -> IO ()
bindQueueExchange conf conn =
    bindQueue (fst $ amqpChan conn) (queueName (amqpQueue conf)) (exchangeName (amqpExchange conf)) (amqpExchanKey conf)



-- internal
connect :: AmqpURI
        -> IO AmqpConn
connect uri = do
    -- make a connection and a channel of the connection.
    conn <- openConnection'' $ fromURI uri
    chan <- openChannel conn

    -- set a excetion when closeing the connection
    addConnectionClosedHandler conn True (throwIO (ConnectionClosedException "Connection Closed."))
    return $ AmqpConn conn (chan, Nothing)

disconnect :: AmqpConn
        -> IO ()
disconnect conn = do
    closeConnection (amqpConn conn)

sendMsg :: AmqpConn
         -> Exchange
         -> ExchangeKey
         -> Message
         -> IO (Maybe Int)
sendMsg conn exchange key msg = do
    publishMsg chan exchange key msg
    where
        chan = fst (amqpChan conn)

-- | Source as consuming data pushed.
amqpReceiveSource :: MonadIO m
        => (Message, Envelope)
        -> Source m (Message, Envelope)
amqpReceiveSource (msg, env) = loop
    where
        loop = do
            yield (msg, env)
            loop

-- | Sink as sending data.
amqpSendSink :: MonadIO m
        => AmqpConn
        -> Exchange
        -> ExchangeKey
        -> Sink Message m ()
amqpSendSink conn exchange key = loop
    where
        loop = await >>= maybe (return ()) (\v -> (liftIO $ sendMsg conn exchange key v) >> loop)