Proposal for a unified treatment of output in Ganga
Current situation
In Ganga we currently in the schema for the Job class have the concept of
outputfiles and outputdata. The intention in the design was to distinguish
between large data files (like DSTs) that would go to mass storage and small
files (like ntuples and stdout) that would be returned in the sandbox. Through
user experience it turns out that more flexibility is required in this area:
users want to put their ntuples (or even stdout!) into mass storage or they
want a small DST file returned through the sandbox.
In Ganga the responsibility of dealing with outputdata is currently placed in
the scripts written by the runtime handlers rather than in the scripts written
by the backends. This means that the behaviour is different dependent on the
type of job run, as an example the ROOT application can't deal with outputdata
at all.
Proposal
The problems with the current implementation can be solved by generalising the
concept of output from the job. A set of storage methods will be implemented
and any output file will be declared of such a type. Possible storage methods
to implement would be:
-
File
: Return to the output workspace
-
CompressedFile
: Return gzip compressed to output workspace
-
CastorFile
: Write file into Castor mass storage
-
ScratchFile
: Write file to some large scrath disk
-
LHCbDataFile
: Store file in Storage Element and register in LHCb file catalogue (LHCb only)
-
DQ2File
: Store file in SE and register in DQ2 (ATLAS only)
-
SEFile
: Upload to storage element.
For a given job it should be possible to mix and match these methods.
Usage
As an example to store a DST file as an LHCb dataset, the Ntuple produced on a
scratch disk and the stdout compressed in the output workspace would look like
j.outputfiles = [
LHCbDataFile('mydata.dst'),
ScratchFile('Ntuples.root'),
CompressedFile('stdout')]
In the configuration automatic file types can be defined. So if your
.gangarc
file contains
[Output]
LHCbDataFile = ['*.dst','*.digi','*.raw']
ScratchFile = ['*.root']
CompressedFile = ['stdout','stderr']
then the example above would reduce to
j.outputfiles = ['mydata.dst','Ntuples.root','stdout']
Printing the object afterwards would show the default locations
[1] print j.outputfiles
j.outputfiles = [
LHCbDataFile('mydata.dst'),
ScratchFile('Ntuples.root'),
CompressedFile('stdout')]
In fact since for an LHCb job all these files are discovered automatically
the files would be stored in the correct places with no definition of
j.outputfiles at all.
To place the output in a specific location can take place in two ways. In the
first way the files are uploaded directly from the worker node. Examples would
be
DQ2File
for LCG jobs or
ScratchFile
for the Local backend. In the second
way the files are first returned to the client and then placed in their final
location. Examples here are
ScratchFile
from the LCG backend. Each backend
will, through the configuration system, contain a list of upload methods it
can support directly from the worker node.
--
UlrikEgede - 23 Oct 2008
Prototype in Development 7.9.2009
In development is a prototype to deal with this.
OutputFile objects have been created which contain information about how the output should be dealt with. The current types being looked at are:
-
LocalFile
: Returns files to output workspace. Equivalent to 'File' from above.
-
ScratchFile
: Returns file to a 'scratch' directory.
Also changed from the proposal: compressed is now a parameter of the
OutputFile object, which gives the option to the user of compressing the output no matter where it is going.
gzip
from the python gzip library is currently used to do this.
Necessary parameters for the
OutputFile objects to have are:
-
outpath
: This is the defining property of the object, specifying the location the output should be sent to. In principal, this should be protected from the user, but there may be cases where a user needs to make a change to this.
-
filenames
:This is simply a list containing string filenames or wildcards, specifying which files should be sent to the outpath
-
compressed
:True or False value (default = False) specifying whether the output files should be compressed.
-
copy_method
: Specify a string such as 'cp' or 'scp' to determine how the files should be copied to their final destination. This point is being debated (see later)
The
OutputFile objects are attached to a job by putting them in a list called 'ouput', e.g:
j.output = [LocalFile(filenames = ['stdout']), ScratchFile(filenames = ['__jobstatus__'], compressed = False)]
Currently, the files are being copied from the outputdir. In something nearer the final prototype it would be more desirable if the output is created in the outpath directory directly, and a report of where files have gone and what has failed is sent back to the user.
This prototype uses only the Local backend thus far. It creates appropriate methods, lists of the information contained within the
OutputFile objects attached to the job, and calls to the methods within the jobscript. A jobscript is not created for every backend, however, so alternative methods will have to be found.
Issue of Contention
When transferring the output files, should os.system('cp ' + string) or shutil.copy(src,dst) be used? This relates back to the copy_method parameter. If shutil.copy() is used defaultly across all objects, then there is no need for the copy_method parameter. However, some users might want to specify their copy method, in which case os.system() would be more appropriate. This method can however cause problems for users of other operating systems, such as Windows. Alternatively, users could create their own class which inherits from the base class or even from the higher level objects to get around this.
Justin Whitehouse
Current status as of August 2012
This topic was discussed at the latest Ganga developers meeting in Birmingham 2012 and there are a couple of storage methods implemented and ready to be validated/used.
-
OutputSandboxFile
: returns file to the output workspace
-
MassStorageFile
: uploads file to mass storage
-
LCGStorageElementFile
: uploads file to LCG storage element
-
DiracFile
: uploads file to Dirac storage element (LHCb only)
Configuring the output files in .gangarc
[Output]
MassStorageFile = {'fileExtensions':['*.dummy'], 'backendPostprocess':{'LSF':'WN', 'LCG':'client', 'CREAM':'client', 'Localhost':'WN'},
'uploadOptions':{'mkdir_cmd':'nsmkdir', 'cp_cmd':'rfcp', 'ls_cmd':'nsls', 'path':/castor/cern.ch/user/i/idzhunov/ganga}}
LCGStorageElementFile = {'fileExtensions':['*.root'], 'backendPostprocess':{'LSF':'client', 'LCG':'WN', 'CREAM':'WN', 'Localhost':'WN'},
'uploadOptions':{'LFC_HOST':'lfc-dteam.cern.ch', 'dest_SRM':'srm-public.cern.ch'}}
DiracFile = {'fileExtensions':['*.dst'], 'backendPostprocess':{'Dirac':'WN', 'LSF':'WN', 'LCG':'WN', 'CREAM':'WN', 'Localhost':'WN'}, 'uploadOptions':{}}
Configuration/automatic file type detection
If we have the above configuration. it will be possible to do
j.outputfiles = ['data.dummy','fillrandom.root', 'myfile.dst', 'data.raw']
which with the automatic file type detection will result in
print j.outputfiles
[MassStorageFile(namePattern='data.dummy'), LCGStorageElementFile(namePattern='fillrandom.root'), DiracFile(namePattern='myfile.dst'),
OutputSandboxFile(namePattern='data.raw')]
File name 'data.raw' doesn't match any of the fileExtensions in the configuration and that is why it is associated with the default output file type
OutputSandboxFile which will just send the file to the output workspace.
Configuration/upload from the worker node or from the client for different backends
Every file type should be configured with backendPostprocess dictionary that describes from where the uploads should be done for different backends. Key of this dictionary is the backend name, value is 'WN' or 'client'. In case of 'WN' the code provided by getWNInjectedScript() method of the output file type will be injected in the job script of the backend and the upload will be done from the worker node. In case of 'client' value, the put() method of the output file type will be called on job completion when the output file has been downloaded in the job's output sandbox and the upload will be done from the client.
Configuration/storing upload options in configuration
In the uploadOptions dictionary a user can store configurables needed for the actual upload, like commands to be used for the upload, destination directory/SRM, LFC host, etc.
Option to compress the file before uploading it to its destination
Every output file type (
OutputSandboxFile,
MassStorageFile,
LCGStorageElementFile,
DiracFile) has a compressed attribute. If you set this attribute to True, the file will be first compressed and after that sent to the desired destination.
job.outputfiles use case
j = Job(application=Root(),backend=Local())
j.application.script = File('~/fillrandom2.py')
j.outputfiles = [MassStorageFile(namePattern='fillrandom.root', compressed=True),
LCGStorageElementFile(namePattern='fillrandom1.root'), OutputSandboxFile(namePattern='fillrandom2.root')]
j.submit()
j.status
completed
print j.outputfiles[0].location()
['/castor/cern.ch/user/i/idzhunov/ganga/fillrandom.root.gz']
print j.outputfiles[1].location()
['guid:b5a8fe4d-4877-4a14-8375-744e98e438b0']
The first output file is compressed and sent to mass storage (Castor), the second one is uploaded to LCG SE and the third one goes to the job's outputsandbox. In case the user wants to retrieve locally the first two files, he can do the following
j.outputfiles[0].localDir = 'some dir'
j.outputfiles[0].get()
j.outputfiles[1].localDir = 'some dir'
j.outputfiles[1].get()
Adding new file type object to job.outputfiles machanism
In principle what you have to do is to create a new class that inherits from
OutputSandboxFile and override the following methods :
-
put()
: makes upload from the client, considering the file has been downloaded in the output sandbox
-
getWNInjectedScript()
: injects script (for uploading the file) into the job script that is executed on the worker node
-
setLocation()
: method called on the client (on job completed hook), setting the location of the output file processed on the worker node
-
location()
: returns the location of the uploaded file
-
get()
: retrieves locally the already uploaded (to mass storage, LCG SE, Dirac, etc. ) file
You also need to configure the new file type in the Output configuration section
Output file types as standalone objects (not part of the job.outputfiles mechanism)
The different file type objects can be used also as standalone objects. Image one wants to upload a file from a local dir to LCG SE, mass storage or Dirac SE.
Here is example:
a = MassStorageFile(localDir='/afs/cern.ch/user/i/idzhunov/outputfiles', namePattern='*.py')
a.put()
print a.location()
['/castor/cern.ch/user/i/idzhunov/ganga/fillrandom.py']
b = LCGStorageElementFile(localDir='/afs/cern.ch/user/i/idzhunov/outputfiles', namePattern='*.root')
b.put()
print b.location()
['guid:a7a7c3fb-559e-4e57-ba3b-01262a0a2e8f']