Introduction

Historically within the Ganga framework, responsibility for handling job output data has been with the scripts created by runtime-handlers, rather than those created by backends. The result of this is that behaviour differs, depending on the type of job executed - for example, the ROOT application could not deal with output data at all.

Development focussed on designing a solution which allows users to handle both large data files (like DSTs) that would ordinarily be sent to mass storage devices in addition to smaller files (such as ntuples and stdout) that would be returned in the sandbox. The flexibility of the solution means that, if desired, users may store their ntuples/stdout files on mass storage devices, or even return small DST files back to the Ganga client machine via the Ganga sandbox mechanism.

Current status as of September 2012

To date (September 2012) the following storage methods have been implemented and are ready to be tested.

  • SandboxFile : returns file to the output workspace
  • MassStorageFile : uploads file to mass storage
  • LCGSEFile : uploads file to LCG storage element
  • DiracFile : uploads file to Dirac storage element (LHCb users only)

Configuration

Configuration of this functionality is controlled by a new section of attributes in the user's ~/.gangarc file.

[Output]
MassStorageFile = {'fileExtensions':['*.dummy'], 'backendPostprocess':{'LSF':'WN', 'LCG':'client', 'CREAM':'client', 'Localhost':'WN'}, 
'uploadOptions':{'mkdir_cmd':'nsmkdir', 'cp_cmd':'rfcp', 'ls_cmd':'nsls', 'path':/castor/cern.ch/user/i/idzhunov/ganga}}
LCGSEFile = {'fileExtensions':['*.root'], 'backendPostprocess':{'LSF':'client', 'LCG':'WN', 'CREAM':'WN', 'Localhost':'WN'},
 'uploadOptions':{'LFC_HOST':'lfc-dteam.cern.ch', 'dest_SRM':'srm-public.cern.ch'}}
DiracFile = {'fileExtensions':['*.dst'], 'backendPostprocess':{'Dirac':'WN', 'LSF':'WN', 'LCG':'WN', 'CREAM':'WN', 'Localhost':'WN'}, 'uploadOptions':{}}

Automatic file type detection

Given the above configuration, the automatic file-type detection of the output file handling mechanism can be demonstrated:
j=Job()
j.outputfiles = ['data.dummy','fillrandom.root', 'myfile.dst', 'data.raw']
print j.outputfiles
[MassStorageFile(namePattern='data.dummy'), LCGSEFile(namePattern='fillrandom.root'), DiracFile(namePattern='myfile.dst'),
 SandboxFile(namePattern='data.raw')]
The file name data.raw doesn't match any of the fileExtensions in the ~/.gangarc configuration and so is associated with the default output file type SandboxFile, which will simply send the file to the output workspace.

Upload from worker nodes or from the client for different backends

Every file type should be configured with the backendPostprocess dictionary that describes from where the uploads should be done for different backends:
'backendPostprocess':{'LSF':'WN', 'LCG':'client', 'CREAM':'client', 'Localhost':'WN'},

Key attributes of this dictionary describe the backend name (e.g. LSF, LCG, etc) and are mapped to either the value WN or client. In the case of using the WN attribute, the code provided by the getWNInjectedScript() method of the output file type will be injected in the job script of the backend and the upload performed from the worker node. When the client value is used, the put() method of the output file type will be called upon job completion, when the output file has been downloaded into the job's output sandbox; the subsequent upload will be performed from the Ganga client.

Storing upload options in configuration

The uploadOptions dictionary holds options which define items needed to perform the upload, such as commands to be used for the upload, destination directory/SRM, LFC host, etc:
'uploadOptions':{'LFC_HOST':'lfc-dteam.cern.ch', 'dest_SRM':'srm-public.cern.ch'}}

Compressing files before uploading it to its destination

Every output file type (SandboxFile, MassStorageFile, LCGSEFile, DiracFile) has a compressed attribute. If you set this attribute to True, the file will be first compressed and after that sent to the desired destination.

job.outputfiles use case

j = Job(application=Root(),backend=Local())
j.application.script = File('~/fillrandom2.py')
j.outputfiles = [MassStorageFile(namePattern='fillrandom.root', compressed=True),
LCGSEFile(namePattern='fillrandom1.root'), SandboxFile(namePattern='fillrandom2.root')]
j.submit()
j.status
 completed
