Event Service in PanDA Pilot 2
Introduction
This document describes the ATLAS Event Service implementation in the
PanDA Pilot 2.
Function calls until ESProcess starts
Event service mode is selected when the pilot downloads a job carrying one of the event service identifiers ('eventService' or 'eventServiceMerge') in the job description. Seeing one of these identifiers, the pilot internally sets the 'is_eventservice' or 'is_eventservicemerge' booleans that are data members of the JobData class (i.e. they end up in the job object as job.is_eventservice and job.is_eventservicemerge). The pilot later uses them in pilot/control/payload to select the proper payload executor (eventservice.Executor or eventservicemerge.Executor - the executor for running 'normal' jobs is called generic.Executor). The pilot starts the selected executor by calling its run() function. The main pilot keeps running its normal threads in the meantime for monitoring.
The eventServiceMerge workflow is very similar to the generic workflow, while the main eventService workflow is very different. In it's run_payload() function (called from the run() function), the executor type is selected. For a normal event service job to be run on a grid site, the 'generic' is used. For the Raythena workflow, the executor type is 'raythena'. As of April 2020, the executor type has to be set in the pilot config file although it might be preferable to use AGIS instead. When the payload (job object) has been attached to the selected executor, the executor itself is launched by calling its start() function. When the GenericExecutor starts, its run() function gets called since it is a thread. The run() function in turn starts the ESProcess, described in the next section.
pilot.control.payload.execute_payloads()
'-> pilot.control.payloads.eventservice.Executor.run() (generic.Executor.run())
'-> pilot.control.payloads.eventservice.Executor.run_payload()
'-> pilot.eventservice.workexecutor.workexecutor.start() (plugin gets selected using config file, thread starts, run() gets executed)
'-> pilot.eventservice.workexecutor.plugins.genericexecutor.GenericExecutor.start() (basexecutor.BaseExecutor.start())
'-> pilot.eventservice.communicationmanager.communicationmanager.start() (thread starting)
'-> pilot.eventservice.workexecutor.plugins.genericexecutor.run()
'-> pilot.eventservice.esprocess.esprocess.ESProcess.start()
ESProcess
- ESProcess structure:
ESExecutor
- pilot.control.payloads.eventservice.py
- pilot.eventservice.workexecutor.plugins.genericexecutor.py
- Implement get_event_ranges_hook and handle_output_message_hook and link them to ESProcess.
- The handle_output_message_hook calls ESStageOut client to stageout ES outputs.
For ESMerge jobs, es_events_read activity is used to stagein es files. It will call
ESStageIn client to stagein files.
ESStaging
- ESStagingClient transfer_files
* for activity in activities
* for protocol in activity.protocols:
* try to transfer files, if success then return; otherwise continue to try different protocols and activities.
- ESStageIn
* process storage id: map storage id to ddmendpoint (storage id is from panda job payload).
* call ESStagingClient transfer_files
- ESStageOut
* switch ddempoint based on activity: for ES stagout, different activity may define different ddmendpoint. For example, es_failover may point to a remote storage.
* call ESStagingClient transfer_files
- ESActivityMap: to map pilot activity to ddm activity
* es_events: write_lan, fallback to write_wan
* es_failover: write_lan, fallback to write_wan
* es_events_read: read_lan for local endpoints, read_wan for remote endpoints.
- ES copytool:default will use objectstore copytool(inherited from rucio copytool), except special copytools defined in agis for activity es_events, es_failover or es_events_read.
Event ranges
When the pilot is ready to process events, it asks the server for suitable event ranges by sending the following dictionary (example with values):
{'pandaID': 4512659616, 'nRanges': 64, 'jobsetID': 4512659613, 'taskID': 19366884}
The server reponds by sending the following list of event ranges (example with values corresponding to the first event range in the list):
[{u'eventRangeID': u'19366884-4512659616-18883859031-8193-1', u'LFN': u'EVNT.19366871._000075.pool.root.1', u'lastEvent': 8193, u'startEvent': 8193, u'scope': u'mc16_13TeV', u'GUID': u'BDF920D7-B68A-E64A-8F89-EDD2E52DBDF1'}, ..}]
Note: the number of event ranges (nRanges) is set to 2 * number of cores.
Major updates:
--
WenGuan - 2019-07-19
--
PaulNilsson - 2020-04-15