Event Service in PanDA Pilot 2


This document describes the ATLAS Event Service implementation in the PanDA Pilot 2.

Function calls until ESProcess starts

Event service mode is selected when the pilot downloads a job carrying one of the event service identifiers ('eventService' or 'eventServiceMerge') in the job description. Seeing one of these identifiers, the pilot internally sets the 'is_eventservice' or 'is_eventservicemerge' booleans that are data members of the JobData class (i.e. they end up in the job object as job.is_eventservice and job.is_eventservicemerge). The pilot later uses them in pilot/control/payload to select the proper payload executor (eventservice.Executor or eventservicemerge.Executor - the executor for running 'normal' jobs is called generic.Executor). The pilot starts the selected executor by calling its run() function. The main pilot keeps running its normal threads in the meantime for monitoring.

The eventServiceMerge workflow is very similar to the generic workflow, while the main eventService workflow is very different. In it's run_payload() function (called from the run() function), the executor type is selected. For a normal event service job to be run on a grid site, the 'generic' is used. For the Raythena workflow, the executor type is 'raythena'. As of April 2020, the executor type has to be set in the pilot config file although it might be preferable to use AGIS instead. When the payload (job object) has been attached to the selected executor, the executor itself is launched by calling its start() function. When the GenericExecutor starts, its run() function gets called since it is a thread. The run() function in turn starts the ESProcess, described in the next section.

'-> pilot.control.payloads.eventservice.Executor.run()   (generic.Executor.run())
   '-> pilot.control.payloads.eventservice.Executor.run_payload()
       '-> pilot.eventservice.workexecutor.workexecutor.start()   (plugin gets selected using config file, thread starts, run() gets executed)
            '-> pilot.eventservice.workexecutor.plugins.genericexecutor.GenericExecutor.start() (basexecutor.BaseExecutor.start())
                '-> pilot.eventservice.communicationmanager.communicationmanager.start() (thread starting)
            '-> pilot.eventservice.workexecutor.plugins.genericexecutor.run()
                '-> pilot.eventservice.esprocess.esprocess.ESProcess.start()


  • ESProcess works as an independent thread with two hooks to communicate without outside. Callers needs to implement these two hooks and link them to the ESProcess when using it.
         * get_event_ranges_hook: to get events
         * handle_output_message_hook: to handle outputs

  • ESProcess structure:


  • pilot.control.payloads.eventservice.py
  • pilot.eventservice.workexecutor.plugins.genericexecutor.py
  • Implement get_event_ranges_hook and handle_output_message_hook and link them to ESProcess.
    • The handle_output_message_hook calls ESStageOut client to stageout ES outputs.


For ESMerge jobs, es_events_read activity is used to stagein es files. It will call ESStageIn client to stagein files.
  • pilot.control.payloads.eventservicemerge.py
  • override utility_before_payload to
          * untar esmerge files
          * process_writetofile


  • ESStagingClient transfer_files
          * for activity in activities
               * for protocol in activity.protocols:
                 * try to transfer files, if success then return; otherwise continue to try different protocols and activities.
  • ESStageIn
          * process storage id: map storage id to ddmendpoint (storage id is from panda job payload).
          * call ESStagingClient transfer_files
  • ESStageOut
          * switch ddempoint based on activity: for ES stagout, different activity may define different ddmendpoint. For example, es_failover may point to a remote storage.
          * call ESStagingClient transfer_files
  • ESActivityMap: to map pilot activity to ddm activity
          * es_events: write_lan, fallback to write_wan
          * es_failover: write_lan, fallback to write_wan
          * es_events_read: read_lan for local endpoints, read_wan for remote endpoints.
  • ES copytool:default will use objectstore copytool(inherited from rucio copytool), except special copytools defined in agis for activity es_events, es_failover or es_events_read.

Event ranges

When the pilot is ready to process events, it asks the server for suitable event ranges by sending the following dictionary (example with values):

{'pandaID': 4512659616, 'nRanges': 64, 'jobsetID': 4512659613, 'taskID': 19366884}

The server reponds by sending the following list of event ranges (example with values corresponding to the first event range in the list):

[{u'eventRangeID': u'19366884-4512659616-18883859031-8193-1', u'LFN': u'EVNT.19366871._000075.pool.root.1', u'lastEvent': 8193, u'startEvent': 8193, u'scope': u'mc16_13TeV', u'GUID': u'BDF920D7-B68A-E64A-8F89-EDD2E52DBDF1'}, ..}]

Note: the number of event ranges (nRanges) is set to 2 * number of cores.

Major updates:
-- WenGuan - 2019-07-19 -- PaulNilsson - 2020-04-15

Topic attachments
I Attachment History Action Size Date Who Comment
PNGpng ESProcess.png r1 manage 67.0 K 2019-07-19 - 11:28 WenGuan  
Edit | Attach | Watch | Print version | History: r5 < r4 < r3 < r2 < r1 | Backlinks | Raw View | WYSIWYG | More topic actions
Topic revision: r5 - 2020-04-15 - PaulNilsson
    • Cern Search Icon Cern Search
    • TWiki Search Icon TWiki Search
    • Google Search Icon Google Search

    PanDA All webs login

This site is powered by the TWiki collaboration platform Powered by PerlCopyright & 2008-2020 by the contributing authors. All material on this collaboration platform is the property of the contributing authors.
or Ideas, requests, problems regarding TWiki? use Discourse or Send feedback