print j.outputfiles[0].location()
 ['/castor/cern.ch/user/i/idzhunov/ganga/fillrandom.root.gz']
print j.outputfiles[1].location()
 ['guid:b5a8fe4d-4877-4a14-8375-744e98e438b0']
The first output file is compressed and sent to mass storage (Castor), the second one is uploaded to LCG SE and the third one goes to the job's outputsandbox. In case the user wants to retrieve locally the first two files, he can do the following
j.outputfiles[0].get()
j.outputfiles[1].get()

Adding new file type object to the job.outputfiles mechanism

In principle what you have to do is to create a new class that inherits from IOutputFile and override the following methods :
  • put() : makes upload from the client, considering the file has been downloaded in the output sandbox
  • getWNInjectedScript() : injects script (for uploading the file) into the job script that is executed on the worker node
  • setLocation() : method called on the client (on job completed hook), setting the location of the output file processed on the worker node
  • location() : returns the location of the uploaded file
  • get() : retrieves locally the already uploaded (to mass storage, LCG SE, Dirac, etc. ) file

You also need to configure the new file type in the [Output] configuration section

Processing on the WN (pseudo code)

In every backend class there is submit method

def submit(self, jobconfig, inputsandbox):
   ....
   self.preparejob(jobconfig)
   ....

def preparejob(jobconfig):
   ...
   text = '''some long script
   ...........................
'''
   ...
   ...
   text = text.replace('###OUTPUTUPLOADPOSTPROCESSING###', getWNCodeForOutputPostprocessing(job))
   
   return job.getInputWorkspace().writefile(FileBuffer('__jobscript__', text), executable=1) 

In OutputFileManager class

def getWNCodeForOutputPOstprocessing(job):
   injectScript = ''
   
   for outputfile in job.outputfiles:
      if process on WN:
         injectScript += outputfile.getWNInjectedScript()

   return injectScript

Processing on the client (pseudo code)

In Job.py in job completed hook

def postprocessoutput(self, outputfiles):
   for outputfile in outputfiles:
      if process on client:
         outputfile.put()
      elif process on WN:
         outputfile.setLocation()
Put() method calls internally setLocation(). In case of processing on the WN, locations are read from a file (passed within the outputsandbox to the client), where the locations were written during the uploads from the WN

Tests for the new mechanism

Tests and examples how to use the job.outputfiles mechanism can be found in /trunk/ganga/python/Ganga/test/GPI/OutputFiles directory

Output file types as standalone objects (not part of the job.outputfiles mechanism)

The different file type objects can be used also as standalone objects. Imagine one wants to upload a file from a local dir to a LCG SE, mass storage device or Dirac storage element:
a = MassStorageFile(localDir='/afs/cern.ch/user/i/idzhunov/outputfiles', namePattern='*.py')
a.put()
print a.location()
 ['/castor/cern.ch/user/i/idzhunov/ganga/fillrandom.py']

b = LCGSEFile(localDir='/afs/cern.ch/user/i/idzhunov/outputfiles', namePattern='*.root')
b.put()
print b.location()
 ['guid:a7a7c3fb-559e-4e57-ba3b-01262a0a2e8f']

Comments and questions from our group at CERN

  • Users have to know where their job is running and what commands they will be able to execute (this comment was about the configuration of MassStorageFile). From where the user will know if the commands for copying the file will be available for example on LSF???
  • Is there a possibility to have a file replica, i.e. send the output file to 2 places (mass storage and LCG SE), answer - currently no
  • What happens if the upload to LCG SE fails, answer currently nothing, suggestion to store the file on the scratch dir of the SE
  • Comment from Mike: Is the configuration required for the MassStorageFile object unnecessarily complex? We could hide the copy/mkdir/etc commands from the user, and they just define the underlying technology: Castor/DPM/Hadoop. Ganga will have an internal dictionary of the necessary copy/mkdir commands for each of these storage technologies.

Comments from the Ganga developers (changes are implemented)

  • In automatic file type detection if a file name is matching two fileExtensions patterns from the configuration, don't throw a Config error, but assign the filename to the last match and issue a warning, for example like this
