{-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE ScopedTypeVariables #-} module Reflex.Filesystem.Internal ( dirSource , DataUpdate(..) , readTree, watchDir, e2e , readAsUpdate, followDir ) where import Reflex import Control.Concurrent import Control.DeepSeq import Control.Concurrent.EQueue import Control.Exception.Lens import Control.Lens import Control.Monad import qualified Control.Monad.Catch as E import Control.Monad.Fix import Control.Monad.Trans import Data.ByteString (ByteString) import qualified Data.ByteString as BS import Data.Dependent.Sum (DSum ((:=>))) import qualified Data.DList as DL import Data.Foldable import Data.Functor.Misc (Const2(..)) import qualified Data.ListTrie.Patricia.Map.Ord as LT import qualified Data.Map as Map import System.FilePath (()) import qualified System.FilePath as FP import qualified System.FSNotify as FSN import Reflex.Filesystem.DirTree import Reflex.Host.Class import System.Directory import System.IO.Error (isDoesNotExistError) import System.IO.Error.Lens import Data.Text (Text) import qualified Data.Text as T -- The problem with making this based on Reflex's Incremental is that a delete removes all paths -- below it, which we can't know what they are in the patch. Which is all fine and good until -- an addition occures after the delete below said path. In which case we need to preserve ordering. -- Using the list just works, but introducing the fanned events in an Incremental still posses -- a challenge. -- Idea: -- TimeBits t -> Event t Patch -> Event t Patch -- Event t Patch -> Dynamic t (DirTree Data) -- Self fan data DataUpdate a = DataMod { newData :: a } -- DataDel Removed because we can't tell if the file deleted was a directory or file -- without the DirTree which is in 'reflex' land and we don't want to keep a duplicate. | PathDel deriving (Eq, Ord, Show) localPath :: FP.FilePath -> FP.FilePath -> [Text] localPath pf fp = filter (/=".") . map T.pack . FP.splitDirectories . FP.makeRelative pf $ fp readAsUpdate :: FP.FilePath -> FP.FilePath -> IO [([Text], DataUpdate ByteString)] readAsUpdate basePath fp = do -- Handle the file being deleted since we listed the directory contents E.handleIf isDoesNotExistError (\_ -> pure [(lp, PathDel)]) $ do d <- BS.readFile (FP.combine basePath fp) d `deepseq` pure [(lp, DataMod d)] where lp = localPath basePath fp -- | Given a path reads that source and any paths descending from it, into the update format. -- To handle weird events with deletion, we PathDel if we try looking at a path that does -- not exist. readTree :: FP.FilePath -> FP.FilePath -> IO [([Text], DataUpdate ByteString)] readTree basePath dir = -- This is a TOCTOU issue, but sadly I don't see a complete way around it. -- I can't both list a directory, and read the file contents if it isn't one -- as a single operation. Since we're based on filesystem watches though, -- we hope to just narrow this condition and work around it by assuming -- our watch observes the change and thus gives us eventual consistency. -- Side note: POSIX.1-2008 adds fdopendir so we might be able to work around -- this issue at the expense of platform specific code. go (basePath dir) where -- pf is the full path at all times. go pf = do -- We return IO actions to run so that we aren't catching our children's exceptions. act::IO [([Text], DataUpdate ByteString)] <- (`E.catches` [ handler_ (_IOException.errorType._InappropriateType) . pure $ -- Its not a directory, so read it as an update if we can. readAsUpdate basePath pf , handler_ (_IOException.errorType._NoSuchThing) . pure $ -- The file/directory was deleted while we were getting around to reading it pure [(localPath basePath pf, PathDel)] ]) $ do ents <- listDirectory pf pure (fmap mconcat . forM ents $ \fp -> go (pf fp)) act -- | Produce an Event of the changes to the state of files in the directory specified. -- -- We reread the files to collect their new contents, but we do it at the time of the event. -- As such, many small writes may come through as many small writes, not one large one. -- For Reflex apps, debounce across the entire directory, and ones that would work with -- network latency might both be desirable. -- -- Processing on these events should be idempotent. watchDir :: (EQueue eq, Reflex t, MonadReflexCreateTrigger t m, MonadIO m) => eq (DSum (EventTrigger t) Identity) -> FilePath -> m (Event t [([Text], DataUpdate ByteString)]) watchDir eq dir' = do dir <- liftIO $ ( dir') <$> getCurrentDirectory newEventWithTrigger $ \et -> do -- Note: This isn't exception safe. fsnm <- FSN.startManagerConf (FSN.defaultConfig { FSN.confDebounce = FSN.NoDebounce , FSN.confUsePolling = False --, FSN.confPollInterval = 10 }) (enqueue, dereg) <- registerSemi eq ((:=>) et . Identity . toList) -- Forks off a thread to recieve the events and reread the files in the background. stopListen <- FSN.watchTree fsnm dir (const True) $ \upEvent -> do --print upEvent e2e dir upEvent >>= enqueue . DL.fromList -- The threadDelay here is because stopping the watch -- and directory deletion which autoremoves some watches, races. -- This race is observed by printing some warnings to the console, -- which a delay should almost always supress. return (dereg >> threadDelay 250 >> stopListen >> FSN.stopManager fsnm) e2e :: FilePath -> FSN.Event -> IO [([Text], DataUpdate ByteString)] e2e dir (FSN.Added fp _ _) = readTree dir fp e2e dir (FSN.Modified fp _ _) = readTree dir fp e2e dir (FSN.Removed fp _ _) = pure [(localPath dir fp, PathDel)] e2e dir (FSN.Unknown _ _ _) = -- Got an unknown from fsnotify which we litterly can't know how to handle. -- So (re)readTree to get a complete state. readTree dir "" -- | Given an initial state, and an Event of updates, maintain a DynDirTree. followDir :: forall t m a . (Reflex t, MonadFix m, MonadHold t m) => [([Text], DataUpdate a)] -- ^ The initial state of the directory. -> Event t [([Text], DataUpdate a)] -- ^ An Event of the changes to the directory tree. -- Its ok that this contain changes already represented in the initial state, -- but it must not be missing any. -> m (DynDirTree t a) followDir initDirList dirEvents = do initDir <- doDirTree initDirList mempty foldDynM doDirTree initDir dirEvents where des = fanMap (fmap Map.fromList dirEvents) flEvent = select des . Const2 doDyn :: (MonadHold t mi, MonadFix mi) => [Text] -> a -> mi (Dynamic t a) doDyn fl di = foldDyn (\d _ -> d) di . fmapMaybe maybeMod . flEvent $ fl maybeMod :: DataUpdate a -> Maybe a maybeMod (DataMod d) = Just d maybeMod _ = Nothing doDirTree :: (MonadHold t mi, MonadFix mi) => [([Text], DataUpdate a)] -> DirTree (Dynamic t a) -> mi (DirTree (Dynamic t a)) doDirTree es tOrig = foldlM (\t c -> case c of (fp, PathDel) -> -- foldTrie would make this O(1). -- This can be updates once my list-trie patches are merged. pure $ t `LT.difference` (LT.lookupPrefix fp t) (fp, DataMod _) | LT.member fp t -> pure t -- It'll update its self. (fp, DataMod d) -> (\v -> LT.insert fp v t) <$> doDyn fp d) tOrig es dirSource :: (EQueue eq, Reflex t, MonadFix m, MonadIO m, MonadHold t m, MonadReflexCreateTrigger t m ,MonadSubscribeEvent t m) => eq (DSum (EventTrigger t) Identity) -> FP.FilePath -> m (DynDirTree t ByteString) dirSource eq dir' = do dir <- liftIO $ ( dir') <$> getCurrentDirectory de <- watchDir eq dir -- Force the event to be setup, but we have to make sure it isn't cleaned up too early. -- TODO, make sure of that. void $ subscribeEvent de -- Place de has to be active by. initS <- liftIO $ readTree dir "" ddt <- followDir initS de -- We need to make sure 'h' isn't cleaned up by here, or the watcher might have been -- shut down. If we were to read it though, we'd have to correctly process the events. return ddt