module Data.Conduit.INotify where

import Conduit (ConduitT, MonadIO, bracketP, lift, liftIO, (.|))
import qualified Conduit as C (await, awaitForever, mapInput, sourceHandle, yield)
import Control.Concurrent.STM (TVar, newTVar, newTVarIO, readTVarIO, writeTVar)
import Control.Concurrent.STM.TMQueue (TMQueue, closeTMQueue, newTMQueue, writeTMQueue)
import Control.Exception (tryJust)
import Control.Monad.Except (guard)
import Control.Monad.STM (STM, atomically)
import Control.Monad.Trans.Resource (MonadResource)
import Data.ByteString (ByteString)
import qualified Data.ByteString as BS (hGetSome)
import qualified Data.ByteString.Lazy.Internal as BS (defaultChunkSize)
import qualified Data.Conduit.List as C (catMaybes, map, mapMaybe)
import Data.Conduit.TQueue (sourceTMQueue)
import Data.Foldable (traverse_)
import System.FilePath.ByteString (encodeFilePath)
import qualified System.INotify as INotify (Event (DeletedSelf, Modified), EventVariety (DeleteSelf, Modify), INotify, WatchDescriptor, addWatch, initINotify, killINotify, removeWatch)
import qualified System.IO as IO (Handle, IOMode (ReadMode), SeekMode (AbsoluteSeek), hClose, hSeek, hTell, openFile)
import qualified System.IO.Error as IO (isEOFError)

-- | Run 'ConduitT' with 'INotify'
withINotify :: MonadResource m => (INotify.INotify -> ConduitT a b m r) -> ConduitT a b m r
withINotify :: (INotify -> ConduitT a b m r) -> ConduitT a b m r
withINotify = IO INotify
-> (INotify -> IO ())
-> (INotify -> ConduitT a b m r)
-> ConduitT a b m r
forall (m :: * -> *) a i o r.
MonadResource m =>
IO a -> (a -> IO ()) -> (a -> ConduitT i o m r) -> ConduitT i o m r
bracketP IO INotify
INotify.initINotify INotify -> IO ()
INotify.killINotify

-- | Watch INotify events for given file
-- Does not support file rotation.
-- Once the watched file is removed, it will not emit any additional events and needs to be terminated via handle.
inotifyEventsSource ::
  (MonadResource m, Monad m) =>
  -- | events to watch for
  [INotify.EventVariety] ->
  -- | path to file to be watched
  FilePath ->
  -- | returns (source, handle to terminate the watch)
  STM (ConduitT () INotify.Event m (), STM ())
inotifyEventsSource :: [EventVariety] -> FilePath -> STM (ConduitT () Event m (), STM ())
inotifyEventsSource [EventVariety]
events FilePath
fp = do
  TMQueue Event
q <- STM (TMQueue Event)
forall a. STM (TMQueue a)
newTMQueue
  (ConduitT () Event m (), STM ())
-> STM (ConduitT () Event m (), STM ())
forall (m :: * -> *) a. Monad m => a -> m a
return ((INotify -> ConduitT () Event m ()) -> ConduitT () Event m ()
forall (m :: * -> *) a b r.
MonadResource m =>
(INotify -> ConduitT a b m r) -> ConduitT a b m r
withINotify (\INotify
i -> IO WatchDescriptor
-> (WatchDescriptor -> IO ())
-> (WatchDescriptor -> ConduitT () Event m ())
-> ConduitT () Event m ()
forall (m :: * -> *) a i o r.
MonadResource m =>
IO a -> (a -> IO ()) -> (a -> ConduitT i o m r) -> ConduitT i o m r
bracketP (INotify -> TMQueue Event -> IO WatchDescriptor
initialize INotify
i TMQueue Event
q) WatchDescriptor -> IO ()
cleanup (TMQueue Event -> WatchDescriptor -> ConduitT () Event m ()
forall (m :: * -> *) a p z.
MonadIO m =>
TMQueue a -> p -> ConduitT z a m ()
inside TMQueue Event
q)), TMQueue Event -> STM ()
forall a. TMQueue a -> STM ()
closeTMQueue TMQueue Event
q)
  where
    initialize :: INotify -> TMQueue Event -> IO WatchDescriptor
initialize INotify
i TMQueue Event
q = INotify
-> [EventVariety]
-> RawFilePath
-> (Event -> IO ())
-> IO WatchDescriptor
INotify.addWatch INotify
i [EventVariety]
events (FilePath -> RawFilePath
encodeFilePath FilePath
fp) (STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> (Event -> STM ()) -> Event -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TMQueue Event -> Event -> STM ()
forall a. TMQueue a -> a -> STM ()
writeTMQueue TMQueue Event
q)
    cleanup :: WatchDescriptor -> IO ()
