-- |
-- Module : Web.WindowsAzure.ServiceBus.Queue
-- Description : API for reading from and writing to ServiceBus Queue
-- Copyright : (c) Hemanth Kapila, 2014
-- License : BSD3
-- Maintainer : saihemanth@gmail.com
-- Stability  : Experimental
-- Provides API to pull from and push to ServiceBus queue
-- Please refer to <http://msdn.microsoft.com/en-us/library/hh780726.aspx Service Bus Rest API> for information on the API provided by
-- Microsoft Service bus.
-- Simple example for how to use this library is as below
-- @
-- import Web.WindowsAzure.ServiceBus.Queue
-- import Web.WindowsAzure.ServiceBus
-- import qualified Data.ByteString.Char8 as C
-- queueName = "queueName"
-- sbNamespace = "namespace"
-- sbIssuerKey = C.pack "1287361251262as="
-- sbIssuerName = C.pack "owner"
-- sbinfo = SBInfo sbNamespace sbIssuerName sbIssuerKey 
-- message = C.pack "Hello from Haskell"
-- main = do
--   sbContext <- sbContext sbinfo
--   enQueueBS queueName message sbContext
--   res <- deQueue queueName 30 sbContext
--   print res
-- @
-- see examples (available as a part of distribution) for a more detailed example.

