module SecondTransfer.Http1.Session(
http11Attendant
) where
import Control.Lens
import Control.Exception (catch)
import Control.Concurrent (forkIO)
import qualified Data.ByteString as B
import Data.Conduit
import Data.Conduit.List (consume)
import SecondTransfer.MainLoop.CoherentWorker
import SecondTransfer.MainLoop.PushPullType
import SecondTransfer.Sessions.Internal (SessionsContext, acquireNewSessionTag, sessionsConfig)
import System.Log.Logger
import System.Clock
import SecondTransfer.Http1.Parse
import SecondTransfer.Exception (IOProblem)
import SecondTransfer.Sessions.Config
import qualified SecondTransfer.Utils.HTTPHeaders as He
http11Attendant :: SessionsContext -> AwareWorker -> Attendant
http11Attendant sessions_context coherent_worker attendant_callbacks
=
do
new_session_tag <- acquireNewSessionTag sessions_context
infoM "Session.Session_HTTP11" $ "Starting new session with tag: " ++(show new_session_tag)
forkIO $ go new_session_tag (Just "") 1
return ()
where
push_action = attendant_callbacks ^. pushAction_AtC
close_action = attendant_callbacks ^. closeAction_AtC
best_effort_pull_action = attendant_callbacks ^. bestEffortPullAction_AtC
go :: Int -> Maybe B.ByteString -> Int -> IO ()
go session_tag (Just leftovers) reuse_no = do
infoM "Session.Session_HTTP11" $ "(Re)Using session with tag: " ++ (show session_tag)
maybe_leftovers <- add_data newIncrementalHttp1Parser leftovers session_tag reuse_no
go session_tag maybe_leftovers (reuse_no + 1)
go _ Nothing _ =
return ()
add_data :: IncrementalHttp1Parser -> B.ByteString -> Int -> Int -> IO (Maybe B.ByteString)
add_data parser bytes session_tag reuse_no = do
let
completion = addBytes parser bytes
case completion of
MustContinue_H1PC new_parser ->
catch
(do
new_bytes <- best_effort_pull_action True
add_data new_parser new_bytes session_tag reuse_no
)
( (\ _e -> do
debugM "Session.HTTP1" "Could not receive data"
close_action
return Nothing
) :: IOProblem -> IO (Maybe B.ByteString) )
OnlyHeaders_H1PC headers leftovers -> do
let
modified_headers = addExtraHeaders sessions_context headers
started_time <- getTime Monotonic
principal_stream <- coherent_worker Request {
_headers_RQ = modified_headers,
_inputData_RQ = Nothing,
_perception_RQ = Perception {
_startedTime_Pr = started_time,
_streamId_Pr = reuse_no,
_sessionId_Pr = session_tag
}
}
let
data_and_conclusion = principal_stream ^. dataAndConclusion_PS
response_headers = principal_stream ^. headers_PS
(_, fragments) <- runConduit $ fuseBoth data_and_conclusion consume
let
response_text =
serializeHTTPResponse response_headers fragments
catch
(do
push_action response_text
return $ Just leftovers
)
((\ _e -> do
debugM "Session.HTTP1" "Session abandoned"
close_action
return Nothing
) :: IOProblem -> IO (Maybe B.ByteString) )
HeadersAndBody_H1PC _headers _stopcondition _recv_leftovers -> do
close_action
error "NotImplemented requests with bodies"
addExtraHeaders :: SessionsContext -> Headers -> Headers
addExtraHeaders sessions_context headers =
let
enriched_lens :: Lens' SessionsContext SessionsEnrichedHeaders
enriched_lens = (sessionsConfig . sessionsEnrichedHeaders)
headers_editor = He.fromList headers
protocol_lens = He.headerLens "second-transfer-eh--used-protocol"
add_used_protocol = sessions_context ^. (enriched_lens . addUsedProtocol )
he1 = if add_used_protocol
then set protocol_lens (Just "HTTP/1.1") headers_editor
else headers_editor
result = He.toList he1
in if add_used_protocol
then result
else headers