cleanup = WatchDescriptor -> IO ()
INotify.removeWatch
    inside :: TMQueue a -> p -> ConduitT z a m ()
inside TMQueue a
q p
_ = TMQueue a -> ConduitT z a m ()
forall (m :: * -> *) a z.
MonadIO m =>
TMQueue a -> ConduitT z a m ()
sourceTMQueue TMQueue a
q

-- | Stream contents of a 'IO.Handle' as binary data.
-- Will yield Nothing after EOF is reached
sourceHandleEof :: MonadIO m => IO.Handle -> ConduitT () (Maybe ByteString) m ()
sourceHandleEof :: Handle -> ConduitT () (Maybe RawFilePath) m ()
sourceHandleEof Handle
h = Handle -> ConduitT () RawFilePath m ()
forall (m :: * -> *) i.
MonadIO m =>
Handle -> ConduitT i RawFilePath m ()
C.sourceHandle Handle
h ConduitT () RawFilePath m ()
-> ConduitM RawFilePath (Maybe RawFilePath) m ()
-> ConduitT () (Maybe RawFilePath) m ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| (RawFilePath -> Maybe RawFilePath)
-> ConduitM RawFilePath (Maybe RawFilePath) m ()
forall (m :: * -> *) a b. Monad m => (a -> b) -> ConduitT a b m ()
C.map RawFilePath -> Maybe RawFilePath
forall a. a -> Maybe a
Just ConduitM RawFilePath (Maybe RawFilePath) m ()
-> ConduitM RawFilePath (Maybe RawFilePath) m ()
-> ConduitM RawFilePath (Maybe RawFilePath) m ()
forall a. Semigroup a => a -> a -> a
<> Maybe RawFilePath -> ConduitM RawFilePath (Maybe RawFilePath) m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
C.yield Maybe RawFilePath
forall a. Maybe a
Nothing

-- | Stream contents of a file as binary data.
-- Once EOF is reached it waits for file modifications and streams data as they are appended to the file.
-- Once the watch is terminated, it will read the file until EOF is reached.
--
-- Source emits 'Nothing' when EOF is reached. For version emitting just data see 'sourceFileFollowModify\''
-- Does not support file rotations. For version supporing rotations see 'sourceFileFollowModifyRotateWithSeek'
sourceFileFollowModify ::
  (MonadResource m, MonadIO m) =>
  -- patch to file to be followed
  FilePath ->
  -- returns (source of binary data from file, handle to terminate the follow)
  STM (ConduitT () (Maybe ByteString) m (), STM ())
sourceFileFollowModify :: FilePath -> STM (ConduitT () (Maybe RawFilePath) m (), STM ())
sourceFileFollowModify FilePath
fp =
  do
    (ConduitT () Event m ()
eventsSource, STM ()
closeWatch) <- [EventVariety] -> FilePath -> STM (ConduitT () Event m (), STM ())
forall (m :: * -> *).
(MonadResource m, Monad m) =>
[EventVariety] -> FilePath -> STM (ConduitT () Event m (), STM ())
inotifyEventsSource [EventVariety
INotify.Modify] FilePath
fp
    (ConduitT () (Maybe RawFilePath) m (), STM ())
-> STM (ConduitT () (Maybe RawFilePath) m (), STM ())
forall (m :: * -> *) a. Monad m => a -> m a
return (IO Handle
-> (Handle -> IO ())
-> (Handle -> ConduitT () (Maybe RawFilePath) m ())
-> ConduitT () (Maybe RawFilePath) m ()
forall (m :: * -> *) a i o r.
MonadResource m =>
IO a -> (a -> IO ()) -> (a -> ConduitT i o m r) -> ConduitT i o m r
bracketP (FilePath -> IOMode -> IO Handle
IO.openFile FilePath
fp IOMode
IO.ReadMode) Handle -> IO ()
IO.hClose (ConduitT () Event m ()
-> Handle -> ConduitT () (Maybe RawFilePath) m ()
forall (m :: * -> *).
MonadIO m =>
ConduitT () Event m ()
-> Handle -> ConduitT () (Maybe RawFilePath) m ()
inside ConduitT () Event m ()
eventsSource), STM ()
closeWatch)
  where
    inside :: MonadIO m => ConduitT () INotify.Event m () -> IO.Handle -> ConduitT () (Maybe ByteString) m ()
    inside :: ConduitT () Event m ()
-> Handle -> ConduitT () (Maybe RawFilePath) m ()
inside ConduitT () Event m ()
eventsSource Handle
h =
      Handle -> ConduitT () (Maybe RawFilePath) m ()
forall (m :: * -> *).
MonadIO m =>
Handle -> ConduitT () (Maybe RawFilePath) m ()
sourceHandleEof Handle
h -- read file before any event appears
        ConduitT () (Maybe RawFilePath) m ()
