Job Splitting Design Document

document revised for Ganga 5.0

the original Ganga 4.1.0 content may be retrieved as revision 3 (r3) of CERN wiki site

GPI interface for splitting

Example

Example of an LHCb job.

Note: the GPI shortcuts may be implemented so that LHCbDataset would appear as a simple list of files and the splitter as a simpler object (such as a number of files or so). Here we ignore any possible shortcuts to better explain the structure of the interface.

j = Job(application=DaVinci(...))
j.inputdata = LHCbDataset(['file1','file2'])
j.splitter = LHCbDataSplitter(file_bunch_size=1,event_bunch_size=500)

# at this point subjobs do not exist yet, so the following asserts hold

assert(j.subjobs == [])
assert(j.parent is None)

# the submission step creates the subjobs according to the split policy
# if the splitter is not compatible with the dataset and application then
# the submission fails (a rollback to the point before submission)
j.submit()

# now the subjobs have been created

assert(j.subjobs != [])

for s in j.subjobs:
    assert(s.parent is j)

GPI operations on subjobs

Subjobs may not be removed or submitted individually.

assert(not j.subjobs[i].remove())
assert(not j.subjobs[i].submit())

The copy operation creates another job (not subjob):

j2 = j.subjobs[i].copy()
assert(j2.parent is None)

Subjobs may be killed or resubmitted individually (if supported by the backend implementation):

assert(j.subjobs[i].kill())
assert(j.subjobs[i].resubmit())

Access to subjobs, indices etc.

Job id is always an integer (for subjobs as well). The id is relative to the container in which the job lives:

  • a job repository for top-level jobs,
  • a parent job for subjobs.

The j.subjobs attribute is a repository slice object.

The following holds:


#The ids of the subjobs are [0,...,N] where N is defined by the splitter.
#The ids are ordered (ascending).
#The subjobs are accessible from the parent job.

for i in range(len(j.subjobs)):
 assert(j.subjobs[i].id == i) # FIXME: this may fail with Oracle remote job repository

for s in j.subjobs:
 assert(j.subjobs(s.id) is s)

# Jobs and subjobs are accessible from the jobs registry.
# Tuples may be used to index the subjobs.

assert(jobs(j.id) is j)

for i in range(len(j.subjobs)):
 assert(jobs((j.id,i)) is j.subjobs[i]) # FIXME: this may fail with Oracle remote job repository

# Index-tuple for any job may be created automatically by a convenience
function:

 def fully_qualified_id(j):
  index = []
  while j:
   index.append(j.id)
   j = j.parent
  index.reverse()
  return index

# For convenience the tuples may be also represented as dot separated strings,
# for instance (i,k) is equivalent to "i.k"

Subjob status transitions

The general rule is that if there exists a monitorable subjob (submitted,running) then the master job status is also monitorable. This optimizes the internal monitoring.

The master job status (j.status) is correlated with the subjob status in the following way:

  1. if there are submitted or submitting (1) subjobs then the master job is submitted, else
  2. if there are running or completing subjobs then the master job is running, else
  3. if there are failed subjobs then the master job is failed, else
  4. if there are completed subjobs then the master job is completed, else
  5. all subjobs must be killed so the master is killed.

(1) this is the case when the subjob is resubmitted

Notes:

  • points 1 and 2 may be swapped in order to achieve behaviour requested by https://savannah.cern.ch/bugs/?29843
  • on any update of the status of a subjob the master job status must be recalculated (using all other subjob statuses) with the help of the procedure above.
  • the transitory states (submitting, completing) are not applied to the master job because they are not monitorable (so it would prevent to monitor other subjobs in case the transitory state would be left over at the master due to some problem)
  • in a possible implementation improvement in LCG backend, the subjob may stay in submitting state until the NODE_ID is harvested from monitoring and the backend.id is retrieved, current implementation blocks the submission until all subjob ids are harvested. Note however that a top-level job (nonsplit or master) may not stay in submitting state because submitting is not a monitorable state. For the same reason in default implementation of the emulated bulk submission mode this is not possible for a subjob (it would not be selected for monitoring action).

transitions on subjob resubmit()

