Concurrent Job Access in Ganga 5.0

This is a summary of the discussions at CERN the 27.11.2007 and 28.11.2007.

We identified that in Ganga 4.4 there are a number of race conditions, both within a single Ganga session (concurrent GPI and monitoring threads) and in case of multiple sessions. For example:

  • j.kill() operation in GPI thread may overlap with the backend.updateMonitoringInformation() call (for the same job) in the monitoring thread
  • the advisory locking mechanism in the repository does not protect two concurrent sessions from doing backend.updateMonitoringInformation() twice on the same job

Alexander: If two (monitoring) threads ask to update monitoring information for the same job it should not lead to a race condition as long as the exchange of information goes through the same repository object which methods are thread-safe. If two concurrent Ganga processes try to update repository the atomicity of this operation is guaranteed either by the backend DB (remote repository) or the file locking mechanism (local repository). So the net effect of two concurrent calls would be as if the polling time for that particular backend is shortened. I would consider all other side effects if any as bugs in our implementation. In order to resolve possible problems with j.kill() operation I would recommend implementing updateMonitoringInformation() method so that:

  • first it checks job status set in the repository (possibly by another thread/process); if the status is completed or failed it doesn’t proceed;
  • if it gets empty/unrecognizable response from the backend (job was killed or connection to the backend is down) it does not change the status of the job..

Session locks

Implemented as described here since 5.5.0

In order to cope with these issues we want to introduce the mandatory locking at the master job level between concurrent ganga sessions with the single writer multiple readers principle. The mechanism works as follows:

  • a session tries to acquire a write lock for a particular job only if it is needed, i.e. in the following cases:
    • a user action i.e. job is modified by attribute assignment or a method call submit()/resubmit()/fail()
    • job is monitorable and monitoring loop is on
  • the lock guarantees that there is only one session which modifies the job (or a master job and all its subjobs)
  • when another session is started it checks which jobs are locked and will not try to monitor them at all (ignore), user actions will fail with exception
  • the other session may periodically check if the locks of monitorable jobs have been released by the original session, if yes, the jobs are locked and updated before entering the monitoring thread
  • when the session exits (or is deactivated) the lock is released
  • there is a sensible timeout mechanism to remove leftover locks in case of abrupt session termination

Since the session locks guarantee that only one session is modifying a job at the same time, the remaining discussion on concurrent job access from GPI and monitoring thread is simplified.

The session lock for the job is accessible as j.session_lock attribute (not in GPI)

See below for the proposed implementation details in case of local repository.

Ulrik: We need to think very carefully about how to implement some kind of file based session locks. We know from previous experience that it is hard to get this correct on distributed file systems. I suggest that the session basded locking should only be implemented if we carefully review what locking we are lready performing in the file system.

Alexander: To my mind we should use full power of the atomic updates provided by the AMGA/repository interface (which may be conditional if required) to resolve any race conditions issues rather than introduce additional locking mechanism. For example, in implementation of the j.submit() method we have to retrieve job status from the repository before any further action. If it is set to submitting or any subsequent state no action should be done except possibly the warning that job has already been submitted and returning “success”. Similar solutions can be provided in case of other user actions like j.resubmit() and j.fail().

In order to deal with the issue of job attributes assignment within concurrent Ganga sessions I would propose to scrap current locking mechanism (in which the last started session is effectively the only writer), but to use long-awaiting time stamps instead. If timestamp of the last job modification recorded in the repository (as a job metadata for performance reasons) is newer than the timestamp of a job object within current session than an attempt to change the job has to raise a warning or even error “job has been modified outside current session”. The same warning/error can be issued in case of j.submit() operation.

I would also recommend that within each thread/process the status of a master job is derived dynamically from the status its subjobs recorded in the repository whenever it is required rather than is set up in the repository by a selected “writing” process or thread. In turn the subjob status like status of any regular job can be updated by any monitoring thread. Since on the repository level the subjobs are treated as separate entries (with the atomic update mechanism in place) I believe this approach will relax most of the problems related to the inconsistent master job status.

I recognize however that in some cases we need a sort of locking mechanism in place. In particular it may be required for the automatic output retrieval and/or merging operations. I believe that the conditional update mechanism with the introduction of time stamps can solve this problem in most efficient and safe (from the point of view of synchronization between different sessions and creating abandoned locks) way. For example, internal job state in the repository (not necessarily propagated to job object level) like getting output and corresponding timestamp may provide such a solution if corresponding j.getoutput() method will:

  • check if getting output status of a job is recorded in the repository;
  • check the relevant timestamp;
  • start getting output only if job is not in the getting output status or the timestamp is too old that may indicate that previous update failed.

Please note that within this solution the job status reported to Ganga can still be running if we want to. So in contrast to the proposal followed below I wouldn’t diminish the importance of the internal states like submitting, getting output etc, but rather use them where appropriate. Moreover in contrast to this proposal I would introduce more “democracy” among threads and processes and diminish the concept that particular thread/process owns particular job state.

