{-# LANGUAGE ConstraintKinds #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE TypeOperators #-}

module Web.Twitter.Conduit.Stream (
    -- * StreamingAPI
    Userstream,
    userstream,
    StatusesFilter,
    FilterParameter (..),
    statusesFilter,
    statusesFilterByFollow,
    statusesFilterByTrack,
    -- , statusesFilterByLocation
    -- , statusesSample
    -- , statusesFirehose
    -- , sitestream
    -- , sitestream'
    stream,
    stream',
) where

import Web.Twitter.Conduit.Base
import Web.Twitter.Conduit.Request
import Web.Twitter.Conduit.Request.Internal
import Web.Twitter.Conduit.Response
import Web.Twitter.Conduit.Types
import Web.Twitter.Types

import Control.Monad.Catch
import Control.Monad.IO.Class
import Control.Monad.Trans.Resource (MonadResource)
import Data.Aeson
import qualified Data.ByteString.Char8 as S8
import Data.Char
import qualified Data.Conduit as C
import qualified Data.Conduit.List as CL
import qualified Data.List as L
import qualified Data.Text as T
import qualified Network.HTTP.Conduit as HTTP

stream ::
    ( MonadResource m
    , FromJSON responseType
    , MonadThrow m
    ) =>
    TWInfo ->
    HTTP.Manager ->
    APIRequest apiName responseType ->
    m (C.ConduitM () responseType m ())
stream :: TWInfo
-> Manager
-> APIRequest apiName responseType
-> m (ConduitM () responseType m ())
stream = TWInfo
-> Manager
-> APIRequest apiName responseType
-> m (ConduitM () responseType m ())
forall (m :: * -> *) value (apiName :: [Param Symbol *])
       responseType.
(MonadResource m, FromJSON value, MonadThrow m) =>
TWInfo
-> Manager
-> APIRequest apiName responseType
-> m (ConduitM () value m ())
stream'

stream' ::
    ( MonadResource m
    , FromJSON value
    , MonadThrow m
    ) =>
    TWInfo ->
    HTTP.Manager ->
    APIRequest apiName responseType ->
    m (C.ConduitM () value m ())
stream' :: TWInfo
-> Manager
-> APIRequest apiName responseType
-> m (ConduitM () value m ())
stream' TWInfo
info Manager
mgr APIRequest apiName responseType
req = do
    Response (ConduitM () ByteString m ())
rsrc <- TWInfo
-> Manager -> Request -> m (Response (ConduitM () ByteString m ()))
forall (m :: * -> *).
MonadResource m =>
TWInfo
-> Manager -> Request -> m (Response (ConduitM () ByteString m ()))
getResponse TWInfo
info Manager
mgr (Request -> m (Response (ConduitM () ByteString m ())))
-> m Request -> m (Response (ConduitM () ByteString m ()))
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< IO Request -> m Request
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (APIRequest apiName responseType -> IO Request
forall (apiName :: [Param Symbol *]) responseType.
APIRequest apiName responseType -> IO Request
makeRequest APIRequest apiName responseType
req)
    ConduitM () value m () -> m (ConduitM () value m ())
forall (m :: * -> *) a. Monad m => a -> m a
return (ConduitM () value m () -> m (ConduitM () value m ()))
-> ConduitM () value m () -> m (ConduitM () value m ())
forall a b. (a -> b) -> a -> b
$ Response (ConduitM () ByteString m ())
-> ConduitM () ByteString m ()
forall responseType. Response responseType -> responseType
responseBody Response (ConduitM () ByteString m ())
rsrc ConduitM () ByteString m ()
-> ConduitM ByteString value m () -> ConduitM () value m ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
C..| ConduitT ByteString value m value -> ConduitM ByteString value m ()
forall (m :: * -> *) i o.
Monad m =>
ConduitT i o m o -> ConduitT i o m ()
CL.sequence ConduitT ByteString value m value
forall c. ConduitM ByteString c m value
sinkFromJSONIgnoreSpaces
  where
    sinkFromJSONIgnoreSpaces :: ConduitM ByteString c m value
sinkFromJSONIgnoreSpaces = (ByteString -> Bool) -> ConduitT ByteString ByteString m ()
forall (m :: * -> *) a. Monad m => (a -> Bool) -> ConduitT a a m ()
CL.filter (Bool -> Bool
not (Bool -> Bool) -> (ByteString -> Bool) -> ByteString -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Char -> Bool) -> ByteString -> Bool
S8.all Char -> Bool
isSpace) ConduitT ByteString ByteString m ()
-> ConduitM ByteString c m value -> ConduitM ByteString c m value
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
C..| ConduitM ByteString c m value
forall a (m :: * -> *) o.
(FromJSON a, MonadThrow m) =>
ConduitT ByteString o m a
sinkFromJSON

