CMS Collectors review


Contents:

Basic understanding of the 3 CMS collectors running on the dashboard07 host.

General point to understand and/or to solve

  • Actual collector are using python2.3, I performed the test with the new collectors through python2.5 and python2.6: the ported collector will depend on one of these (is this a problem?)
    • probably at some point all the collector should switch to a more recent version of python...with significant performance improvement (probably with some long time of integration and testing before reaching valid stability).
  • Need to have the collector autocleaning old files (it can be related to something configurable).

The first collector: LogFileCollector

It collect the information from the Monalisa and passes the output to the second collector.

How it works
Starting from a given date it reads the content of the MonaLisa service file that correspond to that date. The MonaLisa file is rotated each day (the old file are stored, but zipped...how long the zipped files are stored?) and it contains the messages received by the MonaLisa server. Each message corresponds to a line of that file. Usually for a working day the number of lines in the file are about between 25.000.000 and 30.000.000, which correspond to a about 8GB file or more. The steps this collector performs are:
  • understading from which file to start from depending on:
    • the input date
    • if no input date is given from the logfile.ini file that stores (this allows to specify even an old file zipped...that this collector can unzip and process); an example of the file:
      date=2010_10_08
      line=100
      state=1
      
  • open the file and reads all the lines till arrives to the given one (if the input date is given it starts from the first)
  • if it founds no more line (1)
    • looks for more files to read with the date just next to the one it started from
    • if it founds the next file it read it, otherwise it sleeps 60 seconds
  • if it founds lines
    • starts processing all the lines and parse line per line getting the various information: time, taskname, rest of the data (stripping some Java string from it)
    • it takes at maximum 1000 lines per time then it merges the lines depending on some break-minutes/seconds rule
      • => this still needs to be fully understanded: it seems to group lines on the same time range, collecting lines in 5 minutes time range, depending on this global variable
            DBOARD_COLLECTOR_BREAK_MINUTE_LIST = [0,5,10,15,20,25,30,35,40,45,50,55]
    • it writes the group of lines in files named ml_orig_YEARMONTHDATE-HOURSMINUTESECONDS_T1_T2_T3 each time it reaches a line in the next time range (T1 = timestamp of first line, T2 = timestamp of the last line, T3 = not clear, but som timestmap greater then T2)
    • it loops again from (1).

Porting CollectorMonaLisaCMS
The collector has been ported in the new dedicated arda.dashboard.service-collector-cms package. No major changes have been done since it is quite unstable and critical to any changes. The only changes done are:
  • configurable input directory that contains the monalisa files
  • configurable output directory that will contains the file for the second collector (ml_orig files)
  • added configurable sleep time between one loop and another one (to avoid resource wasting)
  • configurable date: if specified it processes just the log file corresponding to the given date.

In the todo list:

  • writing output files ml_dict as pickle object (cPickle python module)
  • reduce the number of times the information is parsed/iterated

Example of configuration:

                <service name="collector.cms.monalisa.first"
                        module="dashboard.collector.cms.CollectorMonaLisaCMS"
                        class="CollectorMonaLisaCMS">
                        <config>
                            <param name="input-directory">/Users/mcinquil/Collector/Running/MonalisaInputFiles/</param>
                            <param name="output-directory">/Users/mcinquil/Collector/Running/FirstCollectorOutput/</param>
                            <param name="start-date">2010-10-07</param>
                            <param name="run-interval">10</param>
                        </config>
                </service>

Performance after the porting
Since no major changes are done in the logic, the performances should be the same.

The second collector: MonalisaCollector

This collector takes as input the ml_orig files prepared from the first collector and writes other files for the third collector.

A side note. The file supposed to run in the dashbaord07 host, namely

/afs/cern.ch/user/d/dboard/cms-prod/dashboard-2006-07-03/python/MonalisaCollector.py
doesn't seems to be working. The correct files, actually running on dashboard07 is the one located in that machine at the path
/opt/data/dboard_cms/ml-collector/python/MonalisaCollector.py
. Here some detail that let me understand which one is the right one:
-rw-r--r--  1 dboard 5.9K Jul  3  2006 /afs/cern.ch/user/d/dboard/cms-prod/dashboard-2006-07-03/python/MonalisaCollector.py
-rw-r--r--  1 dboard  11K Jan  6  2010 /opt/data/dboard_cms/ml-collector/python/MonalisaCollector.py
. Obviously the two files differ.

