Panda Pilot
Introduction
The
PanDA pilot is an execution environment used to prepare the computing element,
request the actual payload (a production or user analysis job), execute it, and clean up when the
payload has finished. Input and output are transferred from/to storage elements, including object stores.
The pilot jobs are sent from pilot factories to the batch systems and the grid sites.
The actual payload is scheduled when a CPU becomes available, leading to a low latency for analysis
tasks. It also supports pre-placed jobs and input files on the worker node.
For robustness, the pilot jobs can be submitted either through Condor-G or locally.
The
PanDA pilot has support for HPC:s (incl. Titan using backfill, NERSC and others). Aside from traditional
workflows ("normal jobs") it also handles so called event service jobs, where the pilot requests event ranges
from a server that are processed by
AthenaMP (ATLAS) in parallel. Outputs are staged out asynchronously
to object stores, where they are later picked up by merge jobs. Event service runs both on normal grid sites
and HPC:s.
This wiki primarily describes the original
PanDA Pilot ("Pilot 1.0"). The
PanDA Pilot 2.0 Project was formally launched in April 2016, and is documented
here.
Pilot execution
The pilot is launched by a pilot wrapper sent to the batch system by a pilot factory. It supports the following options:
usage: python pilot.py -s <sitename> -d <workdir> -a <appdir> -w <url> -p <port> -u <user> -m <outputdir>
-g <inputdir> -r <rmwkdir> -j <jrflag> -n <jrmax> -c <jrmaxatt> -f <jreqflag> -e <logfiledir> -b <debuglevel>
-h <queuename> -x <stageinretry> -y <loggingMode> -z <updateserver> -k <memory> -t <proxycheckflag>
-l <wrapperflag> -i <pilotreleaseflag> -o <countrygroup> -v <workingGroup> -A <allowOtherCountry>
-B <allowSingleUser> -C <timefloor> -D <useCoPilot> -E <stageoutretry> -F <experiment> -G <getJobMaxTime>
-H <cache> -I <schedconfigURL> -N <yodaNodes> -Q <yodaQueue>
where:
<sitename> is the name of the site that this job is landed,like BNL_ATLAS_1
<workdir> is the pathname to the work directory of this job on the site
<appdir> is the pathname to the directory of the executables
<url> is the URL of the PanDA server
<port> is the port on which the web server listens on
<user> is a flag meaning this pilot is to get a user analysis job from dispatcher if set to user (test will return a test job)
<mtflag> controls if this pilot runs in single or multi-task mode, for multi-task mode, set it to true, all other values is for single mode
<outputDir> location of output files (destination for mv site mover)
<inputdir> location of input files (source for mv site mover)
<rmwkdir> controls if the workdir of this pilot should be removed in the end or not, true or false
<jrflag> turns on/off job recovery, true or false
<jrmax> maximum number of job recoveries
<jrmaxatt> maximum number of recovery attempts per lost job
<jreqflag> job request flag that controls whether the initial job is received from server (true/default) or file (false), jobrec for recovery mode
<logfiledir> if specified, the log file will only be copied to a dir (None means log file will not be registered)
<debuglevel> 0: debug info off, 1: display function name when called, 2: full debug info
<queuename> name of queue to be used as an argument for downloading config info (e.g. UBC-lcgpbs)
<stageinretry> number of tries for stage-ins (default is 2)
<stageoutretry> number of tries for stage-outs (default is 2)
<loggingMode> True: pilot only reports space, etc, but does not run a job. False: normal pilot (default)
<updateserver> True (default) for normal running, False is used for interactive test runs
<memory> memory passed on to the dispatcher when asking for a job (MB; overrides queuedata)
<proxycheckflag> True (default): perform proxy validity checks, False: no check
<wrapperflag> True for wrappers that expect an exit code via return, False (default) when exit() can be used
<pilotreleaseflag> PR for production pilots, RC for release candidate pilots
<countrygroup> Country group selector for getJob request
<workinggroup> Working group selector for getJob request
<allowOtherCountry> True/False
<allowSingleUser> True/False, multi-jobs will only belong from the same user (owner of first downloaded job)
<timefloor> Time limit for multi-jobs in minutes
<useCoPilot> Expect CERNVM pilot to be executed by Co-Pilot (True: on, False: pilot will finish job (default))
<experiment> Current experiment (default: ATLAS)
<getJobMaxTime> The maximum time the pilot will attempt single job downloads (in minutes, default is 3 minutes, min value is 1)
<cache> is an optional URL used by some experiment classes (LSST)
<schedconfigURL> optional URL used by the pilot to download queuedata from the schedconfig server
<yodaNodes> The maximum nodes Yoda will start with
<yodaQueue> The queue Yoda jobs will be sent to
Example of a pilot launch with a minimal set of options:
python pilot.py -w https://pandaserver.cern.ch -p 25443 -u managed -s AGLT2_LMEM -h AGLT2_LMEM-condor -d /tmp
Pilot workflow
When a pilot launches, it starts by scanning the local disk for remaining work directories of previously
failed jobs, referred to here as lost jobs. If it finds any, it will try to update the Panda server and
create and/or register the log files. Remaining output data files will be transfered to the local SE.
It then connects to the Job Dispatcher (Panda server) to ask for a payload to execute. If it doesn't get a job, it will
try again a second time after waiting 100 seconds. If it doesn't get a job the second time either, it will
finish and remove its work directory. Normally it will receive a job and will continue to setup the runtime
environment. It will copy the input files from the local SE through the DDM client to the job work directory
which is created in the pilot work directory. The pilot will then fork the real job and monitor it. The forked
child process can communicate with the pilot via TCP messages. The pilot job monitoring sends
heartbeat
messages to the Job Dispatcher every 30 minutes until the job is done. The initial job is considered a
production job. In multi-job mode (as defined by a non-zero/empty schedconfig.timefloor value), the pilot will
execute several jobs sequentially until it runs out of time/jobs. When a job has finished,
the pilot will create a tar ball of the work directory and will copy it to the local SE. It then updates the Panda
server with the status of the job. Output data files will be transferred to the
local SE. After a successful job completion, it will remove the job work directory. When all jobs have
finished, the pilot will remove its own work directory and will then end.
In event service mode, the job definition is downloaded as normally but for this mode the pilot uses a special
module for executing the job (
RunJobEvent for normal grid sites,
RunJobHpcEvent on HPC:s). The job definition
only contains an initial set of parameters to launch the job. Information about which events to process are
downloaded separately (event ranges).
AthenaMP (ATLAS) is launched as a subprocess and wait until the pilot
sends an event range to it for processing. The pilot keeps downloading and sending event ranges until all event
ranges have been finished. When
AthenaMP has finished processing one event range, it sends a message to the
pilot with information about the output file (and file id, CPU and wall time info). An asynchronous stage-out thread
transfers the file to an object store. When all output files have been transfers (and the event service job has completed)
a merge job is triggered which merges all the output into a single file that is placed in a "normal" (i.e. non-object store) SE.
(See also
EventServiceOperations).
Special algorithms
- Job state, updates a special file that contains the current state of a job
- Job recovery, the ability to recover lost jobs
- Looping job killer, aborting hanging jobs
- FAX, retry mechanism for stage-in using FAX technology
Pilot errors
The full list of pilot error codes and texts can be found on the
PandaErrorCodes page.
Time-outs
The table contains a list of various time-outs either used in, or relevant for the pilot.
Time-out |
Limit |
Purpose |
Looping job |
12 h or maxCpuCount |
When a job has not modified its output files since its time limit, the pilot will kill the job and send a "looping job" error message to the job dispatcher. maxCpuCount is defined in the task request |
File transfer |
30 min (per try) |
In case of staging problems (hanging file transfer), the file copy will be aborted after the given time limit. An aborted file transfer can be recovered by the job recovery algorithm by a later pilot. |
Source |
5 min |
Execution of CVMFS test (checkValidity.sh) |
Source |
1000 s (500 s for paths on CVMFS) |
Various: cmtconfig verification which needs full asetup sourcing, python identification, cmtsite setup (deprecated) |
Source |
60 s |
Setup plus which MemoryMonitor (ND) |
tar and gzip |
55 min |
Tarring and zipping of log file |
Benchmark |
120 s |
Special benchmark test prior to downloading job |
Quotas
Space limits enforced by the pilot.
Name |
Limit |
Explanation |
Initial space |
5 GB |
Initial space limit before asking for a job. Checked before the payload is executed. |
Current space |
2 GB |
Limit of remaining local disk size during running. Normally checked every 10 minutes. |
Payload stdout |
2 GB |
Size limit of payload stdout size during running ([athena|prun]_stdout.txt). Normally checked every 10 minutes. |
User work directory |
7 GB |
Maximum size of user work area (<path to pilot work directory for user job>/workDir). Normally checked every 10 minutes. |
Max total input size |
14 GB or schedconfig.maxinputsize |
Total size of input files must be less than this limit. Verified once size of input files is known (i.e. after LFC query prior to stage-in). |
Input space |
Total size of input files |
The local available space needs to be at least as large as the total size of the input files. Checked prior to stage-in. |
Output limit |
500 GB |
Maximum allowed size of an output file. The current large limit means this limit is not effective. Can be put into use rapidly (requires pilot update). |
Max file size for CHIRP |
200 MB |
The maximum allowed size of an output file transported by the CHIRP site mover. |
Release comments
For the latest version information and release comments, please see the
Panda Pilot Blog
.
Alternative stage-outs
The pilot supports stage-out to alternative storage if the transfer to the primary storage fails. As of June 2016, the pilot can only do transfers from a T-2 to the T-1 of the cloud that the job originally came from. Turning on alternative stage-out for a queue is done by adding "allow_alt_stageout" (for transfers to normal storage) and "allow_alt_os_stageout" (for transfers to objectstore) to schedconfig.catchall. For transfers to normal storage, the pilot also supports server instructions: on (allow alternative storage transfers), off (do not allow alternative storage transfers), force (all transfers will go to alternative storage).
Warning: currently alternative stage-out to normal storage (i.e. non-objectstore) does not support transfer to the nucleus. It will be handled by the new site movers that are currently in development (June 2016).
Structure of a panda server update
The following node structure is sent to the Panda server by pilot.updatePandaServer(). A minimum heartbeat message would consist of 'jobId' and 'state'.
node['jobId'] = Panda job id, e.g. 11263391
node['state'] = current job state, e.g. running, finished, failed, holding
node['node'] = node name/modification host, e.g. lx64e54
node['siteName'] = site name, e.g. LRZ
node['timestamp'] = current time, e.g. 2008-05-10T16:05:04+-100 (local time, defined in pUtil.timeStamp())
node['schedulerID'] = job scheduler id, e.g. rod, lyon, gridui07
node['workdir'] = job work directory of the form [path]/Panda_Pilot_[pid]_[timestamp]/PandaJob_[jobId]_[timestamp2],
e.g. /lustre/uh35398/condorg_SlL11551/pilot3/Panda_Pilot_11571_1210428134/PandaJob_11263391_1210428316
if batchSystemType can be determined (currently Condor, PBS, Grid Engine, LSF, BQS):
node['pilotID'] = pilot id (see below)|batch system id (e.g. os.environ["clusterid"])|batch system type (e.g. Condor)
else:
node['pilotID'] = pilot id string, e.g. tp_gridui07_25222_20080512-091002_213
if there are log extracts and job has either failed or is holding:
node['pilotLog'] = log extracts string
if job has finished or failed or is holding:
node['pilotErrorDiag'] = detailed pilot error diagnostics string
if job has finished or failed:
if there are exeErrorDiag's:
node['exeErrorCode'] = exeErrorCode
node['exeErrorDiag'] = exeErrorDiag
else:
node['transExitCode'] = transExitCode from payload
if job has failed and exeErrorCode is different from transExitCode:
if there are log extracts:
mismatch = "MISMATCH | Trf error code mismatch: exeErrorCode = %d, transExitCode = %d" %\
(exeErrorCode, transExitCode)
node['pilotLog'] = mismatch + node['pilotLog']
node['pilotErrorCode'] = pilot error code as defined in PilotErrors.py
node['cpuConsumptionTime'] = cpuConsumptionTime
node['cpuConsumptionUnit'] = cpuConsumptionUnit + cpuModel
node['cpuConversionFactor'] = cpuConversionFactor
node['pilotTiming'] = timeGetJob|timeStageIn|timeExe|timeStageOut
elif job is holding:
node['exeErrorCode'] = pilot error code
node['exeErrorDiag'] = currently less detailed pilot error diags (PilotErrors.getPilotErrorDiag(pilot error code))
if a space report need to be sent (beginning of pilot):
node['remainingSpace'] = remaining space
node['messageLevel'] = message
if job is holding:
do not send xml for holding jobs
else:
if job has failed:
node['xml'] = xml for log files only
else:
node['xml'] = xml for a normal, finished job
if there is a metadata-[jobId].xml.ATHENA file (original xml, at end of job):
node['metaData'] = xml from athena
# make the actual update, repeatedly if necessary (for the final update)
ret = pUtil.makeHTTPUpdate(node)
Pilot components
The following are the main pilot modules.
- pilot; the main component, communicates with Panda server, retrieves and updates job information, monitors job execution
- RunJob, RunJobEvent (more soon); payload execution on normal grid sites and HPC:s, normal jobs and event service
- EventService; event service helper methods
- EventRanges; helper methods for event range handling
- ErrorDiagnosis; interpretation of payload output for diagnosing special errors
- FAXTools; helper functions for FAX usage
- pUtil; pilot utilities and helper functions
- Mover; pilot interface to the site movers (responsible for get/put). Loops over all input/output files and calls the relevant site mover per file
- SiteMover; main site mover class, all specific site movers inherit from this one. Specific site movers implements the copy tools
- SiteMoverFarm; copy tool definitions, etc
- environment; global variables
- DeferredStageout; stage-out of output files that can be used for job recovery and on HPC file transfer nodes
- Site; site information
- Job; pilot job definition
- JobLog, functions for log transfers
- TimerCommand; time-out class used by site movers
- PilotErrors; pilot error code definitions
- JobState, JobStateClient; classes for storing current job state, used by job recovery
- Node; worker node information
- Monitor; sub process (RunJob*) monitoring
- FileHandling; functions related to file handling
- glexec_*; modules for glExec (deprecated - no longer supported by ATLAS)
- PilotTCPServer; TCP server used for sending TCP messages between main pilot and sub processes (RunJob and RunJobEvent)
- PilotYamplServer; Yampl server used for sending yampl messages between RunJobEvent and sub process (AthenaMP)
- Experiment; classes for experiment specific code (ATLASExperiment, etc)
- SiteInformation; classes for site and experiment specific code (ATLASSiteInformation, etc)
Role of the pilot in Distributed Analysis
Distributed Analysis (DA) jobs are latency sensitive. Panda reduces this
latency by bypassing the usual obstacles of acquisition of storage and
Computing Element (CE) resources. This is achieved by pre-staging the
input data into the local CE storage, and with a late-binding of the job
payload with CPU slots using pilots. Currently 10% of the pilots are
allocated to DA users by the Job Scheduler. DA computing has a chaotic
behavior, so there is no guarantee of a steady stream of DA pilots to the
Panda server. Problems would occur when long-running production jobs, with
a typical wall time of several days, occupies all available CPUs. No new
pilot requests would arrive to the Job Dispatcher and any pre-slot
allocation would not work. This does not meet the demands of an analysis
job since they require a low latency. To address these difficulties, we
have two alternative solutions that are currently being implemented in the
Panda system. One solution has the pilot running in multitasking mode,
serving production jobs at the same time as user analysis jobs, as
described above. The other involves defining special queues on the batch
system for running different jobs. The jobs asking for an especially low
latency have to be short, while jobs with less priority can be run for a
longer time. It thus seems natural to define short and long queues to
which the users can submit their jobs.
These problems however require a study of the chaotic user behaviors and
demands, and weigh that against the needs of the production system. We
need to study which of the alternatives will work best in practice.
Possibly using a combination of the two will be the optimum choice, since
implementing new queues might not be possible on all sites. Another
drawback is when the queues become idle when users do not send enough DA
jobs. A solution might be to define virtual short queues under the pilot
pool which would provide extra slots where two analysis jobs could run in
parallel. Both jobs would be slowed down compared to sequential running
and there is a potential risk of overloading the worker node due to the
high memory usage of ATLAS jobs.
Direct access vs. stage-in mode
The pilot has the ability of skipping the input file
transfer stage and leave the data access to runAthena/runGen. The direct access mode is
selected by setting special schedconfig variables.
New style:
direct_access_lan = True/False
direct_access_wan = True/False
The former (direct_access_lan) is sufficient to turn direct access on or off. If the latter (direct_access_wan) is True, it means
that direct access is allowed over WAN. If a site does NOT want to use any type of direct access, both of these variables have to be set to False.
Direct access of WAN is done with the geoip-sorting capability of ruche's list_replicas() function. The first replica is chosen, i.e. the nearest one.
Direct access for user jobs is activated for the entire queue by setting direct_access_lan = True.
Direct access for production jobs is activated on the task level by setting allowInputLAN = ‘only’. Pilot then receives transferType = ‘direct’ and uses direct access if direct_access_lan/wan = True.
Metadata files produced or processed by the pilot
metadata.xml
: Metadata created by the trf/athena. Renamed in runJob moveTrfMetadata() to metadata-<jobId>.xml.PAYLOAD. Located in the job work directory.
metadata-<jobId>.xml
: Metadata containing output file information (LFN, GUID, file size and checksum), created by runJob createFileMetadata(). Log file information is added to this file after the log has been created. Copied to the site work directory by runJob cleanup().
OutputFiles-<jobId>.xml
: Special XML file for Nordugrid and CERNVM. The file is converted from metadata-<jobId>.xml by
RunJobUtilities convertMetadata4NG().
OutPutFileCatalog.xml
: Metadata for output file transfer and registration, created by runJob stageOut(). Used by the site mover.
Pilot testing: The Release Candidate Test Framework
Before a new pilot is released, it is tested in the Release Candidate (RC) Test Framework, using special
HammerCloud test jobs that are only picked up by RC pilots. Once a development version of the pilot is stable enough for more thorough testing on the grid, it is released into the RC Test Framework by setting the pilot_version_tag = 'RC' (currently in pilot.py), and calling the source tarball 'pilotcode-rc.tar.gz'. Currently the source location must be in Paul's private www area (assumed by the pilot factories). Progress of RC testing can be followed from the special
PanDA monitor RC page
.
Generic PanDA Pilot
The aim of the Generic
PanDA Pilot project is to refactor the ATLAS specific
PanDA pilot to a version that is not bound to ATLAS or any other experiment. Code that is specific to a given experiment will be placed in plug-in classes (currently there are two). An experiment that wants to use
PanDA and the
PanDA Pilot, should only have to implement their own plug-ins. The current status and documentation is located
here.
For a standard grid job, the pilot's Monitor module forks a subprocess and monitors it for the duration of the job. The subprocess,
RunJob (formerly called 'runJob') is responsible for determining the payload setup, perform any stage-ins, execute the payload and stage-out its output at end of the job. To facilitate the introduction of other subprocess modules with different workflows,
RunJob has been refactored into a class and has the same plug-in structure as the Experiment and
SiteInformation classes. Knowing the type of subprocess (e.g. 'standard', 'hpc', 'titan', 'eventservice', etc) a factory is used to return the proper sub class object.
Middleware used by the PanDA Pilot
The following is a list of the currently used middleware in the pilot.
lcg-cp, rfcp, chirp (deprecated), gfal-copy, gfal-mkdir, gfal-sum, gfal-ls, gfal-rm, globus-url-copy (deprecated), aria2c, dccp, xrdcp, arcproxy, voms-proxy-info, voms-proxy-init, grid-proxy-info, lcg-getturls (deprecated), lcg-cr (deprecated), lcg-get-checksum, lcg-ls, lcg-del, lcg-gt, lcg-sd, lcg-rf (deprecated), pacman (deprecated).
Q&A
Q: Why are pilots failing jobs with "Required CMTCONFIG (required cmtconfig not set) incompatible with that of local system (i686-slc4-gcc34-opt)"?
A: The schedconfig field
cmtconfig has not been set. Check the queue info page, and also make sure that the site name is what you think it is.. (E.g. jobs are sent to ANALY_SLAC-lsf, but the cmtconfig field has only been set for site ANALY_SLAC).
Q: Why is it necessary to set the sched config cmtconfig field?
A: The variable was introduced as a protective measure since some sites have both SLC3 and SLC4. Release 14 jobs will only run on SLC4.
Q: What does it mean that a job has lost its heartbeat?
A: When the pilot executes a payload, it monitors the job and sends an update to the Panda server every 30 minutes to let it know that the job is still running. If for some reason the pilot loses the contact with the server the job will be timed out after six hours, i.e. it will be considered to have lost its heartbeat. There are several reasons why this can happen; e.g. a WN power cut or reboot, or if the batch system kills the job too hard (SIGKILL) or too quickly (SIGTERM followed too soon by SIGKILL), or if the pilot itself has an internal failure.
Q: If there are several replicas available, which one will the pilot choose for transfer?
A: The pilot sorts the replica list in the following way:
replicas = _replicas_hotdisk + _replicas_mcdisk_datadisk + _replicas_disk + _replicas_tape
unless you have specified a special space token the file should be copied from (in which case a tape replica will be the primary choice). After the initial sorting, the pilot will choose the replicas from the list above according to what's in schedconfig.copyprefix. If there are several replicas in the matched list, the first replica in the list will be chosen. Note: only DBRelease replicas are randomized.
Q: From which version is the pilot setting up jobs with asetup instead of setup?
A: 16.1.0.
Q: At the end of a job the pilot makes a final status update to the
PanDA server. What happens if the server is not reachable at that time?
A: If the pilot cannot make the final update, the job will eventually become a 'lost heartbeat' job, i.e. the server has lost contact with it. The pilot will make ten attempts, each separated by a two minute pause, to send the final update.
Q: Which are the timings in pilotTiming?
A: Job execution time (including all necessary setup source commands and the pathena/prun/trf command), how long the job definition download took, stage-in time, how long it took to figure out how to setup the job (including test sourcings of setup scripts), and the stage-out time. E.g.
2|76|401|128|37 execute: 6.7 mins getJob: 2 secs stagein: 1.3 mins setup: 37 secs stageout: 2.1 mins
Q: How can I test a change to the schedconfig DB without disrupting production?
A: For a production job, add the following command to the job parameters: "--overwriteQueuedata={field1=value1,field2=value2,..}". For a user analysis job, the corresponding prun command is: "--queueData={field1=value1,field2=value2,..}". If a field should be reset to an empty string, leave the value blank, i.e. use "field=". Using these commands will not affect the schedconfig DB itself, and are only used by the pilot for the job(s) in question.
Links
Publications, posters and presentations
- Experience from a pilot based system for ATLAS, CHEP 2007 (slides)
- Experience from a pilot based system for ATLAS, CHEP 2007 (letter)
- The PanDA System in the ATLAS Experiment, ACAT 2008 (slides)
- The PanDA System in the ATLAS Experiment, ACAT 2008 (paper)
- Distributed Data Analysis in ATLAS, ICCMSE 2009 (slides)
- Distributed Data Analysis in ATLAS, ICCMSE 2009 (letter)
- Distributed Analysis and Error Debugging, UTA Jamboree 2010 (slides)
- The ATLAS PanDA Pilot in Operation, CHEP 2010 (poster)
- The ATLAS PanDA Pilot in Operation, CHEP 2010 (paper)
- PanDA Pilot Ideas, TIM Annecy 2012 (slides)
- Improvements in the ATLAS PanDA Pilot, CHEP 2012 (poster)
- Improvements in the ATLAS PanDA Pilot, CHEP 2012 (paper)
- Next Generation PanDA Pilot for ATLAS and Other Experiments, CHEP 2013 (paper)
- Next Generation PanDA Pilot for ATLAS and Other Experiments, CHEP 2013 (poster)
- Extending ATLAS Computing to Commercial Clouds and Supercomputers, ISGC 2014 (paper)
- Extending ATLAS Computing to Commercial Clouds and Supercomputers, ISGC 2014 (slides)
- Event Service in the Pilot, TIM Berkeley 2014 (slides)
- Event Service Pilot Plans, TIM Berkeley 2014 (slides)
- HPC Pilot, TIM Berkeley 2014 (slides)
Major updates:
--
PaulNilsson - 05 Oct 2006
--
PaulNilsson - 12 May 2008
Responsible: PaulNilsson