Internal job locking.

The basic assumption is that a job in new state is owned by GPI thread. All other states are owned by monitoring thread. However there are the following exceptions:

  • j.kill() may be called by GPI thread in the middle of backend.updateMonitoringInformation() call in the monitoring thread
  • j.fail(): idem
Ulrik: The completed and failed states are also owned by the GPI thread.

Even worse, if j.kill() or j.fail() is called when the job is already scheduled in the monitoring thread, the race condition may occur as the job is killed and monitored at the same time.

To protect from this an internal job lock (based on threading module) should be used. This lock should be available at subjob level. Critical sections in the monitoring loop (typically the single iteration of updateMonitoringInformation(jobs)) function should be protected by the lock. If the lock cannot be acquired, this means that GPI thread is doing kill() or fail() on the job and the monitoring should be ignored. Conversely, if the fail() or kill() operation fails to acquire a lock (with a short timeout), this means that the monitoring action for this job is taking place.

The submitting and completing states have only informative meaning and are not used anymore for locking the critical sections.

Ulrik: I believe we should completely get rid of these states then.

The internal lock for the job is accessible as j.lock (not in GPI).

Ulrik: I would call the method j._lock to make it clear for developers that it is not a GPI method.

Ulrik: The merging process should make use of this locking mechanism as well.

Internal job locking: details

Logic of user operations on monitorable jobs:

j.fail():

 j.scheduled='fail' #mark job internally as scheduled for fail
 try:
   j.lock.acquire(timeout=1second)
   try:
     if job not in the final state: change status to failed else: keep old statue (e.g. completed)
   finally:
     j.lock.release()
 except NotAcquired:
   pass
 # if not acquired then the monitoring loop will automatically check at the end of critical section if a job is scheduled for failure and fail it 

Ulrik: It should be possible to fail a job in a final state (like completed) as well.

Ulrik: I am a bit worried that a race condition can still exist for setting/unsetting the j.scheduled flag. Also if the j.scheduled flag is already set, the code above should probably not run at all (imagine the user repeating j.kill() many times on a job while it is in the monitoring loop).

The implementation of kill() is very similar. Both calls are non-blocking but the status change to 'killed' or 'failed' may occur with a short delay (the time to complete the concurrent monitoring action if this is a case).

The j.resubmit() method would require that the user first fails or kills the job. Alternatively, the resubmit() could block until the concurrent monitoring action completes and only then do the resubmit.

In order to support the internal locking, we need to change the skeleton of the backend.updateMonitoringInformation() method. Each monitoring action must be guarded with the lock and the job status checking must be done after the lock is acquired:

 for j in jobs:
  try:
    j.lock.acquire(nonblocking)
   try:
    do_backend_specific_monitoring_action_for_single_job()
    do j.scheduled action (if any)
   except: # continue monitoring
    j.lock.release()
  except NotAcquired:
   pass

Since the majority of the backends have emulated bulk monitoring, we put this logic into IBackend.bulk_updateMonitoringInformation() so that the do_backend_specific_monitoring_action_for_single_job() may be implemented by overrding IBackend.single_updateMonitoringInformation(j) taking a single job as the argument.

In case the bulk_updateMonitoringInformation() is overriden for real bulk submission it is up to the backend developer to guarantee the locking.

Migration:

  • Most of the backends will have to rename the existing updateMonitoringInformation(jobs) into single_updateMonitoringInformation(j) and remove the internal for j in jobs loop.
  • Explicit locking will have to be added in the bulk monitoring function in the LCG backend.

Session locks: implementation details

A proposal for the discussion:

Each session maintains a lock table file. This is a text file which contains the ids of the locked jobs. The timestamp of the lock table file is refreshed automatically by the session every few seconds. In this way the left-over locks from crashed sessions may be easily identified. A session may create/remove in bulk the multiple lock entires in the table. In order to protect lock tables there is a master lock file. If a session wants to lock a job or check if a job is already locked, it first acquires the master lock. The master lock contains the session id, so in case the master lock is not unlocked, it may be easily cleaned up (with few seconds delay) using the timestamp mechanism described above.

Note: The locks are acquired for the lifetime of the session (and typically in bulk) so the master lock is not acquired very often (few collisions).


Edit | Attach | Watch | Print version | History: r5 < r4 < r3 < r2 < r1 | Backlinks | Raw View | WYSIWYG | More topic actions
Topic revision: r5 - 2010-03-23 - JohannesEbke
 
    • Cern Search Icon Cern Search
    • TWiki Search Icon TWiki Search
    • Google Search Icon Google Search

    ArdaGrid All webs login

This site is powered by the TWiki collaboration platform Powered by PerlCopyright & 2008-2022 by the contributing authors. All material on this collaboration platform is the property of the contributing authors.
or Ideas, requests, problems regarding TWiki? use Discourse or Send feedback