panda_server.cfg
configuration file, one can decide whether Oracle or MySQL database backend will be used for PanDA data storage.
WrappedCursor()
is a class defined in pandaserver.taskbuffer.WrappedCursor
. It alters Oracle's PL/SQL syntax, so that other DB backends can be used. WrappedCursor
class provides interface to translate Oracle PL/SQL queries to MySQL (or keep them as Oracle PL/SQL queries).
cx_Oracle
and/or MySQLdb
libaries for python-DB backend interactions.
# connection object conn = None # cursor object cur = None # use special error codes for reconnection in querySQL useOtherError = False # backend backend = 'oracle' # constructor def __init__(self, connection) # iterator def __iter__(self) # string representation def __str__(self) def execute(self, sql, varDict=None, cur=None) def executemany(self, sql, params) def fetchall(self) def fetchmany(self, arraysize=1000) def fetchone(self) def var(self, dataType, *args, **kwargs) def getvalue(self,dataItem) def next(self) def close(self) def prepare(self, statement) # wrappers to handle Oracle's "RETURNING INTO" statement in different backends def _returningIntoOracle(self, returningInputData, varDict, cur, dryRun=False) def _returningIntoMySQLpre(self, returningInputData, varDict, cur) def _returningIntoMySQLpost(self, returningInputData, varDict, cur) @property def description(self) @property def rowcount(self) @property def arraysize(self) @arraysize.setter def arraysize(self,val)
# cat /etc/my.cnf [mysqld] ... lower_case_table_names=1 ...
panda_server.cfg
WrappedCursor()
the schema names in a SQL query are replaced with schema names from configuration: sql = re.sub('ATLAS_PANDA\.', panda_config.schemaPANDA + '.', sql) sql = re.sub('ATLAS_PANDAMETA\.', panda_config.schemaMETA + '.', sql) sql = re.sub('ATLAS_GRISLI\.', panda_config.schemaGRISLI + '.', sql) sql = re.sub('ATLAS_PANDAARCH\.', panda_config.schemaPANDAARCH + '.', sql) sql = re.sub('ATLAS_DEFT\.', panda_config.schemaJEDI + '.', sql)
CURRENT_DATE
term WrappedCursor()
.
CURRENT_DATE
for MySQL consists of 2 steps. sql = re.sub("CURRENT_DATE\s*-\s*(\d+|:[^\s\)]+)", "DATE_SUB(CURRENT_DATE,INTERVAL \g<1> DAY)", sql)
CURRENT_TIMESTAMP
instead of CURRENT_DATE
, because of different date string formats in Oracle and MySQL.sql = re.sub('CURRENT_DATE', 'CURRENT_TIMESTAMP', sql)
SYSDATE
term WrappedCursor()
.
SYSDATE
for MySQL consists of 2 steps. sql = re.sub("SYSDATE\s*-\s*(\d+|:[^\s\)]+)", "DATE_SUB(SYSDATE,INTERVAL \g<1> DAY)", sql)
SYSDATE()
instead of SYSDATE
. sql = re.sub('SYSDATE', 'SYSDATE()', sql)
EMPTY_CLOB()
term WrappedCursor()
.
EMPTY_CLOB()
in MySQL, therefore replacing it with an empty string: sql = re.sub('EMPTY_CLOB\(\)', "''", sql)
ROWNUM
term WrappedCursor()
.
LIMIT
instead of Oracle's ROWNUM
.sql = re.sub("(?i)(AND)*\s*ROWNUM\s*<=\s*(\d+)", " LIMIT \g<2>", sql) sql = re.sub("(?i)(WHERE)\s*LIMIT\s*(\d+)", " LIMIT \g<2>" , sql)
NOWAIT
term WrappedCursor()
.
SELECT ... FOR UPDATE NOWAIT
, however, MySQL does not support NOWAIT
.
NOWAIT
.sql = re.sub('NOWAIT', "", sql)
RETURNING INTO
term WrappedCursor()
.
RETURNING INTO
term is used to get an ID of the new job or file.
LAST_INSERT_ID()
together with an autoincremented column.
var
of data type number pandaserver.taskbuffer.OraDBProxy
.
cursor.var()
, the Oracle-backed PanDA server returns variable of a data type cx_Oracle.NUMBER
. However, MySQLdb
provides no such thing, so we use python's long
data type.if panda_config.backend == 'oracle': import cx_Oracle varNUMBER = cx_Oracle.NUMBER else: import MySQLdb varNUMBER = long
pandaserver.taskbuffer.OraDBProxy
, pandaserver.taskbuffer.{JobSpec,FileSpec,DatasetSpec}
, pandajedi.jedicore.{JediFileSpec,JediTaskSpec}
.
.nextval
or .currval
properties of a defined sequence are used.
.nextval
property is then replaced by INSERT
of a NULL
value into an autoincremented column of the "sequence table", combined together with SELECT LAST_INSERT_ID()
.
JobSpec
, FileSpec
, and DatasetSpec
classes, e.g. # return expression of bind values for INSERT def bindValuesExpression(cls,useSeq=False): from config import panda_config ret = "VALUES(" for attr in cls._attributes: if useSeq and cls._seqAttrMap.has_key(attr): if panda_config.backend == 'mysql': # mysql ret += "%s," % "NULL" else: # oracle ret += "%s," % cls._seqAttrMap[attr] else: ret += ":%s," % attr ret = ret[:-1] ret += ")" return ret bindValuesExpression = classmethod(bindValuesExpression)
ATLAS_PANDA.JOBSDEFINED4_PANDAID_SEQ
. JOBSDEFINED4_PANDAID_SEQ
: CREATE TABLE JOBSDEFINED4_PANDAID_SEQ ( id BIGINT(20) NOT NULL AUTO_INCREMENT, col bit(1) NULL, PRIMARY KEY (id) ) ENGINE=InnoDB DEFAULT CHARSET=latin1;
### ATLAS_PANDA schema ### ATLAS_PANDA.JOBSDEFINED4_PANDAID_SEQ.nextval ATLAS_PANDA.SUBCOUNTER_SUBID_SEQ.nextval ATLAS_PANDA.GROUP_JOBID_SEQ.nextval ATLAS_PANDA.CLOUDTASKS_ID_SEQ.nextval ### ATLAS_PANDAMETA schema ### ATLAS_PANDAMETA.USERS_ID_SEQ.nextval ATLAS_PANDAMETA.PROXYKEY_ID_SEQ.nextval ATLAS_PANDAMETA.SITEACCESS_ID_SEQ.nextval ### ATLAS_DEFT schema ### ATLAS_DEFT.PRODSYS2_TASK_ID_SEQ.nextval ATLAS_DEFT.PRODSYS2_TASK_ID_SEQ.currval
### ATLAS_PANDA schema ### ATLAS_PANDA.JEDI_DATASET_CONT_FILEID_SEQ.nextval ATLAS_PANDA.JEDI_DATASETS_ID_SEQ.nextval ATLAS_PANDA.JEDI_OUTPUT_TEMPLATE_ID_SEQ.nextval
ATLAS_PANDA.JOBSDEFINED4_PANDAID_SEQ.nextval
if panda_config.backend == 'mysql': ### fake sequence sql = " INSERT INTO ATLAS_PANDA.JOBSDEFINED4_PANDAID_SEQ (col) VALUES (NULL) " self.cur.arraysize = 10 self.cur.execute(sql + comment, {}) sql2 = """ SELECT LAST_INSERT_ID() """ self.cur.execute(sql2 + comment, {}) job.jobsetID, = self.cur.fetchone() else: # panda_config.backend == 'oracle': sqlESS = "SELECT ATLAS_PANDA.JOBSDEFINED4_PANDAID_SEQ.nextval FROM dual "; self.cur.arraysize = 10 self.cur.execute(sqlESS + comment, {}) job.jobsetID, = self.cur.fetchone()
ATLAS_PANDA.CLOUDTASKS_ID_SEQ.nextval
within the original table, with RETURNING INTO
term if panda_config.backend == 'oracle': import cx_Oracle varNUMBER = cx_Oracle.NUMBER else: import MySQLdb varNUMBER = long ### ... if panda_config.backend == 'mysql': ### fake sequence sql = "INSERT INTO ATLAS_PANDA.cloudtasks (id,taskid,status,tmod,tenter) VALUES(NULL,:taskid,:status,CURRENT_DATE,CURRENT_DATE)" else: # panda_config.backend == 'oracle': sql = "INSERT INTO ATLAS_PANDA.cloudtasks (id,taskid,status,tmod,tenter) VALUES(ATLAS_PANDA.CLOUDTASKS_ID_SEQ.nextval,:taskid,:status,CURRENT_DATE,CURRENT_DATE)" sql += " RETURNING id INTO :newID" varMap = {} varMap[':taskid'] = cloudTask.taskid varMap[':status'] = cloudTask.status varMap[':newID'] = self.cur.var(varNUMBER) self.cur.execute(sql+comment, varMap) # get id cloudTask.id = long(self.cur.getvalue(varMap[':newID']))
mysqldump -d
schema: https://svnweb.cern.ch/trac/panda/browser/bigpanda-misc/panda-DB-migration/mysql-work-misc/schema/schema.pandadb1.sql