JEDI (Job Execution and Definition Interface)

Introduction

JEDI is an intelligent component in the panda server to have capability for task-level workload management. Main goals to develop JEDI are to make the panda system more task-oriented and to integrate prodDB and PandaDB.


Main concepts

Work queues

JEDI works with work queues, e.g., submits tasks per work queue. Tasks are mapped to work queues based on their attributes, such as processingType, working Group, prodSourceLabel, and coreCount. Each task belongs to only one work queue and all jobs in the task belong to the work queue.

There are two kinds of work queues, normal queue and partition queue. Each normal queue has own selection criteria. If tasks pass the selection criteria, those tasks belong to the queue. For example, JEDI submits tasks in the queue together. On the other hand, partition queues themselves don't have selection criteria. One partition queue is composed of one or more sub queues. Each sub queue has selection criteria. If tasks pass the selection criteria of the sub queue, those tasks belong to the sub queue in addition to the partition queue. JEDI works with the partition queue, e.g., tasks in the partition queue are submitted together regardless of sub queues. Partitions are useful for nested definitions of shares, as described in the next section.

Work queues have the order attribute. Each queue is evaluated before other queues which have higher order values. If a task passes the selection criteria of a queue, it is mapped to the queue and no more queues are evaluated.

Shares

Each normal or partition queue has a share to define how many CPU resources are allocated to the queue. The share of a partition queue is divided to sub queues in the partition. If work queues have active tasks, they are regarded as active queues and CPU resources are distributed among them. This means that CPU resources which are originally allocated to a queue can be used by other queues if the queue doesn't have active tasks. If queues have the stretchable attribute, they are given priority in getting unused CPUs. If there is more than one queue with the stretchable attribute, unused CPUs are distributed among them according on their original shares. On the other hand, all queues get unused CPUs according to their original shares if there is no queue with the stretchable attribute.

Task identifier

JEDITASKID is the unique task identifier in JEDI. It comes from ATLAS_DEFT.PRODSYS2_TASK_ID_SEQ independently of T_TASK_REQUEST.REQID. Until DEFT is ready task requests come from AKTR T_TASK_REQUEST.REQID is stored in JEDI_TASKS.REQID which is mapped to job.TASKID.

Dataset attributes

Each dataset can define some attributes which give hints to JEDI for splitting and brokerage.

  • repeat
  • nosplit
  • manytime
  • cvmfs
  • ratio

Pseudo datasets and files

Some job attributes, such as random seeds and serial numbers for no-input tasks, are internally managed in JEDI by using separate file records. The main reason is for book-keeping. Their dataset or file names have the prefix of "pseudo_" and can be ignored from user's point of view.

Transition of task status

Task status changes as shown in the following chart.

jediTaskStatus.png

registered
the task information is inserted to the JEDI_Tasks table
defined
all task parameters are properly defined
assigning
the task brokerage is assigning the task to a cloud
ready
the task is ready to generate jobs. New jobs are generated once computing resources become available.
pending
the task has a temporary problem, e.g. there is no sites to assign jobs.
scouting
the task is running scout jobs to collect job data
scouted
all scout jobs were successfully finished
running
the task is running jobs
prepared
the task is ready to go to a final status
done
all inputs of the task were successfully processed
failed
all inputs of the task were failed
finished
some inputs of the task were successfully processed but others were failed or not processed since the task was terminated
aborting
the task is being killed
aborted
the task is killed
finishing
the task is forced to get finished
topreprocess
preprocess job is ready for the task
preprocessing
preprocess job is running for the task
tobroken
the task is going to broken
broken
the task is broken, e.g., the task definition is wrong
toretry
the retry command was received for the task
toincexec
the incexec command was received for the task
rerefine
task parameters are going to be changed for incremental execution
paused
the task is paused and doesn't do anything until it is resumed
throttled
the task is throttled to generate jobs since the largest attemptNr in the task is a multiple of 5. The throttled period is 120 x int(max(attemptNr)/5)**2 minites
exhausted
For production tasks, all reattempts were done but some inputs were still unsuccessful, and the task is configured to go to this state instead of finished to wait for manual intervention or timeout of 10 days. Also, if cpuTime of scouts jobs > 2x cpuTime of task definition, the task goes to exhausted. For analysis tasks, if there are more than 5 scout jobs with short execution time (< 4 min) and more than 1000 jobs are expected, they will go to exhausted to prevent automatic avalanche since so many short jobs are problematic at grid sites. Analysis tasks in exhausted state can be retried using pbook but it is better to change some parameters like nFilesPerJob to have longer execution time. Also both production and analysis tasks will go to exhausted if they are timed-out while pending in the brokerage.
passed
the task is ready to go to a final state after manual intervention or timeout

Transition of dataset status

Dataset status changes as shown in the following charts.

jediDatasetStatus.png

Input

defined
the dataset information is inserted to the JEDI_Datasets table
toupdate
the dataset information needs to be updated
pending
the dataset is temporally unavailable
broken
the dataset is permanently unavailable
ready
the dataset is ready to be used
done
all files in the dataset were processed

Output

defined
the dataset information is inserted to the JEDI_Datasets table
ready
the dataset is ready in DDM
running
files are being added to the dataset
prepared
the dataset is ready for post-processing
done
the final status

Transition of file status

File status changes as shown in the following charts.

jediFiletStatus.png

Input

ready
the file information is correctly retrieved from DDM and is inserted to the JEDI_Dataset_Contents table
missing
the file is missing in the cloud/site where corresponding task is assigned
lost
the file was available in the previous lookup but is now unavailable
broken
the file is corrupted
picked
the file is picked up to generate jobs
running
one or more jobs are using the file
finished
the file was successfully used
failed
the file was tried multiple times but not succeeded
partial
the file was split at event-level and some of event chunks were successfully finished

Output

defined
the file information is inserted to the JEDI_Dataset_Contents table
running
the file is being produced
prepared
the file is produced
merging
the file is being merged
finished
the file was successfully processed
failed
the file was not produced or failed to be merged

Transition of event status

Event status changes as shown in the following chart.

jediEventStatus.png

ready
ready to be processed
sent
sent to the pilot
running
being processed on WN
finished
successfully processed and the corresponding ES job is still running
cancelled
the ES job was killed before the even range was successfully processed
discarded
the ES job was killed in the merging state after the event range had finished
done
successfully processed and waiting to be merged. The corresponding ES job went to a final job status.
failed
failed to be processed
fatal
failed with a fatal error or attempt number reached the max
merged
the corresponding ES merge job successfully finished
corrupted
the event is flagged as corrupted in order to be re-processed since corresponding zip file is problematic


Database design for JEDI

Here is an ER diagram for JEDI. It is assumed that current Job, File, and Dataset tables are basically reused although several columns are added and tables themselves may be further optimized in the future. The idea is to implement functions of JEDI on top of the current system while avoiding huge service disruptions. Once the whole system and clients migrate to the new task-oriented scheme, Job, File, and Dataset tables may be refactored. The Task table and Dataset table may be shared with DEFT (see Object Models for ProdSys II). JEDI works with queues, eg., it counts the number of jobs per queue to estimate workload in the queue. Tasks/jobs are mapped to one of queues based on procesingType, workingGroup, and so on. Queues are configurable and flexible, so that they are defined in the WorkQueue table.

jedier.png