module Web.WindowsAzure.ServiceBus.Queue( 
  -- Locked Message Info
  -- * Pushing data to Queue
  -- * Reading data from Queue

import qualified Data.ByteString.Char8 as C
import qualified Data.ByteString.Lazy as L
import Data.Conduit
import Data.Int

import Web.WindowsAzure.ACS
import Web.WindowsAzure.ServiceBus.SBTypes
import qualified Data.ByteString.Char8 as C
import qualified Data.ByteString.Lazy as L
import Network.HTTP.Conduit hiding (requestBodySource)
import Network.HTTP.Client.Conduit hiding (httpLbs)
import Network.HTTP.Types.Method (methodDelete, methodPost,methodPut)
import Network.HTTP.Types.Header
import Network.HTTP.Types.Method

import qualified Data.CaseInsensitive as CI
import Network.Connection (TLSSettings (..))
import Data.Aeson
import Network(withSocketsDo)

-- | 'QLockedMsgInfo' provides Information of the locked message from a queue.
data QLockedMsgInfo = QLockedMsgInfo String BrokerProperties 
                       deriving (Show)

-- | Internal low-level method for performing HTTP calls. 
-- Not exposed to the user
enQueueRequest :: String -> RequestBody -> SBContext -> IO ()
enQueueRequest queueName body (SBContext baseUrl manager aContext)  = do
  token <- acsToken manager aContext
  reqInit <- parseUrl (baseUrl ++ "/" ++ queueName ++ "/messages")
  withSocketsDo $ httpLbs (reqInit { method = methodPost,
                     requestHeaders = [token],
                     requestBody = body
                   }) manager
  return ()

-- | Internal low-level method for creating the HTTP calls. For internal use. 
-- should be avoided by most users  
deQueueRequest :: String -> Int -> SBContext -> IO L.ByteString
deQueueRequest queueName timeout (SBContext baseUrl manager aContext) = do
  token <- acsToken manager aContext
  reqInit <- parseUrl (baseUrl ++ "/" ++ queueName ++ "/messages/head?timeout=" ++ (show timeout))
  res <-withSocketsDo $  httpLbs ( reqInit { method = methodDelete, 
                     requestHeaders = [token]
                   }) manager
  return $ responseBody res

-- | publish a message containing 'C.ByteString' to queue.
-- The following publishes a strict bytestring \bs\ to queue \q\
-- @
-- enQueueBS q bs ctx
-- @
enQueueBS ::  String -> C.ByteString -> SBContext -> IO ()
enQueueBS   queueName content context = 
  enQueueRequest  queueName (RequestBodyBS content) context
-- | publish a message containing 'L.ByteString' to queue
--  The following publishes a lazy bytestring ,\lbs\, to queue \q\,  
-- @  
-- enQueueLBS q lbs  ctx
-- @  
enQueueLBS :: String -> L.ByteString -> SBContext -> IO ()  
enQueueLBS  queueName content context = 
  enQueueRequest  queueName (RequestBodyLBS content) context
-- | publish from a 'Source' (refer to 'requestBodySource')
enQueueBodySrc ::  String -> Int64 -> Source IO C.ByteString -> SBContext -> IO ()
enQueueBodySrc  queueName  len bodysrc context =   enQueueRequest queueName (requestBodySource len bodysrc) context

-- | Reads and deletes the message from a queue. 
-- In order to destructively read the latest message from the queue (with a time out of n seconds),   
-- @  
-- deQueue queueName n context
-- @
--  Note that the timeout can be at the most 55 seconds. This silently \ignores\ the timeouts greater than 55  
deQueue :: String -> Int -> SBContext -> IO (L.ByteString)
deQueue  queueName timeout context =  deQueueRequest  queueName (timeout `mod` 55) context

-- | Peek Lock Message from a Queue. Non-Destructive Read.
-- Atomically retrieves the next message from a queue and locks it for further processing. The message is guaranteed not to be delivered to
-- other receivers (on the same subscription) during the duration of the lock.
-- Refer <http://msdn.microsoft.com/en-us/library/hh780735.aspx ServiceBus documentation> for semantics of the underlying REST API.
peekLockQueue :: String -> Int -> SBContext -> IO (QLockedMsgInfo,L.ByteString)
peekLockQueue qName timeout (SBContext baseUrl manager aContext) = do
    token <- acsToken manager aContext
    reqInit <- parseUrl (baseUrl ++ "/" ++ qName ++ "/messages/head?timeout=" ++ (show timeout))
    res <-withSocketsDo $  httpLbs (reqInit { method = methodPost,
                              requestHeaders = [token]
                            }) manager
    return $ (getQLI res,responseBody res)

getQLI :: Response L.ByteString -> QLockedMsgInfo
getQLI res = QLockedMsgInfo loc bp 
      loc = case lookup hLocation (responseHeaders res) of
            Nothing -> error "Expected Location Header in the response!"
            Just x  -> C.unpack x
      bp =   case lookup (CI.mk . C.pack $ "BrokerProperties") (responseHeaders res) of
                Nothing -> emptyBP
                Just bs -> case decode $ L.fromChunks [bs] of
                            Nothing -> emptyBP
                            Just b  -> b

-- | Unlock a messages that has been locked earlier.
-- Given a queueName and the broker properties of the message that has been locked before,
-- 'unlockMessage' removes the lock so that the message can be consumed by other consumers.
-- see 'peekLockQueue' and <http://msdn.microsoft.com/en-us/library/hh780723.aspx ServiceBus documentation> for details on the underlying API.
unlockMessage :: String -> QLockedMsgInfo -> SBContext -> IO ()
unlockMessage queueName (QLockedMsgInfo url brokerProps) (SBContext baseUrl manager acsContext) = do
    token <- acsToken manager acsContext
    reqInit <- parseUrl url
    res <-withSocketsDo $  httpLbs (reqInit { method = methodPut,
                              requestHeaders = [token]
                            }) manager
    return ()

-- | Delete a message that has been locked earlier.
-- Given a queueName and the locked message info,  'deleteMessage' deletes the message from the queue
-- see 'peekLockQueue' and <http://msdn.microsoft.com/en-us/library/hh780767.aspx ServiceBus documentation> for details on the underlying API. 
deleteMessage :: String -> QLockedMsgInfo -> SBContext -> IO ()
deleteMessage queueName (QLockedMsgInfo url brokerProps) (SBContext baseUrl manager acsContext) = do
    token <- acsToken manager acsContext
    reqInit <- parseUrl url
    res <-withSocketsDo $  httpLbs (reqInit { method = methodDelete,
                              requestHeaders = [token]
                            }) manager
    return ()

-- | Renews lock on a locked message
-- Given a queueName and the locked-message info, 'renewLock' renews the lock
-- 'deleteMessage' deletes the message from the queue
-- see 'peekLockQueue' and <http://msdn.microsoft.com/en-us/library/jj839741.aspx ServiceBus documentation> for  details on the underlying API. 
renewLock :: String -> QLockedMsgInfo -> SBContext -> IO ()
renewLock queueName (QLockedMsgInfo url brokerProps) (SBContext baseUrl manager acsContext) = do
    token <- acsToken manager acsContext
    reqInit <- parseUrl url
    res <-withSocketsDo $  httpLbs (reqInit { method = methodPost,
                              requestHeaders = [token]
                            }) manager
    return ()