Because any subjob may be resubmitted the master job may go from any state to submitted. This means that all such transitions must be allowed. In case of subjob resubmit() there is no need that the master goes to the submitting state (however internally locking must be used in order to prevent others (e.g. monitoring loop or another session) to modify the subjob at the same time). See GangaThreadsAndConcurrentJobAccess. Since the status of the master is always derived from the status of the subjobs, it is enough to lock access to subjobs (unless a particular backend's implementation involves special actions which should not be done concurrently).

Subjob submission

Relevant discussions in savannah:

splitting

The splitting procedure is triggered by master_job.submit():

  1. splitter is used to create internally a list of subjobs and application is configured
  2. subjobs are submitted in bulk to the backend
  3. if backend does not support the real bulk submisson then it is emulated (internal loop in ganga client)

Note: this is a client-side splitting. Server-side splitting scenario is not currently supported.

error handling

The master_job.submit() handles errors in the following way:

  1. if failures occur before the subjobs are submitted to the backend (splitter or application or runtime handler errors) then the JobError is raised and the master is in the new state:
    • status=="new"
    • subjobs==[]
  2. if bulk (or emulated bulk) submission to the backend fails the master is in the submitted state, subjobs!=[]. The error reporting depends on the keep_going hint.

The master_job.submit(keep_going=True/False) is a hint on subjob submission behaviour and reporting. It follows the logic of collective operations using slices and is roughly equivalent to master_job.subjobs.submit().

Note: a subjob may never be in the 'new' state

emulated bulk case

Emulated bulk backend submit (e.g. Local or Batch):

  • if keep_going is True then any failures are ignored and submission of all subjobs is attempted, master_job.submit() always suceeds
  • if keep_going is False then submission is stopped at first subjob which fails and JobError is raised
  • not submitted subjobs or the ones which failed a submission are left in the new state Note: this is different from the behaviour of simple jobs (which go back to the new state if can't be submitted). The difference is deliberate: subjobs cannot be edited and are created by the splitter automatically. For the simple job the requirement to go to the failed state would be too heavy as it would force a user to make a copy in order to modify the parameters and submit the job again. For the subjob it is fine, because the subjob may be attempted a resubmit in case of transient backend problems, or a splitter should be modified (and a new master created) or the subjob may be copied out of the master and modified.
  • currently if all subjobs would be in new state after submission the entire master job is rolled back to the new state

real bulk case

Real bulk backend submit (e.g. gLite)

Since ganga does not have the control how the bulk submission is implemented by the backend service no special behaviour may be guaranteed.

If ganga is able to detect while submission that some subjobs failed to submit then the error reporting should follow the same rule as for emulated bulk:

  • if keep_going is True then master_job.submit() always suceeds
  • if keep_going is False then JobError is raised
  • not submitted subjobs or the ones which failed a submission are in the 'failed' state

It may happen that bulk submission failures for subjobs are detected via the monitoring loop. In such cases the subjobs will first be submitted and then failed.

Subjob monitoring, kill, resubmit ...

Collective monitoring and killing of subjobs is implemented via a bulk or emulated-bulk backend interface.

kill()

Any errors are reported according to the keep_going hint. If bulk backend interface has keep_going==True behaviour then emulated bulk operation may be needed to implement the keep_going=False hint. This may affect the performance. The reverse case is not a problem from the performance point of view.

Hence keep_going=True is guaranteed to be always faster.

resubmit()

It may be hard (we saw it in LCG backend) to implement individual subjob resubmission (because LCG does not support it directly so it must be emulated). The result is that some subjobs are operated with native LCG bulk interface while others in the emulated mode.


GangaSplittersInternalJunkToBeReviewed

-- JakubMoscicki - 14 Jun 2006

Edit | Attach | Watch | Print version | History: r7 < r6 < r5 < r4 < r3 | Backlinks | Raw View | WYSIWYG | More topic actions
Topic revision: r7 - 2008-02-28 - JakubMoscicki
 
    • Cern Search Icon Cern Search
    • TWiki Search Icon TWiki Search
    • Google Search Icon Google Search

    ArdaGrid All webs login

This site is powered by the TWiki collaboration platform Powered by PerlCopyright & 2008-2022 by the contributing authors. All material on this collaboration platform is the property of the contributing authors.
or Ideas, requests, problems regarding TWiki? use Discourse or Send feedback