DIRAC Taskqueue Director

This page describes the algorithms used for Pilot Submission in the DIRAC TaskqueueDirector Agent.


The aim of the DIRAC Director is to place in the gLite/LCG WMS system enough pilots to execute the tasks pending in DIRAC in such a way that the priorities defined can be followed. Several aspects need to be considered:

  • Priorities of Pending tasks in DIRAC.
  • Number of Pending tasks in DIRAC.
  • Already submitted pilots that are still in the gLite/LCG WMS system waiting for execution.

In DIRAC pending tasks are organized in TaskQueues, each one holding all the ones that have an identical set of requirements.

If PilotType is not defined (the only allowed value is "private"), a mixture of generic and private pilots are submitted following what it is currently supported at the different sites. Generic pilots will be able to match any non-"private" task in the system (for any user or group), while private pilots will only match for a certain user/group combination.

TaskqueueDirector Components and Parameters

The functionality of the TaskqueueDirector is achieve using a number of components whose behavior can be controlled via a set of Configuration parameters.

Pilot Directors

These are the components that execute the submission of the pilots to the middleware gLite WMS and LCG RB are currently supported, but only gLite WMS is used at the moment. For each of these Directors a list of WMS/RB servers can be defined in the DIRAC Configuration System, as well as other parameters like the Ranking expression used in the middleware matching, the CPU Normalization expression, or whether a resource list-match is attempted prior to the pilot submission.

Submit Pools

As the TaskqueueDirector goes over the different TaskQueues and decides to submit a certain number of pilots for a given one, the actual submission is handled to one "Submit Pool", ie to a separated thread that makes use one of the above Pilot Directors to perform the submission. The list of available Pools in the system and as well as the ones used by default are defined in the Agent Configuration (see bellow). Each Submit Pool is implemented as a DIRAC ThreadPool object with a Min, Max executing threads and a Total number tasks in the Queue that can be set via the DIRAC Configuration System ( minThreadsInPool, maxThreadsInPool, totalThreadsInPool ).


  • pilotsPerIteration: Number of pilots to be submitted per iteration of the Agent (in the near future this will be made more dynamic allowing it to vary depending on the current number of Waiting and Running jobs in the system with respect to some predefined targets).
  • lowestCPUBoost: [7200] Lower limit for boosting pilot submission for TaskQueues with lower CPU requirements.
  • extraPilotFraction and extraPilots: [0.2 and 4] extra fraction of waiting pilots waiting in middleware with respect to the number of waiting tasks in a given TaskQueue. For TaskQueues with a small number of waiting tasks at least this extra number of pilots are allowed in the middleware.
  • maxPilotWaitingHours: [6] time after which a pilot waiting in the middleware is no longer considered in this state. There are some pathological behaviours of the middleware that can leave pilots in Waiting state when they should actually be Aborted.
  • SubmitPools: list of Submit Pools known to the system, currently DIRAC, Local, gLite, SAM. From this only the last 2 are handle by the TaskqueueDirector.
  • DefaultSubmitPools: Pools to use if nothing is specified by the TaskQueue (ie the Job).
  • GenericPilotDN and GenericPilotGroup: Identity used for the submission of generic pilots.
  • PrivatePilotFraction: Fraction of private pilots submitted for a TaskQueue that is not "private".

TaskqueueDirector Algorithm

The pilot submission on the the TaskQueueDirector Agent proceeds as follows:

  • self.__checkSubmitPools()
Reload configuration and apply any necessary changes to Pilot Directors and Submit Pools.

  • taskQueueDB.retrieveTaskQueues()
Retrieve the current list of TaskQueues with pending requests in the System.

  • prioritySum += taskQueueDict[taskQueueID]['Priority']
Add up Priorities for all TaskQueues.

  • waitingJobs += taskQueueDict[taskQueueID]['Jobs']
Add up number of pending tasks for all TaskQueues.

  • self.pilotsPerPriority  = self.am_getOption('pilotsPerIteration') / prioritySum
