-- -*- indent-tabs-mode: nil -*- -- -- | Creation and operations on a 'WorkQueue' module Control.Distributed.CCTools.WorkQueue.Internal.WorkQueue where import Control.Distributed.CCTools.WorkQueue.Internal.Cast import Control.Distributed.CCTools.WorkQueue.Internal.Types import Control.Distributed.CCTools.WorkQueue.Internal.CastInstances () import Bindings.CCTools.WorkQueue import Foreign.C import Foreign.Ptr import Foreign.ForeignPtr import Foreign.Storable import Foreign.Marshal.Utils (toBool) import Control.Applicative ((<$>)) import Control.Lens import Control.Monad ((>=>), void) import System.IO.Unsafe import Control.Monad.Loops withJust :: Maybe a -> (a -> IO ()) -> IO () withJust m f = case m of Just a -> f a _ -> return () -- * WorkQueue creation/deletion -- | The default parameters for initializing a 'WorkQueue' -- -- > defaultQParams :: QueueParams -- > defaultQParams = QP { -- > _qport = Just . port $ fromIntegral c'WORK_QUEUE_DEFAULT_PORT -- > , _name = Nothing -- > , _fastabort = Nothing -- > , _taskordering = Nothing -- > , _priority = Nothing -- > , _mode = Nothing -- > , _logfile = Nothing -- > , _scheduler = Nothing -- > } defaultQParams :: QueueParams defaultQParams = QP { _qport = Just . port $ fromIntegral c'WORK_QUEUE_DEFAULT_PORT , _name = Nothing , _fastabort = Nothing , _taskordering = Nothing , _priority = Nothing , _mode = Nothing , _logfile = Nothing , _scheduler = Nothing } workqueue :: QueueParams -> IO WorkQueue workqueue p = do ptr <- c'work_queue_create (cast $ p^.qport) q <- WQ <$> newForeignPtr p'work_queue_delete ptr withJust (p^.fastabort ) (setFastAbort q) withJust (p^.taskordering) (setTaskOrdering q) withJust (p^.priority ) (setPriority q) withJust (p^.mode ) (setMasterMode q) withJust (p^.name ) (setName q) withJust (p^.logfile ) (setLog q) withJust (p^.scheduler ) (setScheduleAlg q) return q setName :: WorkQueue -> String -> IO () setName q n = withForeignPtr (q^.unWQ) $ \q' -> do n' <- newCString n c'work_queue_specify_name q' n' setFastAbort :: WorkQueue -> FastAbort -> IO () setFastAbort q a = withForeignPtr (q^.unWQ) (void . flip c'work_queue_activate_fast_abort (cast a)) setTaskOrdering :: WorkQueue -> TaskOrdering -> IO () setTaskOrdering q o = withForeignPtr (q^.unWQ) (`c'work_queue_specify_task_order` cast o) setPriority :: WorkQueue -> Int -> IO () setPriority q p = withForeignPtr (q^.unWQ) (`c'work_queue_specify_priority` cast p) setMasterMode :: WorkQueue -> MasterMode -> IO () setMasterMode q m = withForeignPtr (q^.unWQ) (`c'work_queue_specify_master_mode` cast m) setLog :: WorkQueue -> FilePath -> IO () setLog q p = withForeignPtr (q^.unWQ) $ \q' -> c'work_queue_specify_log q' =<< newCString p setScheduleAlg :: WorkQueue -> WorkerScheduleAlg -> IO () setScheduleAlg q a = withForeignPtr (q^.unWQ) (`c'work_queue_specify_algorithm` cast a) shutdownWorkers :: WorkQueue -> NPlus Int -> IO (NPlus Int) shutdownWorkers q n = withForeignPtr (q^.unWQ) (fmap cast . flip c'work_queue_shut_down_workers (cast n)) -- * Task submission/retrieval submit :: WorkQueue -> Task -> IO TaskID submit q t = withForeignPtr (q^.unWQ) (fmap cast . flip c'work_queue_submit (t^.unT)) wait :: WorkQueue -> Timeout -> IO (Maybe Task) wait q t = withForeignPtr (q^.unWQ) $ \q' -> do r <- c'work_queue_wait q' $ cast t return $ if r == nullPtr then Nothing else Just $ T r cancelTaskBy :: (Ptr C'work_queue -> a -> IO (Ptr C'work_queue_task)) -> (b -> IO a) -> WorkQueue -> b -> IO (Maybe Task) cancelTaskBy c'cancel mkAttr q attribute = withForeignPtr (q^.unWQ) $ \q' -> do a <- mkAttr attribute r <- c'cancel q' a return $ if r == nullPtr then Nothing else Just $ T r cancelTaskID :: WorkQueue -> TaskID -> IO (Maybe Task) cancelTaskID = cancelTaskBy c'work_queue_cancel_by_taskid (return . cast) cancelTaskTag :: WorkQueue -> String -> IO (Maybe Task) cancelTaskTag = cancelTaskBy c'work_queue_cancel_by_tasktag newCString -- | Main pattern for processing tasks after submission -- -- Basically: -- -- > eventLoop q timeout update process = -- > while (q is not empty) -- > update q -- > r <- wait q timeout -- > when r is Just: process q r eventLoop :: WorkQueue -> Timeout -> (WorkQueue -> IO ()) -- ^ "update": called before 'wait' -> (WorkQueue -> Task -> IO ()) -- ^ "process" a returned 'Task' -> IO () eventLoop q timeout update process = whileM_ (not <$> isEmpty q) $ do update q r <- wait q timeout maybe (return ()) (process q) r -- * WorkQueue properties getHunger :: WorkQueue -> IO Hunger getHunger q = cast <$> withForeignPtr (q^.unWQ) c'work_queue_hungry isFull :: WorkQueue -> IO Bool isFull = fmap (Full ==) . getHunger isEmpty :: WorkQueue -> IO Bool isEmpty q = toBool <$> withForeignPtr (q^.unWQ) c'work_queue_empty getPort :: WorkQueue -> IO Port getPort q = P . cast <$> withForeignPtr (q^.unWQ) c'work_queue_port getName :: WorkQueue -> IO String getName q = peekCString =<< withForeignPtr (q^.unWQ) c'work_queue_name -- * WorkQueue Runtime Statistics getWorkerSummary :: WorkQueue -> IO String getWorkerSummary q = peekCString =<< withForeignPtr (q^.unWQ) c'work_queue_get_worker_summary getStats :: WorkQueue -> IO Stats getStats q = withForeignPtr (q^.unWQ) $ \q' -> do fptr <- mallocForeignPtr withForeignPtr fptr $ c'work_queue_get_stats q' return $ S fptr withStats :: (a -> b) -> (C'work_queue_stats -> a) -> Stats -> IO b withStats g f s = withForeignPtr (s^.unS) (peek >=> (return . g . f)) unsafeWithStats :: (a -> b) -> (C'work_queue_stats -> a) -> Stats -> b unsafeWithStats g f s = unsafeDupablePerformIO $ withStats g f s workersInit :: Stats -> Int workersInit = unsafeWithStats fromIntegral c'work_queue_stats'workers_init workersReady :: Stats -> Int workersReady = unsafeWithStats fromIntegral c'work_queue_stats'workers_ready workersBusy :: Stats -> Int workersBusy = unsafeWithStats fromIntegral c'work_queue_stats'workers_busy workersCancelling :: Stats -> Int workersCancelling = unsafeWithStats fromIntegral c'work_queue_stats'workers_cancelling tasksRunning :: Stats -> Int tasksRunning = unsafeWithStats fromIntegral c'work_queue_stats'tasks_running tasksWaiting :: Stats -> Int tasksWaiting = unsafeWithStats fromIntegral c'work_queue_stats'tasks_waiting tasksComplete :: Stats -> Int tasksComplete = unsafeWithStats fromIntegral c'work_queue_stats'tasks_complete totalTasksDispatched :: Stats -> Int totalTasksDispatched = unsafeWithStats fromIntegral c'work_queue_stats'total_tasks_dispatched totalTasksComplete :: Stats -> Int totalTasksComplete = unsafeWithStats fromIntegral c'work_queue_stats'total_tasks_complete totalWorkersJoined :: Stats -> Int totalWorkersJoined = unsafeWithStats fromIntegral c'work_queue_stats'total_workers_joined totalWorkersRemoved :: Stats -> Int totalWorkersRemoved = unsafeWithStats fromIntegral c'work_queue_stats'total_workers_removed totalBytesSent :: Stats -> Integer totalBytesSent = unsafeWithStats fromIntegral c'work_queue_stats'total_bytes_sent totalBytesReceived :: Stats -> Integer totalBytesReceived = unsafeWithStats fromIntegral c'work_queue_stats'total_bytes_received startTime :: Stats -> EpochTime MicroSeconds startTime = unsafeWithStats (ET . fromIntegral) c'work_queue_stats'start_time totalSendTime :: Stats -> DiffTime MicroSeconds totalSendTime = unsafeWithStats (DT . fromIntegral) c'work_queue_stats'total_send_time totalReceiveTime :: Stats -> DiffTime MicroSeconds totalReceiveTime = unsafeWithStats (DT . fromIntegral) c'work_queue_stats'total_receive_time