{-# LANGUAGE CPP, DeriveDataTypeable, OverloadedStrings, ForeignFunctionInterface #-} module TailHandle ( runTail ) where import Control.Concurrent (ThreadId, myThreadId, threadWaitRead, yield, killThread) import Control.Concurrent.MVar (newEmptyMVar, readMVar) import Control.Exception (Exception(..), asyncExceptionToException, asyncExceptionFromException, catch, throwTo, mask) import Control.Monad ((>=>), guard, when, unless, forever) import qualified Data.ByteString.Char8 as BS import Data.Functor (($>)) import qualified Data.HashSet as Set import Data.List (genericTake, genericLength) import Data.Maybe (isJust) import Data.Monoid ((<>)) import Data.Typeable (Typeable) import Foreign.C.Types (CInt(..)) import Foreign.C.Error (throwErrnoIfNull) import Foreign.Marshal.Alloc (allocaBytes) import Foreign.Ptr (Ptr, castPtr) import System.FilePath (()) #ifdef VERSION_hinotify import qualified System.INotify as INotify #endif import System.IO.Error (isDoesNotExistError, isEOFError, isFullError) import System.IO (SeekMode(AbsoluteSeek)) import System.Posix.Directory (DirStream, readDirStream, rewindDirStream) import System.Posix.Files (getFileStatus, getFdStatus, isRegularFile, isNamedPipe, isSocket, isCharacterDevice, isDirectory, fileID, fileSize) import System.Posix.IO (openFd, OpenMode(ReadOnly), OpenFileFlags(..), fdReadBuf, fdSeek, setFdOption, FdOption(NonBlockingRead), closeFd) import System.Posix.Types (Fd(..), FileOffset, FileID) import Unsafe.Coerce (unsafeCoerce) import Util import TailTypes import Tail data TailHandle = TailHandle { thTail :: Tail , thRuntime :: TailRuntime , thPoll :: ThreadId , thReopen :: Maybe ThreadId , thFd :: Maybe Fd , thPos :: FileOffset -- or -1 for blocking fd , thIno :: Maybe FileID , thBuf :: BS.ByteString , thDirStream :: Maybe DirStream , thDirList :: Set.HashSet FilePath , thAgain :: Bool #ifdef VERSION_hinotify , thPollWatch :: Maybe INotify.WatchDescriptor , thReopenWatch :: Maybe INotify.WatchDescriptor #endif } data TailSignal = SignalPoll | SignalReopen | SignalInsert String Bool | SignalDelete String deriving (Show, Typeable, Eq, Ord) instance Exception TailSignal where toException = asyncExceptionToException fromException = asyncExceptionFromException thErrMsg :: TailHandle -> BS.ByteString -> IO () thErrMsg t = tailErrMsg (thRuntime t) (thTail t) catchDoesNotExist :: IO a -> IO (Maybe a) catchDoesNotExist f = catchWhen isDoesNotExistError (Just <$> f) (return Nothing) bad :: String -> IO a bad = ioError . userError closeTail :: TailHandle -> IO TailHandle closeTail th@TailHandle{ thFd = Nothing } = return th closeTail th@TailHandle{ thFd = Just fd } = do #ifdef VERSION_hinotify mapM_ INotify.removeWatch (thPollWatch th) mapM_ INotify.removeWatch (thReopenWatch th) #endif closeFd fd return th { thFd = Nothing , thPos = 0 , thIno = Nothing #ifdef VERSION_hinotify , thPollWatch = Nothing , thReopenWatch = Nothing #endif } seekTail :: FileOffset -> TailHandle -> IO TailHandle seekTail _ TailHandle{ thFd = Nothing } = bad "seek on closed fd" seekTail c th@TailHandle{ thFd = Just fd } = fdSeek fd AbsoluteSeek c $> th{ thPos = c } inotifyTail :: TailHandle -> IO TailHandle #ifdef VERSION_hinotify inotifyTail th@TailHandle { thRuntime = TailRuntime{ trINotify = Just inotify } , thPoll = tid , thPos = pos , thTail = Tail { tailTarg = TailPath path , tailPollINotify = ipoll , tailReopenINotify = ireopen } } = do let sig (INotify.Modified {}) = Just SignalPoll sig (INotify.MovedSelf {}) = Just SignalReopen sig (INotify.MovedOut { INotify.filePath = f }) | notdot f = Just $ SignalDelete (fp f) sig (INotify.MovedIn { INotify.filePath = f }) | notdot f = Just $ SignalInsert (fp f) False sig (INotify.Created { INotify.filePath = f }) | notdot f = Just $ SignalInsert (fp f) True sig (INotify.Deleted { INotify.filePath = f }) | notdot f = Just $ SignalDelete (fp f) sig _ = Nothing #if MIN_VERSION_hinotify(0,3,10) fp = BS.unpack notdot = maybe False (('.' /=) . fst) . BS.uncons #else fp = id notdot "" = False notdot ('.':_) = False notdot _ = True #endif add l = INotify.addWatch inotify l ( #if MIN_VERSION_hinotify(0,3,10) BS.pack #endif path ) (mapM_ (throwTo tid) . sig) poll <- justWhen (ipoll && pos >= 0) $ if isJust (thDirStream th) then add [INotify.OnlyDir, INotify.Move, INotify.Create, INotify.Delete] else add [INotify.Modify] reopen <- justWhen ireopen $ add [INotify.MoveSelf, INotify.DeleteSelf] return th { thPollWatch = poll , thReopenWatch = reopen } #endif inotifyTail th = return th foreign import ccall unsafe fdopendir :: CInt -> IO (Ptr ()) fdOpenDirStream :: Fd -> IO DirStream fdOpenDirStream (Fd d) = (unsafeCoerce :: Ptr () -> DirStream) <$> throwErrnoIfNull "fdOpenDirStream" (fdopendir d) readDirStreamAll :: DirStream -> IO [FilePath] readDirStreamAll d = readDirStream d >>= c where c [] = return [] c ('.':_) = readDirStreamAll d c f = (f :) <$> readDirStreamAll d subTail :: TailHandle -> Bool -> FilePath -> IO () subTail TailHandle{ thRuntime = tr, thTail = t@Tail{ tailTarg = TailPath p } } new f | tailDirTail t || tailDirList t = trAddTail tr t { tailTarg = TailPath (p f) , tailFileTail = tailDirTail t , tailDirList = tailDirList t && tailDirRecursive t , tailDirTail = tailDirTail t && tailDirRecursive t , tailBegin = tailBegin t || new } subTail _ _ _ = nop openTail :: TailHandle -> IO TailHandle openTail th@TailHandle{ thFd = Nothing } = get (tailTarg (thTail th)) where get (TailFd fd) = got (Just fd) get (TailPath path) = got =<< catchDoesNotExist ( openFd path ReadOnly Nothing OpenFileFlags{ append = False, exclusive = False, noctty = False, nonBlock = True, trunc = False }) got Nothing = thErrMsg th "No such file or directory" $> th{ thPos = 0 } got (Just fd) = do setFdOption fd NonBlockingRead True -- is this really necessary? inotifyTail =<< go fd =<< getFdStatus fd go fd stat | isRegularFile stat = seekTail (if pos < 0 then max 0 $ sz + 1 + pos else min sz pos) th' | isNamedPipe stat || isSocket stat || isCharacterDevice stat = return th'{ thPos = -1 } | isDirectory stat && (tailDirList (thTail th) || tailDirTail (thTail th)) = do ds <- fdOpenDirStream fd dl <- readDirStreamAll ds let nl = genericTake (if pos < 0 then genericLength dl + 1 + pos else pos) dl th'' = th'{ thDirStream = Just ds, thDirList = Set.fromList nl } mapM_ (subTail th'' False) nl return th'' | otherwise = closeFd fd >> thErrMsg th "Unsupported file type" $> th where th' = th{ thFd = Just fd, thIno = Just (fileID stat), thAgain = True, thPos = 0 } sz = fileSize stat pos = thPos th openTail _ = bad "open on opened tail" reopenTail :: TailHandle -> IO TailHandle reopenTail th@TailHandle{ thTail = Tail{ tailTarg = TailPath path }, thIno = ino } = do fstat <- catchDoesNotExist $ getFileStatus path case fstat of Nothing -> return th Just _ | ino == Nothing -> openTail th Just stat | ino == Just (fileID stat) -> return th Just stat | fileSize stat == 0 -> return th _ -> do thErrMsg th "Following new file" closeTail th >>= openTail reopenTail th = return th noRead :: TailHandle -> (TailHandle, [BS.ByteString]) noRead th = (th{ thAgain = False }, []) bufsiz :: Int bufsiz = 8192 insertMsg, deleteMsg :: FilePath -> BS.ByteString insertMsg = BS.cons '+' . BS.pack deleteMsg = BS.cons '-' . BS.pack readTail :: TailHandle -> IO (TailHandle, [BS.ByteString]) readTail th@TailHandle{ thFd = Nothing } = return (noRead th) readTail th@TailHandle{ thDirStream = Just ds } = do dl <- rewindDirStream ds >> readDirStreamAll ds let df f (n, o, s) | Set.member f o = (n, Set.delete f o, s) | otherwise = (f : n, o, Set.insert f s) let (nl, os, s) = foldr df ([], thDirList th, thDirList th) dl mapM_ (subTail th True) nl return (th{ thDirList = Set.difference s os, thAgain = False }, guard (tailDirList (thTail th)) >> map insertMsg nl ++ map deleteMsg (Set.toList os)) readTail th@TailHandle{ thFd = Just fd, thPos = pos } = if pos == -1 then catchWhen isEOFError readsock $ do noRead <$> checkbuf th -- thErrMsg th "closed?" -- closeTail th >.= noRead th else gotlen . fileSize =<< getFdStatus fd where checkbuf th'@TailHandle{ thBuf = buf } = do unless (BS.null buf) $ thErrMsg th' ("Unterminated line: " <> buf) return th'{ thBuf = BS.empty } gotlen len | len < pos = do thErrMsg th ("File truncated to " <> BS.pack (show len)) seekTail 0 th >>= readTail | len == pos = do noRead <$> checkbuf th | otherwise = do let count = min (fromIntegral (len - pos)) bufsiz buf <- readit count let buflen = BS.length buf when (buflen /= count) $ thErrMsg th ("Short read (" <> BS.pack (show buflen) <> BS.singleton '/' <> BS.pack (show count) <> BS.singleton ')') return $ gotbuf th{ thPos = pos + fromIntegral buflen, thAgain = buflen == bufsiz } buf readsock = gotbuf th{ thAgain = True } <$> readit bufsiz gotbuf th'@TailHandle{ thBuf = oldbuf } buf | BS.null buf = noRead th' | otherwise = case initLast $ BS.split '\n' buf of ([], r) -> (th'{ thBuf = oldbuf <> r }, []) (l1 : l, r) -> (th'{ thBuf = r }, (oldbuf <> l1) : l) readit len = catchWhen isFullError (allocaBytes len $ \p -> do l <- fdReadBuf fd p (fromIntegral len) BS.packCStringLen (castPtr p, fromIntegral l)) (return BS.empty) pause :: IO () pause = readMVar =<< newEmptyMVar waitTail :: TailHandle -> IO () waitTail TailHandle{ thFd = Nothing } = pause waitTail TailHandle{ thFd = Just fd, thPos = -1 } = threadWaitRead fd waitTail TailHandle{ thAgain = True } = yield waitTail TailHandle{ thTail = Tail{ tailPollInterval = i } } | i == 0 = pause | otherwise = threadDelayInterval i reopenThread :: Interval -> ThreadId -> IO () reopenThread ri tid = forever $ do threadDelayInterval ri throwTo tid SignalReopen newTail :: TailRuntime -> Tail -> IO TailHandle newTail tr t@Tail{ tailReopenInterval = ri } = do tid <- myThreadId rid <- justWhen (ri /= 0) $ forkIOUnmasked $ reopenThread ri tid return TailHandle { thTail = t , thRuntime = tr , thPoll = tid , thReopen = rid , thFd = Nothing , thPos = if tailBegin t then 0 else -1 , thIno = Nothing , thBuf = BS.empty , thDirStream = Nothing , thDirList = Set.empty , thAgain = True #ifdef VERSION_hinotify , thPollWatch = Nothing , thReopenWatch = Nothing #endif } runTail :: TailRuntime -> Tail -> IO () runTail tr tl = mask $ \unmask -> let signal th SignalReopen = reopenTail th -- >>= wait signal th SignalPoll = return th{ thAgain = True } signal th@TailHandle{ thDirStream = ~(Just _), thDirList = l, thTail = t } (SignalInsert f new) = do subTail th new f if tailDirList t then proc th' [insertMsg f] else return th' where th' = th{ thDirList = Set.insert f l } signal th@TailHandle{ thDirStream = ~(Just _), thDirList = l, thTail = t } (SignalDelete f) = if tailDirList t then proc th' [deleteMsg f] else return th' where th' = th{ thDirList = Set.delete f l } wait th = catch (unmask $ waitTail th $> th) (signal th >=> wait) poll th | thReopen th == Nothing && (thFd th == Nothing || (thAgain th == False && thPos th /= -1 && tailPollInterval (thTail th) == 0 #ifdef VERSION_hinotify && thPollWatch th == Nothing && thReopenWatch th == Nothing #endif )) = return th | otherwise = wait th >>= go go th = readTail th >>= uncurry proc >>= poll proc th s = mapM_ fun s $> th fun = tailText tr tl in newTail tr tl >>= openTail >>= go >>= mapM_ killThread . thReopen