-> ConduitT () (Maybe RawFilePath) m ()
-> ConduitT () (Maybe RawFilePath) m ()
forall a. Semigroup a => a -> a -> a
<> (ConduitT () Event m ()
eventsSource ConduitT () Event m ()
-> ConduitM Event (Maybe RawFilePath) m ()
-> ConduitT () (Maybe RawFilePath) m ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| (Event -> ConduitM Event (Maybe RawFilePath) m ())
-> ConduitM Event (Maybe RawFilePath) m ()
forall (m :: * -> *) i o r.
Monad m =>
(i -> ConduitT i o m r) -> ConduitT i o m ()
C.awaitForever (\Event
e -> (Event -> ())
-> (() -> Maybe Event)
-> ConduitT () (Maybe RawFilePath) m ()
-> ConduitM Event (Maybe RawFilePath) m ()
forall (m :: * -> *) i1 i2 o r.
Monad m =>
(i1 -> i2)
-> (i2 -> Maybe i1) -> ConduitT i2 o m r -> ConduitT i1 o m r
C.mapInput (() -> Event -> ()
forall a b. a -> b -> a
const ()) (Maybe Event -> () -> Maybe Event
forall a b. a -> b -> a
const (Maybe Event -> () -> Maybe Event)
-> Maybe Event -> () -> Maybe Event
forall a b. (a -> b) -> a -> b
$ Event -> Maybe Event
forall a. a -> Maybe a
Just Event
e) (ConduitT () (Maybe RawFilePath) m ()
 -> ConduitM Event (Maybe RawFilePath) m ())
-> ConduitT () (Maybe RawFilePath) m ()
-> ConduitM Event (Maybe RawFilePath) m ()
forall a b. (a -> b) -> a -> b
$ Handle -> ConduitT () (Maybe RawFilePath) m ()
forall (m :: * -> *).
MonadIO m =>
Handle -> ConduitT () (Maybe RawFilePath) m ()
sourceHandleEof Handle
h)) -- reread from handle after each modify event
        ConduitT () (Maybe RawFilePath) m ()
-> ConduitT () (Maybe RawFilePath) m ()
-> ConduitT () (Maybe RawFilePath) m ()
forall a. Semigroup a => a -> a -> a
<> Handle -> ConduitT () (Maybe RawFilePath) m ()
forall (m :: * -> *).
MonadIO m =>
Handle -> ConduitT () (Maybe RawFilePath) m ()
sourceHandleEof Handle
h -- read to the end of the file after the watch ends

-- | Version of 'sourceFileFollowModify' not notifying about EOF
sourceFileFollowModify' :: (MonadResource m, MonadIO m) => FilePath -> STM (ConduitT () ByteString m (), STM ())
sourceFileFollowModify' :: FilePath -> STM (ConduitT () RawFilePath m (), STM ())
sourceFileFollowModify' FilePath
fp = do
  (ConduitT () (Maybe RawFilePath) m ()
source, STM ()
close) <- FilePath -> STM (ConduitT () (Maybe RawFilePath) m (), STM ())
forall (m :: * -> *).
(MonadResource m, MonadIO m) =>
FilePath -> STM (ConduitT () (Maybe RawFilePath) m (), STM ())
sourceFileFollowModify FilePath
fp
  (ConduitT () RawFilePath m (), STM ())
-> STM (ConduitT () RawFilePath m (), STM ())
forall (m :: * -> *) a. Monad m => a -> m a
return (ConduitT () (Maybe RawFilePath) m ()
source ConduitT () (Maybe RawFilePath) m ()
-> ConduitM (Maybe RawFilePath) RawFilePath m ()
-> ConduitT () RawFilePath m ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| ConduitM (Maybe RawFilePath) RawFilePath m ()
forall (m :: * -> *) a. Monad m => ConduitT (Maybe a) a m ()
C.catMaybes, STM ()
close)

-- | Like 'bracketP', but resource can be released within 'in-between' computation.
-- Resource is recreated after release if needed
replacableBracketP ::
  MonadResource m =>
  -- acquire resource computation
  IO a ->
  -- release resource computation
  (a -> IO ()) ->
  -- computation to run in-between.
  -- first: acquires the resource if not available, otherwise just gets it
  -- second: releases the resource
  ((m a, m ()) -> ConduitT i o m ()) ->
  ConduitT i o m ()
replacableBracketP :: IO a
-> (a -> IO ())
-> ((m a, m ()) -> ConduitT i o m ())
-> ConduitT i o m ()
replacableBracketP IO a
initialize a -> IO ()
cleanup (m a, m ()) -> ConduitT i o m ()
inside =
  let getOrInitialize :: TVar (Maybe a) -> m a
