{-# LANGUAGE OverloadedStrings #-}
module Matterhorn.State.Setup.Threads
( startUserStatusUpdateThread
, startTypingUsersRefreshThread
, startSubprocessLoggerThread
, startTimezoneMonitorThread
, maybeStartSpellChecker
, startAsyncWorkerThread
, startSyntaxMapLoaderThread
, module Matterhorn.State.Setup.Threads.Logging
)
where
import Prelude ()
import Matterhorn.Prelude
import Brick.BChan ( BChan )
import Brick.Main ( invalidateCache )
import Control.Concurrent ( threadDelay, forkIO )
import qualified Control.Concurrent.STM as STM
import Control.Concurrent.STM.Delay
import Control.Exception ( SomeException, try, fromException, catch )
import Data.List ( isInfixOf )
import qualified Data.Map as M
import qualified Data.Sequence as Seq
import qualified Data.Text as T
import Data.Time ( getCurrentTime, addUTCTime )
import Lens.Micro.Platform ( (.=), (%=), (%~), mapped )
import Skylighting.Loader ( loadSyntaxesFromDir )
import System.Directory ( getTemporaryDirectory )
import System.Exit ( ExitCode(ExitSuccess) )
import System.IO ( hPutStrLn, hFlush )
import System.IO.Temp ( openTempFile )
import System.Timeout ( timeout )
import Text.Aspell ( Aspell, AspellOption(..), startAspell )
import Network.Mattermost.Exceptions ( RateLimitException
, rateLimitExceptionReset )
import Network.Mattermost.Endpoints
import Network.Mattermost.Types
import Matterhorn.Constants
import Matterhorn.State.Editing ( requestSpellCheck )
import Matterhorn.State.Setup.Threads.Logging
import Matterhorn.TimeUtils ( lookupLocalTimeZone )
import Matterhorn.Types
updateUserStatuses :: [UserId] -> Session -> IO (Maybe (MH ()))
updateUserStatuses uIds session =
case null uIds of
False -> do
statuses <- mmGetUserStatusByIds (Seq.fromList uIds) session
return $ Just $ do
forM_ statuses $ \s ->
setUserStatus (statusUserId s) (statusStatus s)
True -> return Nothing
startUserStatusUpdateThread :: STM.TChan [UserId] -> Session -> RequestChan -> IO ()
startUserStatusUpdateThread zipperChan session requestChan = void $ forkIO body
where
seconds = (* (1000 * 1000))
userRefreshInterval = 30
body = refresh []
refresh prev = do
result <- timeout (seconds userRefreshInterval)
(STM.atomically $ STM.readTChan zipperChan)
let (uIds, update) = case result of
Nothing -> (prev, True)
Just ids -> (ids, ids /= prev)
when update $ do
STM.atomically $ STM.writeTChan requestChan $ do
rs <- try $ updateUserStatuses uIds session
case rs of
Left (_ :: SomeException) -> return Nothing
Right upd -> return upd
refresh uIds
startTypingUsersRefreshThread :: RequestChan -> IO ()
startTypingUsersRefreshThread requestChan = void $ forkIO $ forever refresh
where
seconds = (* (1000 * 1000))
refreshIntervalMicros = ceiling $ seconds $ userTypingExpiryInterval / 2
refresh = do
STM.atomically $ STM.writeTChan requestChan $ return $ Just $ do
now <- liftIO getCurrentTime
let expiry = addUTCTime (- userTypingExpiryInterval) now
let expireUsers c = c & ccInfo.cdTypingUsers %~ expireTypingUsers expiry
csChannels . mapped %= expireUsers
threadDelay refreshIntervalMicros
startSubprocessLoggerThread :: STM.TChan ProgramOutput -> RequestChan -> IO ()
startSubprocessLoggerThread logChan requestChan = do
let logMonitor mPair = do
ProgramOutput progName args out err ec <-
STM.atomically $ STM.readTChan logChan
case ec == ExitSuccess of
True -> logMonitor mPair
False -> do
(logPath, logHandle) <- case mPair of
Just p ->
return p
Nothing -> do
tmp <- getTemporaryDirectory
openTempFile tmp "matterhorn-subprocess.log"
hPutStrLn logHandle $
unlines [ "Program: " <> progName
, "Arguments: " <> show args
, "Exit code: " <> show ec
, "Stdout:"
, out
, "Stderr:"
, err
]
hFlush logHandle
STM.atomically $ STM.writeTChan requestChan $ do
return $ Just $ mhError $ ProgramExecutionFailed (T.pack progName)
(T.pack logPath)
logMonitor (Just (logPath, logHandle))
void $ forkIO $ logMonitor Nothing
startTimezoneMonitorThread :: TimeZoneSeries -> RequestChan -> IO ()
startTimezoneMonitorThread tz requestChan = do
let timezoneMonitorSleepInterval = minutes 5
minutes = (* (seconds 60))
seconds = (* (1000 * 1000))
timezoneMonitor prevTz = do
threadDelay timezoneMonitorSleepInterval
newTz <- lookupLocalTimeZone
when (newTz /= prevTz) $
STM.atomically $ STM.writeTChan requestChan $ do
return $ Just $ do
timeZone .= newTz
mh invalidateCache
timezoneMonitor newTz
void $ forkIO (timezoneMonitor tz)
maybeStartSpellChecker :: Config -> BChan MHEvent -> IO (Maybe (Aspell, IO ()))
maybeStartSpellChecker config eventQueue = do
case configEnableAspell config of
False -> return Nothing
True -> do
let aspellOpts = catMaybes [ UseDictionary <$> (configAspellDictionary config)
]
spellCheckerTimeout = 500 * 1000
asResult <- either (const Nothing) Just <$> startAspell aspellOpts
case asResult of
Nothing -> return Nothing
Just as -> do
resetSCChan <- startSpellCheckerThread eventQueue spellCheckerTimeout
let resetSCTimer = STM.atomically $ STM.writeTChan resetSCChan ()
return $ Just (as, resetSCTimer)
startSpellCheckerThread :: BChan MHEvent
-> Int
-> IO (STM.TChan ())
startSpellCheckerThread eventChan spellCheckTimeout = do
delayWakeupChan <- STM.atomically STM.newTChan
delayWorkerChan <- STM.atomically STM.newTChan
delVar <- STM.atomically $ STM.newTVar Nothing
void $ forkIO $ forever $ do
STM.atomically $ waitDelay =<< STM.readTChan delayWorkerChan
writeBChan eventChan (RespEvent requestSpellCheck)
void $ forkIO $ forever $ do
() <- STM.atomically $ STM.readTChan delayWakeupChan
oldDel <- STM.atomically $ STM.readTVar delVar
mNewDel <- case oldDel of
Nothing -> Just <$> newDelay spellCheckTimeout
Just del -> do
expired <- tryWaitDelayIO del
case expired of
True -> Just <$> newDelay spellCheckTimeout
False -> do
updateDelay del spellCheckTimeout
return Nothing
case mNewDel of
Nothing -> return ()
Just newDel -> STM.atomically $ do
STM.writeTVar delVar $ Just newDel
STM.writeTChan delayWorkerChan newDel
return delayWakeupChan
startSyntaxMapLoaderThread :: Config -> BChan MHEvent -> IO ()
startSyntaxMapLoaderThread config eventChan = void $ forkIO $ do
mMaps <- forM (configSyntaxDirs config) $ \dir -> do
result <- try $ loadSyntaxesFromDir dir
case result of
Left (_::SomeException) -> return Nothing
Right (Left _) -> return Nothing
Right (Right m) -> return $ Just m
let maps = catMaybes mMaps
finalMap = foldl M.union mempty maps
writeBChan eventChan $ RespEvent $ do
csResources.crSyntaxMap .= finalMap
mh invalidateCache
startAsyncWorkerThread :: Config -> STM.TChan (IO (Maybe (MH ()))) -> BChan MHEvent -> IO ()
startAsyncWorkerThread c r e = void $ forkIO $ asyncWorker c r e
asyncWorker :: Config -> STM.TChan (IO (Maybe (MH ()))) -> BChan MHEvent -> IO ()
asyncWorker c r e = forever $ doAsyncWork c r e
doAsyncWork :: Config -> STM.TChan (IO (Maybe (MH ()))) -> BChan MHEvent -> IO ()
doAsyncWork config requestChan eventChan = do
let rateLimitNotify sec = do
writeBChan eventChan $ RateLimitExceeded sec
startWork <- case configShowBackground config of
Disabled -> return $ return ()
Active -> do chk <- STM.atomically $ STM.tryPeekTChan requestChan
case chk of
Nothing -> do writeBChan eventChan BGIdle
return $ writeBChan eventChan $ BGBusy Nothing
_ -> return $ return ()
ActiveCount -> do
chk <- STM.atomically $ do
chanCopy <- STM.cloneTChan requestChan
let cntMsgs = do m <- STM.tryReadTChan chanCopy
case m of
Nothing -> return 0
Just _ -> (1 +) <$> cntMsgs
cntMsgs
case chk of
0 -> do writeBChan eventChan BGIdle
return (writeBChan eventChan $ BGBusy (Just 1))
_ -> do writeBChan eventChan $ BGBusy (Just chk)
return $ return ()
req <- STM.atomically $ STM.readTChan requestChan
startWork
res <- try $ rateLimitRetry rateLimitNotify req
case res of
Left e -> do
when (not $ shouldIgnore e) $ do
case fromException e of
Just (_::RateLimitException) ->
writeBChan eventChan RequestDropped
Nothing -> do
let err = case fromException e of
Nothing -> AsyncErrEvent e
Just mmErr -> ServerError mmErr
writeBChan eventChan $ IEvent $ DisplayError err
Right upd ->
case upd of
Nothing -> writeBChan eventChan RateLimitSettingsMissing
Just Nothing -> return ()
Just (Just action) -> writeBChan eventChan (RespEvent action)
rateLimitRetry :: (Int -> IO ()) -> IO a -> IO (Maybe a)
rateLimitRetry rateLimitNotify act = do
let retry e = do
case rateLimitExceptionReset e of
Nothing -> return Nothing
Just sec -> do
let adjusted = sec + 1
rateLimitNotify adjusted
threadDelay $ adjusted * 1000000
Just <$> act
(Just <$> act) `catch` retry
shouldIgnore :: SomeException -> Bool
shouldIgnore e =
let eStr = show e
in or [ "getAddrInfo" `isInfixOf` eStr
, "Network.Socket.recvBuf" `isInfixOf` eStr
, "Network.Socket.sendBuf" `isInfixOf` eStr
, "resource vanished" `isInfixOf` eStr
, "timeout" `isInfixOf` eStr
, "partial packet" `isInfixOf` eStr
]