Ganga.GPIDev.Lib.File              : WARNING  file name pattern test.root matched ['MassStorageFile', 'LCGSEFile'], assigning to LCGSEFile

  • Before the SandboxFile was base class for the other file types. Now an abstract class IOutputFile is put in place that defines the methods that have to be overwritten. All file types (including SandboxFile) now inherit from this abstract class.

  • Added the possibility for having replicas of the output file on different LCG SEs, we can define 2 job.outputfiles with the same name but with different SEs where to end up

j.outputfiles = 
[LCGSEFile(namePattern='fillrandom1.root', lfc_host='lfc-dteam.cern.ch', se='srm-dteam.gridpp.rl.ac.uk'), 
LCGSEFile(namePattern='fillrandom1.root', lfc_host='lfc-dteam.cern.ch', se='srm-public.cern.ch')]

  • Fail the job if one of the output files doesn't end up where it was required. Issue an error message to the client and also set the failureReason for this output file

Ganga.GPIDev.Lib.File              
: ERROR    Job 785 failed. One of the job.outputfiles couldn't be uploaded because of /bin/sh: asdlcg-cr: command not found

jobs[-1].outputfiles
[LCGSEFile (
 compressed = False ,
 srm_token = '' ,
 se_type = '' ,
 SURL = '' ,
 joboutputdir = '/afs/cern.ch/user/i/idzhunov/gangadir/workspace/idzhunov/LocalXML/785/output/' ,
 locations = [] ,
 se_rpath = '' ,
 namePattern = 'fillrandom1.root' ,
 lfc_host = 'lfc-dteam.cern.ch' ,
 localDir = '' ,
 port = '' ,
 se = 'srm-public.cern.ch' ,
 failureReason = '/bin/sh: asdlcg-cr: command not found' 
 )]

..................

Ganga.GPIDev.Schema                
: ERROR    Job 789 failed. One of the job.outputfiles couldn't be uploaded because of /castor/cern.ch/user/g/gangage/ganga : Permission denied

jobs[-2].outputfiles
[MassStorageFile (
 joboutputdir = '/afs/cern.ch/user/i/idzhunov/gangadir/workspace/idzhunov/LocalXML/790/output/' ,
 locations = [] ,
 namePattern = 'fillrandom.root' ,
 compressed = False ,
 localDir = '' ,
 failureReason = '/castor/cern.ch/user/g/gangage/ganga : Permission denied' 
 )]

Backward compatibility with the old output structure

Ganga chooses which output mechanism to be used , old with outputdata/outputsandbox or new with outputfiles, depending on the output fields that are set.

  • If any of the outputsandbox and outputdata is set, writing to outputfiles is forbidden and the old output mechanism will be used

In [6]:j.outputdata = AthenaMCOutputDatasets()

In [7]:j.outputsandbox = ['fillrandom.root']

In [8]:j.outputfiles = ['test.root']

Ganga.GPIDev.Lib.Job               : ERROR    job.outputdata is set, you can't set.outputfiles

In [9]:print j.outputdata

AthenaMCOutputDatasets (
 outdirectory = '' ,
 expected_output = [] ,
 actual_output = [] ,
 output_dataset = '' ,
 output_firstfile = 1 ,
 store_datasets = [] ,
 logfile = '' ,
 outrootfiles = {} 
 ) 

In [10]:print j.outputsandbox
[fillrandom.root]

In [11]:print j.outputfiles
[]

  • If outputfiles is set, writing to outputsandbox and outputdata is forbidden and the new output mechanism is used


In [1]:j.outputfiles = [SandboxFile('fillrandom.root')]

In [2]:j.outputsandbox = ['test.root']

Ganga.GPIDev.Lib.Job               : ERROR    job.outputfiles is set, you can't set.outputsandbox

In [3]:j.outputdata = AthenaMCOutputDatasets()

Ganga.GPIDev.Lib.Job               : ERROR    job.outputfiles is set, you can't set.outputdata

In [4]:print j.outputfiles
[SandboxFile (
 namePattern = 'fillrandom.root' ,
 compressed = False ,
 localDir = '' 
 )]

In [5]:print j.outputsandbox
[]

In [6]:print j.outputdata
None

Forbid the use of the old output fields

The idea of the new outputfiles mechanism is to replace (fully in the future) the current outputdata/outputsandbox mechanism. A config option has been added that forbids the use of outputdata and outputsandbox fields.