getOrInitialize TVar (Maybe a)
var = IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO a -> m a) -> IO a -> m a
forall a b. (a -> b) -> a -> b
$ do
        Maybe a
maybeA <- TVar (Maybe a) -> IO (Maybe a)
forall a. TVar a -> IO a
readTVarIO TVar (Maybe a)
var
        case Maybe a
maybeA of
          Just a
a -> a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return a
a
          Maybe a
Nothing -> do
            a
a <- IO a
initialize
            STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar (Maybe a) -> Maybe a -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (Maybe a)
var (a -> Maybe a
forall a. a -> Maybe a
Just a
a)
            a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return a
a
      cleanupAndUnset :: TVar (Maybe a) -> m ()
cleanupAndUnset TVar (Maybe a)
var = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
        Maybe a
maybeA <- TVar (Maybe a) -> IO (Maybe a)
forall a. TVar a -> IO a
readTVarIO TVar (Maybe a)
var
        (a -> IO ()) -> Maybe a -> IO ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ a -> IO ()
cleanup Maybe a
maybeA
        STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar (Maybe a) -> Maybe a -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (Maybe a)
var Maybe a
forall a. Maybe a
Nothing
   in IO (TVar (Maybe a))
-> (TVar (Maybe a) -> IO ())
-> (TVar (Maybe a) -> ConduitT i o m ())
-> ConduitT i o m ()
forall (m :: * -> *) a i o r.
MonadResource m =>
IO a -> (a -> IO ()) -> (a -> ConduitT i o m r) -> ConduitT i o m r
bracketP
        (IO a
initialize IO a -> (a -> IO (TVar (Maybe a))) -> IO (TVar (Maybe a))
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Maybe a -> IO (TVar (Maybe a))
forall a. a -> IO (TVar a)
newTVarIO (Maybe a -> IO (TVar (Maybe a)))
-> (a -> Maybe a) -> a -> IO (TVar (Maybe a))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Maybe a
forall a. a -> Maybe a
Just)
        TVar (Maybe a) -> IO ()
forall (m :: * -> *). MonadIO m => TVar (Maybe a) -> m ()
cleanupAndUnset
        (\TVar (Maybe a)
var -> (m a, m ()) -> ConduitT i o m ()
inside (TVar (Maybe a) -> m a
forall (m :: * -> *). MonadIO m => TVar (Maybe a) -> m a
getOrInitialize TVar (Maybe a)
var, TVar (Maybe a) -> m ()
forall (m :: * -> *). MonadIO m => TVar (Maybe a) -> m ()
cleanupAndUnset TVar (Maybe a)
var))

-- | Watch INotify events for given file.
-- Interprets file removal as file rotation and tries to recreate the watch again.
inotifyEventsSourceRotate :: MonadResource m => [INotify.EventVariety] -> FilePath -> STM (ConduitT () INotify.Event m (), STM ())
inotifyEventsSourceRotate :: [EventVariety] -> FilePath -> STM (ConduitT () Event m (), STM ())
inotifyEventsSourceRotate [EventVariety]
events FilePath
fp = do
  TMQueue Event
q <- STM (TMQueue Event)
forall a. STM (TMQueue a)
newTMQueue
  let c :: ConduitM a Event m ()
c = TMQueue Event -> ConduitM a Event m ()
forall (m :: * -> *) a z.
MonadIO m =>
TMQueue a -> ConduitT z a m ()
sourceTMQueue TMQueue Event
q ConduitM a Event m ()
-> ConduitM Event Event m () -> ConduitM a Event m ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| (INotify -> ConduitM Event Event m ()) -> ConduitM Event Event m ()
forall (m :: * -> *) a b r.
MonadResource m =>
(INotify -> ConduitT a b m r) -> ConduitT a b m r
withINotify (\INotify
i -> IO (TVar (Maybe WatchDescriptor))
-> (TVar (Maybe WatchDescriptor) -> IO ())
-> ((m (TVar (Maybe WatchDescriptor)), m ())
    -> ConduitM Event Event m ())
-> ConduitM Event Event m ()
forall (m :: * -> *) a i o.
MonadResource m =>
IO a
-> (a -> IO ())
-> ((m a, m ()) -> ConduitT i o m ())
-> ConduitT i o m ()
replacableBracketP (INotify -> TMQueue Event -> IO (TVar (Maybe WatchDescriptor))
initialize INotify
i TMQueue Event
q) TVar (Maybe WatchDescriptor) -> IO ()
cleanup (m (TVar (Maybe WatchDescriptor)), m ())
-> ConduitM Event Event m ()
forall (m :: * -> *).
MonadIO m =>
(m (TVar (Maybe WatchDescriptor)), m ())
-> ConduitT Event Event m ()
inside)
  (ConduitT () Event m (), STM ())
