module Network.Pubnub
(
PN(..)
, defaultPN
, Timestamp(..)
, time
, publish
, subscribe
, hereNow
, presence
, history
, leave
, getUuid
, unsubscribe
, audit
, auth
, grant
) where
import Network.Pubnub.Types
import Data.Default (def)
import Data.Aeson
import Data.UUID.V4
import Network.HTTP.Conduit
import Network.HTTP.Types
import Data.Conduit
import Control.Monad.Trans
import Control.Concurrent.Async
import Control.Applicative ((<$>))
import Control.Exception.Lifted (try)
import Data.Text.Encoding
import Data.Maybe
import Data.Time.Clock.POSIX
import Data.Digest.Pure.SHA
import Crypto.Cipher.AES
import Crypto.Padding
import qualified Data.Text as T
import qualified Data.UUID as U
import qualified Data.ByteString.Char8 as B
import qualified Data.ByteString.Lazy as L
import qualified Data.ByteString.Base64 as B64
time :: IO (Maybe Timestamp)
time = do
let req = buildRequest defaultPN ["time", "0"] [] Nothing
res <- withManager $ httpLbs req
return (decode $ responseBody res :: Maybe Timestamp)
subscribe :: (FromJSON b) => PN -> SubscribeOptions b -> IO (Async ())
subscribe pn subOpts =
case onPresence subOpts of
Nothing -> subscribeInternal pn subOpts
Just onPresenceCallback ->
case uuid_key pn of
Nothing -> getUuid >>= \u -> subscribe' pn{uuid_key = Just u}
_ -> subscribe' pn
where
subscribe' pn' =
do
a <- presence pn' (uuid_key pn') onPresenceCallback
b <- subscribeInternal pn' subOpts
link2 a b
return a
subscribeInternal :: (FromJSON b) => PN -> SubscribeOptions b -> IO (Async ())
subscribeInternal pn subOpts =
async (withManagerSettings conduitManagerSettings $ \manager -> connect pn{time_token = Timestamp 0} manager False)
where
connect pn' manager isReconnect = do
let req = buildSubscribeRequest pn' "0"
eres <- try $ httpLbs req manager
case eres of
Right res -> do
case decode $ responseBody res of
Just (ConnectResponse ([], t)) -> do
liftIO (if isReconnect
then onReconnect subOpts
else onConnect subOpts)
subscribe' manager (if resumeOnReconnect subOpts && isReconnect
then pn
else pn{time_token=t})
_ -> do
liftIO (onDisconnect subOpts)
subscribe' manager pn
Left (StatusCodeException (Status code msg) _ _) -> do
liftIO $ onError subOpts (Just code) (Just msg)
connect pn' manager isReconnect
Left _ -> do
liftIO $ onError subOpts Nothing Nothing
connect pn' manager isReconnect
subscribe' manager pn' = do
let req = buildSubscribeRequest pn' $ L.toStrict $ encode (time_token pn')
eres <- try $ http req manager
case eres of
Right res -> do
case (ctx pn', iv pn') of
(Just c, Just i) -> do
responseBody res $$+- encryptedSubscribeSink c i
(_, _) -> do
responseBody res $$+- subscribeSink
reconnect pn' manager
Left (ResponseTimeout :: HttpException) ->
subscribe' manager pn'
Left (StatusCodeException (Status code msg) _ _) -> do
liftIO $ onError subOpts (Just code) (Just msg)
reconnect pn' manager
Left _ -> do
liftIO $ onError subOpts Nothing Nothing
reconnect pn' manager
encryptedSubscribeSink c i =
awaitForever $ (\x ->
case decode (L.fromStrict x) of
Just (EncryptedSubscribeResponse (resp, _)) -> do
_ <- liftIO $ mapM (onMsg subOpts . decodeEncrypted c i) resp
return ()
Nothing ->
return ())
subscribeSink =
awaitForever $ (\x ->
case decode (L.fromStrict x) of
Just (SubscribeResponse (resp, _)) -> do
_ <- liftIO $ mapM (onMsg subOpts) resp
return ()
Nothing ->
return ())
reconnect pn' manager = connect pn' manager True
buildSubscribeRequest pn' tt =
buildRequest pn' [ "stream"
, encodeUtf8 $ sub_key pn'
, encodeUtf8 $ T.intercalate "," (channels pn')
, bsFromInteger $ jsonp_callback pn'
, tt ]
(userIdOptions pn')
(subTimeout subOpts)
decodeEncrypted c i m = decodeUnencrypted $ decrypt c i m
decodeUnencrypted :: (FromJSON b) => B.ByteString -> b
decodeUnencrypted m = fromJust $ decode $ L.fromStrict m
decrypt c i m = unpadPKCS5 $ decryptCBC c i $ B64.decodeLenient $ decodeJson m
decodeJson :: T.Text -> B.ByteString
decodeJson s = case decode (L.fromStrict (encodeUtf8 s)) of
Nothing -> encodeUtf8 s
Just l -> encodeUtf8 l
publish :: ToJSON a => PN -> T.Text -> a -> IO (Maybe PublishResponse)
publish pn channel msg = do
let encoded_msg = L.toStrict $ encode msg
let sig = signature (sec_key pn) encoded_msg
let req = buildRequest pn [ "publish"
, encodeUtf8 $ pub_key pn
, encodeUtf8 $ sub_key pn
, sig
, encodeUtf8 channel
, bsFromInteger $ jsonp_callback pn
, encrypt (ctx pn) (iv pn) encoded_msg]
(userIdOptions pn)
Nothing
res <- withManager $ httpLbs req
return (decode $ responseBody res)
where
signature "0" _ = "0"
signature secret m = B.pack $ showDigest $ hmacSha256 (L.fromStrict $ encodeUtf8 secret) (L.fromStrict m)
encrypt (Just c) (Just i) m = encodeJson $ B64.encode $ encryptCBC c i (padPKCS5 16 m)
encrypt Nothing _ m = m
encrypt _ Nothing m = m
encodeJson s = L.toStrict $ encode (decodeUtf8 s)
hereNow :: PN -> T.Text -> IO (Maybe HereNow)
hereNow pn channel = do
let req = buildRequest pn [ "v2"
, "presence"
, "sub-key"
, encodeUtf8 $ sub_key pn
, "channel"
, encodeUtf8 channel] [] Nothing
res <- withManager $ httpLbs req
return (decode $ responseBody res)
presence :: (FromJSON b) => PN -> Maybe UUID -> (b -> IO ()) -> IO (Async ())
presence pn u fn =
let subOpts = defaultSubscribeOptions{ onMsg = fn }
pn' = pn { uuid_key=u, ctx=Nothing, channels=presence_channels }
in
subscribeInternal pn' subOpts
where
presence_channels = map (prepend "-pnpres") (channels pn)
prepend = flip T.append
history :: FromJSON b => PN -> T.Text -> HistoryOptions -> IO (Maybe (History b))
history pn channel options = do
let req = buildRequest pn [ "v2"
, "history"
, "sub-key"
, encodeUtf8 $ sub_key pn
, "channel"
, encodeUtf8 channel]
((convertHistoryOptions options) ++
(userIdOptions pn))
Nothing
res <- withManager $ httpLbs req
return (decode $ responseBody res)
leave :: PN -> T.Text -> UUID -> IO ()
leave pn channel u = do
let req = buildRequest pn [ "v2"
, "presence"
, "sub-key"
, encodeUtf8 $ sub_key pn
, "channel"
, encodeUtf8 channel
, "leave"] [("uuid", encodeUtf8 u)] Nothing
_ <- withManager $ httpLbs req
return ()
getUuid :: IO UUID
getUuid =
T.pack . U.toString <$> nextRandom
unsubscribe :: Async () -> IO ()
unsubscribe = cancel
audit :: PN -> Auth -> IO (Maybe Value)
audit = pamDo "audit"
grant :: PN -> Auth -> IO (Maybe Value)
grant = pamDo "grant"
auth :: PN -> T.Text -> PN
auth pn k = pn{auth_key = (Just k)}
pamDo :: B.ByteString -> PN -> Auth -> IO (Maybe Value)
pamDo pamMethod pn authR = do
ts <- (bsFromInteger . round) <$> getPOSIXTime
let req = buildRequest pn pamURI ((pamQS ts) ++ [("signature", signature ts)]) Nothing
res <- withManager $ httpLbs req
return (decode $ responseBody res)
where
authK = T.intercalate "," $ authKeys authR
channel = fromJust $ chan authR
authRead = r authR
authWrite = w authR
pubKey = encodeUtf8 $ pub_key pn
subKey = encodeUtf8 $ sub_key pn
secKey = L.fromStrict . encodeUtf8 $ sec_key pn
signature ts = B64.encode . L.toStrict . bytestringDigest $ hmacSha256 secKey (msg ts)
msg ts = L.fromStrict $ B.intercalate B.empty [ subKey, "\n"
, pubKey, "\n"
, pamMethod, "\n"
, B.tail $ renderSimpleQuery True $ pamQS ts]
pamURI = [ "v1"
, "auth"
, pamMethod
, "sub-key"
, subKey]
pamQS ts = [ ("auth", encodeUtf8 authK)
, ("channel", encodeUtf8 channel)
, ("r", if authRead then "1" else "0")
, ("timestamp", ts)
, ("w", if authWrite then "1" else "0")]
userIdOptions :: PN -> [(B.ByteString, B.ByteString)]
userIdOptions pn =
(maybe [] (\u -> [("uuid", encodeUtf8 u)]) (uuid_key pn)) ++
(maybe [] (\a -> [("auth", encodeUtf8 a)]) (auth_key pn))
buildRequest :: PN -> [B.ByteString] -> SimpleQuery -> Maybe Int -> Request
buildRequest pn elems qs timeout =
def { host = encodeUtf8 $ origin pn
, path = B.intercalate "/" elems
, method = "GET"
, port = if ssl pn then 443 else 80
, requestHeaders = [ ("V", "3.1")
, ("User-Agent", "Haskell")
, ("Accept", "*/*")]
, queryString = renderSimpleQuery True qs
, secure = ssl pn
, responseTimeout = Just (maybe 5000000 (* 1000000) timeout) }
bsFromInteger :: Integer -> B.ByteString
bsFromInteger = B.pack . show