-- adopted from Bardur Arantsson's MIT mongrel2-handler {-# LANGUAGE TemplateHaskell #-} {-# LANGUAGE OverloadedStrings #-} module Hack2.Handler.Mongrel2.IO where import Blaze.ByteString.Builder (Builder,fromByteString) import Data.Attoparsec import Hack2.Handler.Mongrel2.MessageParser (messageParser) import Hack2.Handler.Mongrel2.Response import Hack2.Handler.Mongrel2.Types import qualified System.ZMQ as ZMQ import Data.ByteString.Char8 (ByteString, unpack) import Control.Monad (forM_) import qualified Data.ByteString.Char8 as B import Data.Enumerator hiding (map) import Air.Env hiding (log) import Prelude () import Hack2.Handler.Mongrel2.Utils import Data.Attoparsec import Data.Aeson (Value(..), json) import Data.Text.Encoding (encodeUtf8) import qualified Data.Aeson as Aeson import Data.Maybe (listToMaybe, fromMaybe, isJust, fromJust) import qualified Data.Map as Map import Control.Concurrent.STM import Data.Int import qualified Data.Set as Set -- | Create a new handler. @mkHandler pullFromAddress publishToAddress id@ -- creates a handler which pulls requests from @fromAddress@ and publishes -- replies to @publishAddress@. mkHandler :: String -> String -> Maybe String -> Handler mkHandler = Handler -- | Run an IO action with a connected handler. withConnectedHandler :: Handler -> (ConnectedHandler -> IO a) -> IO a withConnectedHandler h io = ZMQ.withContext 1 - \c -> ZMQ.withSocket c ZMQ.Pull - \s -> do ZMQ.withSocket c ZMQ.Pub - \ps -> do -- Connect to the poll socket. ZMQ.connect s - handlerPullFrom h -- Bind to publish socket. ZMQ.connect ps - handlerPublishTo h case handlerId h of Nothing -> return () Just uuid -> ZMQ.setOption ps - ZMQ.Identity uuid -- Run the action with the connected handler. io - ConnectedHandler s ps -- | Receive a parsed request from the Mongrel2 server. Blocks until -- a message is received. receiveRequest :: ConnectedHandler -> IO Request receiveRequest handler = do msg <- ZMQ.receive (chPullSocket handler) [] log - "ZeroMQ received: " + unpack msg -- Parse into a structured message. case parseOnly messageParser msg of Left errMsg -> -- Invalid message. This can only happen if there's -- an error in our parsing code or Mongrel2 itself. fail - "Couldn't parse message: " + (show errMsg) Right m -> do -- Return the parsed message. return m jsonToList :: Value -> [(ByteString, ByteString)] jsonToList value = let maybe_string (Aeson.String x) = Just - x maybe_string _ = Nothing _headers = case value of Aeson.Object headerMap -> headerMap .Map.toAscList .map_snd maybe_string .select (snd > isJust) .map (\(x,y) -> (x.encodeUtf8, fromJust y.encodeUtf8)) _ -> [] in _headers requestJsonBody :: Request -> Maybe [(ByteString, ByteString)] requestJsonBody request = let _headers = request.requestHeaders.jsonToList _method = _headers.lookup "METHOD" .fromMaybe "JSON" in if _method.is "JSON" then let msg = request.requestBody in case parseOnly json - msg of Left errMsg -> -- Invalid message. This can only happen if there's -- an error in our parsing code or Mongrel2 itself. -- log_error - "Couldn't parse message: " + (show errMsg) Just [] Right m -> do -- Return the parsed message. Just - m.jsonToList else Nothing sendResponse :: ConnectedHandler -> TVar (Set.Set Int64) -> Response -> IO () sendResponse handler disconnected response = run_ - _responseEnum $$ _send_iteratee where uuid = responseUuid response clientId = responseClientId response _responseEnum = responseBody response _mk_response msg = mkResponse uuid clientId msg _socket = chPublishSocket handler -- if put SndMore, memory hogs at socket buffer _send msg = ZMQ.send _socket (_mk_response msg) [] -- [ZMQ.SndMore] -- we don't need done, since according to HTTP, ocntent-length should always be set -- which causes connection to be auto disconnected _done = ZMQ.send _socket (_mk_response "") [] -- head :: Monad m => Iteratee B.ByteString m (Maybe Word8) -- head = continue loop where -- loop (Chunks xs) = case BL.uncons (BL.fromChunks xs) of -- Just (char, extra) -> yield (Just char) (toChunks extra) -- Nothing -> head -- loop EOF = yield Nothing EOF -- _send_iteratee :: Iteratee ByteString IO () _send_iteratee = continue consume where consume (Chunks xs) = do _ids <- io - atomically - readTVar disconnected if _ids.has clientId then do io - log - "client disconnected, ignore rest of message" else do xs.mapM_ (\msg -> do io - _send - msg -- io - log - "ZeroMQ sent: " + unpack msg ) _send_iteratee consume EOF = do yield () EOF -- let combined = B.append _responseHeader _responseBody -- putStrLn - "message: " + unpack combined -- ZMQ.send _socket combined []