-> STM (ConduitT () Event m (), STM ())
forall (m :: * -> *) a. Monad m => a -> m a
return (ConduitT () Event m ()
forall a. ConduitM a Event m ()
c, TMQueue Event -> STM ()
forall a. TMQueue a -> STM ()
closeTMQueue TMQueue Event
q)
  where
    -- WatchDescriptior is stored within TVar because it destroys itself when the watched file is deleted
    initialize :: INotify.INotify -> TMQueue INotify.Event -> IO (TVar (Maybe INotify.WatchDescriptor))
    initialize :: INotify -> TMQueue Event -> IO (TVar (Maybe WatchDescriptor))
initialize INotify
i TMQueue Event
q = do
      WatchDescriptor
w <- INotify
-> [EventVariety]
-> RawFilePath
-> (Event -> IO ())
-> IO WatchDescriptor
INotify.addWatch INotify
i (EventVariety
INotify.DeleteSelf EventVariety -> [EventVariety] -> [EventVariety]
forall a. a -> [a] -> [a]
: [EventVariety]
events) (FilePath -> RawFilePath
encodeFilePath FilePath
fp) (STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> (Event -> STM ()) -> Event -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TMQueue Event -> Event -> STM ()
forall a. TMQueue a -> a -> STM ()
writeTMQueue TMQueue Event
q)
      Maybe WatchDescriptor -> IO (TVar (Maybe WatchDescriptor))
forall a. a -> IO (TVar a)
newTVarIO (Maybe WatchDescriptor -> IO (TVar (Maybe WatchDescriptor)))
-> Maybe WatchDescriptor -> IO (TVar (Maybe WatchDescriptor))
forall a b. (a -> b) -> a -> b
$ WatchDescriptor -> Maybe WatchDescriptor
forall a. a -> Maybe a
Just WatchDescriptor
w

    cleanup :: TVar (Maybe INotify.WatchDescriptor) -> IO ()
    cleanup :: TVar (Maybe WatchDescriptor) -> IO ()
cleanup TVar (Maybe WatchDescriptor)
var = TVar (Maybe WatchDescriptor) -> IO (Maybe WatchDescriptor)
forall a. TVar a -> IO a
readTVarIO TVar (Maybe WatchDescriptor)
var IO (Maybe WatchDescriptor)
-> (Maybe WatchDescriptor -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (WatchDescriptor -> IO ()) -> Maybe WatchDescriptor -> IO ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ WatchDescriptor -> IO ()
INotify.removeWatch

    inside :: MonadIO m => (m (TVar (Maybe INotify.WatchDescriptor)), m ()) -> ConduitT INotify.Event INotify.Event m ()
    inside :: (m (TVar (Maybe WatchDescriptor)), m ())
-> ConduitT Event Event m ()
inside (m (TVar (Maybe WatchDescriptor))
getOrInit, m ()
unset) = do
      TVar (Maybe WatchDescriptor)
var <- m (TVar (Maybe WatchDescriptor))
-> ConduitT Event Event m (TVar (Maybe WatchDescriptor))
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift m (TVar (Maybe WatchDescriptor))
getOrInit
      Maybe Event
event <- ConduitT Event Event m (Maybe Event)
forall (m :: * -> *) i. Monad m => Consumer i m (Maybe i)
C.await

      case Maybe Event
event of
        Just e :: Event
e@INotify.DeletedSelf {} -> do
          Event -> ConduitT Event Event m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
C.yield Event
e
          IO () -> ConduitT Event Event m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ConduitT Event Event m ())
-> IO () -> ConduitT Event Event m ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar (Maybe WatchDescriptor) -> Maybe WatchDescriptor -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (Maybe WatchDescriptor)
var Maybe WatchDescriptor
forall a. Maybe a
Nothing -- WatchDescriptor is deleted implicitly
          m () -> ConduitT Event Event m ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift m ()
unset
          (m (TVar (Maybe WatchDescriptor)), m ())
-> ConduitT Event Event m ()
forall (m :: * -> *).
MonadIO m =>
(m (TVar (Maybe WatchDescriptor)), m ())
-> ConduitT Event Event m ()
inside (m (TVar (Maybe WatchDescriptor))
getOrInit, m ()
unset)
        Just Event
other -> do
          Event -> ConduitT Event Event m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
C.yield Event
other
          (m (TVar (Maybe WatchDescriptor)), m ())
-> ConduitT Event Event m ()
forall (m :: * -> *).
MonadIO m =>
(m (TVar (Maybe WatchDescriptor)), m ())
-> ConduitT Event Event m ()
inside (m (TVar (Maybe WatchDescriptor))
getOrInit, m ()
unset)
        Maybe Event
Nothing ->
          () -> ConduitT Event Event m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()

