Post processor objects in Ganga 6
Postprocessors is a new concept introduced in Ganga 6. It includes the mergers that were previously available and at the same time introduce a whole new set of actions that can be carried out when a job has finished. This means that j.merger is now obsolete; from Ganga 6 onwards you must use j.postprocessors instead.
To do exactly the same job in Ganga 6 as you would do in Ganga 5 just replace
j.merger = MyFavouriteMerger()
with
j.postprocessors = MyFavouriteMerger()
and the job will run as before. Old jobs will
not be able to merge unless j.postprocessors is set.
Mergers
For experienced users: The interface for mergers is the same as in Ganga 5.
A merger is an object which will merge files from each subjobs and place it the master job output folder. The method to merge depends on the type of merger object (or file type).
For example, if each subjob produces a root file 'thesis_data.root' and you would like this to be merged you can attach a
RootMerger object to your job:
j.postprocessors.append(RootMerger(files = ['thesis_data.root'],ignorefailed = true,overwrite = true))
When the job is finished this merger object will then merge the root files and place them in j.outputdir. The ignorefailed flag toggles whether the merge can proceed if a subjob has failed. The overwrite flag toggles whether to overwrite the output if it already exists.
There are several mergers available:
TextMerger(compress = True)
Used for merging .txt,.log .. etc. In addition to the normal attributes, you can also choose to compress the output with
TextMerger().compress = True.
RootMerger(args = '-T')
Used for root files. In addition to the normal attributes, you can also pass additional arguments to hadd.
CustomMerger()
A custom merger where you can define your own merge function. For this merger to work you must supply a path to a python module which carries out the merge with
CustomMerger().module = '~/mymerger.py'
In mymerger.py you must define a function called merge(file_list,output_file), e.g:
import os
def mergefiles(file_list,output_file):
f_out = file(output_file,'w')
for f in file_list:
f_in = file(f)
f_out.write(f_in.read())
f_in.close()
f_out.flush()
f_out.close()
This function would mimic the
TextMerger, but with more control to the user. Note that the overwrite and ignorefailed flags will still work here as a normal merger object.
The final merger object which can be used is the
SmartMerger(), which will choose a merger object based on the output file extension. It supports different file types.
For example the following
SmartMerger would use a
RootMerger for 'thesis_data.root' and a
TextMerger for 'stdout'.
SmartMerger(files = ['thesis_data.root','stdout'],overwrite = True)
Note that:
j.postprocessors.append(SmartMerger(files = ['thesis_data.root','stdout'],overwrite = True))
is equivalent to doing:
j.postprocessors.append(TextMerger(files = ['stdout'],overwrite = True))
j.postprocessors.append(RootMerger(files = ['thesis_data.root'],overwrite = False))
However in the second instance you gain more control as you have access to the Root/TextMerger specific attributes, but at the expense of more code. Choose which objects work best for you.
If a merger fails to merge, then the merger will fail the job and subsequent postprocessors will not run.
Checkers
A checker is an object which will fail otherwise completed jobs based on certain conditions. Currently there are three Checkers:
- FileChecker - checks the list of output files and fails job if a particular string is found (or not found).
- MetaDataChecker - simple checker to compare with the meta data of a job. This class is hidden and has to be overridden in the experiment plugin.
- RootFileChecker - checks that all your ROOT files are closed properly and have nonzero size. Also checks the merging procedure worked properly.
- CustomChecker - probably the most useful checker, allows the user to use private python code to decide if a job should fail or not.
Note: If a checker is misconfigured the default is to do nothing (pass the job), this is different to the merger.
For example, you could do:
fc = FileChecker(files = ['stdout'], searchStrings = ['Segmentation'])
You can also enforce that your file must exist, by setting filesMustExists to True.
fc.filesMustExist = True
If a job does not produce a stdout file, the checker will fail the job.
This
FileChecker will look in your stdout file and grep the file for the string 'Segmentation'. If it finds it, the job will fail.
If you want to fail the job a string
doesnt exist, then you can do
fc.searchStrings = [ 'SUCCESS' ]
fc.failIfFound = False
In this case the
FileChecker will fail the job if the string 'SUCCESS' is
not found.
The
MetaDataChecker allows you to test a very simple expression which compares some of the job's metadata. These metadata are experiment specific.
For LHCb, you would do for example:
mc = LHCbMetaDataChecker(expression = 'inputevents >= 1000')
This checker would fail the job if the number of input events is less than 1000.
LHCbMetaDataCheckers take in keyword arguments to access the metadata, for LHCb there are 5, which are:
'inputevents' = j.events['input']
'outputevents' = j.events['output']
'lumi' = j.lumi (float of the value).
'nskipped' = j.xmlskippedfiles
'nfiles' = j.xmldatanumbers['full']
Adding a
RootFileChecker to your job will add some protection against hadd failures, and ensure that your ROOT files are mergable.
If you do
rfc = RootFileChecker(files = ["*.root"])
rfc.files = ["*.root"]
j.postprocessors.append(rfc)
This checker will check that each ROOT file has non-zero file size and is not a zombie.
If you also have a merger, it will check the output from hadd, ensure that the sum of the subjob
entries is the same as the master job entries, and check that each ROOT file has the same file structure.
RootFileChecker inherits from
FileChecker so you can also ensure that the ROOT files must exist.
The
CustomChecker will execute your script and fail the job based on the output.
For example, you can make a checker in your home directory called 'mychecker.py'.
In this file you must define a function called check(j), which takes in your job as input and returns True (pass) or False (fail)
import os
def check(j):
outputfile = os.path.join(j.outputdir,'thesis_data.root')
return os.path.exists(outputfile)
Then do in ganga
cc = CustomChecker(module = '~/mychecker.py')
This checker will then fail jobs which dont produce a 'thesis_data.root' root file.
Notifier
The notifier is an object which will email you about your jobs upon completion. The default behaviour is to email when master jobs have finished and when subjobs have failed. Emails are not sent upon failure if the auto-resubmit feature is used.
Important note: Emails will only be sent when ganga is running, and so the Notifier is only useful if you have ganga running in the background (e.g. screen session,
GangaService).
To make a notifier, just do something like:
n = Notifier(address = 'myaddress.cern.ch')
If you want emails about every subjob, do
n.verbose = True
Management of post processors with your job
Important Note: Ganga will order your postprocessors to some degree. Mergers appear first, then checkers, then finally the notifier. It will preserve the order within each class though (e.g. The ordering of the checkers is defined by the user).
To add some postprocessors to your job, you can do something like
tm = TextMerger(files=['stdout'],compress = True)
rm = RootMerger(files=['thesis_data.root'],args = -f6)
fc = FileChecker(files = ['stdout'],searchString['Segmentation'])
cc = CustomChecker(module = '~/mychecker.py')
n = Notifier(address = 'myadress.cern.ch')
j.postprocessors = [tm,rm,fc,cc,n]
or
j.postprocessors.append(fc)
j.postprocessors.append(tm)
j.postprocessors.append(rm)
j.postprocessors.append(cc)
j.postprocessors.append(n)
You can also remove postprocessors:
In [21]:j.postprocessors
Out[21]: [SmartMerger (
files = [] ,
ignorefailed = False ,
overwrite = False
), FileChecker (
files = [] ,
checkSubjobs = False ,
searchStrings = [] ,
failIfFound = True
), Notifier (
verbose = False ,
address = ''
)]
In [22]:j.postprocessors.remove(FileChecker())
In [23]:j.postprocessors
Out[23]: [SmartMerger (
files = [] ,
ignorefailed = False ,
overwrite = False
), Notifier (
verbose = False ,
address = ''
)]