Notes on database design

  • JEDI has custody of the actual task/dataset tables which carry content, while DEFT will keep references to objects in its own (small) database
  • This page describes communication and interaction between DEFT and JEDI through the database tables
  • The Dataset table contains one record per task-dataset-type. E.g. when one task uses multiple input datasets and produces multiple output datasets, there could be (task-dataset1-input), (task-dataset2-input), ..., and (task-dataset3-output), (task-dataset4-output)...
  • The Dataset table contains datasets but not containers. When a task is submitted with an input container, datasets in the container are inserted, which would make incremental exection of task easier when new datasets are added to the container
  • The Task, Dataset, and Contents tables could be partitioned by taskID

Table schemas

All of they have the JEDI_ prefix in table names for now. Note that they are not final versions.

The JEDI_Tasks table

Description: Table for hosting all production and analysis. tasks.

Name Type Comment
JEDITASKID NOT NULL NUMBER(9) The task identifier coming from ATLAS_DEFT.PRODSYS2_TASK_ID_SEQ
CREATIONDATE NOT NULL DATE Set when the record is inserted at the first time
MODIFICATIONTIME NOT NULL DATE Set when the record is updated
STARTTIME DATE Set when the task gets started
ENDTIME DATE Set when the task is finished
FROZENTIME DATE Set when the record is frozen
PRODSOURCELABEL VARCHAR2(20) The source label, such as managed, user, prod_test, ...
TASKNAME VARCHAR2(132) The task name
USERNAME VARCHAR2(128) The name of the user who owns the task
WORKINGGROUP VARCHAR2(32) The name of the working group which owns the task
VO VARCHAR2(16) The name of virtual organization which owns the task
CORECOUNT NUMBER(9) The number of cores per job. Set 0 if it can vary
TASKTYPE VARCHAR2(64) The task type
PROCESSINGTYPE VARCHAR2(64) The type of the process
TASKPRIORITY NUMBER(9) The assigned priority of the task
CURRENTPRIORITY NUMBER(9) The current priority of the task. JEDI could increase/decrease priorities if necessary
STATUS NOT NULL VARCHAR2(64) The task status
OLDSTATUS VARCHAR2(64) Old task status
ARCHITECTURE VARCHAR2(256) The architecture on which the task runs. Eg, $CMTCONFIG
TRANSUSES VARCHAR2(64) The name of the software release
TRANSHOME VARCHAR2(128) The name of the software cache in which the transformation is included
TRANSPATH VARCHAR2(128) The name of the transformation
LOCKEDBY VARCHAR2(40) The name of process/thread which is taking care of the record
LOCKEDTIME DATE Set when the record is locked
TERMCONDITION VARCHAR2(100) Termination condition of the task. Eg, to terminate task when 90% of files are successfully processed
SPLITRULE VARCHAR2(100) Rules for job splitting. Eg, to use event-level splitting or not, to limit the number of files per job, etc
WALLTIME NUMBER(9) average walltime consumption measured by scouts when processing 1MB of input
WALLTIMEUNIT VARCHAR2(32) unit of WALLTIME
OUTDISKCOUNT NUMBER(9) average total size of outputs measured by scouts when processing 1MB of input
OUTDISKUNIT VARCHAR2(32) unit of OUTDISKCOUNT
WORKDISKCOUNT NUMBER(9) average size of work directory measured by scouts
WORKDISKUNIT VARCHAR2(32) unit of WORKDISKCOUNT
RAMCOUNT NUMBER(9) average memory consumption measured by scouts
RAMUNIT VARCHAR2(32) unit of RAMCOUNT
IOINTENSITY NUMBER(9) The characteristics of I/O patterns measured by scouts. (inSize+outSize)/wallTime/nCore
IOINTENSITYUNIT VARCHAR2(32) unit of IOINTENSITY
WORKQUEUE_ID NUMBER(5) The work queue identifier to which the task belongs
PROGRESS NUMBER(3) Percentage of completion of the task
FAILURERATE NUMBER(3) The frequency of job failures in the task (in %)
ERRORDIALOG VARCHAR2(255) Error diagnostics
CLOUD VARCHAR2(10) The cloud where the task is assigned
SITE VARCHAR2(60) The site where the task is assigned. Set NULL to use the normal brokerage
REQID NUMBER(9) See this section
COUNTRYGROUP VARCHAR2(20) The name of the country group to which the user belongs
PARENT_TID NUMBER(12) The jediTaskID of the parent task. PARENT_TID=JEDITASKID if no parent
EVENTSERVICE NUMBER(1) The task uses Event Service if 1
TICKETID VARCHAR2(50) The identifier of the ticket to keep track of the task
TICKETSYSTEMTYPE VARCHAR2(16) The type of the ticket system
STATECHANGETIME DATE Updated when the task status is changed
SUPERSTATUS VARCHAR2(64) The super status (DEFT status) of the task which is explained in DEFT task status
CAMPAIGN VARCHAR2(32) Campaign to which the task belongs
MERGERAMCOUNT NUMBER(9) average memory consumption for merge jobs measured by scouts
MERGERAMUNIT VARCHAR2(32) unit of MERGERAMCOUNT
MERGEWALLTIME NUMBER(9) average walltime consumption for merge jobs measured by scouts when processing 1MB of input
MERGEWALLTIMEUNIT VARCHAR2(32) unit of MERGEWALLTIME
THROTTLEDTIME DATE Updated when the task is throttled
NUMTHROTTLED NUMBER(3) How many times the task was throttled so far
MERGECORECOUNT NUMBER(9) The number of cores per merge job. Set 0 if it can vary
GOAL NUMBER(4) Goal of the task completion in percentage multiplied by 10
ASSESSMENTTIME DATE Set when statistics is updated
CPUTIME NUMBER(9) The product of HS06 score and CPU consumption time per event
CPUTIMEUNIT VARCHAR2(32) Unit of CPUTIMECOUNT (HS06sPerEvent)
CPUEFFICIENCY NUMBER(3) Percentage of single core efficiency
BASEWALLTIME NUMBER(9) Offset of waltime in sec
AMIFLAG VARCHAR2(10) Set to READY when the task is done or finished, so that AMI is aware of task completion
NUCLEUS VARCHAR2(52) Name of the site where the task is assigned in WORLD cloud
BASERAMCOUNT NUMBER(9) Shared memory size in MB
RESCUETIME DATE Update when rescue process checks the task
REQUESTTYPE VARCHAR2(32) type of the request
USEJUMBO CHAR(1) Set Y when using jumbo jobs
DISKIO NUMBER(9) Local disk access measured by scouts. (totWBytes+totRBytes)/((endTime-startTime)
DISKIOUNIT VARCHAR2(32) Unit of DISKIO

Typical usages

* Get N production tasks to be processed in a queue based on priority. Queues are defined in the JEDI_WorkQueue table
SELECT * FROM (SELECT * FROM JEDI_Task WHERE STATUS = ??? AND WORKINGGROUP IN (...) AND PROCESSINGTYPE IN (...) AND VO = ??? AND TASKTYPE = ??? ORDER BY CURRENTPRIORITY DESC,TASKID) WHERE ROWNUM < ???;
* Get the list of tasks to be amended
SELECT TASKID FROM JEDI_Task WHERE COMMAND IS NOT NULL;
* Get the list of users who have tasks to be processed
SELECT USERNAME,VO FROM JEDI_Task WHERE STATUS = ??? AND TASKTYPE = ???;
* Get N analysis tasks for a user to process them
SELECT * FROM (SELECT ??? FROM JEDI_Task WHERE STATUS = ??? AND USERNAME = ??? AND VO = ??? AND TASKTYPE = ??? ORDER BY CURRENTPRIORITY DESC,TASKID) WHERE ROWNUM < ???;
* Find slow or delayed tasks
SELECT ??? FROM JEDI_Task WHERE STATUS = ??? AND TASKTYPE = ??? AND CREATIONDATE < ???;
* Find hung-up processes
SELECT ??? FROM JEDI_Task WHERE LOCKEDBY IS NOT NULL AND LOCKEDTIME < ???;

The JEDI_TaskParams Table

Description: Table with task parameters. Each task has own parameters which are described in a single row.

Name Type Comment
JEDITASKID NOT NULL NUMBER(11) The task identifier coming from the task table
TASKPARAMS CLOB Special task parameters. Eg, the list of lost files for a lost-file recovery task

The JEDI_JobParams_Template Table

Description: Table to define how job parameters are generated for each task.

Name Type Comment
TASKID NOT NULL NUMBER(11) The task identifier coming from the task table
JOBPARAMSTEMPLATE CLOB The template to generate job parameters

The JEDI_Datasets table

Description: Table to describe datasets and their relations with tasks. The same dataset name may appear in multiple rows since it may be used as input or output by multiple tasks.

Name Type Comment
DATASETID NOT NULL NUMBER(11) auto-incremented id of the dataset generated from a SEQ
DATASETNAME NOT NULL VARCHAR2(255) The dataset name
CONTAINERNAME VARCHAR2(132) The name of the container to which the dataset belongs. Set NULL when the dataset is not used as a part of a container
JEDITASKID NOT NULL NUMBER(11) The task identifier coming from the task table
MASTERID NUMBER(11) Set the DATASETID of the master dataset if the dataset is used as a secondary dataset of the master. Otherwise, set NULL
PROVENANCEID NUMBER(11) The DATASETID of the input dataset to which the output dataset is associated. Set NULL for input datasets
STATE VARCHAR2(20) The dataset state of the dataset when it was checked with DDM
STATECEHCKTIME DATE Set when the dataset state is checked with DDM
STATECHECKEXPIRATION DATE The date when the dataset state check is over. Set NULL if the state check is not required any more
NFILES NUMBER(10) The total number of files in the dataset
NFILESTOBEUSED NUMBER(10) The number of files to be used
NFILESUSED NUMBER(10) The number of files used so far
NFILESFINISHED NUMBER(10) The number of files successfully finished so far
NFILESFAILED NUMBER(10) The number of files failed so far
NFILESONHOLD NUMBER(10) The number of files on hold
NEVENTS NUMBER(11) The total number of events in the dataset. Set NULL unless this info is required
NEVENTSTOBEUSED NUMBER(11) The number of events to be use. Set NULL unless this info is required
NEVENTSUSED NUMBER(11) The number of events used so far. Set NULL unless this info is required
STATUS VARCHAR2(20) Show how the dataset is currently being used in Panda
CREATIONTIME NOT NULL DATE Set when the record is inserted at the first time
MODIFICATIONTIME NOT NULL DATE Set when the record is updated
FROZENTIME DATE Set when the record is frozen
TYPE NOT NULL VARCHAR2(20) The type of the dataset
VO VARCHAR2(16) The name of virtual organization which owns the dataset
CLOUD VARCHAR2(10) The replica in the cloud is used. Set manually or by the task brokerage
SITE VARCHAR2(60) The replica at the site is used. Set NULL to use the normal brokerage
LOCKEDBY VARCHAR2(40) The name of process/thread which is taking care of the record
LOCKEDTIME DATE Set when the record is locked
ATTRIBUTES VARCHAR2(100) describes how the dataset is split. e,g, the ratio to the number of master files, no-split, repeat, etc
STREAMNAME VARCHAR2(20) The name of stream for input files which is used as a placeholder in jobParamsTemplate. For output, STREAMNAME in JEDI_Output_Template is used
STORAGETOKEN VARCHAR2(100) The token in the storage element where input files are available or output files are put during the task is running
DESTINATION VARCHAR2(60) The final destination where the dataset is transferred once the task is finished
TEMPLATEID NUMBER(11) The DATASETID of the template dataset from which the dataset inherits
NFILESWAITING NUMBER(10) The number of files waiting for real co-jumbo jobs

Typical usages

* Get datasets for a task
SELECT DATASETNAME,TYPE FROM JEDI_Dataset WHERE TASKID = ???;
* Get datasets to be processed in a task
SELECT DATASETNAME FROM JEDI_Dataset WHERE TASKID = ??? AND STATUS = ??? AND NFILESTOBEUSED > NFILESUSED AND TYPE = ???;
* Get a task which uses or produces a dataset
SELECT TASKID FROM JEDI_Dataset WHERE DATASETNAME = ??? AND TYPE = ???;
* Get tasks which contribute to a container
SELECT TASKID FROM  JEDI_Dataset WHERE CONTAINER = ??? AND TYPE = ???;
* Get datasets to update the dataset state
SELECT DATASETNAME FROM JEDI_Dataset WHERE STATECEHCKDATE < ??? AND STATECHECKEXPIRATION IS NOT NULL;
* Update a dataset
UPDATE JEDI_Dataset SET ??? WHERE DATASETID = ???;
* Update datasets in a task
UPDATE JEDI_Dataset SET ??? WHERE TASKID = ???;
* Find hung-up processes
SELECT ??? FROM JEDI_Dataset WHERE LOCKEDBY IS NOT NULL AND LOCKEDTIME<???;

The JEDI_Dataset_Contents table

Description: Table to describe files and how they are used by tasks. The same LFN may appear in multiple rows since it may be used by multiple tasks. Basically only LFN is mandatory but some metadata of each file are stored in the table to avoid frequent lookup to DDM, such as file size and checksum.

Name Type Comment
FILEID NOT NULL NUMBER(11) auto-incremented ID of the file generated from a SEQ
DATASETID NOT NULL NUMBER(11) The dataset identifier coming from the dataset table
JEDITASKID NOT NULL NUMBER(11) The task identifier coming from the task table
CREATIONDATE NOT NULL DATE Set when the record is inserted at the first time
LASTATTEMPTTIME DATE Set when the file is tried
LFN NOT NULL VARCHAR2(256) The logical filename
GUID VARCHAR2(64) The GUID of the file
TYPE NOT NULL VARCHAR2(20) The type of the file
FSIZE NUMBER(11) The size of the file
CHECKSUM VARCHAR2(36) The checksum of the file
SCOPE VARCHAR2(30) The scope of the file
STATUS NOT NULL VARCHAR2(64) The status of the file
ATTEMPTNR NUMBER(3) How many times the file has been tried so far
MAXATTEMPT NUMBER(3) How many times the file can be tried at most
NEVENTS NUMBER(10) The number of events in the file
KEEPTRACK NUMBER(1) Set 1 when keeping track of the file usage
STARTEVENT NUMBER(10) The starting serial number used in the file
ENDEVENT NUMBER(10) The ending serial number used in the file
FIRSTEVENT NUMBER(10) The event number which is assigned to the first processed event
BOUNDARYID NUMBER(11) Splitting Input to respect this identifier if not NULL. e.g., used to specify lumi block boundaries
PANDAID NUMBER(11) PandaID of the job which uses the file
FAILEDATTEMPT NUMBER(3) How many times the file failed so far
LUMIBLOCKNR NUMBER(10) Lumiblock Number in the file
OUTPANDAID NUMBER(11) PandaID of the job which produced the file
MAXFAILURE NUMBER(3) How many times the file can be failed at most
IS_WAITING CHAR(1) Set Y if waiting for real co-jumbo jobs
JOBSETID NUMBER(11) jobSetID of the job which uses the file
PROC_STATUS VARCHAR2(64) Processing status of the file

Typical usages

* Get task and dataset which produced a file
SELECT TASKID,DATASETID FROM JEDI_Contents WHERE LFN = ??? AND SCOPE = ??? AND TYPE = ???;
* Get the list of files in a task
SELECT FILEID FROM JEDI_Contents WHERE TASKID = ??? AND TYPE = ???;
* Get the list of files in a particular state and type with a task and a dataset
SELECT FILEID FROM JEDI_Contents WHERE TASKID = ??? AND DATASET = ??? AND TYPE = ??? AND STATUS = ???;
* Get the list of files to be processed
SELECT FILEID FROM JEDI_Contents WHERE TASKID = ??? AND DATASET = ??? AND TYPE = ??? AND STATUS = ??? AND ATTEMPTNR<MAXATTEMPT;
* Update a file
UPDATE JEDI_Contents SET ??? WHERE FILETID = ???;
* Update files in a dataset
UPDATE JEDI_Contents SET SET ??? WHERE DATASETID = ???;

The JEDI_Output_Template table

Description: Table to define how output filename is generated for each output dataset. One production dataset has a single row while one analysis dataset may have multiple rows.

Name Type Comment
OUTTEMPID NOT NULL NUMBER(11) auto-incremented ID of the template generated from a SEQ
JEDITASKID NOT NULL NUMBER(11) The task identifier coming from the task table
DATASETID NOT NULL NUMBER(11) The dataset identifier coming from the dataset table
FILENAMETEMPLATE NOT NULL VARCHAR2(256) The template to generate output filenames
SOURCENAME VARCHAR2(256) source filename to be renamed, if any. Set NULL if renaming is unnecessary
SERIALNR NUMBER(7) The serial number to generate files using the template. When a new file is produced it would have SERIALNR+1 and this field is incremented
MAXSERIALNR NUMBER(7) The maximum serial number which can be used to generate files using the template
STREAMNAME VARCHAR2(20) The name of the stream which is used as a placeholder in jobParamsTemplate
OUTTYPE VARCHAR2(20) The type of the file

Possible example

* FILENAMETEMPLATE = EVNT.01206750._${SN}.pool.root, SOURCENAME = event.pool.root
An output file, event.pool.root, is renamed to EVNT.01206750._XYZ.pool.root

The JEDI_Work_Queue table

Description: Table to define partitions and shares of workload.

Name Type Comment
QUEUE_ID NOT NULL NUMBER(5) unique identifier
QUEUE_NAME NOT NULL VARCHAR2(16) The name of the queue
QUEUE_TYPE NOT NULL VARCHAR2(16) The type of the queue which is mapped to prodSourceLabel in JEDI_Tasks and jobs*Table
VO NOT NULL VARCHAR2(16) The name of virtual organization for which the queue is used
QUEUE_SHARE NUMBER(3) Share for the queue. Set NULL if the queue is not throttled
QUEUE_ORDER UNIQUE NUMBER(3) The queue is evaluated before other queues which have higher ORDER values. If a task or a job passes the selection criteria, it is mapped to the queue and no more queues are evaluated
CRITERIA VARCHAR2(256) Selection criteria written in SQL to map tasks/jobs to the queue. It is applied to JEDI_Tasks
VARIABLES VARCHAR2(256) Valid values to replace bind-variables in CRITERIA
PARTITIONID NUMBER(5) The QUEUE_ID of the partition to which the queue belong. Set NULL if the queue doesn't belong to any partition. Set -1 if the record stands for a partition
STRETCHABLE NUMBER(1) Set 1 if the queue can use shares of other queues when those queues don't have tasks/jobs
STATUS VARCHAR2(64) The queue status

Possible examples

* QUEUENAME = evgensimul, QUEUETYPE = production, ... , CRITERIA = processingType IN (:p1,:p2), VARIABLES = p1:evgen,p2:simul
* QUEUENAME = group, QUEUETYPE = production, ... , CRITERIA = workingGroup LIKE :p1, VARIABLES = p1:GP_%

The JEDI_Events table

Description: Table for the job progress tracking. This table may be used for Event Server as well.

Name Type Comment
JEDITASKID NOT NULL NUMBER(11) JediTaskID of the file which contains the events
DATASETID NOT NULL NUMBER(11) DatasetID of the file which contains the events
FILEID NOT NULL NUMBER(11) FileID of the file which contains the events
STATUS NOT NULL NUMBER(2) The status of the event range. Explained in this section
PANDAID NOT NULL NUMBER(11) PandaID of the job in which the events are processed
JOB_PROCESSID NOT NULL NUMBER(10) identifier of the slave process
DEF_MIN_EVENTID NUMBER(10) The minimum event number which is assigned to the slave process
DEF_MAX_EVENTID NUMBER(10) The maximum event number which is assigned to the slave process
PROCESSED_UPTO_EVENTID NUMBER(10) The event number which the slave process completed so far
ATTEMPTNR NUMBER(3) How many times the events have been retried so far
OBJSTORE_ID NUMBER(3) Identifier of the objectstore endpoint where pre-merged file is stored
EVENT_OFFSET NUMBER(11) The total offset of positional event numbers
IS_JUMBO NUMBER(1) Set 1 if events are processed by a jumbo job
ZIPROW_ID NUMBER(11) ROW_ID of the zip file in the file table. NULL if not zipped
FILE_NOT_DELETED CHAR(1) Set Y if corresponding pre-merged files can be deleted
ERROR_CODE NUMBER(5) Error code for the event range
PATH_CONVENTION NUMBER(2) Convention ID of file path

The JEDI_Job_Retry_History table

Description: Table to keep track of history of job reattempts.

Name Type Comment
JEDITASKID NOT NULL NUMBER(11) JediTaskID of the jobs
OLDPANDAID NOT NULL NUMBER(11) PandaID of the old job
NEWPANDAID NOT NULL NUMBER(11) PandaID of the new job
RELATIONTYPE VARCHAR2(16) The type of relation between old and new jobs
ORIGINPANDAID NUMBER(11) PandaID of the origin job

The JEDI_Process_Lock table

Description: Table to contain locks for JEDI processes

Name Type Comment
PRODSOURCELABEL NOT NULL VARCHAR2(20) The source label, such as managed, user, prod_test, ...
VO NOT NULL VARCHAR2(16) The name of virtual organization
WORKQUEUE_ID NOT NULL NUMBER(5) The work queue identifier
CLOUD VARCHAR2(10) The cloud name
LOCKEDBY NOT NULL VARCHAR2(40) The name of process/thread which uses the lock
LOCKEDTIME NOT NULL DATE Set when the lock is created


Functions of JEDI

This section describes functions of JEDI for production and analysis, and how JEDI would use the database and other info.

Production/Common

This section describes functions for production. Some of them could be used for analysis as well.

Workflow for tasks with input datasets using file-level splitting

Here is simple workflow for a task with input datasets using file-level splitting. Input datasets are split to multiple chunks of files, and one job is generated per chunk.

  1. A task is defined
  2. Input and output dataset information are inserted to the Dataset table (by DEFT if a shared table is used, or by JEDI if own table is used)
  3. Information of files in input datasets are retrieved from DDM to be inserted to the Contents table. Basically LFNs are mandatory but some metadata of each file could be stored in the table to avoid frequent lookup to DDM, such as size, the number of events, and checksum.
  4. JEDI takes a few files from the Contents table and runs the brokerage to choose a site, and generates jobs according to the site parameters, such as the number of cores per slot, memory size, and walltime limit.
  5. Jobs and files are inserted to the Job and File tables
  6. Jobs go through the present workflow (i.e., assigned→activated→...→transferring)
  7. If a job finishes/fails, files used by the job are flagged in the Contents table
  8. JEDI repeats 4.
  9. When all files in an input dataset are processed, the dataset status is changed to done
  10. When all input datasets are processed, the task status is changed to done

Output dataset registration

JEDI or DEFT registers output datasets when tasks are activated. Note that output datasets for analysis are registered by JEDI as shown in this section.

Job rebrokerage

Job rebrokerage is periodically done by killing old jobs after those jobs are assigned to a site but they don't get CPUs at the site for a time period. That is essentially reattempt. For example, analysis jobs are reattempted every 12h if they don't get started in 12h. If jobs are killed by the system due to timeout etc they go to the closed state, while they go to cancelled state if they are killed manually. Generally closed or cancelled jobs don't have log files since they didn't run on WNs. Relation between old and new jobs is recorded in the JEDI_Job_Retry_History. Each row of the Contents table have the attemptNr column to record how many times the file was attempted and the maxAttempt column to limit the number of attempt. Every time job is reattempted attemptNr is incremented and it gives up when attemptNr=maxAttempt. maxAttempt is 3 by default and can be changed by a task parameter, maxAttempt.

Naming of output file

output2.png

With JEDI, naming for output file might be tricky. As shown in the above figure, initially, JEDI would define Job1 and Job2 with File1-2 and File3-4, respectively. Job2 would successfully produce OutFile_00002, while Job1 would fail and JEDI would decide to split it for some reason to Job1' and Job1''. Finally, they would successfully produce OutFile1' and OutFile1''. The issue is what filenames are used for OutFile1' and OutFile1''. There are some possibilities

  1. Job1' uses Output_00001 with an incremented attempt number and Job1'' uses a new name with an incremented serial number, i,e, Output_00001.1 and Output_00003
  2. Both Job1' and Job2'' use new filenames, i.e., Output_00003 and Output_00004
  3. All jobs use temporary output filenames first, and real filenames are assigned in such a way that output files with smaller serial numbers have input files with smaller serial numbers. i.e., OutFile_00001'→RealOutFile_00001, OutFile_00001''→RealOutFile_00002, OutFile_00002→RealOutFile_00003. Then rename filenames in SE
  4. Real filenames are assigned similarly as option C, but no renaming in SE. i.e., PFN and LFN are different.

The downside of options C and D is that real filenames cannot be defined until the task is finished. The simplest solution would be option B, but sequential order of input and output files would be inconsistent and serial numbers in a dataset would not be continuous. Option A has the same problem with inconsistent sequential order between input and output files, and serial numbers have to jump when old jobs are combined to one new job. Option B has been adopted after discussion in ADC meeting.

Internal Merging

JEDI has a capability to merge output files for each task. If the task is configured with mergeOutput=True, jobs are generated to produce intermediate files and go to merging state once they are done on WNs. When all jobs assigned to a site are done, merge jobs are generated at the site to merge those intermediate files. Once the merge job is done and merged files are transferred to the final destination, jobs in merging state go to finished.

Open dataset handling

Some tasks could be submitted before input datasets are frozen. The issue here is that new files may be appended to an open dataset after the task is submitted. If the user wants to execute the task on all files including newly appended files, JEDI keeps the dataset status to 'running', periodically checks with DDM to find new files, adds new files to the Contents table, and changes the dataset status to 'done' if the dataset is frozen and all files are processed in Panda. This implies that the Dataset table requires two status attributes, one for dataset status in DDM and the other for how the dataset is being used in Panda.

Lost file recovery

When some files need to be re-produced a new task is submitted. The task parameter specifies names of lost files in addition to normal task parameters. Then,

  1. taskID is retrieved from the Dataset table using the output dataset name
  2. Jobs which produced lost files are retrieved from Job and File tables using taskID and names of lost files
  3. Input files of the jobs are inserted to the Contents table
  4. The task runs on those input files

New files are added to the existing output dataset or a new output dataset, according to the task definition.

If the input files for the lost files are lost as well (e.g., the dataset has already been deleted), another task which re-generate those input files needs to be submitted (or internally re-generate by JEDI) and tasks are chained.

Protection against lost/broken input files

When JEDI finds that an input file is lost (i.e., the file is not found in DDM although it was there when the task was submitted) or is problematic, the file status in the Contents table is set to 'lost' or 'broken' and JEDI generates new jobs by skipping lost/broken files or updates the dataset status if no files remain.

Automatic priority boosting for quick completion of tasks

Tasks that are 95% complete and have less than 100 jobs to do will get

  • priority boost to 900
  • gshare set to Express
This functionality may move to a TaskWatcher component in future.

Event-level splitting

For event-level job splitting, the Event table is used. The following figure shows an example of workflow with event-level splitting.

evtsplit2.png

Actions of JEDI are as follows

  1. A few files are taken from the Contents table
  2. They are split by N events (N=100 in the figure)
  3. One job is generated per N events
  4. One record is inserted to the Event table for each combination of (file, event range). e.g., (File1,0-99), (File1,100-149) (File2,0-49), (File2,50-149), (File3,0-99)
  5. The status of each event record is set when the corresponding job finishes/fails
  6. If all events in a file are done, the file status in the Contents table is set to 'done'
  7. Step 1 is repeated. The number of events per job is changed if necessary

Event server (event engine)

The detail is described in this blog article and the EventServer twiki.

Task aborting and forced termination

Once taks status is set to TOBEABORTED or TOBEFINISHEDASAP, JEDI sends kill command to queued or running jobs, deletes or freezes output datasets, and updates task status.

Task brokerage

The task brokerage assigns tasks to nuclei per work queue (evgensimul, reprocessing, test, mcore, group, others, ...). One nucleus is chosen for each task based on data locality, total RW and available storage size in the nucleus, nucleus status and output file transfer queue to the nucleus. RW is "remaining work" and is calculated for each task using

  • nEvents … The total number of events in the task.
  • nEventsUsed … The number of events already used.
  • cpuTime … Described in this section.

Here is the list of selection criteria; nuclei not fulfilling any of the criteria is skipped:

  • The nucleus status must be 'ACTIVE' (All nuclei are ACTIVE for now. Could be set to another status to be skipped by task brokerage in the future).
  • The size of free space, space_free+space_expired from DDM - (totalRW in the nucleus)/(corePower=10)/24/3600×0.25GB, associated to the output endpoint type (e.g. DATATAPE→XYZ_DATATAPE) must be larger than 5TB.
  • If the total input size is larger than 1GB, totalInputSize×10% must be available in the nucleus.
  • If the total number of input files is larger than 100, totalNumInputs×10% must be available in the nucleus.
  • The nucleus must be able to execute the workload.
  • The queue of output files transferring to the nucleus must be below 2000 files.

If multiple nucleus meet the selection criteria, one nucleus is chosen based on the weight:

  • totalRW … The total sum of task RWs in the nucleus. Their priorities are equal to or larger than that of the assigned task. If totalRW is less than 50, totalRW=1.
  • availableInputSize … The available input size for the assigned task in the nucleus.
  • totalInputSize … The total size of input for the task.
  • tapeWeight … 0.001 if input is available only on TAPE in the nucleus. Otherwise, 1.

Job brokerage explained

This is the general production job brokerage flow. For simplification, we will leave out the details for particular cases as pre-assigned sites and test/validation tasks.
  1. Generate the list of preliminary candidates, from one of the following:
    • queues in the cloud for non-WORLD tasks
    • all queues for WORLD tasks
    • exclude any queue with case-insensitve 'test' in the name
  2. Generate the list of T1 queues:
    • T1s and hospital queues for non-WORLD tasks
    • Nuclei for WORLD tasks
  3. Filter out preliminary candidates that don't pass any of the checks:
    • Offline queues
    • For WORLD tasks: skip blacklisted satellite links or satellites with over max_queued (currently 300) files queued to the nucleus
    • If priority>900 or scout job: remove non T1 queues
    • If priority>800 or scout job or merging job or mergeOutput job: remove inactive queues (queues where no jobs started in the last 2 hours)
    • Zero Share which is defined in schedconfig.fairsharepolicy. For example type=evgensimul:100%, in this case only evgen or simul jobs can be assigned as others have zero shares
    • If IO Intensity (input size / execution time) of the task is larger than 200 kB/s: the total size of missing files must be less than 10GB and the number of missing files must be less than 100. That is, if a queue needs to transfer more than 10GB of input files or more than 100 input files, the queue is removed
    • Core count
    • ATLAS Release
    • RAM Count
    • DISK size check, free space has to be over 200GB
    • Blacklisted DDM endpoints
    • Walltime
    • IPConnectivity
    • Event Server settings
    • Too many transferring jobs: skip if transferring > max(transferring_limit, 2 x running), where transferring_limit limit is defined by site or 2000 if undefined
    • T1 Weight, see details
    • Queues without pilots
    • if processingType=urgent or priority >= 1000, the network weight (see next subsection) must be larger than or equal to NW_THRESHOLD × NW_WEIGHT_MULTIPLIER which are defined in gdp config
  4. Calculate brokerage weight for remaining candidates:
    • Initial weight based on running vs queued jobs. If nRunning is less than 20 and the number of running/submitted batch jobs at PQ is larger than nRunning, min(20, nBatchJob) is used as nRunning for bootstrap.


    • Take data availability into consideration

    • For WORLD tasks, apply a network weight based on connectivity between nucleus and satellite, since the OUTPUT files are collected in the nucleus (see next subsection)

  5. Apply further filters
    • Skip site if activated + starting > 2 x running
    • Skip site if defined+activated+assgined+starting > 2 x running
    • If all sites are skipped candidates with the 3 best weights go to the next step.
  6. Remaining candidates are sorted by weight. For WORLD tasks, the best 10 candidates are taken.

Network weight

The network data sources are Given the accuracy of the data and the timelapse from decision to action, the network weight only aims to provide a simple, dynamic classification of links. It is currently calculated as:
where the queued and throughput weight are calculated as in the plot below
  • queuedWeight:
    queued.png

  • throughputWeight:
    throughput.png

It uses the most recent available data, so preferably data of the last 1h, in not available of last 1d, if not available of last 1w. FTS Mbps are used, which are filled from Chicago elastic search.

If there are no available network metrics, the AGIS closeness (0 best to 11 worst) is used in a normalized way:

Periodical task execution

Currently each HC job is submitted independently, which is very inefficient. In the new system, one task would be defined per cloud or site to generate jobs periodically. A special plugin to generate jobs would be required. Also a special file status in the Contents table would be required.

Fairshare

It has been proposed to have a fairshare mechanism in one of more upstream components, while it is done currently in the job dispatcher. One possibility is that JEDI could generate jobs based on a fairshare policy. However, the fairshare mechanism in the job dispatcher is still required since fairshare is defined for the number of running jobs and generated jobs may not start running immediately if they require input file transfers. Although job generation will take fairshare into account and that will help for global fairshare, implementation of fairshare management would not be changed significantly.

Different treatment of input datasets

In task request, distinguish one-time input and many-time input. One-time input is treated as of today, ie., send a subset to proddisk with a short lifetime and deletes after usage. Many-time input is treated as long-standing at T2s, ie., copy the whole(?) dataset to datadisk without a lifetime but as 'secondary', or with a longer lifetime as 'input', This will need some more consideration, the definition, the size, the workflow with Rucio, etc. Sounds similar to PD2P with JEDI, but somewhat different, especially in how to identify the datasets used by many tasks.

Tasks with incomplete log datasets

When jobs fail at T2, log files are immediately added to tid datasets before they are transferred to T1. This is required to immediately trigger reattempt but causes incomplete log datasets when transfers of log files fail. JEDI would clean this kind of incomplete datasets before generating jobs for subsequent tasks. Or new job status could be introduced to PanDA to transfer log files for failed jobs.

Validation in post-processing phase

For example, availability of output files in tid datasets may be checked. Also, consistency between contents in tid datasets and Dataset_Contents may be checked.

Random seed assignment

Each task can specify the initial value for the random seed and then it is incremented per job. If a job used a random seed value and failed, the value is reused by the next job regardless of whether the old job is combined or split. For example,

  1. A job failed with 2 input files and a random seed N at site X
  2. JEDI retries the 2 input files
  3. Site Y is free but each job can take only 1 input file due to some site specification
  4. Two jobs are generated at site Y
  5. The first job reuses the random seed N and the other uses a new random seed

If reproduciblity is important, i.e., the retried job must use the random seed of the failed job, the task needs to use SEQNUMBER instead of RNDMSEED.

T1 weight

Each task can define t1Weight in task parameters to send more jobs to the nucleus. If t1Weight=0 or unspecified and task.ioIntensity≥500, cloudconfig.weight is used. If t1Weight=-1, all jobs go to the nucleus. If task.currentPriority≥900, t1Weight=max(t1Weight,100).

Timeout values for production jobs

  • 30 min for waiting jobs
  • 1 hour for pending jobs
  • 21 days for running jobs
  • 2 days for activated or starting jobs
  • 6 hours for activated even/simul jobs at T2 when nRunning=0 or nActivated/nRunning>0
  • 6 hours for activated evgen/simul jobs at T1 except US and ND
  • 12 hours for assigned jobs
  • 4 hours for defined jobs
  • 2 hours for heartbeats from running jobs
  • 3 hours for holding jobs
  • 30 min for sent jobs
  • schedconfig.transtimehi days for transferring jobs with priorities>=800
  • schedconfig.transtimelo days for transferring jobs with priorities<800
  • 4 hours for activated or starting jobs with priorities>=800 at sites where sitedata.laststart is older than 2 hours

Direct replica lookup for assigned jobs

Jobs usually go from assigned to activated state when a callback is received from rucio that a dis dataset is complete. In case the callback is lost, or some files take much longer than others, there is a periodic (every one hour) direct lookup if the files for each activated job with priority>800.

Final status of Event Service jobs

ES jobs go to status=merging if they successfully processed one or more events successfully. Then they go to status=finished or failed (jobsubstatus=es_merge_failed) once corresponding ES-merge jobs are finished or failed.

ES jobs have status=failed when

  • the number of attempts for some event ranges reached the limit, or
  • the pilot reported errors with jobsubstatus=pilot_failed (jobsubstatus=pilot_failed), or
  • heartbeat was lost without successful event ranges (jobsubstatus=es_heartbeat)

ES jobs have status=merging when they have one or more successful event ranges and

  • they were retried to run unprocessed even ranges (jobsubstatus=es_retry), or
  • all event ranges were processed and ES merge jobs were generated (jobsubstatus=es_merge), or
  • all event ranges assigned to the job sets were done but they didn't generate ES merge jobs since other jobs were still running (jobsubstatus=es_wait), or
  • they didn't process any event ranges but were retried since the num of attempts didn't reach the limit (jobsubstatus=es_inaction), or

ES jobs have status=closed

  • they were killed since an associated ES job generated an ES merge job(jobsubstatus=es_killed), or
  • they were killed before being dispatched to WN (jobsubstatus=es_unused), or
  • they were killed since associated ES jobs failed (jobsubstatus=es_aborted)

ES jobs have status=cancelled when

  • they were killed by human using the kill command

ES_job_final_state2.png ES_job_merge_state.png

Jumbo jobs and co-jumbo jobs

If a task is configured to enable event service and sets the nJumboJobs parameter to an integer, the task can generate jumbo jobs. Each jumbo job can process any events in the task across all jobsets. Note that each jumbo job doesn't have to process all events in the task and events are delivered on-demand to CPUs during the job is being processed on the CPUs, i.e., events (workload in other words) are not pre-assigned to resources. From a single task, one jumbo job is generated for each PQ which has useJumbo in schedconfig.catchall. Co-jumbo jobs are very similar to conventional event service jobs. Each co-jumbo job can process only the events in a jobset and run on PQs which enables event service with shcedonfig.jobseeds following the standard brokerage procedures for event service. Jumbo jobs and co-jumbo jobs share events in the same task, i.e., they are essentially consumers with different sizes. Jumbo jobs can be generated when the PQ sets useJumbo and is lack of jumbo jobs, even if other PQs are full and cannot accept more co-jumbo jobs. In this case, fake co-jumbo jobs are generated with computingSite=WAITING_CO_JUMBO and they are in waiting state until real co-jumbo jobs are generated or all events in corresponding jobsets are processed. When jumbo jobs process events, those events go to finished state. Those events are changed to done when co-jumbo jobs are finished, i.e. jumbo jobs don't change event status to done when they are finished. A cron hourly checks co-jumbo jobs in waiting/assigned/activated state and finishes them if all events in corresponding jobsets are done. Then co-jumbo jobs generate merge jobs similarly as conventional event service jobs. When the task is closed to 100% done, it unsets nJumboJobs not to generate jumbo jobs with a small number of events. This machinery is disabled if the task sets the site parameter.

Jumbo job switcher

JumboWatchDog is a sub-component of the WatchDog agent which runs every 10 min to enable or disable jumbo jobs in event service tasks. It is possible to customize behavior of JumboWatchDog using following parameters with component=jumbo_dog in GDP config I/F:

Name | Default Value | Description |

JUMBO_MAX_TASKS 1 The total number of active tasks with jumbo jobs
JUMBO_MIN_EVENTS_DISABLE 100000 The minimum number of remaining events for each task to generate jumbo jobs
JUMBO_MIN_EVENTS_ENABLE JUMBO_MIN_EVENTS_DISABLE × 10 The minimum number of remaining events in each task which can enable jumbo jobs
JUMBO_MAX_EVENTS JUMBO_MAX_TASKS × JUMBO_MIN_EVENTS_ENABLE / 2 The upper limit on the total number of remaining events in active tasks with jumbo jobs
JUMBO_PER_TASK 1 The number of jumbo jobs per task
JUMBO_PER_SITE 1 The number of jumbo jobs per site for each task
JUMBO_MAX_CURR_PRIO 500 The upper limit on current priorities of tasks which can enable jumbo jobs

JumboWatchDog tries to find event service tasks which have not enabled jumbo jobs yet, and have

  • more remaining events than JUMBO_MIN_EVENTS_ENABLE,
  • lower priorities than JUMBO_MAX_CURR_PRIO,
  • transUse which is available at one or more jumbo-job-enabled PQs,
  • and taskParam.inFilePosEvtNum=True, nFilesPerJob=1, or nEventsPerJob<nEventsPerInputFile which is required since jumbo jobs work with in-file positional event numbers,

when the number of active tasks with jumbo jobs is less than JUMBO_MAX_TASKS and the total number of remaining events in active tasks with jumbo jobs is less than JUMBO_MAX_EVENTS. JumboWatchDog enables jumbo jobs in a task with the largest number of remains events among the tasks which meet the above criteria, and then repeats the procedure until the system has enough tasks and events. At the same time, JumboWatchDog disables jumbo jobs in tasks which have less remains events than JUMBO_MIN_EVENTS_DISABLE.

Memory probe and brokerage

  • A MemoryMonitor running in the pilot records the maxrss and maxpss of each job, summed over all processes.
If ramunit='MBPerCore' then the task ramcount is filled as follows:-
  • Once the first 5 jobs are finished, the task ramcount is set to (max(maxpss)-baseramcount)/corecount, i.e. the RSS per core, allowing for some baseramcount independent of corecount.
  • Brokerage calculates the minramcount, customized to the site corecount, as baseramcount+(ramcount*corecount)
  • This is compared to AGIS set values of maxrss and minrss

Sites that kill on vmem should stop doing that if they can. Otherwise they need to set maxrss sufficiently small to attract jobs such that whatever vmem is measured by the batch system is below the threshold.

The baseramcount could perhaps be deduced from the job pss and rss values, and thus set by jedi. (insert maths) It is not important for single core tasks, as it cancels out, so we leave it as zero.Currently it is fixed per processingtype, but may need tuning

processingType baseramcount
simul 1000
pile 1500
reco 1000

Memory increase in job retry

When a job fails because it ran out of memory (e.g. memory leaks), the ramcount setting for successive jobs will be increased. The new value will be set to the next value in the list [1000,2000,3000,4000,6000,8000] above max(maxPSS, current ramCount).

When 30% of the jobs in a task have failed because of insufficient memory, the setting will be increased for the whole task.

IO intensity

The task parameter iointensity is set from the first 5 finished jobs. It is the total input plus output size divided by the walltime and the corecount. So the unit is kB/s per core, where the percore makes this scalable to run the task on different numbers of cores. Perhaps it should be per HS06 but not so critical.

This is some measure of the LAN traffic of stage in and out, which can be used to choose lower io jobs, e.g. evgensimul. The disk io on the node is also interesting, but for another day.

HS06sPerEvent

The cpuconsumption of the job with 95th percentile of (endTime-startTime)*corepower from the first 5 finished jobs, together with the AGIS site HS06 corepower and a safety factor, is used to set HS06sPerEvent.
cpuTime = max(0,endTime-startTime-baseTime)*corePower*coreCount*cpuEfficiency/NEvents*1.5
where cpuEfficiency defaults to 90% basewalltime represents the part of the job not scaling with cpu power, default 600s. This is used to get the expected walltime on potential sites during brokerage. basewalltime represents the part of the job not scaing with cpu power.
wall = cpuTime/corePower/coreCount/cpuEfficiency*nEvents+baseWalltime
cpuEfficiency can be set to 0 to disable scaling with nEvents. In this case
wall = baseWalltime

Express input/output transfers

processingType=urgent or priority>=1000 makes the FTS transfers of inputs and outputs use the express activity.

processingType=urgent also ignores fairshare.

nGBPerJob

This setting in the request interface is not the input GB per job. The disk usage is estimated as inputsize + max(outdisksize*inputsize/1MB, 2.5GB) + workdirsize. JEDI tries to generate jobs so that the estimation for each jobis less than NG.

tgtMaxOutputForNG

The target size for the largest output format. NG is reset after the scouts in order to get towards this value. Often it is set to 10GB to have large outputs, but avoid hitting the hard limit in athena. Works with maxWaltime (hrs) so that the jobs do not get too long.

Task retry

Tasks can be retried if they are in finished or exhausted state. Child tasks are automatically retried when the parent task is retried unless they are in aborted, broken, or failed state.

Job throttling

The job throttler helps to control the job generation and the amount of jobs that can sit in the queue. The job throttler will decide if there are enough jobs (SKIP) or if more jobs are needed (PASS). In the case of a PASS, it will also calculate the maximum number of jobs that can be submitted. The SKIP/PASS is decided based on the nQueued > 2 * nRunning condition and on lower and upper limits that can be set in the DPA configuration page.

The job throttler functions at the workqueue resource type (SCORE, MCORE, etc.). Currently there is one exception for Event Service, where it works at workqueue level because tasks are not split by resource type, i.e. MCORE tasks can generate SCORE and MCORE jobs, which breaks the general use case.

The maximum number of jobs is calculated with these rules:

- Normal case: (500 < remaining jobs to the 2nRunning < 600) / 2

- A max queued cap exists: cap/5

- If there is a min queued limit and you are below 0.9*queue limit: max(queue limit/20, normal case). In this case the submission will not be locked and potentially all JEDI job generators could be generating jobs at the same time

Special permission

Special permission can be set for each user by appending the following letters in the GRIDPREF field of the ATLAS_PANDAMETA.users table:
  • k : the user can download key-pairs
  • p : the user can retrive proxies
  • s : the user has super user privilege so that personal or group analysis tasks can jump over other tasks when generating jobs
  • g : the user has super group privilege so that group analysis tasks can jump over other tasks when generating jobs

Analysis

This section describes analysis specific functions. Instructions for JEDI testers can be found here.

Workflow for analysis

  1. A task is submitted with an input dataset container
  2. Information of constituent datasets in the container are inserted to the Dataset table. i.e.. one record per dataset.
  3. Information of files in the datasets are inserted to the Contents table
  4. JEDI takes a few files from the Contents table per dataset and runs the brokerage to choose a site
  5. If a new output dataset is required at the chosen site, it is created and is added to the output dataset container
  6. Jobs are generated according to the site parameters
  7. JEDI repeats 4-6
  8. When all files in an input dataset are processed, the dataset status is changed to done
  9. When all input datasets are processed, the task status is changed to done

There would be an upper limit on the number of sites where jobs are assigned.

Using scout jobs

Scout jobs have been introduced to analysis as well so that job splitting is dynamically optimized based on memory consumption, output size, execution time, etc.

Output merging

If the task is submitted with --mergeOutput and jobs successfully produce pre-merged files, their status will change to merging and those jobs will wait until other jobs at the same site are finished/failed/cancelled. Once all jobs at the site are done, merge jobs will be generated with priority=5000. When merge jobs successfully finish, merged files are added to output dataset containers and job status will change from merging to finished. This means that pre-merged files are hidden from users. Merge jobs themselves are tried at most three times. If the final attempt for the merge job fails, the status of merging jobs will change to cancelled and input files will be retried.

Task retry and resubmission

One can re-execute tasks using pbook or pathena/prun. When pathena/prun is executed with the same outDS, an old task is resubmitted to run on files which were unused or failed in the previous attempt. You can add or use a new input if necessary. On the other hand, retry with pbook runs only on the same input.

A typical usecase for resubmission is that a user executes a task on a fraction of files in a dataset and then executes the same task on the rest of files. When tasks are resubmitted, some task parameters can be implicitly changed, such as site, cloud, includedSite, excludedSite, nFilesPerJob, nGBPerJob, nFiles, nEvents, and ignoreMissingInDS. Other parameters are not changed even if they show up in the cliParams field. If you need to change input sandbox, use the --useNewCode option.

Note that some parameters like nEvetsPerJob cannot be changed by retry or resubmission since they are evaluated only when the original task is submitted and some database records are inserted based on those parameters. If you want to change those parameters you have to submit a new task.

Broken tasks

Tasks go to the broken state due to the following reasons:

  • Input datasets are unknown
  • Build job failed three times when task are in the scouting state
  • No scout jobs succeeded

Once tasks are broken, they cannot be retried or resubmitted by default since it is rather useless to simply repeat problematic tasks without changes (such as fixed code, new task parameters, etc). If code and/or task parameters are changed, they are internally regarded as new tasks. Each task must has a unique outDS. Therefore, outDS of broken tasks cannot be reused by default since you have to use a new outDS when submitting a new task. However, you can resubmit broken tasks using --allowTaskDuplication, but make sure that

  --allowTaskDuplication
                        As a general rule each task has a unique outDS and
                        history of file usage is recorded per task. This
                        option allows multiple tasks to contribute to the same
                        outDS. Typically useful to submit a new task with the
                        outDS which was used by another broken task. Use this
                        option very carefully at your own risk, since file
                        duplication happens when the second task runs on the
                        same input which the first task successfully processed

Input data transfer

Basically analysis jobs go to data, i.e., they are sent to sites where input data are already available and thus they don't transfer input data. However, if all sites with data have one or more closed jobs from the same task in last 6h, which implies that those sites were busy and jobs were not able to get CPU for last ANALY_TIMEOUT hours, new jobs are sent to idle sites together with input data.

Throttled jobs and tasks

If a user has more running jobs than CAP_RUNNING_USER_JOBS, activated jobs go to throttled so that job slots are released to other users. They are automatically changed to activated. Otherwise, they are closed after 7 days.

If the total size of staging/transferring data from TAPE/DISK is larger than PRESTAGE_LIMIT for a user, tasks for the user with data staging/transfers are throttled until the total size drops below the limit. This limit is required to protect network and storages. Note that existing jobs can run even if tasks are throttled . For example, when one task uses 100 input datasets and 1 dataset is available only at busy sites, the first jobs go to data and thus jobs for the 99 datasets keep running there even if the task is throttled for 1 dataset.


Software structure

Notes on software design

Plugins to generate jobs based on various usecases

For maintainability it is good to have one plugin to generate jobs for each usecase. One typical case would be that one plugin is for CMS analysis and CMS colleagues maintain it. Usecases would be

  • ATLAS production
  • Athena-based analysis
  • ROOT-based or general analysis
  • PAT tool-based analysis? (eg. mana and EventLoop)
  • AMS analysis? (the above general analysis plugin might be enough)
  • CMS analysis
  • HammerCloud
  • ...


Major changes from old system

analysis

See JEDI-based Analysis.

Appendix


Links

No permission to view AtlasComputing.ProdHeader


Major updates:
-- TadashiMaeno - 17-Dec-2012



Responsible: TadashiMaeno

Never reviewed

Topic attachments
I Attachment History Action Size Date Who Comment
PNGpng queued.png r1 manage 6.1 K 2016-05-11 - 11:15 FernandoHaraldBarreiroMegino  
PNGpng throughput.png r1 manage 4.7 K 2016-05-11 - 11:15 FernandoHaraldBarreiroMegino  
Edit | Attach | Watch | Print version | History: r262 < r261 < r260 < r259 < r258 | Backlinks | Raw View | WYSIWYG | More topic actions
Topic revision: r262 - 2019-11-21 - RodneyWalker
 
    • Cern Search Icon Cern Search
    • TWiki Search Icon TWiki Search
    • Google Search Icon Google Search

    PanDA All webs login

This site is powered by the TWiki collaboration platform Powered by PerlCopyright & 2008-2019 by the contributing authors. All material on this collaboration platform is the property of the contributing authors.
Ideas, requests, problems regarding TWiki? Send feedback