data FollowFileEvent = Replaced | Modified deriving (FollowFileEvent -> FollowFileEvent -> Bool
(FollowFileEvent -> FollowFileEvent -> Bool)
-> (FollowFileEvent -> FollowFileEvent -> Bool)
-> Eq FollowFileEvent
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: FollowFileEvent -> FollowFileEvent -> Bool
$c/= :: FollowFileEvent -> FollowFileEvent -> Bool
== :: FollowFileEvent -> FollowFileEvent -> Bool
$c== :: FollowFileEvent -> FollowFileEvent -> Bool
Eq, Int -> FollowFileEvent -> ShowS
[FollowFileEvent] -> ShowS
FollowFileEvent -> FilePath
(Int -> FollowFileEvent -> ShowS)
-> (FollowFileEvent -> FilePath)
-> ([FollowFileEvent] -> ShowS)
-> Show FollowFileEvent
forall a.
(Int -> a -> ShowS) -> (a -> FilePath) -> ([a] -> ShowS) -> Show a
showList :: [FollowFileEvent] -> ShowS
$cshowList :: [FollowFileEvent] -> ShowS
show :: FollowFileEvent -> FilePath
$cshow :: FollowFileEvent -> FilePath
showsPrec :: Int -> FollowFileEvent -> ShowS
$cshowsPrec :: Int -> FollowFileEvent -> ShowS
Show)

-- | Stream contents of a file as binary data.
-- Once EOF is reached it waits for file modifications and streams data as they are appended to the file.
-- Once the watch is terminated, it will read the file until EOF is reached.
--
-- Interprets file removal as file rotation and tries to recreate the watch and continue to follow the file from last position (expects just rotation that resembles append to file).
-- Source emits 'Nothing' when EOF is reached. For version emitting just data see 'sourceFileFollowModifyRotateWithSeek\''
sourceFileFollowModifyRotateWithSeek :: (MonadResource m, MonadIO m) => FilePath -> STM (ConduitT () (Maybe ByteString) m (), STM ())
sourceFileFollowModifyRotateWithSeek :: FilePath -> STM (ConduitT () (Maybe RawFilePath) m (), STM ())
sourceFileFollowModifyRotateWithSeek FilePath
fp = do
  (ConduitT () Event m ()
eventsSource, STM ()
closeWatch) <- [EventVariety] -> FilePath -> STM (ConduitT () Event m (), STM ())
forall (m :: * -> *).
MonadResource m =>
[EventVariety] -> FilePath -> STM (ConduitT () Event m (), STM ())
inotifyEventsSourceRotate [EventVariety
INotify.Modify] FilePath
fp
  TVar (Maybe Integer)
positionVar <- Maybe Integer -> STM (TVar (Maybe Integer))
forall a. a -> STM (TVar a)
newTVar Maybe Integer
forall a. Maybe a
Nothing
  (ConduitT () (Maybe RawFilePath) m (), STM ())
-> STM (ConduitT () (Maybe RawFilePath) m (), STM ())
forall (m :: * -> *) a. Monad m => a -> m a
return (ConduitT () Event m ()
eventsSource ConduitT () Event m ()
-> ConduitM Event (Maybe RawFilePath) m ()
-> ConduitT () (Maybe RawFilePath) m ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| (Event -> Maybe FollowFileEvent)
-> ConduitT Event FollowFileEvent m ()
forall (m :: * -> *) a b.
Monad m =>
(a -> Maybe b) -> ConduitT a b m ()
C.mapMaybe Event -> Maybe FollowFileEvent
handleINotifyEvent ConduitT Event FollowFileEvent m ()
-> ConduitM FollowFileEvent (Maybe RawFilePath) m ()
-> ConduitM Event (Maybe RawFilePath) m ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| IO Handle
-> (Handle -> IO ())
-> ((m Handle, m ())
    -> ConduitM FollowFileEvent (Maybe RawFilePath) m ())
-> ConduitM FollowFileEvent (Maybe RawFilePath) m ()
forall (m :: * -> *) a i o.
MonadResource m =>
IO a
-> (a -> IO ())
-> ((m a, m ()) -> ConduitT i o m ())
-> ConduitT i o m ()
replacableBracketP (TVar (Maybe Integer) -> IO Handle
initialize TVar (Maybe Integer)
positionVar) Handle -> IO ()
cleanup (TVar (Maybe Integer)
-> (m Handle, m ())
-> ConduitM FollowFileEvent (Maybe RawFilePath) m ()
forall (m :: * -> *).
MonadIO m =>
TVar (Maybe Integer)
-> (m Handle, m ())
-> ConduitT FollowFileEvent (Maybe RawFilePath) m ()
inside TVar (Maybe Integer)
positionVar), STM ()
closeWatch)
  where
    handleINotifyEvent :: Event -> Maybe FollowFileEvent
