#! /usr/bin/env python # Plugin for the ARC middleware to generate usage records on job completion. # Currently only supports arc.conf style config (no XML config support) # # Author: Henrik Thostrup Jensen import os import sys import re import pwd import time import logging import datetime import urlparse import tempfile import subprocess import ConfigParser try: from xml.etree import ElementTree as ET except ImportError: # Python 2.4 compatability from elementtree import ElementTree as ET # constant / defaults DEFAULT_LOG_DIR = "/var/spool/arc/usagerecords" DEFAULT_LOGLEVEL = logging.INFO UR_DIRECTORY = "urs" GM_DEFAULT_PORT = 2811 GM_DEFAULT_MOUNT = "/jobs" ARC_CONFIG = "ARC_CONFIG" NORDUGRID_CONFIG = "NORDUGRID_CONFIG" ISO_TIME_FORMAT = "%Y-%m-%dT%H:%M:%S" ISOZ_TIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ" ARC_TIME_FORMAT = "%Y%m%d%H%M%SZ" XML_HEADER = '''''' + "\n" LOG_FORMAT = "%(asctime)s [%(levelname)s] %(message)s" LOGGER_NAME_VALUE = 'ARC1-URLogger' LOGGER_VERSION_VALUE = '1.2.0' # arc.conf configuration constants CONFIG_COMMON_SECTION = 'common' CONFIG_CLUSTER_SECTION = 'cluster' CONFIG_LOGGER_SECTION = 'logger' CONFIG_HOSTNAME = 'hostname' CONFIG_LOG_DIR = 'log_dir' CONFIG_LOGFILE = 'urlogger_logfile' CONFIG_LOGLEVEL = 'urlogger_loglevel' # valid options: debug, info, warning LOGLEVELS = { 'debug' : logging.DEBUG, 'info' : logging.INFO, 'warning' : logging.WARNING } # static file paths ARC_DEFAULT_CONFIG_FILE = "/etc/arc.conf" NORDUGRID_DEFAULT_CONFIG_FILE = "/etc/nordugrid.conf" GRID_VO_MAPFILE = "/etc/grid-security/grid-vo-mapfile" DEFAULT_LOGFILE = "/var/log/arc/ur-logger.log" # executables VOMS_PROXY_INFO = 'voms-proxy-info' VOMS_PROXY_INFO_PATH = '/opt/voms/bin' # xml namespaces OGF_UR_NAMESPACE = "http://schema.ogf.org/urf/2003/09/urf" DEISA_NAMESPACE = "http://rmis.deisa.org/acct" SGAS_VO_NAMESPACE = "http://www.sgas.se/namespaces/2009/05/ur/vo" SGAS_AT_NAMESPACE = "http://www.sgas.se/namespaces/2009/07/ur" LOGGER_NAMESPACE = "http://www.sgas.se/namespaces/2010/08/logger" TRANSFER_NAMESPACE = "http://www.sgas.se/namespaces/2010/10/filetransfer" # job usage element/attribute names JOB_USAGE_RECORD = ET.QName("{%s}JobUsageRecord" % OGF_UR_NAMESPACE) RECORD_IDENTITY = ET.QName("{%s}RecordIdentity" % OGF_UR_NAMESPACE) RECORD_ID = ET.QName("{%s}recordId" % OGF_UR_NAMESPACE) CREATE_TIME = ET.QName("{%s}createTime" % OGF_UR_NAMESPACE) JOB_IDENTITY = ET.QName("{%s}JobIdentity" % OGF_UR_NAMESPACE) GLOBAL_JOB_ID = ET.QName("{%s}GlobalJobId" % OGF_UR_NAMESPACE) LOCAL_JOB_ID = ET.QName("{%s}LocalJobId" % OGF_UR_NAMESPACE) USER_IDENTITY = ET.QName("{%s}UserIdentity" % OGF_UR_NAMESPACE) GLOBAL_USER_NAME = ET.QName("{%s}GlobalUserName" % OGF_UR_NAMESPACE) LOCAL_USER_ID = ET.QName("{%s}LocalUserId" % OGF_UR_NAMESPACE) JOB_NAME = ET.QName("{%s}JobName" % OGF_UR_NAMESPACE) STATUS = ET.QName("{%s}Status" % OGF_UR_NAMESPACE) CHARGE = ET.QName("{%s}Charge" % OGF_UR_NAMESPACE) WALL_DURATION = ET.QName("{%s}WallDuration" % OGF_UR_NAMESPACE) CPU_DURATION = ET.QName("{%s}CpuDuration" % OGF_UR_NAMESPACE) NODE_COUNT = ET.QName("{%s}NodeCount" % OGF_UR_NAMESPACE) PROCESSORS = ET.QName("{%s}Processors" % OGF_UR_NAMESPACE) START_TIME = ET.QName("{%s}StartTime" % OGF_UR_NAMESPACE) END_TIME = ET.QName("{%s}EndTime" % OGF_UR_NAMESPACE) PROJECT_NAME = ET.QName("{%s}ProjectName" % OGF_UR_NAMESPACE) SUBMIT_HOST = ET.QName("{%s}SubmitHost" % OGF_UR_NAMESPACE) MACHINE_NAME = ET.QName("{%s}MachineName" % OGF_UR_NAMESPACE) HOST = ET.QName("{%s}Host" % OGF_UR_NAMESPACE) QUEUE = ET.QName("{%s}Queue" % OGF_UR_NAMESPACE) SUBMIT_TIME = ET.QName("{%s}SubmitTime" % DEISA_NAMESPACE) VO = ET.QName("{%s}VO" % SGAS_VO_NAMESPACE) VO_TYPE = ET.QName("{%s}type" % SGAS_VO_NAMESPACE) VO_NAME = ET.QName("{%s}Name" % SGAS_VO_NAMESPACE) VO_ISSUER = ET.QName("{%s}Issuer" % SGAS_VO_NAMESPACE) VO_ATTRIBUTE = ET.QName("{%s}Attribute" % SGAS_VO_NAMESPACE) VO_GROUP = ET.QName("{%s}Group" % SGAS_VO_NAMESPACE) VO_ROLE = ET.QName("{%s}Role" % SGAS_VO_NAMESPACE) VO_CAPABILITY = ET.QName("{%s}Capability" % SGAS_VO_NAMESPACE) SGAS_USER_TIME = ET.QName("{%s}UserTime" % SGAS_AT_NAMESPACE) SGAS_KERNEL_TIME = ET.QName("{%s}KernelTime" % SGAS_AT_NAMESPACE) SGAS_EXIT_CODE = ET.QName("{%s}ExitCode" % SGAS_AT_NAMESPACE) SGAS_MAJOR_PAGE_FAULTS = ET.QName("{%s}MajorPageFaults" % SGAS_AT_NAMESPACE) SGAS_RUNTIME_ENVIRONMENT = ET.QName("{%s}RuntimeEnvironment" % SGAS_AT_NAMESPACE) # logger elements and attributes LOGGER_NAME = ET.QName("{%s}LoggerName" % LOGGER_NAMESPACE) LOGGER_VERSION = ET.QName("{%s}version" % LOGGER_NAMESPACE) # elements for transfer stats FILE_TRANSFERS = ET.QName("{%s}FileTransfers" % TRANSFER_NAMESPACE) FILE_DOWNLOAD = ET.QName("{%s}FileDownload" % TRANSFER_NAMESPACE) FILE_UPLOAD = ET.QName("{%s}FileUpload" % TRANSFER_NAMESPACE) TRANSFER_URL = ET.QName("{%s}URL" % TRANSFER_NAMESPACE) TRANSFER_SIZE = ET.QName("{%s}Size" % TRANSFER_NAMESPACE) TRANSFER_START_TIME = ET.QName("{%s}StartTime" % TRANSFER_NAMESPACE) TRANSFER_END_TIME = ET.QName("{%s}EndTime" % TRANSFER_NAMESPACE) TRANSFER_BYPASS_CACHE = ET.QName("{%s}BypassCache" % TRANSFER_NAMESPACE) TRANSFER_RETRIEVED_FROM_CACHE = ET.QName("{%s}RetrievedFromCache" % TRANSFER_NAMESPACE) # register namespaces in element tree so we get more readable xml files # the semantics of the xml files does not change due to this try: register_namespace = ET.register_namespace except AttributeError: def register_namespace(prefix, uri): ET._namespace_map[uri] = prefix register_namespace('ur', OGF_UR_NAMESPACE) register_namespace('deisa', DEISA_NAMESPACE) register_namespace('vo', SGAS_VO_NAMESPACE) register_namespace('sgas', SGAS_AT_NAMESPACE) register_namespace('logger', LOGGER_NAMESPACE) register_namespace('transfer', TRANSFER_NAMESPACE) class ConfigurationError(Exception): pass # utility function, very handy def setElement(parent, name, text): element = ET.SubElement(parent, name) element.text = str(text) class VOInformation: def __init__(self): self.type = None self.name = None self.issuer = None self.attributes = [] # [group, role, capability] def generateTree(self): vo = ET.Element(VO) if self.type is not None: vo.attrib[VO_TYPE] = self.type setElement(vo, VO_NAME, self.name) if self.issuer is not None: setElement(vo, VO_ISSUER, self.issuer) for attrs in self.attributes: group, role, capability = attrs attr = ET.SubElement(vo, VO_ATTRIBUTE) setElement(attr, VO_GROUP, group) if role is not None: setElement(attr, VO_ROLE, role) if capability is not None: setElement(attr, VO_CAPABILITY, capability) return vo class Download: def __init__(self, url, size=None, start_time=None, end_time=None, bypass_cache=None, retrieved_from_cache=None): self.url = url self.size = size self.start_time = start_time self.end_time = end_time self.bypass_cache = bypass_cache self.retrieved_from_cache = retrieved_from_cache def generateTree(self): download = ET.Element(FILE_DOWNLOAD) setElement(download, TRANSFER_URL, self.url) if self.size is not None: setElement(download, TRANSFER_SIZE, self.size) if self.start_time is not None: setElement(download, TRANSFER_START_TIME, self.start_time) if self.end_time is not None: setElement(download, TRANSFER_END_TIME, self.end_time) if self.bypass_cache is not None: setElement(download, TRANSFER_BYPASS_CACHE, self.bypass_cache) if self.retrieved_from_cache is not None: setElement(download, TRANSFER_RETRIEVED_FROM_CACHE, self.retrieved_from_cache) return download class Upload: def __init__(self, url, size=None, start_time=None, end_time=None): self.url = url self.size = size self.start_time = start_time self.end_time = end_time def generateTree(self): upload = ET.Element(FILE_UPLOAD) setElement(upload, TRANSFER_URL, self.url) if self.size is not None: setElement(upload, TRANSFER_SIZE, self.size) if self.start_time is not None: setElement(upload, TRANSFER_START_TIME, self.start_time) if self.end_time is not None: setElement(upload, TRANSFER_END_TIME, self.end_time) return upload class UsageRecord: def __init__(self): self.record_id = None self.global_job_id = None self.local_job_id = None self.global_user_name = None self.local_user_id = None self.job_name = None self.status = None self.machine_name = None self.queue = None self.host = None self.node_count = None self.processors = None self.submit_time = None self.end_time = None self.start_time = None self.project_name = None self.submit_host = None self.wall_duration = None self.cpu_duration = None self.charge = None self.vo_info = [] # list of VOInformation # sgas attributes self.user_time = None self.kernel_time = None self.exit_code = None self.major_page_faults = None self.runtime_environments = [] # logger attributes self.logger_name = LOGGER_NAME self.logger_version = LOGGER_VERSION # transfers self.downloads = [] self.uploads = [] def generateTree(self): """ Generates the XML tree for usage record. """ # begin method ur = ET.Element(JOB_USAGE_RECORD) assert self.record_id is not None, "No recordId specified, cannot generate usage record" record_identity = ET.SubElement(ur, RECORD_IDENTITY) record_identity.set(RECORD_ID, self.record_id) record_identity.set(CREATE_TIME, time.strftime(ISO_TIME_FORMAT, time.gmtime()) + 'Z') if self.global_job_id is not None or self.local_job_id is not None: job_identity = ET.SubElement(ur, JOB_IDENTITY) if self.global_job_id is not None: setElement(job_identity, GLOBAL_JOB_ID, self.global_job_id) if self.local_job_id is not None: setElement(job_identity, LOCAL_JOB_ID, self.local_job_id) if self.global_user_name is not None or self.local_job_id is not None: user_identity = ET.SubElement(ur, USER_IDENTITY) if self.local_user_id is not None: setElement(user_identity, LOCAL_USER_ID, self.local_user_id) if self.global_user_name is not None: setElement(user_identity, GLOBAL_USER_NAME, self.global_user_name) # vo stuff belongs under user identity for voi in self.vo_info: user_identity.append(voi.generateTree()) if self.job_name is not None : setElement(ur, JOB_NAME, self.job_name) if self.charge is not None : setElement(ur, CHARGE, self.charge) if self.status is not None : setElement(ur, STATUS, self.status) if self.machine_name is not None : setElement(ur, MACHINE_NAME, self.machine_name) if self.queue is not None : setElement(ur, QUEUE, self.queue) if self.host is not None : setElement(ur, HOST, self.host) if self.node_count is not None : setElement(ur, NODE_COUNT, self.node_count) if self.processors is not None : setElement(ur, PROCESSORS, self.processors) if self.submit_host is not None : setElement(ur, SUBMIT_HOST, self.submit_host) if self.project_name is not None : setElement(ur, PROJECT_NAME, self.project_name) if self.submit_time is not None : setElement(ur, SUBMIT_TIME, self.submit_time) if self.start_time is not None : setElement(ur, START_TIME, self.start_time) if self.end_time is not None : setElement(ur, END_TIME, self.end_time) if self.wall_duration is not None : setElement(ur, WALL_DURATION, "PT%iS" % self.wall_duration) if self.cpu_duration is not None : setElement(ur, CPU_DURATION, "PT%iS" % self.cpu_duration) # sgas attributes if self.user_time is not None : setElement(ur, SGAS_USER_TIME, "PT%iS" % self.user_time) if self.kernel_time is not None : setElement(ur, SGAS_KERNEL_TIME, "PT%iS" % self.kernel_time) if self.exit_code is not None : setElement(ur, SGAS_EXIT_CODE, self.exit_code) if self.major_page_faults is not None : setElement(ur, SGAS_MAJOR_PAGE_FAULTS, self.major_page_faults) for renv in self.runtime_environments: setElement(ur, SGAS_RUNTIME_ENVIRONMENT, renv) # set logger name and version logger_name = ET.SubElement(ur, LOGGER_NAME) logger_name.text = LOGGER_NAME_VALUE logger_name.set(LOGGER_VERSION, LOGGER_VERSION_VALUE) if self.downloads or self.uploads: tri = ET.SubElement(ur, FILE_TRANSFERS) for dl in self.downloads: tri.append(dl.generateTree()) for ul in self.uploads: tri.append(ul.generateTree()) return ET.ElementTree(ur) def writeXML(self, filename): tree = self.generateTree() f = file(filename, 'w') f.write(XML_HEADER) tree.write(f, encoding='utf-8') # end usage record class ARCConfigFileReader: # Wrapper around the stock file object with slightly altered readline # functionality (the Python ConfigParser only uses readline). # The wrapper "fixes" odd config entries allowed by ARC, but which # falls outside the allowed language defined in RFC 822. def __init__(self, fp): self._fp = fp def readline(self): line = self._fp.readline() # return empty lines and comments as-is if not line.strip() or line.strip().startswith('#'): return line # if the line is not a config block and does not have a '=' in it the # config parser will break, however ARC allows this. We just adds # '=true' to the line, so the config value can even be read in Python. if not line.startswith('[') and '=' not in line: return line + '=true' # regular line, just return it return line def cleanString(value): new_val = type(value) == str and value.strip().replace('"','').replace("'",'') or value return new_val def getARCConfigurationFilePath(): # first try to get the file path from the environment if ARC_CONFIG in os.environ and os.path.exists(os.environ[ARC_CONFIG]): return os.environ[ARC_CONFIG] if NORDUGRID_CONFIG in os.environ and os.path.exists(os.environ[NORDUGRID_CONFIG]): return os.environ[NORDUGRID_CONFIG] # then try the default file paths if os.path.exists(ARC_DEFAULT_CONFIG_FILE): return ARC_DEFAULT_CONFIG_FILE if os.path.exists(NORDUGRID_DEFAULT_CONFIG_FILE): return NORDUGRID_DEFAULT_CONFIG_FILE raise ConfigurationError("No ARC/NorduGrid configuration file found") def getARCConfig(filepath=None): if filepath is None: filepath = getARCConfigurationFilePath() arc_cfg_file = ARCConfigFileReader(file(filepath)) arc_cfg = ConfigParser.SafeConfigParser() arc_cfg.readfp(arc_cfg_file) return arc_cfg def getLogDirectory(arc_cfg): """ Returns the logging directory. """ try: logdir = arc_cfg.get(CONFIG_LOGGER_SECTION, CONFIG_LOG_DIR) logdir = cleanString(logdir) return logdir except ConfigParser.NoSectionError: pass except ConfigParser.NoOptionError: pass return DEFAULT_LOG_DIR def getCommonHostName(arc_cfg): # returns value of hostname from common section hostname = None if arc_cfg.has_section(CONFIG_COMMON_SECTION): options = dict(arc_cfg.items(CONFIG_COMMON_SECTION)) hostname = cleanString(options.get(CONFIG_HOSTNAME)) return hostname def getClusterHostName(arc_cfg): # returns value of hostname from cluster section hostname = None if arc_cfg.has_section(CONFIG_CLUSTER_SECTION): options = dict(arc_cfg.items(CONFIG_CLUSTER_SECTION)) hostname = cleanString(options.get(CONFIG_HOSTNAME)) if hostname is None: hostname = getCommonHostName(arc_cfg) return hostname def getLoggerHostName(arc_cfg): hostname = None if arc_cfg.has_section(CONFIG_LOGGER_SECTION): options = dict(arc_cfg.items(CONFIG_LOGGER_SECTION)) hostname = cleanString(options.get(CONFIG_HOSTNAME)) if hostname is None: hostname = getCommonHostName(arc_cfg) if hostname is None: import socket hostname = socket.getfqdn() return hostname def parseFile(filename): parse_results = {} for line in file(filename).readlines(): line = line.strip() if not line: continue # filter out empty/blank lines try: name, value = line.split("=", 1) parse_results.setdefault(name, []).append(value) except ValueError: logging.warning(""""Error parsing line "%s" in file %s""" % (line, filename)) return parse_results def parseARCTime(date_string): return time.strptime(date_string, ARC_TIME_FORMAT) def gm2isoTime(gm_time): return time.strftime(ISO_TIME_FORMAT, gm_time) + "Z" def arc2isoTime(date_string): gm_time = parseARCTime(date_string) iso_date = gm2isoTime(gm_time) return iso_date def parseDuration(times): def parseTimeEntry(timestr): assert timestr.endswith("s"), "Walltime is in invalid (or new) format" return float(timestr[:-1]) return max( [ parseTimeEntry(time_entry) for time_entry in times ] ) def assembleGlobalJobID(job_id, arc_cfg): gm_port = None gm_hostname = None gm_mount_point = None gm_hostname = getClusterHostName(arc_cfg) if arc_cfg.has_section('cluster'): cluster_options = dict(arc_cfg.items('cluster')) gm_port = cleanString(cluster_options.get('gm_port')) gm_mount_point = cleanString(cluster_options.get('gm_mount_point')) if gm_port is None: gm_port = GM_DEFAULT_PORT if gm_hostname is None: import socket gm_hostname = socket.getfqdn() if gm_mount_point is None: gm_mount_point = GM_DEFAULT_MOUNT global_job_id = "gsiftp://%s:%s%s/%s" % (gm_hostname, gm_port, gm_mount_point, job_id) return global_job_id def executeVOMSProxyInfo(voms_proxy_file): stdout_output = tempfile.TemporaryFile() stderr_output = tempfile.TemporaryFile() # next two statements might raise OSError try: p = subprocess.Popen([VOMS_PROXY_INFO, '-all', '-file', voms_proxy_file], stdout=stdout_output, stderr=stderr_output) p.wait() except OSError, e: if e.errno == 2: # probably couldn't find voms-proxy-info logging.warn("Probably couldn't find voms-proxy-info, trying with path %s" % VOMS_PROXY_INFO_PATH) vpi = os.path.join(VOMS_PROXY_INFO_PATH, VOMS_PROXY_INFO) p = subprocess.Popen([vpi, '-all', '-file', voms_proxy_file], stdout=stdout_output, stderr=stderr_output) p.wait() else: raise e stdout_output.seek(0) s = stdout_output.read() return s def parseVOMSProxyInfoOutput(voms_info_output): # VOMS parsing notes: # The '/Capability=' attribute is deprecated, # so we do not report it anymore. # It is assumed that lines starting with # '/' (slash) # in the 'attributes : ' line is reserved # for VOMS groups. # (i.e. it shall not be allowed for generic VOMS attributes. vo_info = [] voi = None for line in voms_info_output.split("\n"): if line.startswith("=== VO"): # new vo if voi is not None: vo_info.append(voi) voi = VOInformation() voi.type = "voms" if voi is None: continue # only parse stuff after we detected a vo if line.startswith("VO"): vo_name = line.split(':', 1)[-1].strip() voi.name = vo_name elif line.startswith("issuer"): vo_issuer = line.split(':', 1)[-1].strip() voi.issuer = vo_issuer elif line.startswith("attribute"): grc = line.split(':', 1)[-1].strip() # skip if attribute entry does not start with / if not grc.startswith('/'): continue if '/Capability=' in grc: grc = grc.rsplit('/Capability=',1)[0] role = None if '/Role=' in grc: grc, role = grc.rsplit('/Role=',1) if role == 'NULL': role = None group = grc[1:] # remove initial / voi.attributes.append((group, role, None)) if voi is not None: vo_info.append(voi) return vo_info def getVOMSesFromProxy(proxyfile): import arc config = arc.UserConfig() cafile = config.CACertificatePath() cadir = config.CACertificatesDirectory() holder = arc.Credential(proxyfile ,"", cadir, cafile) vomsdir = "" trustlist = arc.VOMSTrustList() outputs = arc.VOMSACInfoVector() vos = [] if arc.parseVOMSAC(holder, cadir, cafile, vomsdir, trustlist, outputs, True, True): for vo in outputs: voi = VOInformation() voi.type = "voms" voi.name = vo.voname voi.issuer = vo.issuer for attribute in vo.attributes: if attribute.startswith('/voname=') or not attribute.startswith('/'): continue # Capabilities capability = None if '/Capability=' in attribute: attribute, capability = attribute.rsplit('/Capability=',1) if capability == 'NULL': capability = None # Roles role = None if '/Role=' in attribute: attribute, role = attribute.rsplit('/Role=',1) if role == 'NULL': role = None group = attribute[1:] # remove initial / voi.attributes.append((group, role, None)) vos.append(voi) return vos def getARCProxyInformation(proxy_filepath): try: import arc except ImportError, e: raise e return getVOMSesFromProxy(proxy_filepath) def getVOMSProxyInformation(proxy_filepath): voms_info_output = executeVOMSProxyInfo(proxy_filepath) vo_info = parseVOMSProxyInfoOutput(voms_info_output) return vo_info def parseGridVOMapUrl(vo_url): voi = VOInformation() voi.type = 'grid-vo-map/' + vo_url[:vo_url.index(':')] voi.issuer = vo_url # heuristic for creating vo names from voms urls if vo_url.startswith('voms://') or vo_url.startswith('vomss://'): # url parse does not understand voms url (which are really just http urls anyway) vo_url2 = vo_url.replace('voms://','http://').replace('vomss://', 'https://') url = urlparse.urlparse(vo_url2) # url[2] -> path path_tokens = [ part for part in url[2].split('/') if part ] if len(path_tokens) == 2 and path_tokens[0] == 'voms': # path structure: vomss://voms.cern.ch:8443/voms/atlas voi.name = path_tokens[1] if url[4]: # url has a query argument, typically means a group is specified group_tokens = [ g for g in url[4].split('/') if g ] # 1 token -> top group (same as vo) - not useful # 2 tokens -> group specified # 3+ tokens -> unknown if len(group_tokens) == 2: group = group_tokens[1] voi.attributes.append((group, None, None)) elif len(group_tokens) > 2: logging.warning('Could not understand voms url query param. Url: %s' % vo_url) else: # unknown path structure, set vo name to url path voi.name = url[2] else: # the reverse map is a file or http/https url, and does not provide # any valuable information, so no information is returned return [] return [voi] def getVOMapfileInformation(usersn): if not os.path.exists(GRID_VO_MAPFILE): return [] line_re = '"(.*)" "(.*)"' regex = re.compile(line_re) try: vomapf = file(GRID_VO_MAPFILE) except IOError: return [] for line in vomapf.readlines(): m = regex.match(line) if not m: continue l_user, l_url = m.groups() if l_user == usersn: return parseGridVOMapUrl(l_url) return [] def parseARCURL(url): # arc likes to add information to its url (groan) # so we have to extract this information and reconstruct the original url # srm://srm.ndgf.org;cache=yes;spacetoken=ATLASNOLOCALGROUPDISK/atlas/disk/atlaslocalgroupdisk/file1 # lfc://lfc1.ndgf.org:5010;cache=yes//grid/atlas/dq2/user/user.andrejfilipcic.production/pilot3-SULU44i.tgz base_url = '' bypass_cache = None spacetoken = None # split metadata out of the url url_prefix, url_reminder = url.split('://', 1) host_metadata, url_path = url_reminder.split('/',1) if ';' in host_metadata: host, metadata = host_metadata.split(';',1) else: host, metadata = host_metadata, None base_url = url_prefix + '://' + host + '/' + url_path # extract metadata if metadata is not None: md_tokens = metadata.split(';') for t in md_tokens: if t.startswith('cache='): if t == 'cache=yes': bypass_cache = 'false' elif t == 'cache=no': bypass_cache = 'true' else: logging.warning('Invalid value for cache bypass: %s' % t) elif t.startswith('spacetoken'): _, spacetoken = t.split('=') else: logging.warning('Unrecognized URL option: %s' % url) return base_url, bypass_cache, spacetoken def isoZTime2ISOTime(isoz_time): gmt = time.strptime(isoz_time, ISOZ_TIME_FORMAT) return time.strftime(ISO_TIME_FORMAT, gmt) + "Z" def getTransferStats(statistics_file): # structure # inputfile=url=srm://srm.ndgf.org;cache=yes;spacetoken=ATLASNOLOCALGROUPDISK/atlas/disk/atlaslocalgroupdisk/no/user10/DmytroKarpenko/testfile,size=5,starttime=2010-10-18 11:46:43,endtime=2010-10-18 11:46:45,fromcache=no # outputfile=url=srm://srm.ndgf.org;spacetoken=ATLASNOLOCALGROUPDISK/atlas/disk/atlaslocalgroupdisk/no/user10/DmytroKarpenko/testfile26,size=10,starttime=2010-10-18 11:47:10,endtime=2010-10-18 11:47:11 downloads = [] uploads = [] if os.path.exists(statistics_file): for line in file(statistics_file): if line.startswith('inputfile:') or line.startswith('outputfile:'): url = None size = None start_time = None end_time = None bypass_cache = None retrieved_from_cache = None transfer_type, info_block = line.split(':',1) tokens = info_block.split(',') for i_key, i_value in [ token.split('=',1) for token in tokens if '=' in token ]: i_value = i_value.strip() if i_key == 'url': url, bypass_cache, _ = parseARCURL(i_value) if i_key == 'size': size = i_value if i_key == 'starttime': start_time = isoZTime2ISOTime(i_value) if i_key == 'endtime': end_time = isoZTime2ISOTime(i_value) if i_key == 'fromcache': if i_value == 'yes': retrieved_from_cache = 'true' elif i_value == 'no': retrieved_from_cache = 'false' else: logging.warning('Invalid fromcache value in statistics file: %s' % i_value) if transfer_type == 'inputfile': dl = Download(url, size, start_time, end_time, bypass_cache, retrieved_from_cache) downloads.append(dl) elif transfer_type == 'outputfile': ul = Upload(url, size, start_time, end_time) uploads.append(ul) # already check for else clause return downloads, uploads def generateUsageRecord(arc_cfg, control_dir, job_id, job_status, local_user=None): diag_filepath = os.path.join(control_dir, 'job.%s.diag' % job_id) local_filepath = os.path.join(control_dir, 'job.%s.local' % job_id) proxy_filepath = os.path.join(control_dir, 'job.%s.proxy' % job_id) statistics_filepath = os.path.join(control_dir, 'job.%s.statistics' % job_id) if not os.path.exists(diag_filepath): raise ConfigurationError("No diag file, skipping record generation") if not os.path.exists(local_filepath): raise ConfigurationError("No local file, skipping record generation") diag_data = parseFile(diag_filepath) local_data = parseFile(local_filepath) global_job_id = assembleGlobalJobID(job_id, arc_cfg) host_name = getLoggerHostName(arc_cfg) # start constructing the usage record ur = UsageRecord() if host_name and '.' in host_name and 'localid' in local_data: ur.record_id = host_name + ':' + local_data['localid'][0] else: ur.record_id = global_job_id ur.global_job_id = global_job_id ur.global_user_name = local_data['subject'][0] if local_user is None: # get local username from owner of diag file uid = os.stat(diag_filepath).st_uid pi = pwd.getpwuid(uid) local_user = pi.pw_name if local_user is not None: ur.local_user_id = local_user ur.machine_name = host_name # exit state status = 'completed' if 'failedstate' in local_data: status = 'failed' ur.status = status ur.queue = local_data['queue'][0] # the following data might not always be available (failures, etc.) if 'jobname' in local_data: ur.job_name = local_data['jobname'][0] if 'localid' in local_data: ur.local_job_id = local_data['localid'][0] cpu_count = None if 'ExecutionUnits' in diag_data: try: cpu_count = int(diag_data['ExecutionUnits'][0]) ur.processors = cpu_count except ValueError: logging.error('Error reading ExecutionUnits. Value: "%s".' % diag_data['ExecutionUnits'][0]) if 'nodename' in diag_data: # some backends repeat the starting node as the last nodename entry # if there are more nodename entries can execution units, only take # the the number corresponding to executionunits if cpu_count is not None: hosts = diag_data['nodename'][:cpu_count] else: hosts = diag_data['nodename'] ur.host = ",".join(hosts) # nodes are typically repeated, only count unique for number of nodes ur.node_count = len(set(hosts)) if 'starttime' in local_data: # starttime in .local specifies time job was submitted ur.submit_time = arc2isoTime(local_data['starttime'][0]) if 'WallTime' in diag_data: wall_time = parseDuration(diag_data['WallTime']) ur.wall_duration = wall_time if 'UserTime' in diag_data and 'KernelTime' in diag_data: # cputime = user + kernel time user_time = parseDuration(diag_data['UserTime']) kernel_time = parseDuration(diag_data['KernelTime']) ur.cpu_duration = user_time + kernel_time if 'UserTime' in diag_data: ur.user_time = parseDuration(diag_data['UserTime']) if 'KernelTime' in diag_data: ur.kernel_time = parseDuration(diag_data['KernelTime']) if 'exitcode' in diag_data: ur.exit_code = int(diag_data['exitcode'][0]) if 'MajorPageFaults' in diag_data: ur.major_page_faults = int(diag_data['MajorPageFaults'][0]) if 'runtimeenvironments' in diag_data: re_str = diag_data['runtimeenvironments'][0] ur.runtime_environments = [ s.strip() for s in re_str.split(';') if s.strip() ] # start/end time if 'LRMSStartTime' in diag_data and 'LRMSEndTime' in diag_data: ur.start_time = arc2isoTime(diag_data['LRMSStartTime'][0]) ur.end_time = arc2isoTime(diag_data['LRMSEndTime'][0]) else: # start/end time not available, guesstimate it by using the current time # this works fine as we are invoked from the middleware immediately after # the job has finished - however if any staging out is performed these time # will no be accurate now = time.time() if 'WallTime' in diag_data: est_start_time = now - wall_time else: est_start_time = now # wrong, but we can't really do better in this case ur.start_time = gm2isoTime(time.gmtime(est_start_time)) ur.end_time = gm2isoTime(time.gmtime(now)) # Get VO information # first try to get voms info from proxy vo_info = [] if os.path.exists(proxy_filepath): if vo_info == []: try: vo_info = getARCProxyInformation(proxy_filepath) except ImportError, e: logging.warning("Warning getting VOMS info. Possibly missing nordugrid-arc-python package (%s)" " trying voms-proxy-info" % str(e)) try: vo_info = getVOMSProxyInformation(proxy_filepath) except OSError, e: logging.error("Error getting VOMS info. Possibly missing voms-proxy-info command (%s)" % str(e)) logging.exception(e) # still no vo info, try grid-vo-mapfile if vo_info == []: vo_info = getVOMapfileInformation(local_data['subject'][0]) if vo_info == []: logging.info("Failed to create any VO information (voms proxy or reverse map) for job %s." % job_id) ur.vo_info = vo_info # gather transfer status downloads, uploads = getTransferStats(statistics_filepath) ur.downloads += downloads ur.uploads += uploads # finished constructing usage record # write usage record to disk log_dir = getLogDirectory(arc_cfg) ur_dir = os.path.join(log_dir, UR_DIRECTORY) ur_file = os.path.join(ur_dir, job_id) logging.debug("UR File : %s" % ur_file) if not os.path.exists(ur_dir): os.makedirs(ur_dir) ur.writeXML(ur_file) logging.info("Wrote usage record to disk for job %s" % job_id) def main(): arc_cfg = getARCConfig() try: logfile = arc_cfg.get(CONFIG_LOGGER_SECTION, CONFIG_LOGFILE) logfile = cleanString(logfile) except (ConfigParser.NoSectionError, ConfigParser.NoOptionError): logfile = DEFAULT_LOGFILE try: loglevel_cfg = arc_cfg.get(CONFIG_LOGGER_SECTION, CONFIG_LOGLEVEL) loglevel_cfg = cleanString(loglevel_cfg) loglevel = LOGLEVELS[loglevel_cfg] except (ConfigParser.NoSectionError, ConfigParser.NoOptionError, KeyError): loglevel = DEFAULT_LOGLEVEL logging.basicConfig(filename=logfile, format=LOG_FORMAT, level=loglevel) logging.debug("--------------------------------") logging.debug("ARC UR Logger invoked. Time %s" % str(datetime.datetime.now())) logging.debug("Arguments: %s" % str(sys.argv[1:])) try: assert len(sys.argv) in (4,5), "Wrong number of arguments for plugin" control_dir = sys.argv[1] job_id = sys.argv[2] job_status = sys.argv[3] local_user = None if len(sys.argv) == 5: local_user = sys.argv[4] # arc will provide the string value 'undefined' if no value is present if local_user == 'undefined': local_user = None generateUsageRecord(arc_cfg, control_dir, job_id, job_status, local_user) except Exception, e: logging.error("Error generating usage record: %s" % str(e)) logging.exception(e) if __name__ == '__main__': main()