{-# LANGUAGE OverloadedStrings #-} module Network.MtGoxAPI.StreamConnection ( initMtGoxStream , mtGoxTickerChannelUSD , mtGoxDepthChannelUSD , mtGoxTickerChannelNameEUR , mtGoxDepthChannelNameEUR , mtGoxTradeChannel , MtGoxStreamSettings(..) , WalletNotifierSetting(..) , FullDepthSetting(..) ) where import Control.Applicative import Control.Concurrent import Control.Exception import Control.Monad import Control.Watchdog import GHC.IO.Handle import Network import System.Timeout import qualified Data.ByteString as B import qualified Data.ByteString.Lazy as BL import qualified Data.Text as T import qualified System.IO as IO import Network.MtGoxAPI.Credentials import Network.MtGoxAPI.DepthStore.Adapter import Network.MtGoxAPI.Handles import Network.MtGoxAPI.StreamCommands import Network.MtGoxAPI.TickerMonitor import Network.MtGoxAPI.Types import Network.MtGoxAPI.WalletNotifier mtGoxStreamHost :: HostName mtGoxStreamHost = "127.0.0.1" mtGoxStreamPort :: PortID mtGoxStreamPort = PortNumber 10508 mtGoxTradeChannel :: T.Text mtGoxTradeChannel = "dbf1dee9-4f2e-4a08-8cb7-748919a71b21" mtGoxTickerChannelUSD :: T.Text mtGoxTickerChannelUSD = "d5f06780-30a8-4a48-a2f8-7ed181b4a13f" mtGoxDepthChannelUSD :: T.Text mtGoxDepthChannelUSD = "24e67e0d-1cad-4cc0-9e7a-f8523ef460fe" mtGoxTickerChannelNameEUR :: T.Text mtGoxTickerChannelNameEUR = "ticker.BTCEUR" mtGoxDepthChannelNameEUR :: T.Text mtGoxDepthChannelNameEUR = "depth.BTCEUR" mtGoxTimeout :: Int mtGoxTimeout = 2 * 60 * 10 ^ (6 :: Integer) wrapInTimeout :: Handle -> String -> IO a -> IO a wrapInTimeout h errMsg action = do resultM <- timeout mtGoxTimeout action case resultM of Just result -> return result Nothing -> do hClose h throw $ userError errMsg readNextStreamMessageWithTimeout :: Handle -> IO StreamMessage readNextStreamMessageWithTimeout h = parseStreamLine <$> wrapInTimeout h "MtGox did not send data for a while." (B.hGetLine h) sendStreamCommand :: Handle -> MtGoxCredentials -> StreamCommand -> IO () sendStreamCommand h creds cmd = do encodedCmd <- encodeStreamCommand cmd creds BL.hPutStr h encodedCmd >> B.hPutStr h "\n" openConnection :: HostName-> PortID-> MtGoxCredentials-> MtGoxStreamSettings-> MtGoxAPIHandles-> IO (Either String ()) openConnection host port creds streamSettings apiHandles = do status <- try go :: IO (Either IOException (Either String ())) case status of Right result -> return result Left ex -> return $ Left (show ex) where go = withSocketsDo $ do -- open connection h <- connectTo host port IO.hSetBuffering h IO.NoBuffering -- unsubscribe from default channels and subscribe to -- EUR channels mapM_ (sendStreamCommand h creds) [ UnsubscribeCmd mtGoxTradeChannel , UnsubscribeCmd mtGoxTickerChannelUSD , UnsubscribeCmd mtGoxDepthChannelUSD , SubscribeCmd mtGoxTickerChannelNameEUR , SubscribeCmd mtGoxDepthChannelNameEUR ] -- wait for healthy activity to appear before proceeding _ <- waitForActivityWithTimeout h -- prepare handles let tickerMonitorHandle = mtgoxTickerMonitorHandle apiHandles depthStoreHandle = mtgoxDepthStoreHandle apiHandles walletNotifierHandle = mtgoxWalletNotifierHandle apiHandles -- extract extra settings let (MtGoxStreamSettings walletNotifierSetting fullDepthSetting) = streamSettings -- wallet notifier step when (walletNotifierSetting == EnableWalletNotifications) $ do -- get idkey and subscribe to wallet operations channel sendStreamCommand h creds IDKeyCmd (buffer1, idKey) <- waitForCallResultWithTimeout h parseIDKeyCallResult sendStreamCommand h creds $ PrivateSubscribeCmd (idkKey idKey) -- rewind buffer forM_ buffer1 $ \streamMessage -> do updateTickerStatus tickerMonitorHandle streamMessage updateDepthStoreFromMessage depthStoreHandle streamMessage updateWalletNotifier walletNotifierHandle streamMessage -- full depth step case fullDepthSetting of RequestFullDepth -> do -- get full depth sendStreamCommand h creds FullDepthCmd (buffer2, fullDepth) <- waitForCallResultWithTimeout h parseFullDepthCallResult updateDepthStoreFromFullDepth depthStoreHandle fullDepth -- rewind buffer forM_ buffer2 $ \streamMessage -> do updateTickerStatus tickerMonitorHandle streamMessage updateDepthStoreFromMessage depthStoreHandle streamMessage updateWalletNotifier walletNotifierHandle streamMessage SkipFullDepth -> skipFullDepthRequest depthStoreHandle -- enter main loop and process incoming messages forever $ do streamMessage <- readNextStreamMessageWithTimeout h updateTickerStatus tickerMonitorHandle streamMessage updateDepthStoreFromMessage depthStoreHandle streamMessage updateWalletNotifier walletNotifierHandle streamMessage waitForCallResultWithTimeout :: Handle -> (StreamMessage -> Maybe a) -> IO ([StreamMessage], a) waitForCallResultWithTimeout h parser = wrapInTimeout h "Call result was not returned in time." (go []) where go buffer = do msg <- readNextStreamMessageWithTimeout h case msg of CallResult {} -> case parser msg of Just result -> return (reverse buffer, result) Nothing -> throw $ userError "Unexpected call result was returned." other -> go (other:buffer) waitForActivityWithTimeout :: Handle -> IO [StreamMessage] waitForActivityWithTimeout h = wrapInTimeout h "Healthy activity did not appear in time." $ go [] where go msgs = do msg <- readNextStreamMessageWithTimeout h let msgs' = msg : msgs checks = [ any isTickerUpdate msgs' , any isDepthUpdate msgs' ] if and checks then return $ reverse msgs' else go msgs' isTickerUpdate TickerUpdateUSD {} = True isTickerUpdate _ = False isDepthUpdate DepthUpdateUSD {} = True isDepthUpdate _ = False -- | Starts a thread that will connect to the data stream -- from Mt. Gox and supply the received data to the handles that are -- are passed in. A watchdog maintains the connection. initMtGoxStream :: MtGoxCredentials-> MtGoxStreamSettings -> MtGoxAPIHandles -> IO ThreadId initMtGoxStream mtgoxCreds streamSettings mtgoxAPIHandles = let task = openConnection mtGoxStreamHost mtGoxStreamPort mtgoxCreds streamSettings mtgoxAPIHandles watchdogConf = do setResetDuration $ 90 * 10 ^ (6 :: Integer) case mtgoxLogger mtgoxAPIHandles of Just logger -> setLoggingAction logger Nothing -> return () watch task in forkIO $ watchdog watchdogConf