handleINotifyEvent INotify.Modified {} = FollowFileEvent -> Maybe FollowFileEvent
forall a. a -> Maybe a
Just FollowFileEvent
Modified
    handleINotifyEvent INotify.DeletedSelf {} = FollowFileEvent -> Maybe FollowFileEvent
forall a. a -> Maybe a
Just FollowFileEvent
Replaced
    handleINotifyEvent Event
_ = Maybe FollowFileEvent
forall a. Maybe a
Nothing

    initialize :: TVar (Maybe Integer) -> IO IO.Handle
    initialize :: TVar (Maybe Integer) -> IO Handle
initialize TVar (Maybe Integer)
positionVar = do
      Handle
newHandle <- IO Handle -> IO Handle
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Handle -> IO Handle) -> IO Handle -> IO Handle
forall a b. (a -> b) -> a -> b
$ FilePath -> IOMode -> IO Handle
IO.openFile FilePath
fp IOMode
IO.ReadMode
      Maybe Integer
maybePosition <- TVar (Maybe Integer) -> IO (Maybe Integer)
forall a. TVar a -> IO a
readTVarIO TVar (Maybe Integer)
positionVar
      (Integer -> IO ()) -> Maybe Integer -> IO ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ (Handle -> SeekMode -> Integer -> IO ()
IO.hSeek Handle
newHandle SeekMode
IO.AbsoluteSeek) Maybe Integer
maybePosition -- seek to original position
      Handle -> IO Handle
forall (m :: * -> *) a. Monad m => a -> m a
return Handle
newHandle

    cleanup :: IO.Handle -> IO ()
    cleanup :: Handle -> IO ()
cleanup = Handle -> IO ()
IO.hClose

    inside :: MonadIO m => TVar (Maybe Integer) -> (m IO.Handle, m ()) -> ConduitT FollowFileEvent (Maybe ByteString) m ()
    inside :: TVar (Maybe Integer)
-> (m Handle, m ())
-> ConduitT FollowFileEvent (Maybe RawFilePath) m ()
inside TVar (Maybe Integer)
positionVar (m Handle
getOrInit, m ()
unset) = do
      Handle
handle <- m Handle -> ConduitT FollowFileEvent (Maybe RawFilePath) m Handle
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift m Handle
getOrInit
      Either () RawFilePath
line <- IO (Either () RawFilePath)
-> ConduitT
     FollowFileEvent (Maybe RawFilePath) m (Either () RawFilePath)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either () RawFilePath)
 -> ConduitT
      FollowFileEvent (Maybe RawFilePath) m (Either () RawFilePath))
-> IO (Either () RawFilePath)
-> ConduitT
     FollowFileEvent (Maybe RawFilePath) m (Either () RawFilePath)
forall a b. (a -> b) -> a -> b
$ (IOError -> Maybe ())
-> IO RawFilePath -> IO (Either () RawFilePath)
forall e b a.
Exception e =>
(e -> Maybe b) -> IO a -> IO (Either b a)
tryJust (Bool -> Maybe ()
forall (f :: * -> *). Alternative f => Bool -> f ()
guard (Bool -> Maybe ()) -> (IOError -> Bool) -> IOError -> Maybe ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IOError -> Bool
IO.isEOFError) (IO RawFilePath -> IO (Either () RawFilePath))
-> IO RawFilePath -> IO (Either () RawFilePath)
forall a b. (a -> b) -> a -> b
$ Handle -> Int -> IO RawFilePath
BS.hGetSome Handle
handle Int
BS.defaultChunkSize

      case Either () RawFilePath
line of
        Right RawFilePath
l -> do
          Maybe RawFilePath
-> ConduitT FollowFileEvent (Maybe RawFilePath) m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
C.yield (Maybe RawFilePath
 -> ConduitT FollowFileEvent (Maybe RawFilePath) m ())
-> Maybe RawFilePath
-> ConduitT FollowFileEvent (Maybe RawFilePath) m ()
forall a b. (a -> b) -> a -> b
$ RawFilePath -> Maybe RawFilePath
forall a. a -> Maybe a
Just RawFilePath
l
          TVar (Maybe Integer)
-> (m Handle, m ())
-> ConduitT FollowFileEvent (Maybe RawFilePath) m ()
forall (m :: * -> *).
MonadIO m =>
TVar (Maybe Integer)
-> (m Handle, m ())
-> ConduitT FollowFileEvent (Maybe RawFilePath) m ()
inside TVar (Maybe Integer)
positionVar (m Handle
getOrInit, m ()
unset)
        Left ()
_ -> do
          -- eof reached
          Maybe RawFilePath
-> ConduitT FollowFileEvent (Maybe RawFilePath) m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
C.yield Maybe RawFilePath
forall a. Maybe a
Nothing
          Maybe FollowFileEvent
