#!/usr/bin/env python """ Script to register usage records to SGAS ur servers. Intented to be run from cron regularly (every hour or so) This file is a bit messy, as it contains many things that would normally be in seperate modules, but is contained in this single file in order to make deployment easy (no imports, problems setting up PYTHONPATH, etc). Doesn't do the parsing of XML configuration. Only classic arc.conf supported. Author: Henrik Thostrup Jensen Copyright: Nordic Data Grid Facility (2009) """ import sys import os import time import urlparse import ConfigParser try: from xml.etree import cElementTree as ET except ImportError: # Python 2.4 compatability from elementtree import ElementTree as ET from OpenSSL import SSL from twisted.internet import reactor, defer from twisted.python import log, usage, failure from twisted.web import client # Nasty global so we can do proper exit codes ERROR = False # static file locations DEFAULT_LOGFILE = '/var/log/arc/ur-registration.log' # config sections CONFIG_SECTION_COMMON = 'common' CONFIG_SECTION_LOGGER = 'logger' CONFIG_HOSTKEY = 'x509_user_key' CONFIG_HOSTCERT = 'x509_user_cert' CONFIG_CERTDIR = 'x509_cert_dir' CONFIG_LOG_DIR = 'log_dir' CONFIG_LOG_ALL = 'log_all' CONFIG_LOG_VO = 'log_vo' CONFIG_UR_LIFETIME = 'ur_lifetime' CONFIG_LOGFILE = 'registrant_logfile' ENV_ARC_CONFIG = "ARC_CONFIG" ENV_NORDUGRID_CONFIG = "NORDUGRID_CONFIG" # system defaults DEFAULT_ARC_CONFIG_FILE = "/etc/arc.conf" DEFAULT_NORDUGRID_CONFIG_FILE = "/etc/nordugrid.conf" # configuration defaults DEFAULT_HOSTKEY = '/etc/grid-security/hostkey.pem' DEFAULT_HOSTCERT = '/etc/grid-security/hostcert.pem' DEFAULT_CERTDIR = '/etc/grid-security/certificates' DEFAULT_LOG_DIR = '/var/spool/arc/usagerecords/' DEFAULT_BATCH_SIZE = 100 DEFAULT_UR_LIFETIME = 30 # days # subdirectories in the spool directory UR_DIRECTORY = 'urs' STATE_DIRECTORY = 'state' ARCHIVE_DIRECTORY = 'archive' # ur namespaces and tag names, only needed ones OGF_UR_NAMESPACE = "http://schema.ogf.org/urf/2003/09/urf" SGAS_VO_NAMESPACE = "http://www.sgas.se/namespaces/2009/05/ur/vo" USAGE_RECORDS = ET.QName("{%s}UsageRecords" % OGF_UR_NAMESPACE) JOB_USAGE_RECORD = ET.QName("{%s}JobUsageRecord" % OGF_UR_NAMESPACE) RECORD_IDENTITY = ET.QName("{%s}RecordIdentity" % OGF_UR_NAMESPACE) USER_IDENTITY = ET.QName("{%s}UserIdentity" % OGF_UR_NAMESPACE) VO = ET.QName("{%s}VO" % SGAS_VO_NAMESPACE) VO_NAME = ET.QName("{%s}Name" % SGAS_VO_NAMESPACE) # -- code class CommandLineOptions(usage.Options): optFlags = [ ['stdout', 's', 'Log to stdout'] ] optParameters = [ ['config-file', 'c', None, 'Config file to use (typically /etc/arc.conf)'] ] class StateFile: """ Abstraction for a ur statefile (describes whereto a UR has been registered). """ def __init__(self, logdir, filename): self.logdir = logdir self.filename = filename statefile = self._filepath() if os.path.exists(statefile): self.urls = set([ line.strip() for line in open(statefile).readlines() if line.strip() ]) else: statedir = os.path.join(logdir, STATE_DIRECTORY) if not os.path.exists(statedir): os.makedirs(statedir) self.urls = set() def _filepath(self): return os.path.join(self.logdir, STATE_DIRECTORY, self.filename) def __contains__(self, ele): return ele in self.urls def add(self, ele): if not ele in self.urls: self.urls.add(ele) return self # makes it possible to do one-liners def write(self): f = open(self._filepath(), 'w') for url in self.urls: f.write(url + "\n") f.close() class ConfigurationError(Exception): pass class ContextFactory: """ SSL context factory. Which hostkey and cert files to use, and which CA to load, etc. """ # Nicked from acix (but I wrote that anyway) def __init__(self, key_path, cert_path, ca_dir=None, verify=True): self.key_path = key_path self.cert_path = cert_path self.verify = verify self.ca_dir = ca_dir if self.verify and ca_dir is None: raise ConfigurationError('Certificate directory must be specified') def getContext(self): # should probably implement caching sometime ctx = SSL.Context(SSL.SSLv23_METHOD) # this also allows tls 1.0 ctx.set_options(SSL.OP_NO_SSLv2) # ssl2 is unsafe ctx.use_privatekey_file(self.key_path) ctx.use_certificate_file(self.cert_path) ctx.check_privatekey() # sanity check def verify_callback(conn, x509, error_number, error_depth, allowed): # just return what openssl thinks is right return allowed if self.verify: ctx.set_verify(SSL.VERIFY_PEER, verify_callback) calist = [ ca for ca in os.listdir(self.ca_dir) if ca.endswith('.0') ] for ca in calist: # openssl wants absolute paths ca = os.path.join(self.ca_dir, ca) ctx.load_verify_locations(ca) return ctx class ARCConfigFileReader: # Wrapper around the stock file object with slightly altered readline # functionality (the Python ConfigParser only uses readline). # The wrapper "fixes" odd config entries allowed by ARC, but which # falls outside the allowed language defined in RFC 822. def __init__(self, fp): self._fp = fp def readline(self): line = self._fp.readline() # return empty lines and comments as-is if not line.strip() or line.strip().startswith('#'): return line # if the line is not a config block and does not have a '=' in it the # config parser will break, however ARC allows this. We just adds # '=true' to the line, so the config value can even be read in Python. if not line.startswith('[') and '=' not in line: return line + '=true' # regular line, just return it return line def getARCConfigurationFilePath(): # first try to get the file path from the environment if ENV_ARC_CONFIG in os.environ and os.path.exists(os.environ[ENV_ARC_CONFIG]): return os.environ[ENV_ARC_CONFIG] if ENV_NORDUGRID_CONFIG in os.environ and os.path.exists(os.environ[ENV_NORDUGRID_CONFIG]): return os.environ[ENV_NORDUGRID_CONFIG] # then try the default file paths if os.path.exists(DEFAULT_ARC_CONFIG_FILE): return DEFAULT_ARC_CONFIG_FILE if os.path.exists(DEFAULT_NORDUGRID_CONFIG_FILE): return DEFAULT_NORDUGRID_CONFIG_FILE raise ConfigurationError("No ARC/NorduGrid configuration file found") def getARCConfig(filepath=None): if filepath is None: filepath = getARCConfigurationFilePath() arc_cfg_file = ARCConfigFileReader(file(filepath)) arc_cfg = ConfigParser.SafeConfigParser() arc_cfg.readfp(arc_cfg_file) return arc_cfg def getConfigOption(cfg, section, option, default=None): clean = lambda s : type(s) == str and s.strip().replace('"','').replace("'",'') or s try: value = cfg.get(section, option) return clean(value) except ConfigParser.NoSectionError: pass except ConfigParser.NoOptionError: pass return default def parseLogAll(value): return value.split(' ') def parseLogVO(value): vo_regs = {} if value == None or len(value) == 0: return vo_regs pairs = value.split(',') for pair in pairs: vo_name, url = pair.strip().split(' ',2) vo_regs[vo_name] = url return vo_regs def parseURLifeTime(value): ur_lifetime_days = int(value) ur_lifetime_seconds = ur_lifetime_days * (24 * 60 * 60) return ur_lifetime_seconds def getVONamesFromUsageRecord(ur): """ Return the VO name element values of a usage record. """ # for some reason the followng fails :-/ # >>> ur.getroot().findall(VO_NAME) # so we do it the silly way and iterate over the tree instead. vos = [] for e in ur.getroot(): if e.tag == USER_IDENTITY: for f in e: if f.tag == VO: for g in f: if g.tag == VO_NAME: vos.append(g.text) return vos def createRegistrationPointsMapping(logdir, logpoints_all, logpoints_vo): """ Create a mapping from all the usage records filenames to which endpoints they should be registered. """ log.msg("Creating registration mapping (may take a little time)") mapping = {} ur_dir = os.path.join(logdir, UR_DIRECTORY) for filename in os.listdir(ur_dir): filepath = os.path.join(ur_dir, filename) # skip if file is not a proper file if not os.path.isfile(filepath): continue try: ur = ET.parse(filepath) except Exception, e: log.msg('Error parsing file %(filepath)s, continuing' % {'filepath' : filepath}) continue vos = getVONamesFromUsageRecord(ur) for lp in logpoints_all: mapping.setdefault(lp, []).append(filename) for vo in vos: vo_lp = logpoints_vo.get(vo) if vo_lp: mapping.setdefault(vo_lp, []).append(filename) return mapping def createFileEPMapping(epmap): # creates filename -> [endpoint] map # makes it easy to know when all registrations have been made for a file fnepmap = {} for ep, filenames in epmap.items(): for fn in filenames: fnepmap.setdefault(fn, []).append(ep) return fnepmap def httpRequest(url, method='GET', payload=None, ctxFactory=None): # probably need a header options as well """ Peform a http request. """ # copied from twisted.web.client in order to get access to the # factory (which contains response codes, headers, etc) scheme, host, port, path = client._parse(url) factory = client.HTTPClientFactory(url, method=method, postdata=payload) factory.noisy = False # stop spewing about factory start/stop # fix missing port in header (bug in twisted.web.client) if port: factory.headers['host'] = host + ':' + str(port) if scheme == 'https': reactor.connectSSL(host, port, factory, ctxFactory) else: reactor.connectTCP(host, port, factory) #factory.deferred.addBoth(f, factory) return factory.deferred, factory def createEPRegistrationMapping(endpoints, ctxFactory): def createRegistrationURL(location, endpoint): if location.startswith('http'): # location is a complete url, so we just return it return location elif location.startswith('/'): # location is a path, and must be merged with base endpoint to form a suitable url url = urlparse.urlparse(endpoint) reg_url = url[0] + '://' + url[1] + location return reg_url else: raise ValueError('Invalid registration point returned by %s (got: %s)' % (endpoint, location)) def gotReply(result, factory, endpoint): tree = ET.fromstring(result) for service in tree: if service.tag == 'service': found_service = False for ele in service: if ele.tag == 'name' and ele.text == 'Registration': found_service = True elif ele.tag == 'href' and found_service == True: location = ele.text return createRegistrationURL(location, endpoint) return None # no registration service found def mergeResults(results, endpoints): regmap = {} for (success, result), ep in zip(results, endpoints): if success and result is not None: regmap[ep] = result elif success: log.msg('Endpoint %s does not appear to have a registration service.' % ep) else: log.msg('Error contacting service %s (%s)' % (ep, result.getErrorMessage())) return regmap defs = [] for ep in endpoints: d, f = httpRequest(ep, ctxFactory=ctxFactory) d.addCallback(gotReply, f, ep) defs.append(d) dl = defer.DeferredList(defs, consumeErrors=1) # otherwise we'll get complaints dl.addCallback(mergeResults, endpoints) return dl def insertUsageRecords(url, payload, ctxFactory): """ Upload (insert) one or more usage record in a usage record service. """ def gotResponse(result, factory, url): if factory.status != '200': log.msg("Reply from %s had other response code than 200 (%s)" % (url, factory.status)) return result d, f = httpRequest(url, method='POST', payload=payload, ctxFactory=ctxFactory) d.addCallback(gotResponse, f, url) return d def joinUsageRecordFiles(logdir, filenames): urs = ET.Element(USAGE_RECORDS) for fn in filenames: ur = ET.parse(os.path.join(logdir, UR_DIRECTORY, fn)) urs.append(ur.getroot()) return ET.tostring(urs) def registerBatch(ep, url, logdir, filenames, ctxFactory): def insertDone(result): log.msg("%i records registered to %s" % (len(filenames), ep)) for fn in filenames: StateFile(logdir, fn).add(ep).write() def insertError(error): log.msg("Error during batch insertion: %s" % error.getErrorMessage()) return error ur_data = joinUsageRecordFiles(logdir, filenames) d = insertUsageRecords(url, ur_data, ctxFactory) d.addCallbacks(insertDone, insertError) return d def registerUsageRecords(mapping, logdir, ctxFactory, batch_size=DEFAULT_BATCH_SIZE): """ Register usage records, given a mapping of where to register the usage records. """ urmap = createFileEPMapping(mapping) if not urmap: # no registration to perform log.msg("No registrations to perform") return defer.succeed(None) log.msg("Registrations to perform: %i files" % len(urmap)) log.msg("Retrieving registration hrefs (service endpoints)") d = createEPRegistrationMapping(mapping.keys(), ctxFactory) d.addCallback(_performURRegistration, urmap, logdir, ctxFactory, batch_size) archive = lambda _, logdir, urmap : archiveUsageRecords(logdir, urmap) d.addCallback(archive, logdir, urmap) return d def _performURRegistration(regmap, urmap, logdir, ctxFactory, batch_size): if not regmap: log.msg("Failed to get any service refs, not doing any registrations") return batch_sets = {} for ep, urreg in regmap.items(): log.msg("%s -> %s" % (ep, urreg)) batch_sets[ep] = [] log.msg("Starting registration") skipped_registrations = {} # new registration logic (batching) for filename, endpoints in urmap.items(): state = StateFile(logdir, filename) for ep in endpoints: if ep in state: skipped_registrations[ep] = skipped_registrations.get(ep, 0) + 1 continue try: batch_sets[ep].append(filename) except KeyError, e: pass # deferring registration as service is not available for ep, ur_registered in skipped_registrations.items(): log.msg("Skipping %i registrations to %s, records already registered" % (ur_registered, ep)) # build up registraion batches (list of (ep, filenames) tuples) registrations = [] for ep, filenames in batch_sets.items(): registrations += [ (ep, filenames[i:i+batch_size]) for i in range(0, len(filenames), batch_size) ] registration_deferred = defer.Deferred() error_endpoints = {} def doBatch(result, used_service_endpoint): if isinstance(result, failure.Failure): # something went wrong in the registration - stop future registrations # split into to 2 lines (far easier to read in the log) log.msg("Error registration records to %s" % used_service_endpoint) log.msg("Skipping all registrations to this endpoint for now") error_endpoints[used_service_endpoint] = True try: service_endpoint, filenames = registrations.pop(0) while service_endpoint in error_endpoints: service_endpoint, filenames = registrations.pop(0) d = registerBatch(service_endpoint, regmap[service_endpoint], logdir, filenames, ctxFactory) d.addBoth(doBatch, service_endpoint) except IndexError: # no more registrations registration_deferred.callback(None) doBatch(None, None) return registration_deferred def archiveUsageRecords(logdir, urmap): log.msg("Registration done, commencing archiving process") archive_dir = os.path.join(logdir, ARCHIVE_DIRECTORY) if not os.path.exists(archive_dir): os.makedirs(archive_dir) for filename, endpoints in urmap.items(): state = StateFile(logdir, filename) for ep in endpoints: if not ep in state: break else: urfilepath = os.path.join(logdir, UR_DIRECTORY, filename) statefilepath = os.path.join(logdir, STATE_DIRECTORY, filename) archivefilepath = os.path.join(logdir, ARCHIVE_DIRECTORY, filename) os.unlink(statefilepath) os.rename(urfilepath, archivefilepath) log.msg("Archiving done") def deleteOldUsageRecords(log_dir, ttl_seconds): archive_dir = os.path.join(log_dir, ARCHIVE_DIRECTORY) log.msg("Cleaning up old records.") now = time.time() i = 0 for filename in os.listdir(archive_dir): filepath = os.path.join(archive_dir, filename) # skip if file is not a proper file if not os.path.isfile(filepath): continue # use ctime to determine file age f_ctime = os.stat(filepath).st_ctime if f_ctime + ttl_seconds < now: # file is old, will get deleted os.unlink(filepath) i += 1 log.msg("Records deleted: %i" % i) return defer.succeed(None) def doMain(): """ "Real" main, parse command line, setup logging, start the actual logic, etc. """ # start by parsing the command line to see if we have a specific config file cmd_cfg = CommandLineOptions() try: cmd_cfg.parseOptions() except SystemExit, e: return # deal with silly sys.exit(0) in twisted.python.usage except usage.UsageError, e: print '%s: %s' % (sys.argv[0], str(e)) print '%s: Try --help for usage details.' % (sys.argv[0]) return cfg_file = cmd_cfg['config-file'] if cfg_file is not None: if (not os.path.exists(cfg_file)) or (not os.path.isfile(cfg_file)): serr = 'The path %s does not exist or is not a file' % cfg_file log.msg(serr) return else: cfg_file = getARCConfigurationFilePath() arc_cfg = getARCConfig(cfg_file) if cmd_cfg['stdout']: log.startLogging(sys.stdout) else: logfile = getConfigOption(arc_cfg, CONFIG_SECTION_LOGGER, CONFIG_LOGFILE, DEFAULT_LOGFILE) log.startLogging(open(logfile, 'a')) #log.msg('Using %s as config file' % cfg_file) # read config arc_cfg = getARCConfig(cfg_file) log_dir = getConfigOption(arc_cfg, CONFIG_SECTION_LOGGER, CONFIG_LOG_DIR, DEFAULT_LOG_DIR) las = getConfigOption(arc_cfg, CONFIG_SECTION_LOGGER, CONFIG_LOG_ALL) lvo = getConfigOption(arc_cfg, CONFIG_SECTION_LOGGER, CONFIG_LOG_VO) ult = getConfigOption(arc_cfg, CONFIG_SECTION_LOGGER, CONFIG_UR_LIFETIME, DEFAULT_UR_LIFETIME) log_all = parseLogAll(las) log_vo = parseLogVO(lvo) ur_lifetime = parseURLifeTime(ult) host_key = getConfigOption(arc_cfg, CONFIG_SECTION_COMMON, CONFIG_HOSTKEY, DEFAULT_HOSTKEY) host_cert = getConfigOption(arc_cfg, CONFIG_SECTION_COMMON, CONFIG_HOSTCERT, DEFAULT_HOSTCERT) cert_dir = getConfigOption(arc_cfg, CONFIG_SECTION_COMMON, CONFIG_CERTDIR, DEFAULT_CERTDIR) log.msg('Configuration:') log.msg(' Log dir: %s' % log_dir) log.msg(' Log all: %s' % log_all) log.msg(' Log vo : %s' % log_vo) #log.msg(' Host key : %s' % host_key) #log.msg(' Host cert : %s' % host_cert) #log.msg(' Cert dir : %s' % cert_dir) if not (log_all or log_vo): log.msg('No log points given. Cowardly refusing to do anything') return if not os.path.exists(log_dir): log.msg('Log directory %s does not exist, bailing out.' % log_dir) return mapping = createRegistrationPointsMapping(log_dir, log_all, log_vo) cf = ContextFactory(host_key, host_cert, cert_dir) d = registerUsageRecords(mapping, log_dir, cf) d.addCallback(lambda _ : deleteOldUsageRecords(log_dir, ur_lifetime)) return d def main(): """ main, mainly a wrapper over the rest of the program. """ def handleError(error): if error.type == SystemExit: log.msg('SystemExit: %s' % error.value) else: error.printTraceback() d = defer.maybeDeferred(doMain) d.addErrback(handleError) d.addBoth(lambda _ : reactor.stop()) return d if __name__ == '__main__': reactor.callWhenRunning(main) reactor.run() if ERROR: sys.exit(1)