How it works
This collector is started with a relevant bash script:
#!/bin/bash
. /data/dboard_cms/ml-collector/setup.sh
while [ 1 ]; do     
  python ml-collector/python/MonalisaCollector.py ml-data/ 
  rm -i -f ml-data/*{debug,filtered}* 
  sleep 6 #0
done

So after the python script as finished to run, some files are deleted: the debug file just contains some duplicated information also written in the final output, while the filtered file should containt duplicate or filtered data (for filtered data see the next steps of this collector). The bash script sleeps 6 seconds and then it runs again the script. The main logic steps of the python script that takes a directory as input are:

  • as started it checks the content of the input directory to understand from which file it has to start with:
    • it looks for files with the ml_list prefix, if there are files it proceeds
      • once found the most recent ml_list file, it looks for a ml_orig file (output of the first collector) with the same timestamp appended at the end of file
        • in few words: this collector looks for file with Ta = Tb in ml_list_Ta and ml_orig_tt_tt_Tb, and it starts to process the first ml_orig file with a date after the Ta (or Tb which is the same value)
    • if there was no input files to start from it tries to query a remote host to get some messages to be processed, the host url is
      http://arda-ml.cern.ch:6004/axis/services/MLWebService
      taken from the monitor.conf file; that host seems to be not existing and for what I know the MonaLisa server runs on the dashboard07; this query fails so the collector fails if it does not find any ml_list file
  • supposed it finds the input file it processes it and then exit => each time the script is called it processes just one file and then finishes; a single file processing consists of the following steps:
    • loading the whole ml_orig file into memory and processing it line by line and parsing getting the information (time, task-name, job-id, message-name, message-value) and composing a dictionary like this
      dictionary[task-name][job-id][message-name] = (message-value, time)
      ;
    • from the dictionary above it prepares a list of string with a single string represeting the very same information like this:
      stringline = '\t'.join([time, task-name, job-id, message-name, message-value]
      this takes to a list of strings, with messages of the same task close together;
    • from the list of string it prepares another list of string, just replacing the tab spacing with a double pipe string and switching the position of some value
      finalLine = ' || '.join([task-name,job-id,time,message-name,message-value])
      ; this list of string will be the only one taken into account from this point of the collector
    • creating a file named ml_list containing the last timestamp of the just processed ml_orig file
    • for each line of the final list of string, it transaltes again the line from a string to a list and prepares another time a dictionary like this:
      {
        'pieta_crab_0_101004_203408_u7e3t1': 
          {
            '10_https://wms213.cern.ch:9000/K1z86Q9l8wPzVFNOtzY_RA_134.61.12.56':
              {
                'StatusValue':
                  {
                    'time': ('Retrieved', 1286433960)
                  }
              }
          }
      }
    • then this just made dictionary is used to check duplicated with the previous message parsed from ml_orig: the new line are kept and then written as dictionary in a text file called ml_dict; otherwise if those lines where already in the previous message than are written in the file named ml_filtered; then the last lines are also written in a file name ml_latest.

Porting SkimCollectorMonaLisaCMS
The collector has been ported in the new dedicated arda.dashboard.service-collector-cms package. Some changes have been done, but just for the most obvious part and after having done proper check. Obviously intensive integration tests need to be done before putting this into production. The changes done are:
  • configurable input directory that contains the monalisa files
  • added configurable sleep time between one loop and another one
  • configurable input date: if no ml_list file is present in the directory, it starts from that input date
  • avoiding writing to disk of -apparently- not useful files (debug, latest, filtered)
  • removed the check of duplicated messages to avoid loosing time: from the test done, duplicate never appeared
  • loading the file one line per time into memory (a file can be more than 20MB)
  • deleting not needed big variable from the memory
  • avoiding two consecutive loops when it is possible to do everything in the same loop
  • writing ml_dict file as pickle (cPickle python library): this takes to gain some time, especially when the third collector will read and wont need to evaluate the type of the content of the file (eval method), it will only load the pickle file
  • minor not important changes.

In the todo list:

  • configurable output directory that will contains the file for the third collector: actually difficult to change since the path are composed in many places and one change can lead to other error....just matter to try and find error...and iterate till it works fine;
  • reduce the number of times the dictionary/list of string are processed
  • removing not needed reformatting (eg: using tab spacing and then replacing it with double pipe)
  • other minor things.

Major changes todo:

  • using multiprocessing
    • slave processes processing more file in parallel (number of maximum parallel processes configurable)

Example of configuration:

                <service name="collector.cms.monalisa.second"
                        module="dashboard.collector.cms.SkimCollectorMonaLisaCMS"
                        class="SkimCollectorMonaLisaCMS">
                        <config>
                            <param name="input-directory">/Users/mcinquil/Collector/Running/FirstCollectorOutput/</param>
                            <param name="output-directory">/Users/mcinquil/Collector/Running/SecondCollectorOutput/</param>
                            <param name="start-timestamp">1287068692</param>
                            <param name="run-interval">6</param>
                        </config>
                </service>

Performance after the porting
  • For a ~20MB input files (~150000 lines) it takes ~1.5 second less to run: average of ~10.5sec vs ~12.0sec (test performed on my laptop and desktop).
  • Reduced load on the filesystem.
  • Reduced usage of memory.

The third collector: MonalisaFeeder

How it works
Class/method workflow: DashboardFeeder -> DatabaseFeeder.feedFiles -> fileLoader==MonalisaFeeder.loadDataFile

MonalisaFeeder.loadDataFile => THIS METHOD IT HAS A LOGIC THAT ALLOWS TO UNDERSTAND WHICH KIND OF MESSAGE (SUBMIT, JOBINFO, ..) IS BEING PROCESSED DEPENDING ON THE AVAILABLE INFORMATION

  • dropExtraTimeStamp: it does some grouping by timestamp, but the result seems to be ignored => not true: dictionary changes in global scope
  • preConvert: it should do some parameter name converting based on DboardPreConvertList, but the result seems to be ignored => not true: dictionary changes in global scope
    • polling a task
      • polling a job
        • polling a key/val pair
          • if the val contains a '\n' then it does some changes
        • polling DboardPreConvertList elements
          • it does a mapping on the field depending on the parName and defines newName and does some check for specific parName (eg: RBname)
          • updates the dictionary with new parName
groupMessages: groups messages
  • iterating over tasks
    • iterating over jobs
      • iterating on DboardDescriptor (each element has: allParams, reqParams, allParamsRequired, preFilter, postFilter)
        • if reqParams is not None
          • if allParamsRequired is True
          • else at least 1 param is neded
        • if something it is ok (some parameter has passed the requirements)
          • it iterates over all the paramName and does a:
            • for each param name who is in allParams:
              • it calls filterParamValues(preFilter, paramName, paramValue, adict[taskId][jobId])
                • filterParamValues: filters the params with preFilter
                  • if no preFilter or paramName is in not in the list of known pars: return as it is {paramName : paramValue[0]}
                  • check if it is a timestamp and eventually converts it with DashboardTime.convertMLTimeStamp
                  • strip, check if not empty then uses the preFilter dictionary to does some paramater changes
                  • does some sanity check (number of parName = number of parValue) * rebuild the dictionary and return it * it calls filterParamValues(postFilter, 'DBOARD-TASKID', (taskId, ''))/(postFilter, 'DBOARD-JOBID', (jobId, ''), adict[taskId][jobId], taskId)/(postFilter, 'STATIC-ATTRS', ('dummy', ''))
                • filterParamValues: filters the params with postFilter:
                  • as above, but it works on calling getJobInfo5(msgParamValue, allValues, taskId)
                    • getJobInfo5( jobid, input={}, taskId='' ):
                      • guess the grid flavour through getGridFlavour()
                      • removes the IP (uses regex) from the jobId
                      • check if jobid has a grid job id
                      • grid flavour check through regex (LCG/GLITE, ARC, CAF, PBS, Condor Glide-in, LocalFNAL, CONDOR_G, unknown)
                      • handles PA jobs that startswith('ProdAgent')
                      • Mattia added wmagent check/hack to mix jobid-jobmonitorid-taskid
                  • returns the dictionary with uptodated things
DatabaseFeeder.feedFiles
  • sets some flags + timestamps
  • calls MonalisaFeeder.loadDataFile and gets a dictionary (see above)
  • iterates on FEEDER_STEPS + ['TOTAL']
    • calls filteredDataDict = filter(step, dataDict[step])
      • filter
        • if step is in InputFilters starts filtering
          • iterates on InputFilters[step] elements
            • for each filter in InputFilters[step]
              • filteredValuesDict = filterParam(paramName, paramValue, valueFilter, infodict)
                • filters basing on valueFilter (list of 4 elements): paramNameFilter, paramValueFilter
                • insert into result dictionary ( result[newParamName] = newParamValue )
                • and then it does a derivedNameFilter: it derives a new parameter from the current parameter being processed
              • check and remove empty values
    • feed: take the updated dictionary
      • if SchemaFeedingEntryPointDict has the current step of the dictionary being processed
        • handleTable(entryTable, infodict, ALL_ALLOWED) # DatabaseFeederDescr.ALL_ALLOWED = 100
          • uses SchemaDefinitionDict[entryTable] to do the message-database mapping
          • getUpdateDict(tableDef['mapping'], submitDict, updateAllowed): get the 'mapping' part of the table and the dictionary as input
            • iterates on the table columns
              • it gets for each column the corresponding value of the submitDict
              • it uses the getInputParamList method when the second element of the mapping key is a tuple
                • getInputParamList(filTable): (filTable is the first column of the tuple)
                  • iterate on ['mapping', 'create']
                    • if ['mapping', 'create'] in SchemaDefinitionDict[schemaKey]
                      • iterate on the SchemaDefinitionDict[schemaKey]['mapping' or 'create'] columns
                        • it builds a list with the third column values, or through a recursive call to itself with the second column (the first element of it, since it has to be a tuple)
                        • put in a dictionary (allows to have unique keys, unique columns)
                  • return a list of the columns
              • it translates the column names of submitDict in a new dictionary (newInput)
              • it calls the filter method for the newInput
              • handleTable (filTable, newInput, updateAllowed, filCol): recursive call
            • return an updateDict

Porting

-- MattiaCinquilli - 22-Oct-2010

Edit | Attach | Watch | Print version | History: r5 < r4 < r3 < r2 < r1 | Backlinks | Raw View | WYSIWYG | More topic actions
Topic revision: r5 - 2011-02-28 - MattiaCinquilli
 
    • Cern Search Icon Cern Search
    • TWiki Search Icon TWiki Search
    • Google Search Icon Google Search

    LCG All webs login

This site is powered by the TWiki collaboration platform Powered by PerlCopyright & 2008-2019 by the contributing authors. All material on this collaboration platform is the property of the contributing authors.
Ideas, requests, problems regarding TWiki? Send feedback