userstream :: APIRequest Userstream StreamingAPI
userstream :: APIRequest Userstream StreamingAPI
userstream = ByteString
-> String -> APIQuery -> APIRequest Userstream StreamingAPI
forall (supports :: [Param Symbol *]) responseType.
ByteString
-> String -> APIQuery -> APIRequest supports responseType
APIRequest ByteString
"GET" String
"https://userstream.twitter.com/1.1/user.json" []
type Userstream =
    '[ "language" ':= T.Text
     , "filter_level" ':= T.Text
     , "stall_warnings" ':= Bool
     , "replies" ':= T.Text
     ]

-- https://dev.twitter.com/streaming/overview/request-parameters
data FilterParameter
    = Follow [UserId]
    | Track [T.Text]

-- | Returns statuses/filter.json API query data.
--
-- >>> statusesFilter [Follow [1,2,3]]
-- APIRequest "POST" "https://stream.twitter.com/1.1/statuses/filter.json" [("follow","1,2,3")]
-- >>> statusesFilter [Track ["haskell","functional"]]
-- APIRequest "POST" "https://stream.twitter.com/1.1/statuses/filter.json" [("track","haskell,functional")]
-- >>> statusesFilter [Follow [1,2,3],Track ["haskell","functional"]]
-- APIRequest "POST" "https://stream.twitter.com/1.1/statuses/filter.json" [("follow","1,2,3"),("track","haskell,functional")]
statusesFilter :: [FilterParameter] -> APIRequest StatusesFilter StreamingAPI
statusesFilter :: [FilterParameter] -> APIRequest StatusesFilter StreamingAPI
statusesFilter [FilterParameter]
fs = ByteString
-> String -> APIQuery -> APIRequest StatusesFilter StreamingAPI
forall (supports :: [Param Symbol *]) responseType.
ByteString
-> String -> APIQuery -> APIRequest supports responseType
APIRequest ByteString
"POST" String
statusesFilterEndpoint ((FilterParameter -> APIQueryItem) -> [FilterParameter] -> APIQuery
forall a b. (a -> b) -> [a] -> [b]
L.map FilterParameter -> APIQueryItem
paramToQueryItem [FilterParameter]
fs)

paramToQueryItem :: FilterParameter -> APIQueryItem
paramToQueryItem :: FilterParameter -> APIQueryItem
paramToQueryItem (Follow [UserId]
userIds) = (ByteString
"follow", [UserId] -> PV
PVIntegerArray [UserId]
userIds)
paramToQueryItem (Track [Text]
texts) = (ByteString
"track", [Text] -> PV
PVStringArray [Text]
texts)

statusesFilterEndpoint :: String
statusesFilterEndpoint :: String
statusesFilterEndpoint = String
"https://stream.twitter.com/1.1/statuses/filter.json"

-- | Returns statuses/filter.json API query data.
--
-- >>> statusesFilterByFollow [1,2,3]
-- APIRequest "POST" "https://stream.twitter.com/1.1/statuses/filter.json" [("follow","1,2,3")]
statusesFilterByFollow :: [UserId] -> APIRequest StatusesFilter StreamingAPI
statusesFilterByFollow :: [UserId] -> APIRequest StatusesFilter StreamingAPI
statusesFilterByFollow [UserId]
userIds = [FilterParameter] -> APIRequest StatusesFilter StreamingAPI
statusesFilter [[UserId] -> FilterParameter
Follow [UserId]
userIds]

-- | Returns statuses/filter.json API query data.
--
-- >>> statusesFilterByTrack "haskell"
-- APIRequest "POST" "https://stream.twitter.com/1.1/statuses/filter.json" [("track","haskell")]
statusesFilterByTrack ::
    -- | keyword
    T.Text ->
    APIRequest StatusesFilter StreamingAPI
statusesFilterByTrack :: Text -> APIRequest StatusesFilter StreamingAPI
statusesFilterByTrack Text
keyword = [FilterParameter] -> APIRequest StatusesFilter StreamingAPI
statusesFilter [[Text] -> FilterParameter
Track [Text
keyword]]

type StatusesFilter =
    '[ "language" ':= T.Text
     , "filter_level" ':= T.Text
     , "stall_warnings" ':= Bool
     ]