If ForbidLegacyOutput is set to True and the user tries to set job.outputsandbox or job.outputdata, an error message will be issued

Ganga.GPIDev.Lib.Job               : ERROR    Use of job.outputsandbox is forbidden, please use job.outputfiles

Ganga.GPIDev.Lib.Job               : ERROR    Use of job.outputdata is forbidden, please use job.outputfiles

and job.outputsandbox and job.outputdata will have their values set to the default ones [] and None.

Copying of the 2 fields will also be forbidden (if ForbidLegacyOutput is true).

Like this the user will be obliged to use the new outputfiles mechanism.

The Idea of input files

On the last Ganga developer days at CERN, the idea of using the output files as input for other jobs was discussed.

A new inputfiles field should be added to the Job.py class which will be a list of file objects : SandboxFile, MassStorageFile, LCGSEFile, DiracFile

In the input files mechanism we should use the same objects, same configuration and same automatic file type detection as in the output files mechanism.

Inputfiles field targets to replace the inputsandbox job field. Backward compatibility should be possible as in the outputfiles - per job the user can use either the inputfiles or the inputsandbox field.

These requirements are implemented now.

Adapting file type for the inputfiles mechanism

Imagine we have a file type class that was used as part of the outputfiles mechanism (above sections are describing this) that now we want to adapt for using in the input files mechanism.

What you have to do in your file type class is to override the getWNScriptDownloadCommand method

  • getWNScriptDownloadCommand() : returns the script that needs to be injected in the job's script for downloading the input files before running the application's script

MassStorageFile.py and LCGSEFile.py can be used as reference as this method is implemented there.

Input files mechanism

File objects (either standalone or used as part of the outputfiles mechanism) can be stored in the box and used as input for other jobs

In [2]:j.inputfiles = [box[-1], box[-2]]

In [3]:j.inputfiles
Out[3]: [MassStorageFile (
 namePattern = 'input1.txt' ,
 locations = ['/afs/cern.ch/user/i/idzhunov/gangamass/1074/input1.txt'] 
 ), LCGSEFile (
 locations = ['guid:8b55a764-8bee-46ae-be8c-cf7bd803fa81'] ,
 namePattern = 'input.txt' ,
 lfc_host = 'lfc-dteam.cern.ch' ,
 se = 'srm-public.cern.ch' 
 )]

In IBackend.py in master_prepare method we go throught the inputfiles list and :

  • if the file is SandboxFile we add it to the inputsandbox
  • if the file is different from SandboxFile and has to be processed on the client for the job's backend (this comes from the configuration) we download the file (calling get() method) in a temp dir and add it to the inputsandbox. Temp dir is deleted after inputsandbox has been tar archived.

In every backend class, in creation of the job script, we should make sure to include the script for downloading input files (those that should be processed on the WN according to the configuration)

For the purpose we call getWNCodeForDownloadingInputFiles method from OutputFileManager.py. This is done for Localhost, Batch, LCG and CREAM backends. For Dirac and other backends it should be done in the same manner.

Definition of getWNCodeForDownloadingInputFiles method :

def getWNCodeForDownloadingInputFiles(job, indent):

    if len(job.inputfiles) == 0:
        return ""

    insertScript = """\n"""

    for inputFile in job.inputfiles:  

        inputfileClassName = inputFile.__class__.__name__

        if outputFilePostProcessingOnWN(job, inputfileClassName):
            insertScript += inputFile.getWNScriptDownloadCommand(indent)

    return insertScript

-- MikeKenyon - 18-Sep-2012

Edit | Attach | Watch | Print version | History: r14 < r13 < r12 < r11 < r10 | Backlinks | Raw View | WYSIWYG | More topic actions
Topic revision: r14 - 2013-05-27 - IvanDzhunov
 
    • Cern Search Icon Cern Search
    • TWiki Search Icon TWiki Search
    • Google Search Icon Google Search

    ArdaGrid All webs login

This site is powered by the TWiki collaboration platform Powered by PerlCopyright & 2008-2022 by the contributing authors. All material on this collaboration platform is the property of the contributing authors.
or Ideas, requests, problems regarding TWiki? use Discourse or Send feedback