JEDI (Job Execution and Definition Interface)
Introduction
JEDI is an intelligent component in the panda server to have capability for task-level workload management.
Main goals to develop JEDI are to make the panda system more task-oriented and to integrate prodDB and
PandaDB.
Main concepts
Work queues
JEDI works with
work queues, e.g., submits tasks per work queue.
Tasks are mapped to work queues based on their attributes, such as processingType,
working Group, prodSourceLabel, and coreCount.
Each task belongs to only one work queue and all jobs in the task
belong to the work queue.
There are two kinds of work queues,
normal queue and
partition queue.
Each normal queue has own selection criteria. If tasks pass the
selection criteria, those tasks belong to the queue. For example,
JEDI submits tasks in the queue together.
On the other hand, partition queues themselves don't have selection criteria.
One partition queue is composed of one or more sub queues. Each sub queue
has selection criteria. If tasks pass the selection criteria of the sub queue,
those tasks belong to the sub queue in addition to the partition queue. JEDI
works with the partition queue, e.g., tasks in the partition queue are
submitted together regardless of sub queues.
Partitions are useful for nested definitions of shares, as described in the next
section.
Work queues have the
order attribute. Each queue is evaluated before other queues which have higher order values.
If a task passes the selection criteria of a queue, it is mapped to the queue and no more queues are evaluated.
Shares
Each normal or partition queue has a share to define how many CPU resources are
allocated to the queue.
The share of a partition queue is divided to sub queues in the partition.
If work queues have active tasks, they are regarded as active queues and
CPU resources are distributed among them.
This means that CPU resources which are originally allocated to a queue
can be used by other queues if the queue doesn't have active tasks.
If queues have the
stretchable attribute, they are given priority in getting unused CPUs.
If there is more than one queue with the stretchable attribute, unused CPUs are distributed among them
according on their original shares.
On the other hand, all queues get unused CPUs according to their original shares if there is no queue with the stretchable attribute.
Task identifier
JEDITASKID is the unique task identifier in JEDI. It comes from ATLAS_DEFT.PRODSYS2_TASK_ID_SEQ independently of T_TASK_REQUEST.REQID.
Until DEFT is ready task requests come from AKTR T_TASK_REQUEST.REQID is stored in JEDI_TASKS.REQID which is mapped to job.TASKID.
Dataset attributes
Each dataset can define some attributes which give hints to JEDI for splitting and brokerage.
- repeat
- nosplit
- manytime
- cvmfs
- ratio
Pseudo datasets and files
Some job attributes, such as random seeds and serial numbers for no-input tasks, are internally managed in JEDI by using separate file records.
The main reason is for book-keeping. Their dataset or file names have the prefix of "pseudo_" and can be ignored from user's point of view.
Transition of task status
Task status changes as shown in the following chart.
- registered
- the task information is inserted to the JEDI_Tasks table
- defined
- all task parameters are properly defined
- assigning
- the task brokerage is assigning the task to a cloud
- ready
- the task is ready to generate jobs. New jobs are generated once computing resources become available.
- pending
- the task has a temporary problem, e.g. there is no sites to assign jobs.
- scouting
- the task is running scout jobs to collect job data
- scouted
- all scout jobs were successfully finished
- running
- the task is running jobs
- prepared
- the task is ready to go to a final status
- done
- all inputs of the task were successfully processed
- failed
- all inputs of the task were failed
- finished
- some inputs of the task were successfully processed but others were failed or not processed since the task was terminated
- aborting
- the task is being killed
- aborted
- the task is killed
- finishing
- the task is forced to get finished
- topreprocess
- preprocess job is ready for the task
- preprocessing
- preprocess job is running for the task
- tobroken
- the task is going to broken
- broken
- the task is broken, e.g., the task definition is wrong
- toretry
- the retry command was received for the task
- toincexec
- the incexec command was received for the task
- rerefine
- task parameters are going to be changed for incremental execution
- paused
- the task is paused and doesn't do anything until it is resumed
- throttled
- the task is throttled to generate jobs since the largest attemptNr in the task is a multiple of 5. The throttled period is
120 x int(max(attemptNr)/5)**2
minites
- exhausted
- For production tasks, all reattempts were done but some inputs were still unsuccessful, and the task is configured to go to this state instead of finished to wait for manual intervention or timeout of 10 days. Also, if cpuTime of scouts jobs > 2x cpuTime of task definition, the task goes to exhausted. For analysis tasks, if there are more than 5 scout jobs with short execution time (< 4 min) and more than 1000 jobs are expected, they will go to exhausted to prevent automatic avalanche since so many short jobs are problematic at grid sites. Analysis tasks in exhausted state can be retried using pbook but it is better to change some parameters like nFilesPerJob to have longer execution time. Also both production and analysis tasks will go to exhausted if they are timed-out while pending in the brokerage.
- passed
- the task is ready to go to a final state after manual intervention or timeout
Transition of dataset status
Dataset status changes as shown in the following charts.
Input
- defined
- the dataset information is inserted to the JEDI_Datasets table
- toupdate
- the dataset information needs to be updated
- pending
- the dataset is temporally unavailable
- broken
- the dataset is permanently unavailable
- ready
- the dataset is ready to be used
- done
- all files in the dataset were processed
Output
- defined
- the dataset information is inserted to the JEDI_Datasets table
- ready
- the dataset is ready in DDM
- running
- files are being added to the dataset
- prepared
- the dataset is ready for post-processing
- done
- the final status
Transition of file status
File status changes as shown in the following charts.
Input
- ready
- the file information is correctly retrieved from DDM and is inserted to the JEDI_Dataset_Contents table
- missing
- the file is missing in the cloud/site where corresponding task is assigned
- lost
- the file was available in the previous lookup but is now unavailable
- broken
- the file is corrupted
- picked
- the file is picked up to generate jobs
- running
- one or more jobs are using the file
- finished
- the file was successfully used
- failed
- the file was tried multiple times but not succeeded
- partial
- the file was split at event-level and some of event chunks were successfully finished
Output
- defined
- the file information is inserted to the JEDI_Dataset_Contents table
- running
- the file is being produced
- prepared
- the file is produced
- merging
- the file is being merged
- finished
- the file was successfully processed
- failed
- the file was not produced or failed to be merged
Transition of event status
Event status changes as shown in the following chart.
- ready
- ready to be processed
- sent
- sent to the pilot
- running
- being processed on WN
- finished
- successfully processed and the corresponding ES job is still running
- cancelled
- the ES job was killed before the even range was successfully processed
- discarded
- the ES job was killed in the merging state after the event range had finished
- done
- successfully processed and waiting to be merged. The corresponding ES job went to a final job status.
- failed
- failed to be processed
- fatal
- failed with a fatal error or attempt number reached the max
- merged
- the corresponding ES merge job successfully finished
- corrupted
- the event is flagged as corrupted in order to be re-processed since corresponding zip file is problematic
Database design for JEDI
Here is an ER diagram for JEDI. It is assumed that current
Job,
File, and
Dataset tables are basically reused although several columns are added and tables themselves may be further optimized in the future.
The idea is to implement functions of JEDI on top of the current system while avoiding huge service disruptions.
Once the whole system and clients migrate to the new task-oriented scheme,
Job,
File, and
Dataset tables may be refactored.
The
Task table and
Dataset table may be shared with DEFT (see
Object Models for ProdSys II).
JEDI works with queues, eg., it counts the number of jobs per queue to estimate workload in the queue. Tasks/jobs are mapped to one of queues based on procesingType, workingGroup, and so on. Queues are configurable and flexible, so that they are defined in the
WorkQueue table.
Notes on database design
- JEDI has custody of the actual task/dataset tables which carry content, while DEFT will keep references to objects in its own (small) database
- This page describes communication and interaction between DEFT and JEDI through the database tables
- The Dataset table contains one record per task-dataset-type. E.g. when one task uses multiple input datasets and produces multiple output datasets, there could be (task-dataset1-input), (task-dataset2-input), ..., and (task-dataset3-output), (task-dataset4-output)...
- The Dataset table contains datasets but not containers. When a task is submitted with an input container, datasets in the container are inserted, which would make incremental exection of task easier when new datasets are added to the container
- The Task, Dataset, and Contents tables could be partitioned by taskID
Table schemas
All of they have the
JEDI_
prefix in table names for now. Note that they are not final versions.
The JEDI_Tasks table
Description: Table for hosting all production and analysis. tasks.
Name |
Type |
Comment |
JEDITASKID |
NOT NULL NUMBER(9) |
The task identifier coming from ATLAS_DEFT.PRODSYS2_TASK_ID_SEQ |
CREATIONDATE |
NOT NULL DATE |
Set when the record is inserted at the first time |
MODIFICATIONTIME |
NOT NULL DATE |
Set when the record is updated |
STARTTIME |
DATE |
Set when the task gets started |
ENDTIME |
DATE |
Set when the task is finished |
FROZENTIME |
DATE |
Set when the record is frozen |
PRODSOURCELABEL |
VARCHAR2(20) |
The source label, such as managed, user, prod_test, ... |
TASKNAME |
VARCHAR2(132) |
The task name |
USERNAME |
VARCHAR2(128) |
The name of the user who owns the task |
WORKINGGROUP |
VARCHAR2(32) |
The name of the working group which owns the task |
VO |
VARCHAR2(16) |
The name of virtual organization which owns the task |
CORECOUNT |
NUMBER(9) |
The number of cores per job. Set 0 if it can vary |
TASKTYPE |
VARCHAR2(64) |
The task type |
PROCESSINGTYPE |
VARCHAR2(64) |
The type of the process |
TASKPRIORITY |
NUMBER(9) |
The assigned priority of the task |
CURRENTPRIORITY |
NUMBER(9) |
The current priority of the task. JEDI could increase/decrease priorities if necessary |
STATUS |
NOT NULL VARCHAR2(64) |
The task status |
OLDSTATUS |
VARCHAR2(64) |
Old task status |
ARCHITECTURE |
VARCHAR2(256) |
The architecture on which the task runs. Eg, $CMTCONFIG |
TRANSUSES |
VARCHAR2(64) |
The name of the software release |
TRANSHOME |
VARCHAR2(128) |
The name of the software cache in which the transformation is included |
TRANSPATH |
VARCHAR2(128) |
The name of the transformation |
LOCKEDBY |
VARCHAR2(40) |
The name of process/thread which is taking care of the record |
LOCKEDTIME |
DATE |
Set when the record is locked |
TERMCONDITION |
VARCHAR2(100) |
Termination condition of the task. Eg, to terminate task when 90% of files are successfully processed |
SPLITRULE |
VARCHAR2(100) |
Rules for job splitting. Eg, to use event-level splitting or not, to limit the number of files per job, etc |
WALLTIME |
NUMBER(9) |
average walltime consumption measured by scouts when processing 1MB of input |
WALLTIMEUNIT |
VARCHAR2(32) |
unit of WALLTIME |
OUTDISKCOUNT |
NUMBER(9) |
average total size of outputs measured by scouts when processing 1MB of input |
OUTDISKUNIT |
VARCHAR2(32) |
unit of OUTDISKCOUNT |
WORKDISKCOUNT |
NUMBER(9) |
average size of work directory measured by scouts |
WORKDISKUNIT |
VARCHAR2(32) |
unit of WORKDISKCOUNT |
RAMCOUNT |
NUMBER(9) |
average memory consumption measured by scouts |
RAMUNIT |
VARCHAR2(32) |
unit of RAMCOUNT |
IOINTENSITY |
NUMBER(9) |
The characteristics of I/O patterns measured by scouts. (inSize+outSize)/wallTime/nCore |
IOINTENSITYUNIT |
VARCHAR2(32) |
unit of IOINTENSITY |
WORKQUEUE_ID |
NUMBER(5) |
The work queue identifier to which the task belongs |
PROGRESS |
NUMBER(3) |
Percentage of completion of the task |
FAILURERATE |
NUMBER(3) |
The frequency of job failures in the task (in %) |
ERRORDIALOG |
VARCHAR2(255) |
Error diagnostics |
CLOUD |
VARCHAR2(10) |
The cloud where the task is assigned |
SITE |
VARCHAR2(60) |
The site where the task is assigned. Set NULL to use the normal brokerage |
REQID |
NUMBER(9) |
See this section |
COUNTRYGROUP |
VARCHAR2(20) |
The name of the country group to which the user belongs |
PARENT_TID |
NUMBER(12) |
The jediTaskID of the parent task. PARENT_TID=JEDITASKID if no parent |
EVENTSERVICE |
NUMBER(1) |
The task uses Event Service if 1 |
TICKETID |
VARCHAR2(50) |
The identifier of the ticket to keep track of the task |
TICKETSYSTEMTYPE |
VARCHAR2(16) |
The type of the ticket system |
STATECHANGETIME |
DATE |
Updated when the task status is changed |
SUPERSTATUS |
VARCHAR2(64) |
The super status (DEFT status) of the task which is explained in DEFT task status |
CAMPAIGN |
VARCHAR2(32) |
Campaign to which the task belongs |
MERGERAMCOUNT |
NUMBER(9) |
average memory consumption for merge jobs measured by scouts |
MERGERAMUNIT |
VARCHAR2(32) |
unit of MERGERAMCOUNT |
MERGEWALLTIME |
NUMBER(9) |
average walltime consumption for merge jobs measured by scouts when processing 1MB of input |
MERGEWALLTIMEUNIT |
VARCHAR2(32) |
unit of MERGEWALLTIME |
THROTTLEDTIME |
DATE |
Updated when the task is throttled |
NUMTHROTTLED |
NUMBER(3) |
How many times the task was throttled so far |
MERGECORECOUNT |
NUMBER(9) |
The number of cores per merge job. Set 0 if it can vary |
GOAL |
NUMBER(4) |
Goal of the task completion in percentage multiplied by 10 |
ASSESSMENTTIME |
DATE |
Set when statistics is updated |
CPUTIME |
NUMBER(9) |
The product of HS06 score and CPU consumption time per event |
CPUTIMEUNIT |
VARCHAR2(32) |
Unit of CPUTIMECOUNT (HS06sPerEvent) |
CPUEFFICIENCY |
NUMBER(3) |
Percentage of single core efficiency |
BASEWALLTIME |
NUMBER(9) |
Offset of waltime in sec |
AMIFLAG |
VARCHAR2(10) |
Set to READY when the task is done or finished, so that AMI is aware of task completion |
NUCLEUS |
VARCHAR2(52) |
Name of the site where the task is assigned in WORLD cloud |
BASERAMCOUNT |
NUMBER(9) |
Shared memory size in MB |
RESCUETIME |
DATE |
Update when rescue process checks the task |
REQUESTTYPE |
VARCHAR2(32) |
type of the request |
USEJUMBO |
CHAR(1) |
Set Y when using jumbo jobs |
DISKIO |
NUMBER(9) |
Local disk access measured by scouts. (totWBytes+totRBytes)/((endTime-startTime) |
DISKIOUNIT |
VARCHAR2(32) |
Unit of DISKIO |
Typical usages
- * Get N production tasks to be processed in a queue based on priority. Queues are defined in the JEDI_WorkQueue table
-
SELECT * FROM (SELECT * FROM JEDI_Task WHERE STATUS
= ??? AND WORKINGGROUP IN (...) AND PROCESSINGTYPE IN (...) AND VO
= ??? AND TASKTYPE
= ??? ORDER BY CURRENTPRIORITY DESC,TASKID) WHERE ROWNUM < ???;
- * Get the list of tasks to be amended
-
SELECT TASKID FROM JEDI_Task WHERE COMMAND IS NOT NULL;
- * Get the list of users who have tasks to be processed
-
SELECT USERNAME,VO FROM JEDI_Task WHERE STATUS
= ??? AND TASKTYPE
= ???;
- * Get N analysis tasks for a user to process them
-
SELECT * FROM (SELECT ??? FROM JEDI_Task WHERE STATUS
= ??? AND USERNAME
= ??? AND VO
= ??? AND TASKTYPE
= ??? ORDER BY CURRENTPRIORITY DESC,TASKID) WHERE ROWNUM < ???;
- * Find slow or delayed tasks
-
SELECT ??? FROM JEDI_Task WHERE STATUS
= ??? AND TASKTYPE
= ??? AND CREATIONDATE < ???;
- * Find hung-up processes
-
SELECT ??? FROM JEDI_Task WHERE LOCKEDBY IS NOT NULL AND LOCKEDTIME < ???;
The JEDI_TaskParams Table
Description: Table with task parameters. Each task has own parameters which are described in a single row.
Name |
Type |
Comment |
JEDITASKID |
NOT NULL NUMBER(11) |
The task identifier coming from the task table |
TASKPARAMS |
CLOB |
Special task parameters. Eg, the list of lost files for a lost-file recovery task |
The JEDI_JobParams_Template Table
Description: Table to define how job parameters are generated for each task.
Name |
Type |
Comment |
TASKID |
NOT NULL NUMBER(11) |
The task identifier coming from the task table |
JOBPARAMSTEMPLATE |
CLOB |
The template to generate job parameters |
The JEDI_Datasets table
Description: Table to describe datasets and their relations with tasks. The same dataset name may appear in multiple rows since it may be used as input or output by multiple tasks.
Name |
Type |
Comment |
DATASETID |
NOT NULL NUMBER(11) |
auto-incremented id of the dataset generated from a SEQ |
DATASETNAME |
NOT NULL VARCHAR2(255) |
The dataset name |
CONTAINERNAME |
VARCHAR2(132) |
The name of the container to which the dataset belongs. Set NULL when the dataset is not used as a part of a container |
JEDITASKID |
NOT NULL NUMBER(11) |
The task identifier coming from the task table |
MASTERID |
NUMBER(11) |
Set the DATASETID of the master dataset if the dataset is used as a secondary dataset of the master. Otherwise, set NULL |
PROVENANCEID |
NUMBER(11) |
The DATASETID of the input dataset to which the output dataset is associated. Set NULL for input datasets |
STATE |
VARCHAR2(20) |
The dataset state of the dataset when it was checked with DDM |
STATECEHCKTIME |
DATE |
Set when the dataset state is checked with DDM |
STATECHECKEXPIRATION |
DATE |
The date when the dataset state check is over. Set NULL if the state check is not required any more |
NFILES |
NUMBER(10) |
The total number of files in the dataset |
NFILESTOBEUSED |
NUMBER(10) |
The number of files to be used |
NFILESUSED |
NUMBER(10) |
The number of files used so far |
NFILESFINISHED |
NUMBER(10) |
The number of files successfully finished so far |
NFILESFAILED |
NUMBER(10) |
The number of files failed so far |
NFILESONHOLD |
NUMBER(10) |
The number of files on hold |
NEVENTS |
NUMBER(11) |
The total number of events in the dataset. Set NULL unless this info is required |
NEVENTSTOBEUSED |
NUMBER(11) |
The number of events to be use. Set NULL unless this info is required |
NEVENTSUSED |
NUMBER(11) |
The number of events used so far. Set NULL unless this info is required |
STATUS |
VARCHAR2(20) |
Show how the dataset is currently being used in Panda |
CREATIONTIME |
NOT NULL DATE |
Set when the record is inserted at the first time |
MODIFICATIONTIME |
NOT NULL DATE |
Set when the record is updated |
FROZENTIME |
DATE |
Set when the record is frozen |
TYPE |
NOT NULL VARCHAR2(20) |
The type of the dataset |
VO |
VARCHAR2(16) |
The name of virtual organization which owns the dataset |
CLOUD |
VARCHAR2(10) |
The replica in the cloud is used. Set manually or by the task brokerage |
SITE |
VARCHAR2(60) |
The replica at the site is used. Set NULL to use the normal brokerage |
LOCKEDBY |
VARCHAR2(40) |
The name of process/thread which is taking care of the record |
LOCKEDTIME |
DATE |
Set when the record is locked |
ATTRIBUTES |
VARCHAR2(100) |
describes how the dataset is split. e,g, the ratio to the number of master files, no-split, repeat, etc |
STREAMNAME |
VARCHAR2(20) |
The name of stream for input files which is used as a placeholder in jobParamsTemplate. For output, STREAMNAME in JEDI_Output_Template is used |
STORAGETOKEN |
VARCHAR2(100) |
The token in the storage element where input files are available or output files are put during the task is running |
DESTINATION |
VARCHAR2(60) |
The final destination where the dataset is transferred once the task is finished |
TEMPLATEID |
NUMBER(11) |
The DATASETID of the template dataset from which the dataset inherits |
NFILESWAITING |
NUMBER(10) |
The number of files waiting for real co-jumbo jobs |
Typical usages
- * Get datasets for a task
-
SELECT DATASETNAME,TYPE FROM JEDI_Dataset WHERE TASKID =
???;
- * Get datasets to be processed in a task
-
SELECT DATASETNAME FROM JEDI_Dataset WHERE TASKID =
??? AND STATUS =
??? AND NFILESTOBEUSED > NFILESUSED AND TYPE =
???;
- * Get a task which uses or produces a dataset
-
SELECT TASKID FROM JEDI_Dataset WHERE DATASETNAME =
??? AND TYPE =
???;
- * Get tasks which contribute to a container
-
SELECT TASKID FROM JEDI_Dataset WHERE CONTAINER =
??? AND TYPE =
???;
- * Get datasets to update the dataset state
-
SELECT DATASETNAME FROM JEDI_Dataset WHERE STATECEHCKDATE < ??? AND STATECHECKEXPIRATION IS NOT NULL;
- * Update a dataset
-
UPDATE JEDI_Dataset SET ??? WHERE DATASETID =
???;
- * Update datasets in a task
-
UPDATE JEDI_Dataset SET ??? WHERE TASKID =
???;
- * Find hung-up processes
-
SELECT ??? FROM JEDI_Dataset WHERE LOCKEDBY IS NOT NULL AND LOCKEDTIME<???;
The JEDI_Dataset_Contents table
Description: Table to describe files and how they are used by tasks. The same LFN may appear in multiple rows since it may be used by multiple tasks. Basically only LFN is mandatory but some metadata of each file are stored in the table to avoid frequent lookup to DDM, such as file size and checksum.
Name |
Type |
Comment |
FILEID |
NOT NULL NUMBER(11) |
auto-incremented ID of the file generated from a SEQ |
DATASETID |
NOT NULL NUMBER(11) |
The dataset identifier coming from the dataset table |
JEDITASKID |
NOT NULL NUMBER(11) |
The task identifier coming from the task table |
CREATIONDATE |
NOT NULL DATE |
Set when the record is inserted at the first time |
LASTATTEMPTTIME |
DATE |
Set when the file is tried |
LFN |
NOT NULL VARCHAR2(256) |
The logical filename |
GUID |
VARCHAR2(64) |
The GUID of the file |
TYPE |
NOT NULL VARCHAR2(20) |
The type of the file |
FSIZE |
NUMBER(11) |
The size of the file |
CHECKSUM |
VARCHAR2(36) |
The checksum of the file |
SCOPE |
VARCHAR2(30) |
The scope of the file |
STATUS |
NOT NULL VARCHAR2(64) |
The status of the file |
ATTEMPTNR |
NUMBER(3) |
How many times the file has been tried so far |
MAXATTEMPT |
NUMBER(3) |
How many times the file can be tried at most |
NEVENTS |
NUMBER(10) |
The number of events in the file |
KEEPTRACK |
NUMBER(1) |
Set 1 when keeping track of the file usage |
STARTEVENT |
NUMBER(10) |
The starting serial number used in the file |
ENDEVENT |
NUMBER(10) |
The ending serial number used in the file |
FIRSTEVENT |
NUMBER(10) |
The event number which is assigned to the first processed event |
BOUNDARYID |
NUMBER(11) |
Splitting Input to respect this identifier if not NULL. e.g., used to specify lumi block boundaries |
PANDAID |
NUMBER(11) |
PandaID of the job which uses the file |
FAILEDATTEMPT |
NUMBER(3) |
How many times the file failed so far |
LUMIBLOCKNR |
NUMBER(10) |
Lumiblock Number in the file |
OUTPANDAID |
NUMBER(11) |
PandaID of the job which produced the file |
MAXFAILURE |
NUMBER(3) |
How many times the file can be failed at most |
IS_WAITING |
CHAR(1) |
Set Y if waiting for real co-jumbo jobs |
JOBSETID |
NUMBER(11) |
jobSetID of the job which uses the file |
PROC_STATUS |
VARCHAR2(64) |
Processing status of the file |
Typical usages
- * Get task and dataset which produced a file
-
SELECT TASKID,DATASETID FROM JEDI_Contents WHERE LFN =
??? AND SCOPE =
??? AND TYPE =
???;
- * Get the list of files in a task
-
SELECT FILEID FROM JEDI_Contents WHERE TASKID =
??? AND TYPE =
???;
- * Get the list of files in a particular state and type with a task and a dataset
-
SELECT FILEID FROM JEDI_Contents WHERE TASKID =
??? AND DATASET =
??? AND TYPE =
??? AND STATUS =
???;
- * Get the list of files to be processed
-
SELECT FILEID FROM JEDI_Contents WHERE TASKID =
??? AND DATASET =
??? AND TYPE =
??? AND STATUS =
??? AND ATTEMPTNR<MAXATTEMPT;
- * Update a file
-
UPDATE JEDI_Contents SET ??? WHERE FILETID =
???;
- * Update files in a dataset
-
UPDATE JEDI_Contents SET SET ??? WHERE DATASETID =
???;
The JEDI_Output_Template table
Description: Table to define how output filename is generated for each output dataset. One production dataset has a single row while one analysis dataset may have multiple rows.
Name |
Type |
Comment |
OUTTEMPID |
NOT NULL NUMBER(11) |
auto-incremented ID of the template generated from a SEQ |
JEDITASKID |
NOT NULL NUMBER(11) |
The task identifier coming from the task table |
DATASETID |
NOT NULL NUMBER(11) |
The dataset identifier coming from the dataset table |
FILENAMETEMPLATE |
NOT NULL VARCHAR2(256) |
The template to generate output filenames |
SOURCENAME |
VARCHAR2(256) |
source filename to be renamed, if any. Set NULL if renaming is unnecessary |
SERIALNR |
NUMBER(7) |
The serial number to generate files using the template. When a new file is produced it would have SERIALNR+1 and this field is incremented |
MAXSERIALNR |
NUMBER(7) |
The maximum serial number which can be used to generate files using the template |
STREAMNAME |
VARCHAR2(20) |
The name of the stream which is used as a placeholder in jobParamsTemplate |
OUTTYPE |
VARCHAR2(20) |
The type of the file |
Possible example
- * FILENAMETEMPLATE =
EVNT.01206750._${SN}.pool.root
, SOURCENAME = event.pool.root
- An output file, event.pool.root, is renamed to EVNT.01206750._XYZ.pool.root
The JEDI_Work_Queue table
Description: Table to define partitions and shares of workload.
Name |
Type |
Comment |
QUEUE_ID |
NOT NULL NUMBER(5) |
unique identifier |
QUEUE_NAME |
NOT NULL VARCHAR2(16) |
The name of the queue |
QUEUE_TYPE |
NOT NULL VARCHAR2(16) |
The type of the queue which is mapped to prodSourceLabel in JEDI_Tasks and jobs*Table |
VO |
NOT NULL VARCHAR2(16) |
The name of virtual organization for which the queue is used |
QUEUE_SHARE |
NUMBER(3) |
Share for the queue. Set NULL if the queue is not throttled |
QUEUE_ORDER |
UNIQUE NUMBER(3) |
The queue is evaluated before other queues which have higher ORDER values. If a task or a job passes the selection criteria, it is mapped to the queue and no more queues are evaluated |
CRITERIA |
VARCHAR2(256) |
Selection criteria written in SQL to map tasks/jobs to the queue. It is applied to JEDI_Tasks |
VARIABLES |
VARCHAR2(256) |
Valid values to replace bind-variables in CRITERIA |
PARTITIONID |
NUMBER(5) |
The QUEUE_ID of the partition to which the queue belong. Set NULL if the queue doesn't belong to any partition. Set -1 if the record stands for a partition |
STRETCHABLE |
NUMBER(1) |
Set 1 if the queue can use shares of other queues when those queues don't have tasks/jobs |
STATUS |
VARCHAR2(64) |
The queue status |
Possible examples
- * QUEUENAME =
evgensimul
, QUEUETYPE = production
, ... , CRITERIA = processingType IN (:p1,:p2)
, VARIABLES = p1:evgen,p2:simul
-
- * QUEUENAME =
group
, QUEUETYPE = production
, ... , CRITERIA = workingGroup LIKE :p1
, VARIABLES = p1:GP_%
-
The JEDI_Events table
Description: Table for the job progress tracking. This table may be used for Event Server as well.
Name |
Type |
Comment |
JEDITASKID |
NOT NULL NUMBER(11) |
JediTaskID of the file which contains the events |
DATASETID |
NOT NULL NUMBER(11) |
DatasetID of the file which contains the events |
FILEID |
NOT NULL NUMBER(11) |
FileID of the file which contains the events |
STATUS |
NOT NULL NUMBER(2) |
The status of the event range. Explained in this section |
PANDAID |
NOT NULL NUMBER(11) |
PandaID of the job in which the events are processed |
JOB_PROCESSID |
NOT NULL NUMBER(10) |
identifier of the slave process |
DEF_MIN_EVENTID |
NUMBER(10) |
The minimum event number which is assigned to the slave process |
DEF_MAX_EVENTID |
NUMBER(10) |
The maximum event number which is assigned to the slave process |
PROCESSED_UPTO_EVENTID |
NUMBER(10) |
The event number which the slave process completed so far |
ATTEMPTNR |
NUMBER(3) |
How many times the events have been retried so far |
OBJSTORE_ID |
NUMBER(3) |
Identifier of the objectstore endpoint where pre-merged file is stored |
EVENT_OFFSET |
NUMBER(11) |
The total offset of positional event numbers |
IS_JUMBO |
NUMBER(1) |
Set 1 if events are processed by a jumbo job |
ZIPROW_ID |
NUMBER(11) |
ROW_ID of the zip file in the file table. NULL if not zipped |
FILE_NOT_DELETED |
CHAR(1) |
Set Y if corresponding pre-merged files can be deleted |
ERROR_CODE |
NUMBER(5) |
Error code for the event range |
PATH_CONVENTION |
NUMBER(2) |
Convention ID of file path |
The JEDI_Job_Retry_History table
Description: Table to keep track of history of job reattempts.
Name |
Type |
Comment |
JEDITASKID |
NOT NULL NUMBER(11) |
JediTaskID of the jobs |
OLDPANDAID |
NOT NULL NUMBER(11) |
PandaID of the old job |
NEWPANDAID |
NOT NULL NUMBER(11) |
PandaID of the new job |
RELATIONTYPE |
VARCHAR2(16) |
The type of relation between old and new jobs |
ORIGINPANDAID |
NUMBER(11) |
PandaID of the origin job |
The JEDI_Process_Lock table
Description: Table to contain locks for JEDI processes
Name |
Type |
Comment |
PRODSOURCELABEL |
NOT NULL VARCHAR2(20) |
The source label, such as managed, user, prod_test, ... |
VO |
NOT NULL VARCHAR2(16) |
The name of virtual organization |
WORKQUEUE_ID |
NOT NULL NUMBER(5) |
The work queue identifier |
CLOUD |
VARCHAR2(10) |
The cloud name |
LOCKEDBY |
NOT NULL VARCHAR2(40) |
The name of process/thread which uses the lock |
LOCKEDTIME |
NOT NULL DATE |
Set when the lock is created |
Functions of JEDI
This section describes functions of JEDI for production and analysis, and how JEDI would use the database and other info.
Production/Common
This section describes functions for production. Some of them could be used for analysis as well.
Workflow for tasks with input datasets using file-level splitting
Here is simple workflow for a task with input datasets using file-level splitting.
Input datasets are split to multiple chunks of files, and one job is generated per chunk.
- A task is defined
- Input and output dataset information are inserted to the Dataset table (by DEFT if a shared table is used, or by JEDI if own table is used)
- Information of files in input datasets are retrieved from DDM to be inserted to the Contents table. Basically LFNs are mandatory but some metadata of each file could be stored in the table to avoid frequent lookup to DDM, such as size, the number of events, and checksum.
- JEDI takes a few files from the Contents table and runs the brokerage to choose a site, and generates jobs according to the site parameters, such as the number of cores per slot, memory size, and walltime limit.
- Jobs and files are inserted to the Job and File tables
- Jobs go through the present workflow (i.e., assigned→activated→...→transferring)
- If a job finishes/fails, files used by the job are flagged in the Contents table
- JEDI repeats 4.
- When all files in an input dataset are processed, the dataset status is changed to done
- When all input datasets are processed, the task status is changed to done
Output dataset registration
JEDI or DEFT registers output datasets when tasks are activated. Note that output datasets for analysis are registered by JEDI as shown in
this section.
Job rebrokerage
Job rebrokerage is periodically done by killing old jobs after those jobs are assigned to a site but
they don't get CPUs at the site for a time period. That is essentially reattempt. For example, analysis jobs are reattempted every 12h if they don't get started in 12h.
If jobs are killed by the system due to timeout etc they go to the
closed state,
while they go to
cancelled state if they are killed manually.
Generally closed or cancelled jobs don't have log files since they didn't run on WNs.
Relation between old and new jobs is recorded in the JEDI_Job_Retry_History.
Each row of the
Contents table have the attemptNr column to record how many times the file was attempted and
the maxAttempt column to limit the number of attempt. Every time job is reattempted attemptNr is incremented and it gives up when attemptNr=maxAttempt.
maxAttempt is 3 by default and can be changed by a task parameter, maxAttempt.
Naming of output file
With JEDI, naming for output file might be tricky. As shown in the above figure, initially, JEDI would define Job1 and Job2
with File1-2 and File3-4, respectively. Job2 would successfully produce OutFile_00002, while
Job1 would fail and JEDI would decide to split it for some reason to Job1' and Job1''.
Finally, they would successfully produce OutFile1' and OutFile1''.
The issue is what filenames are used for OutFile1' and OutFile1''. There are some possibilities
- Job1' uses Output_00001 with an incremented attempt number and Job1'' uses a new name with an incremented serial number, i,e, Output_00001.1 and Output_00003
- Both Job1' and Job2'' use new filenames, i.e., Output_00003 and Output_00004
- All jobs use temporary output filenames first, and real filenames are assigned in such a way that output files with smaller serial numbers have input files with smaller serial numbers. i.e., OutFile_00001'→RealOutFile_00001, OutFile_00001''→RealOutFile_00002, OutFile_00002→RealOutFile_00003. Then rename filenames in SE
- Real filenames are assigned similarly as option C, but no renaming in SE. i.e., PFN and LFN are different.
The downside of options C and D is that real filenames cannot be defined until the task is finished.
The simplest solution would be option B, but sequential order of input and output files would be
inconsistent and serial numbers in a dataset would not be continuous.
Option A has the same problem with inconsistent sequential order between input and output files, and serial numbers have to jump when old jobs are combined to one new job.
Option B has been adopted after discussion in ADC meeting.
Internal Merging
JEDI has a capability to merge output files for each task. If the task is configured with mergeOutput=True,
jobs are generated to produce intermediate files and go to
merging state once they are done on WNs.
When all jobs assigned to a site are done, merge jobs are generated at the site to merge those intermediate files.
Once the merge job is done and merged files are transferred to the final destination, jobs in
merging state go
to
finished.
Open dataset handling
Some tasks could be submitted before input datasets are frozen.
The issue here is that new files may be appended to an open dataset
after the task is submitted. If the user wants to execute the task
on all files including newly appended files, JEDI keeps the
dataset status to 'running', periodically checks with DDM to find
new files, adds new files to the
Contents table, and changes the dataset status
to 'done' if the dataset is frozen and all files are processed in
Panda. This implies that the
Dataset table requires two status
attributes, one for dataset status in DDM and the other for how
the dataset is being used in Panda.
Lost file recovery
When some files need to be re-produced a new task is submitted.
The task parameter specifies names of lost files in addition to
normal task parameters. Then,
- taskID is retrieved from the Dataset table using the output dataset name
- Jobs which produced lost files are retrieved from Job and File tables using taskID and names of lost files
- Input files of the jobs are inserted to the Contents table
- The task runs on those input files
New files are added to the existing output dataset or a new output dataset,
according to the task definition.
If the input files for the lost files are lost as well (e.g., the dataset has already been deleted),
another task which re-generate those input files needs to be submitted (or internally re-generate by JEDI) and tasks are chained.
Protection against lost/broken input files
When JEDI finds that an input file is lost (i.e., the file is not found
in DDM although it was there when the task was submitted) or is problematic,
the file status in the
Contents table is set to 'lost' or 'broken' and
JEDI generates new jobs by skipping lost/broken files or updates the dataset status
if no files remain.
Automatic priority boosting for quick completion of tasks
Tasks that are 95% complete and have less than 100 jobs to do will get
- priority boost to 900
- gshare set to Express
This functionality may move to a
TaskWatcher component in future.
Event-level splitting
For event-level job splitting, the
Event table is used.
The following figure shows an example of workflow with event-level splitting.
Actions of JEDI are as follows
- A few files are taken from the Contents table
- They are split by N events (N=100 in the figure)
- One job is generated per N events
- One record is inserted to the Event table for each combination of (file, event range). e.g., (File1,0-99), (File1,100-149) (File2,0-49), (File2,50-149), (File3,0-99)
- The status of each event record is set when the corresponding job finishes/fails
- If all events in a file are done, the file status in the Contents table is set to 'done'
- Step 1 is repeated. The number of events per job is changed if necessary
Event server (event engine)
The detail is described in
this blog article
and the
EventServer twiki.
Task aborting and forced termination
Once taks status is set to TOBEABORTED or TOBEFINISHEDASAP, JEDI sends kill command to queued or running jobs,
deletes or freezes output datasets, and updates task status.
Task brokerage
The task brokerage assigns tasks to nuclei per work queue
(evgensimul, reprocessing, test, mcore, group, others, ...).
One nucleus is chosen for each task based on data locality, total RW and available storage size in the nucleus, nucleus status and output file transfer queue to the nucleus.
RW is "remaining work" and is calculated for each task using
- nEvents … The total number of events in the task.
- nEventsUsed … The number of events already used.
- cpuTime … Described in this section.
Here is the list of selection criteria; nuclei not fulfilling any of the criteria is skipped:
- The nucleus status must be 'ACTIVE' (All nuclei are ACTIVE for now. Could be set to another status to be skipped by task brokerage in the future).
- The size of free space, space_free+space_expired from DDM - (totalRW in the nucleus)/(corePower=10)/24/3600×0.25GB, associated to the output endpoint type (e.g. DATATAPE→XYZ_DATATAPE) must be larger than 5TB.
- If the total input size is larger than 1GB, totalInputSize×10% must be available in the nucleus.
- If the total number of input files is larger than 100, totalNumInputs×10% must be available in the nucleus.
- The nucleus must be able to execute the workload.
- The queue of output files transferring to the nucleus must be below 2000 files.
If multiple nucleus meet the selection criteria, one nucleus is chosen based on the weight:
- totalRW … The total sum of task RWs in the nucleus. Their priorities are equal to or larger than that of the assigned task. If totalRW is less than 50, totalRW=1.
- availableInputSize … The available input size for the assigned task in the nucleus.
- totalInputSize … The total size of input for the task.
- tapeWeight … 0.001 if input is available only on TAPE in the nucleus. Otherwise, 1.
Job brokerage explained
See
https://panda-wms.readthedocs.io/en/latest/advanced/brokerage.html#atlas-production-job-brokerage
Periodical task execution
Currently each HC job is submitted independently, which is very inefficient.
In the new system, one task would be defined per cloud or site to generate jobs periodically.
A special plugin to generate jobs would be required.
Also a special file status in the
Contents table would be required.
Fairshare
It has been proposed to have a fairshare mechanism
in one of more upstream components, while it is done currently
in the job dispatcher.
One possibility is that JEDI could generate jobs based on a fairshare policy.
However, the fairshare mechanism in the job dispatcher is still
required since fairshare is defined for the number of running
jobs and generated jobs may not start running immediately if
they require input file transfers.
Although job generation will take fairshare into account
and that will help for global fairshare,
implementation of fairshare management would not be changed significantly.
Different treatment of input datasets
In task request, distinguish one-time input and many-time input.
One-time input is treated as of today, ie., send a subset to proddisk with a short lifetime and deletes after usage.
Many-time input is treated as long-standing at T2s, ie., copy the whole(?) dataset to datadisk without a lifetime but as 'secondary', or with a longer lifetime as 'input',
This will need some more consideration, the definition, the size, the workflow with Rucio, etc.
Sounds similar to
PD2P with JEDI, but somewhat different, especially in how to identify the datasets used by many tasks.
Tasks with incomplete log datasets
When jobs fail at T2, log files are immediately added to tid datasets before they are transferred to T1.
This is required to immediately trigger reattempt but causes incomplete log datasets when transfers
of log files fail. JEDI would clean this kind of incomplete datasets before generating jobs for subsequent
tasks. Or new job status could be introduced to PanDA to transfer log files for failed jobs.
Validation in post-processing phase
For example, availability of output files in tid datasets may be checked. Also, consistency between contents in tid datasets and Dataset_Contents may be checked.
Random seed assignment
Each task can specify the initial value for the random seed and then it is incremented per job. If a job used a random seed value and failed, the value is reused by the next job regardless of whether the old job is combined or split. For example,
- A job failed with 2 input files and a random seed N at site X
- JEDI retries the 2 input files
- Site Y is free but each job can take only 1 input file due to some site specification
- Two jobs are generated at site Y
- The first job reuses the random seed N and the other uses a new random seed
If reproduciblity is important, i.e., the retried job must use the random seed of the failed job, the task needs to use SEQNUMBER instead of RNDMSEED.
T1 weight
Each task can define
t1Weight
in task parameters to send more jobs to the nucleus. If t1Weight=0 or unspecified and task.ioIntensity≥500, cloudconfig.weight is used.
If t1Weight=-1, all jobs go to the nucleus. If task.currentPriority≥900, t1Weight=max(t1Weight,100).
Timeout values for production jobs
See
https://panda-wms.readthedocs.io/en/latest/advanced/brokerage.html#timeout-rules
Direct replica lookup for assigned jobs
Jobs usually go from assigned to activated state when a callback is received from rucio that a dis dataset is complete. In case the callback is lost, or some files take much longer than others, there is a periodic (every one hour) direct lookup if the files for each activated job with priority>800.
Final status of Event Service jobs
ES jobs go to status=merging if they successfully processed one or more events successfully. Then they go to status=finished or failed (jobsubstatus=es_merge_failed) once corresponding ES-merge jobs are finished or failed.
ES jobs have status=failed when
- the number of attempts for some event ranges reached the limit, or
- the pilot reported errors with jobsubstatus=pilot_failed (jobsubstatus=pilot_failed), or
- heartbeat was lost without successful event ranges (jobsubstatus=es_heartbeat)
ES jobs have status=merging when they have one or more successful event ranges and
- they were retried to run unprocessed even ranges (jobsubstatus=es_retry), or
- all event ranges were processed and ES merge jobs were generated (jobsubstatus=es_merge), or
- all event ranges assigned to the job sets were done but they didn't generate ES merge jobs since other jobs were still running (jobsubstatus=es_wait), or
- they didn't process any event ranges but were retried since the num of attempts didn't reach the limit (jobsubstatus=es_inaction), or
ES jobs have status=closed
- they were killed since an associated ES job generated an ES merge job(jobsubstatus=es_killed), or
- they were killed before being dispatched to WN (jobsubstatus=es_unused), or
- they were killed since associated ES jobs failed (jobsubstatus=es_aborted)
ES jobs have status=cancelled when
- they were killed by human using the kill command
Jumbo jobs and co-jumbo jobs
If a task is configured to enable event service and sets the nJumboJobs parameter to an integer, the task can generate jumbo jobs.
Each jumbo job can process any events in the task across all jobsets. Note that each jumbo job doesn't have to process all events in the task and
events are delivered on-demand to CPUs during the job is being processed on the CPUs, i.e., events (workload in other words) are not pre-assigned
to resources. From a single task, one jumbo job is generated for each PQ which has useJumbo in schedconfig.catchall.
Co-jumbo jobs are very similar to conventional event service jobs. Each co-jumbo job can process only the events in a jobset
and run on PQs which enables event service with shcedonfig.jobseeds following the standard brokerage procedures for event service.
Jumbo jobs and co-jumbo jobs share events in the same task, i.e., they are essentially consumers with different sizes.
Jumbo jobs can be generated when the PQ sets useJumbo and is lack of jumbo jobs, even if other PQs are full and cannot accept more co-jumbo jobs.
In this case, fake co-jumbo jobs are generated with computingSite=WAITING_CO_JUMBO and they are in waiting state until real co-jumbo jobs are
generated or all events in corresponding jobsets are processed. When jumbo jobs process events, those events go to finished state. Those events
are changed to done when co-jumbo jobs are finished, i.e. jumbo jobs don't change event status to done when they are finished. A cron hourly checks
co-jumbo jobs in waiting/assigned/activated state and finishes them if all events in corresponding jobsets
are done. Then co-jumbo jobs generate merge jobs similarly as conventional event service jobs. When the task is closed to 100% done, it unsets
nJumboJobs not to generate jumbo jobs with a small number of events. This machinery is disabled if the task sets the site parameter.
Jumbo job switcher
JumboWatchDog is a sub-component of the WatchDog agent which runs every 10 min to enable or disable jumbo jobs in event service tasks.
It is possible to customize behavior of JumboWatchDog using following parameters with component=jumbo_dog in
GDP config I/F
:
Name |
Default Value |
Description |
JUMBO_MAX_TASKS |
1 |
The total number of active tasks with jumbo jobs |
JUMBO_MIN_EVENTS_DISABLE |
100000 |
The minimum number of remaining events for each task to generate jumbo jobs |
JUMBO_MIN_EVENTS_ENABLE |
JUMBO_MIN_EVENTS_DISABLE × 10 |
The minimum number of remaining events in each task which can enable jumbo jobs |
JUMBO_MAX_EVENTS |
JUMBO_MAX_TASKS × JUMBO_MIN_EVENTS_ENABLE / 2 |
The upper limit on the total number of remaining events in active tasks with jumbo jobs |
JUMBO_PER_TASK |
1 |
The number of jumbo jobs per task |
JUMBO_PER_SITE |
1 |
The number of jumbo jobs per site for each task |
JUMBO_MAX_CURR_PRIO |
500 |
The upper limit on current priorities of tasks which can enable jumbo jobs |
JumboWatchDog tries to find event service tasks which have not enabled jumbo jobs yet, and have
- more remaining events than JUMBO_MIN_EVENTS_ENABLE,
- lower priorities than JUMBO_MAX_CURR_PRIO,
- transUse which is available at one or more jumbo-job-enabled PQs,
- and taskParam.inFilePosEvtNum=True, nFilesPerJob=1, or nEventsPerJob<nEventsPerInputFile which is required since jumbo jobs work with in-file positional event numbers,
when the number of active tasks with jumbo jobs is less than JUMBO_MAX_TASKS and the total number of remaining events in active tasks with jumbo jobs is less than JUMBO_MAX_EVENTS.
JumboWatchDog enables jumbo jobs in a task with the largest number of remains events among the tasks which meet the above criteria, and then repeats the procedure until the system has enough tasks and events.
At the same time, JumboWatchDog disables jumbo jobs in tasks which have less remains events than JUMBO_MIN_EVENTS_DISABLE.
Memory probe and brokerage
- A MemoryMonitor running in the pilot records the maxrss and maxpss of each job, summed over all processes.
If ramunit='MBPerCore' then the task ramcount is filled as follows:-
- Once the first 5 jobs are finished, the task ramcount is set to (max(maxpss)-baseramcount)/corecount, i.e. the RSS per core, allowing for some baseramcount independent of corecount.
- Brokerage calculates the minramcount, customized to the site corecount, as baseramcount+(ramcount*corecount)
- This is compared to AGIS set values of maxrss and minrss
Sites that kill on vmem should stop doing that if they can. Otherwise they need to set maxrss sufficiently small to attract jobs such that whatever vmem is measured by the batch system is below the threshold.
The baseramcount could perhaps be deduced from the job pss and rss values, and thus set by jedi. (insert maths) It is not important for single core tasks, as it cancels out, so we leave it as zero.Currently it is fixed per processingtype, but may need tuning
processingType |
baseramcount |
simul |
1000 |
pile |
1500 |
reco |
1000 |
Memory increase in job retry
When a job fails because it ran out of memory (e.g. memory leaks), the ramcount setting for successive jobs will be increased. The new value will be set to the next value in the list
[1000,2000,3000,4000,6000,8000] above max(maxPSS, current ramCount).
When 30% of the jobs in a task have failed because of insufficient memory, the setting will be increased for the whole task.
IO intensity
The task parameter iointensity is set from the first 5 finished jobs. It is the total input plus output size divided by the walltime and the corecount. So the unit is kB/s per core, where the percore makes this scalable to run the task on different numbers of cores. Perhaps it should be per HS06 but not so critical.
This is some measure of the LAN traffic of stage in and out, which can be used to choose lower io jobs, e.g. evgensimul. The disk io on the node is also interesting, but for another day.
The cpuconsumption of the job with 95th percentile of (endTime-startTime)*corepower from the first 5 finished jobs, together with the AGIS site HS06 corepower and a safety factor, is used to set HS06sPerEvent.
cpuTime = max(0,endTime-startTime-baseTime)*corePower*coreCount*cpuEfficiency/NEvents*1.5
where
cpuEfficiency defaults to 90%
basewalltime represents the part of the job not scaling with cpu power, default 600s.
This is used to get the expected walltime on potential sites during brokerage. basewalltime represents the part of the job not scaing with cpu power.
wall = cpuTime/corePower/coreCount/cpuEfficiency*nEvents+baseWalltime
cpuEfficiency can be set to 0 to disable scaling with nEvents. In this case
wall = baseWalltime
Express input/output transfers
processingType=urgent or priority>=1000 makes the FTS transfers of inputs and outputs use the express activity.
processingType=urgent also ignores fairshare.
nGBPerJob
This setting in the request interface is not the input GB per job. The disk usage is estimated as inputsize + max(outdisksize*inputsize/1MB, 2.5GB) + workdirsize. JEDI tries to generate jobs so that the estimation for each jobis less than NG.
tgtMaxOutputForNG
The target size for the largest output format. NG is reset after the scouts in order to get towards this value. Often it is set to 10GB to have large outputs, but avoid hitting the hard limit in athena. Works with maxWaltime (hrs) so that the jobs do not get too long.
Task retry
Tasks can be retried if they are in finished or exhausted state. Child tasks are automatically retried when the parent task is retried unless they are in aborted, broken, or failed state.
Job throttling
The job throttler helps to control the job generation and the amount of jobs that can sit in the queue. The job throttler will decide if there are enough jobs (SKIP) or if more jobs are needed (PASS). In the case of a PASS, it will also calculate the maximum number of jobs that can be submitted. The SKIP/PASS is decided based on the nQueued > 2 * nRunning condition and on lower and upper limits that can be set in the
DPA configuration page
.
The job throttler functions at the workqueue resource type (SCORE, MCORE, etc.). Currently there is one exception for Event Service, where it works at workqueue level because tasks are not split by resource type, i.e. MCORE tasks can generate SCORE and MCORE jobs, which breaks the general use case.
The maximum number of jobs is calculated with these rules:
- Normal case: (500 < remaining jobs to the 2nRunning < 600) / 2
- A max queued cap exists: cap/5
- If there is a min queued limit and you are below 0.9*queue limit: max(queue limit/20, normal case). In this case the submission will not be locked and potentially all JEDI job generators could be generating jobs at the same time
Special permission
Special permission can be set for each user by appending the following letters in the GRIDPREF field of the ATLAS_PANDAMETA.users table:
- k : the user can download key-pairs
- p : the user can retrive proxies
- s : the user has super user privilege so that personal or group analysis tasks can jump over other tasks when generating jobs
- g : the user has super group privilege so that group analysis tasks can jump over other tasks when generating jobs
Analysis
This section describes analysis specific functions. Instructions for JEDI testers can be found
here.
Workflow for analysis
- A task is submitted with an input dataset container
- Information of constituent datasets in the container are inserted to the Dataset table. i.e.. one record per dataset.
- Information of files in the datasets are inserted to the Contents table
- JEDI takes a few files from the Contents table per dataset and runs the brokerage to choose a site
- If a new output dataset is required at the chosen site, it is created and is added to the output dataset container
- Jobs are generated according to the site parameters
- JEDI repeats 4-6
- When all files in an input dataset are processed, the dataset status is changed to done
- When all input datasets are processed, the task status is changed to done
There would be an upper limit on the number of sites where jobs are assigned.
Using scout jobs
Scout jobs have been introduced to analysis as well so that job splitting is dynamically optimized based on memory consumption, output size, execution time, etc.
Output merging
If the task is submitted with
--mergeOutput
and jobs successfully produce pre-merged files, their status will change to
merging
and those jobs will wait until
other jobs at the same site are finished/failed/cancelled.
Once all jobs at the site are done, merge jobs will be generated with priority=5000. When merge jobs successfully finish, merged files are added
to output dataset containers and job status will change from
merging
to
finished
. This means that pre-merged files are hidden from users.
Merge jobs themselves are tried at most three times. If the final attempt for the merge job fails, the status of
merging
jobs will change to
cancelled
and input files will be retried.
Task retry and resubmission
One can re-execute tasks using pbook or pathena/prun.
When pathena/prun is executed with the same outDS, an old task is resubmitted to run on files which were unused or failed in the previous attempt.
You can add or use a new input if necessary.
On the other hand, retry with pbook runs only on the same input.
A typical usecase for resubmission is that a user executes a task on a fraction of files
in a dataset and then executes the same task on the rest of files.
When tasks are resubmitted, some task parameters can be implicitly changed, such as
site
,
cloud
,
includedSite
,
excludedSite
,
nFilesPerJob
,
nGBPerJob
,
nFiles
,
nEvents
, and
ignoreMissingInDS
.
Other parameters are not changed even if they show up in the cliParams field.
If you need to change input sandbox, use the
--useNewCode
option.
Note that some parameters like
nEvetsPerJob
cannot be changed by retry or resubmission since they are evaluated only when the original task is submitted
and some database records are inserted based on those parameters. If you want to change those parameters you have to submit a new task.
Broken tasks
Tasks go to the
broken
state due to the following reasons:
- Input datasets are unknown
- Build job failed three times when task are in the
scouting
state
- No scout jobs succeeded
Once tasks are broken, they cannot be retried or resubmitted by default since it is rather useless to simply repeat problematic tasks without changes (such as fixed code, new task parameters, etc).
If code and/or task parameters are changed, they are internally regarded as new tasks. Each task must has a unique outDS.
Therefore, outDS of broken tasks cannot be reused by default since you have to use a new outDS when submitting a new task.
However, you can resubmit broken tasks using
--allowTaskDuplication
, but make sure that
--allowTaskDuplication
As a general rule each task has a unique outDS and
history of file usage is recorded per task. This
option allows multiple tasks to contribute to the same
outDS. Typically useful to submit a new task with the
outDS which was used by another broken task. Use this
option very carefully at your own risk, since file
duplication happens when the second task runs on the
same input which the first task successfully processed
Input data transfer
Basically analysis jobs go to data, i.e., they are sent to sites where input data are already available and thus they don't transfer input data. However, if all sites with data have one or more closed jobs from the same task in last 6h,
which implies that those sites were busy and jobs were not able to get CPU for last
ANALY_TIMEOUT
hours, new jobs are sent to idle sites together with input data.
Throttled jobs and tasks
If a user has more running jobs than
CAP_RUNNING_USER_JOBS
, activated jobs go to throttled so that job slots are released to other users. They are automatically changed to activated. Otherwise, they are closed after 7 days.
If the total size of staging/transferring data from TAPE/DISK is larger than
PRESTAGE_LIMIT
for a user, tasks for the user with data staging/transfers are throttled until the total size drops below the limit. This limit is required to protect network and storages. Note that existing jobs can run even if tasks are throttled . For example, when one task uses 100 input datasets and 1 dataset is available only at busy sites, the first jobs go to data and thus jobs for the 99 datasets keep running there even if the task
is throttled for 1 dataset.
Software structure
Notes on software design
Plugins to generate jobs based on various usecases
For maintainability it is good to have one plugin to generate jobs for each usecase.
One typical case would be that one plugin is for CMS analysis and CMS colleagues
maintain it. Usecases would be
- ATLAS production
- Athena-based analysis
- ROOT-based or general analysis
- PAT tool-based analysis? (eg. mana and EventLoop)
- AMS analysis? (the above general analysis plugin might be enough)
- CMS analysis
- HammerCloud
- ...
Major changes from old system
analysis
See
JEDI-based Analysis.
Appendix
Links
No permission to view AtlasComputing.ProdHeader
Major updates:
--
TadashiMaeno - 17-Dec-2012
Responsible: TadashiMaeno
Never reviewed