Determine the number of pilots to submit per unit of priority, given a target number of pilots to be submitted in the current iteration.

  • self.pilotsPerJob       = self.am_getOption('pilotsPerIteration') / waitingJobs
Determine the number of pilots to submit per pending task, given a target number of pilots to be submitted in the current iteration.

  • pilotAgentsDB.countPilots( {'TaskQueueID': taskQueueID, 'Status': waitingStatusList}, newer = timeLimitToConsider )
Get current number of pilots already submitted for a given TaskQueue that are still waiting to run.

  • pilotsToSubmit = poisson( ( self.pilotsPerPriority * taskQueuePriority + self.pilotsPerJob * taskQueueJobs ) * maxCPU / taskQueueCPU )
    pilotsToSubmit = min( pilotsToSubmit, int( ( 1 + extraPilotFraction ) * taskQueueJobs ) + extraPilots - waitingPilots )
Determine the number of pilots to be submitted for each TaskQueue as the sum of two contributions: one the the Priority and the other proportional to the number of pending tasks. Low CPU tasks are given an extra boost until lowestCPUBoost is reached.

  • if 'SubmitPools' in taskQueueDict:
      submitPools = taskQueueDict[ 'SubmitPools' ]
      submitPools = self.am_getOption('DefaultSubmitPools')
    submitPools = List.randomize( submitPools )
Decide which Submit Pool should be used.

Now the execution goes into the Submit Pool thread using the Pilot Director object.

  • privateIfGenericTQ = self.privatePilotFraction < random.random()
    privateTQ = ( 'PilotTypes' in taskQueueDict and 'private' in [ t.lower() for t in taskQueueDict['PilotTypes'] ] )
    forceGeneric = 'ForceGeneric' in taskQueueDict
    submitPrivatePilot = ( privateIfGenericTQ or privateTQ ) and not forceGeneric
Decide if private pilots are to be submitted, if PilotType is not given a mixture of private and generic pilots are submitted.

  • self._prepareJDL( taskQueueDict, workingDirectory, pilotOptions, pilotsToSubmit, ceMask, submitPrivatePilot, privateTQ )
Prepare JDL for pilot submission.

  • gProxyManager.getPilotProxyFromDIRACGroup( ownerDN, ownerGroup, requiredTimeLeft = 86400 * 5 )
Get a valid proxy for the necessary user and group. Currently we do not rely on MyProxy to extend the proxy of the pilots, thus we request and use a long lived proxy.

  • availableCEs = self._listMatch( proxy, jdl, taskQueueID )
If thus configured, check if there are resources available for the pilots to be submitted before doing the actual submission.

  • self._submitPilot( proxy, pilotsToSubmit, jdl, taskQueueID )
Pilot submission.

  • pilotAgentsDB.addPilotTQReference(pilotReference, taskQueueID, ownerDN, vomsGroup, broker=resourceBroker, gridType=self.gridMiddleware, requirements=pilotRequirements )
Finally, update the PilotAgents table with the information about the pilots just submitted.

Proposed fixes

Adapt the target number of pilots to be submitted per iteration depending on the current number of Waiting and Running Jobs in the system.

Change Logger name for Pilot Director to reflect the Submit Pool being used.

-- RicardoGraciani - 16 Feb 2009

Edit | Attach | Watch | Print version | History: r2 < r1 | Backlinks | Raw View | WYSIWYG | More topic actions
Topic revision: r2 - 2009-02-17 - RicardoGraciani
    • Cern Search Icon Cern Search
    • TWiki Search Icon TWiki Search
    • Google Search Icon Google Search

    LHCb All webs login

This site is powered by the TWiki collaboration platform Powered by PerlCopyright & 2008-2019 by the contributing authors. All material on this collaboration platform is the property of the contributing authors.
Ideas, requests, problems regarding TWiki? Send feedback