event <- ConduitT
  FollowFileEvent (Maybe RawFilePath) m (Maybe FollowFileEvent)
forall (m :: * -> *) i. Monad m => Consumer i m (Maybe i)
C.await
          case Maybe FollowFileEvent
event of
            Just FollowFileEvent
Replaced -> do
              -- store current position
              Integer
pos <- IO Integer
-> ConduitT FollowFileEvent (Maybe RawFilePath) m Integer
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Integer
 -> ConduitT FollowFileEvent (Maybe RawFilePath) m Integer)
-> IO Integer
-> ConduitT FollowFileEvent (Maybe RawFilePath) m Integer
forall a b. (a -> b) -> a -> b
$ Handle -> IO Integer
IO.hTell Handle
handle
              IO () -> ConduitT FollowFileEvent (Maybe RawFilePath) m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ConduitT FollowFileEvent (Maybe RawFilePath) m ())
-> IO () -> ConduitT FollowFileEvent (Maybe RawFilePath) m ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar (Maybe Integer) -> Maybe Integer -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (Maybe Integer)
positionVar (Maybe Integer -> STM ()) -> Maybe Integer -> STM ()
forall a b. (a -> b) -> a -> b
$ Integer -> Maybe Integer
forall a. a -> Maybe a
Just Integer
pos
              -- remove current handle
              m () -> ConduitT FollowFileEvent (Maybe RawFilePath) m ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift m ()
unset
              TVar (Maybe Integer)
-> (m Handle, m ())
-> ConduitT FollowFileEvent (Maybe RawFilePath) m ()
forall (m :: * -> *).
MonadIO m =>
TVar (Maybe Integer)
-> (m Handle, m ())
-> ConduitT FollowFileEvent (Maybe RawFilePath) m ()
inside TVar (Maybe Integer)
positionVar (m Handle
getOrInit, m ()
unset)
            Just FollowFileEvent
Modified ->
              TVar (Maybe Integer)
-> (m Handle, m ())
-> ConduitT FollowFileEvent (Maybe RawFilePath) m ()
forall (m :: * -> *).
MonadIO m =>
TVar (Maybe Integer)
-> (m Handle, m ())
-> ConduitT FollowFileEvent (Maybe RawFilePath) m ()
inside TVar (Maybe Integer)
positionVar (m Handle
getOrInit, m ()
unset)
            Maybe FollowFileEvent
Nothing ->
              -- read the file until EOF after the watch is terminated
              (FollowFileEvent -> ())
-> (() -> Maybe FollowFileEvent)
-> ConduitT () (Maybe RawFilePath) m ()
-> ConduitT FollowFileEvent (Maybe RawFilePath) m ()
forall (m :: * -> *) i1 i2 o r.
Monad m =>
(i1 -> i2)
-> (i2 -> Maybe i1) -> ConduitT i2 o m r -> ConduitT i1 o m r
C.mapInput (() -> FollowFileEvent -> ()
forall a b. a -> b -> a
const ()) (Maybe FollowFileEvent -> () -> Maybe FollowFileEvent
forall a b. a -> b -> a
const Maybe FollowFileEvent
event) (Handle -> ConduitT () (Maybe RawFilePath) m ()
forall (m :: * -> *).
MonadIO m =>
Handle -> ConduitT () (Maybe RawFilePath) m ()
sourceHandleEof Handle
handle)

-- | Version of 'sourceFileFollowModifyRotateWithSeek' not notifying about EOF
sourceFileFollowModifyRotateWithSeek' :: (MonadResource m, MonadIO m) => FilePath -> STM (ConduitT () ByteString m (), STM ())
sourceFileFollowModifyRotateWithSeek' :: FilePath -> STM (ConduitT () RawFilePath m (), STM ())
sourceFileFollowModifyRotateWithSeek' FilePath
fp = do
  (ConduitT () (Maybe RawFilePath) m ()
source, STM ()
close) <- FilePath -> STM (ConduitT () (Maybe RawFilePath) m (), STM ())
forall (m :: * -> *).
(MonadResource m, MonadIO m) =>
FilePath -> STM (ConduitT () (Maybe RawFilePath) m (), STM ())
sourceFileFollowModifyRotateWithSeek FilePath
fp
  (ConduitT () RawFilePath m (), STM ())
-> STM (ConduitT () RawFilePath m (), STM ())
forall (m :: * -> *) a. Monad m => a -> m a
return (ConduitT () (Maybe RawFilePath) m ()
source ConduitT () (Maybe RawFilePath) m ()
-> ConduitM (Maybe RawFilePath) RawFilePath m ()
-> ConduitT () RawFilePath m ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| ConduitM (Maybe RawFilePath) RawFilePath m ()
forall (m :: * -> *) a. Monad m => ConduitT (Maybe a) a m ()
C.catMaybes, STM ()
close)