From f9d4817bea63eac057f8cbe05a0670869bc6f7bf Mon Sep 17 00:00:00 2001 From: Daniel Vrcic Date: Tue, 27 Mar 2018 16:04:23 +0200 Subject: [PATCH 01/11] correct permission for directory cache --- argo-nagios-ams-publisher.spec | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/argo-nagios-ams-publisher.spec b/argo-nagios-ams-publisher.spec index 67d43e7..9802803 100644 --- a/argo-nagios-ams-publisher.spec +++ b/argo-nagios-ams-publisher.spec @@ -61,7 +61,7 @@ install --directory --mode 755 $RPM_BUILD_ROOT/%{_localstatedir}/run/%{name}/ %{python_sitelib}/%{underscore %{name}}/*.py[co] %defattr(-,nagios,nagios,-) %dir %{_localstatedir}/log/%{name}/ -%dir %{_localstatedir}/run/%{name}/ +%attr(0755,nagios,nagios) %dir %{_localstatedir}/run/%{name}/ %dir %{_localstatedir}/spool/%{name}/metrics/ %dir %{_localstatedir}/spool/%{name}/alarms/ From 08deed9ed42b75750f33ea412b49086b479eb6b7 Mon Sep 17 00:00:00 2001 From: Daniel Vrcic Date: Tue, 5 Jun 2018 00:23:11 +0200 Subject: [PATCH 02/11] pass actual perf. data as part of metric result --- config/metric_data.avsc | 1 + helpers/ams-msg-generator.py | 1 + pymod/metrictoqueue.py | 7 ++++--- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/config/metric_data.avsc b/config/metric_data.avsc index c35bbe3..a1cf20c 100644 --- a/config/metric_data.avsc +++ b/config/metric_data.avsc @@ -8,6 +8,7 @@ {"name": "metric", "type": "string"}, {"name": "status", "type": "string"}, {"name": "monitoring_host", "type": ["null", "string"]}, + {"name": "actual_data", "type": ["null", "string"]}, {"name": "summary", "type": ["null", "string"]}, {"name": "message", "type": ["null", "string"]}, {"name": "tags", "type" : ["null", {"name" : "Tags", diff --git a/helpers/ams-msg-generator.py b/helpers/ams-msg-generator.py index 34fd053..ebccccb 100755 --- a/helpers/ams-msg-generator.py +++ b/helpers/ams-msg-generator.py @@ -38,6 +38,7 @@ def construct_msg(session, bodysize, timezone): msg.body += 'summary: %s\n' % generator.rndb64(20) msg.body += 'message: %s\n' % generator.rndb64(bodysize) msg.body += 'vofqan: %s\n' % generator.rndb64(10) + msg.body += 'actual_data: %s\n' % generator.rndb64(10) msg.body += 'voname: %s\n' % generator.rndb64(3) msg.body += 'roc: %s\n' % generator.rndb64(3) diff --git a/pymod/metrictoqueue.py b/pymod/metrictoqueue.py index 1a3cd22..44d40b3 100644 --- a/pymod/metrictoqueue.py +++ b/pymod/metrictoqueue.py @@ -35,7 +35,7 @@ def build_msg(args, *headers): msg.header.update({'status': status.encode('utf-8')}) msg.header.update({'monitoring_host': nagioshost.encode('utf-8')}) - for bs in ['summary', 'message', 'vofqan', 'voname', 'roc']: + for bs in ['summary', 'message', 'vofqan', 'voname', 'roc', 'actual_data']: code = "msg.body += '%s: ' + args.%s.encode(\'utf-8\') + '\\n' if args.%s else ''" % (bs, bs, bs) exec code @@ -61,11 +61,12 @@ def main(): parser.add_argument('--status', required=True, type=str) # msg body - parser.add_argument('--summary', required=False, type=str) + parser.add_argument('--actual_data', required=False, type=str) parser.add_argument('--message', required=False, type=str) + parser.add_argument('--roc', required=False, type=str) + parser.add_argument('--summary', required=False, type=str) parser.add_argument('--vofqan', required=False, type=str) parser.add_argument('--voname', required=False, type=str) - parser.add_argument('--roc', required=False, type=str) args = parser.parse_args() From 853085fb1b33fef639132ce1c81e5adf7e20b680 Mon Sep 17 00:00:00 2001 From: Daniel Vrcic Date: Tue, 5 Jun 2018 23:00:21 +0200 Subject: [PATCH 03/11] do not try to clear already removed msg file --- pymod/consume.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/pymod/consume.py b/pymod/consume.py index e8e7926..4cf8a0d 100644 --- a/pymod/consume.py +++ b/pymod/consume.py @@ -37,12 +37,12 @@ def __init__(self, events, worker=None): filepublisher=False) self.publisher = self.shared.runtime['publisher'](events, worker=worker) self.purger = Purger(events, worker=worker) + self.prevstattime = int(time.time()) def cleanup(self): self.unlock_dirq_msgs(self.seenmsgs) def run(self): - self.prevstattime = int(time.time()) termev = self.events['term-'+self.name] usr1ev = self.events['usr1-'+self.name] lck = self.events['lck-'+self.name] @@ -138,7 +138,9 @@ def unlock_dirq_msgs(self, msgs=None): try: msgl = msgs if msgs else self.inmemq for m in msgl: - self.dirq.unlock(m[0] if not isinstance(m, str) else m) + msg = m[0] if not isinstance(m, str) else m + if os.path.exists('{0}/{1}'.format(self.dirq.path, msg)): + self.dirq.unlock(msg) self.inmemq.clear() except (OSError, IOError) as e: self.shared.log.error(e) @@ -147,8 +149,9 @@ def remove_dirq_msgs(self, msgs=None): try: msgl = msgs if msgs else self.inmemq for m in msgl: - self.dirq.remove(m[0] if not isinstance(m, str) else m) + msg = m[0] if not isinstance(m, str) else m + if os.path.exists('{0}/{1}'.format(self.dirq.path, msg)): + self.dirq.remove(msg) self.inmemq.clear() except (OSError, IOError) as e: self.shared.log.error(e) - From 299e4ab117ad949cf02806d3050c6dd0418d1bd8 Mon Sep 17 00:00:00 2001 From: Daniel Vrcic Date: Wed, 6 Jun 2018 17:59:07 +0200 Subject: [PATCH 04/11] use argparse methods for fetching type values --- config/metric_data.avsc | 1 + helpers/ams-msg-generator.py | 1 + pymod/config.py | 18 +++++++++--------- pymod/consume.py | 11 +++++++---- pymod/metrictoqueue.py | 7 ++++--- 5 files changed, 22 insertions(+), 16 deletions(-) diff --git a/config/metric_data.avsc b/config/metric_data.avsc index c35bbe3..a1cf20c 100644 --- a/config/metric_data.avsc +++ b/config/metric_data.avsc @@ -8,6 +8,7 @@ {"name": "metric", "type": "string"}, {"name": "status", "type": "string"}, {"name": "monitoring_host", "type": ["null", "string"]}, + {"name": "actual_data", "type": ["null", "string"]}, {"name": "summary", "type": ["null", "string"]}, {"name": "message", "type": ["null", "string"]}, {"name": "tags", "type" : ["null", {"name" : "Tags", diff --git a/helpers/ams-msg-generator.py b/helpers/ams-msg-generator.py index 34fd053..ebccccb 100755 --- a/helpers/ams-msg-generator.py +++ b/helpers/ams-msg-generator.py @@ -38,6 +38,7 @@ def construct_msg(session, bodysize, timezone): msg.body += 'summary: %s\n' % generator.rndb64(20) msg.body += 'message: %s\n' % generator.rndb64(bodysize) msg.body += 'vofqan: %s\n' % generator.rndb64(10) + msg.body += 'actual_data: %s\n' % generator.rndb64(10) msg.body += 'voname: %s\n' % generator.rndb64(3) msg.body += 'roc: %s\n' % generator.rndb64(3) diff --git a/pymod/config.py b/pymod/config.py index 62dfdf9..506b895 100644 --- a/pymod/config.py +++ b/pymod/config.py @@ -59,12 +59,12 @@ def parse_config(logger=None): dirqopts = dict() qname = section.split('_', 1)[1].lower() dirqopts['directory'] = config.get(section, 'Directory') - dirqopts['rate'] = int(config.get(section, 'Rate')) + dirqopts['rate'] = config.getint(section, 'Rate') dirqopts['purge'] = eval(config.get(section, 'Purge').strip()) - dirqopts['purgeeverysec'] = int(config.get(section, 'PurgeEverySec')) - dirqopts['maxtemp'] = int(config.get(section, 'MaxTemp')) - dirqopts['maxlock'] = int(config.get(section, 'MaxLock')) - dirqopts['granularity'] = int(config.get(section, 'Granularity')) + dirqopts['purgeeverysec'] = config.getint(section, 'PurgeEverySec') + dirqopts['maxtemp'] = config.getint(section, 'MaxTemp') + dirqopts['maxlock'] = config.getint(section, 'MaxLock') + dirqopts['granularity'] = config.getint(section, 'Granularity') queues[qname] = dirqopts if section.startswith('Topic_'): topts = dict() @@ -74,13 +74,13 @@ def parse_config(logger=None): topts['key'] = config.get(section, 'Key') topts['project'] = config.get(section, 'Project') topts['topic'] = config.get(section, 'Topic') - topts['bulk'] = int(config.get(section, 'BulkSize')) + topts['bulk'] = config.getint(section, 'BulkSize') topts['avro'] = eval(config.get(section, 'Avro').strip()) if topts['avro']: topts['avroschema'] = config.get(section, 'AvroSchema') - topts['retry'] = int(config.get(section, 'Retry')) - topts['timeout'] = int(config.get(section, 'Timeout')) - topts['sleepretry'] = int(config.get(section, 'SleepRetry')) + topts['retry'] = config.getint(section, 'Retry') + topts['timeout'] = config.getint(section, 'Timeout') + topts['sleepretry'] = config.getint(section, 'SleepRetry') topics[tname] = topts for k, v in queues.iteritems(): diff --git a/pymod/consume.py b/pymod/consume.py index e8e7926..4cf8a0d 100644 --- a/pymod/consume.py +++ b/pymod/consume.py @@ -37,12 +37,12 @@ def __init__(self, events, worker=None): filepublisher=False) self.publisher = self.shared.runtime['publisher'](events, worker=worker) self.purger = Purger(events, worker=worker) + self.prevstattime = int(time.time()) def cleanup(self): self.unlock_dirq_msgs(self.seenmsgs) def run(self): - self.prevstattime = int(time.time()) termev = self.events['term-'+self.name] usr1ev = self.events['usr1-'+self.name] lck = self.events['lck-'+self.name] @@ -138,7 +138,9 @@ def unlock_dirq_msgs(self, msgs=None): try: msgl = msgs if msgs else self.inmemq for m in msgl: - self.dirq.unlock(m[0] if not isinstance(m, str) else m) + msg = m[0] if not isinstance(m, str) else m + if os.path.exists('{0}/{1}'.format(self.dirq.path, msg)): + self.dirq.unlock(msg) self.inmemq.clear() except (OSError, IOError) as e: self.shared.log.error(e) @@ -147,8 +149,9 @@ def remove_dirq_msgs(self, msgs=None): try: msgl = msgs if msgs else self.inmemq for m in msgl: - self.dirq.remove(m[0] if not isinstance(m, str) else m) + msg = m[0] if not isinstance(m, str) else m + if os.path.exists('{0}/{1}'.format(self.dirq.path, msg)): + self.dirq.remove(msg) self.inmemq.clear() except (OSError, IOError) as e: self.shared.log.error(e) - diff --git a/pymod/metrictoqueue.py b/pymod/metrictoqueue.py index 1a3cd22..44d40b3 100644 --- a/pymod/metrictoqueue.py +++ b/pymod/metrictoqueue.py @@ -35,7 +35,7 @@ def build_msg(args, *headers): msg.header.update({'status': status.encode('utf-8')}) msg.header.update({'monitoring_host': nagioshost.encode('utf-8')}) - for bs in ['summary', 'message', 'vofqan', 'voname', 'roc']: + for bs in ['summary', 'message', 'vofqan', 'voname', 'roc', 'actual_data']: code = "msg.body += '%s: ' + args.%s.encode(\'utf-8\') + '\\n' if args.%s else ''" % (bs, bs, bs) exec code @@ -61,11 +61,12 @@ def main(): parser.add_argument('--status', required=True, type=str) # msg body - parser.add_argument('--summary', required=False, type=str) + parser.add_argument('--actual_data', required=False, type=str) parser.add_argument('--message', required=False, type=str) + parser.add_argument('--roc', required=False, type=str) + parser.add_argument('--summary', required=False, type=str) parser.add_argument('--vofqan', required=False, type=str) parser.add_argument('--voname', required=False, type=str) - parser.add_argument('--roc', required=False, type=str) args = parser.parse_args() From 186d1da4dcce602af79a3d7c7446b984582be80c Mon Sep 17 00:00:00 2001 From: Daniel Vrcic Date: Thu, 7 Jun 2018 13:21:58 +0200 Subject: [PATCH 05/11] give fraction of hour in log reports --- pymod/stats.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/pymod/stats.py b/pymod/stats.py index 1fc5407..562bcac 100644 --- a/pymod/stats.py +++ b/pymod/stats.py @@ -1,10 +1,11 @@ -import socket -import select -import os -import time -import re import copy +import decimal import errno +import os +import re +import select +import socket +import time from threading import Thread from multiprocessing import Process @@ -47,8 +48,8 @@ def stat_reset(self): self.laststattime = int(time.time()) def stats(self): - sincelaststat = int(time.time()) - self.laststattime - self._stat_msg(sincelaststat/3600) + sincelaststat = decimal.Decimal(int(time.time()) - self.laststattime) + self._stat_msg(sincelaststat/decimal.Decimal(3600)) class Reset(Thread): From c5248d9d8aff46c87f8d4acbfa7d8831b20a0ef4 Mon Sep 17 00:00:00 2001 From: Daniel Vrcic Date: Thu, 7 Jun 2018 20:03:55 +0200 Subject: [PATCH 06/11] move counters for periodic report to shared arrays --- pymod/consume.py | 7 ------- pymod/publish.py | 1 - pymod/run.py | 16 ++++++++++++++-- pymod/shared.py | 10 ++-------- pymod/stats.py | 11 ++++++----- 5 files changed, 22 insertions(+), 23 deletions(-) diff --git a/pymod/consume.py b/pymod/consume.py index 4cf8a0d..5760656 100644 --- a/pymod/consume.py +++ b/pymod/consume.py @@ -37,7 +37,6 @@ def __init__(self, events, worker=None): filepublisher=False) self.publisher = self.shared.runtime['publisher'](events, worker=worker) self.purger = Purger(events, worker=worker) - self.prevstattime = int(time.time()) def cleanup(self): self.unlock_dirq_msgs(self.seenmsgs) @@ -89,11 +88,6 @@ def run(self): evgup.set() raise SystemExit(0) - if int(time.time()) - self.prevstattime >= self.shared.general['statseveryhour'] * 3600: - self.stat_reset() - self.publisher.stat_reset() - self.prevstattime = int(time.time()) - time.sleep(decimal.Decimal(1) / decimal.Decimal(self.shared.queue['rate'])) except KeyboardInterrupt: @@ -107,7 +101,6 @@ def _increm_intervalcounters(self, num): def consume_dirq_msgs(self, num=0): def _inmemq_append(elem): self.inmemq.append(elem) - self.shared.stats['consumed'] += 1 self._increm_intervalcounters(1) self.sess_consumed += 1 if num and self.sess_consumed == num: diff --git a/pymod/publish.py b/pymod/publish.py index 94dd1f8..6020cbf 100644 --- a/pymod/publish.py +++ b/pymod/publish.py @@ -128,7 +128,6 @@ def _write(self, msgs): lck.acquire(False) self.ams.publish(self.shared.topic['topic'], msgs, timeout=self.shared.topic['timeout']) published.update([self.inmemq[e][0] for e in range(self.shared.topic['bulk'])]) - self.shared.stats['published'] += self.shared.topic['bulk'] self._increm_intervalcounters(self.shared.topic['bulk']) self.inmemq.rotate(-self.shared.topic['bulk']) break diff --git a/pymod/run.py b/pymod/run.py index 64d97e9..0615002 100644 --- a/pymod/run.py +++ b/pymod/run.py @@ -25,8 +25,12 @@ def init_dirq_consume(workers, daemonized, sockstat): for w in workers: shared = Shared(worker=w) - shared.statint[w]['published'] = Array('i', 7) - shared.statint[w]['consumed'] = Array('i', 7) + # Create arrays of integers that will be shared across spawned processes + # and that will keep track of number of published and consumed messages + # in 15, 30, 60, 180, 360, 720 and 1440 minutes. Last integer will be + # used for periodic reports. + shared.statint[w]['published'] = Array('i', 8) + shared.statint[w]['consumed'] = Array('i', 8) if not getattr(shared, 'runtime', False): shared.runtime = dict() shared.runtime['started'] = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') @@ -68,7 +72,15 @@ def init_dirq_consume(workers, daemonized, sockstat): statsp.daemon = False statsp.start() + prevstattime = int(time.time()) while True: + if int(time.time()) - prevstattime >= shared.general['statseveryhour']: + shared.log.info('Periodic report (every %sh)' % shared.general['statseveryhour']) + for c in consumers: + c.stat_reset() + c.publisher.stat_reset() + prevstattime = int(time.time()) + for c in consumers: if localevents['giveup-'+c.name].is_set(): c.terminate() diff --git a/pymod/shared.py b/pymod/shared.py index 4315ea4..d970a9b 100644 --- a/pymod/shared.py +++ b/pymod/shared.py @@ -18,21 +18,15 @@ def __init__(self, confopts=None, worker=None): self._topics = confopts['topics'] if not getattr(self, 'general', False): self.general = confopts['general'] - if not getattr(self, '_stats', False): - self._stats = dict() + if not getattr(self, 'statint', False): self.statint = dict() self.workers = self._queues.keys() if worker: self.worker = worker self.queue = self._queues[worker] self.topic = self._topics[worker] - if worker not in self._stats: - self._stats[worker] = dict(published=0) - self._stats[worker].update(dict(consumed=0)) if worker not in self.statint: self.statint[worker] = dict(published=None, consumed=None) - self.stats = self._stats[worker] - def add_event(self, name, ev): if not getattr(self, 'events', False): @@ -44,7 +38,7 @@ def add_log(self, logger): self.log= None self.log = logger - def get_nmsg_interval(self, worker, what, interval): + def get_nmsg(self, worker, what, interval): n = None try: diff --git a/pymod/stats.py b/pymod/stats.py index 562bcac..40f818e 100644 --- a/pymod/stats.py +++ b/pymod/stats.py @@ -26,7 +26,8 @@ def __init__(self, worker): self._reset() def _stat_msg(self, hours): - nmsg = self.shared.stats['published'] if self._iam_publisher() else self.shared.stats['consumed'] + what = 'published' if self._iam_publisher() else 'consumed' + nmsg = self.shared.get_nmsg(self.name, what, 7) self.shared.log.info('{0} {1}: {2} {3} msgs in {4:0.2f} hours'.format(self.__class__.__name__, self.name, self.msgdo, @@ -35,9 +36,9 @@ def _stat_msg(self, hours): def _reset(self): if self._iam_publisher(): - self.shared.stats['published'] = 0 + self.shared.statint[self.name]['published'][7] = 0 else: - self.shared.stats['consumed'] = 0 + self.shared.statint[self.name]['consumed'][7] = 0 def _iam_publisher(self): return bool('Publish' in self.__class__.__name__) @@ -109,7 +110,7 @@ def __init__(self, events, sock): self.shared = Shared() self.sock = sock self._int2idx = {'15': 0, '30': 1, '60': 2, '180': 3, '360': 4, - '720': 5, '1440': 6} + '720': 5, '1440': 6, 'gen': 7} self.resetth = Reset(events=events, map=self._int2idx) try: @@ -150,7 +151,7 @@ def answer(self, query): a = '' for q in query: if q[1] != 'error': - r = self.shared.get_nmsg_interval(q[0], q[1], q[2]) + r = self.shared.get_nmsg(q[0], q[1], q[2]) a += 'w:%s+r:%s ' % (str(q[0]), str(r)) else: a += 'w:%s+r:error ' % str(q[0]) From 0d0d0b692ed0a5859ea35aada8a72206fb266697 Mon Sep 17 00:00:00 2001 From: Daniel Vrcic Date: Tue, 12 Jun 2018 12:06:20 +0200 Subject: [PATCH 07/11] added slipped factor --- pymod/run.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pymod/run.py b/pymod/run.py index 0615002..3ac53a3 100644 --- a/pymod/run.py +++ b/pymod/run.py @@ -74,7 +74,7 @@ def init_dirq_consume(workers, daemonized, sockstat): prevstattime = int(time.time()) while True: - if int(time.time()) - prevstattime >= shared.general['statseveryhour']: + if int(time.time()) - prevstattime >= shared.general['statseveryhour'] * 3600: shared.log.info('Periodic report (every %sh)' % shared.general['statseveryhour']) for c in consumers: c.stat_reset() From c22d0c4d153a2cb2c9a56b04c017ddaa75685be8 Mon Sep 17 00:00:00 2001 From: Daniel Vrcic Date: Fri, 15 Jun 2018 11:35:11 +0200 Subject: [PATCH 08/11] fix the permissions on socket --- bin/ams-publisherd | 5 +++-- pymod/stats.py | 5 +++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/bin/ams-publisherd b/bin/ams-publisherd index b71fa8a..654d7a0 100755 --- a/bin/ams-publisherd +++ b/bin/ams-publisherd @@ -34,7 +34,7 @@ shared = None logger = None -def setup_statssocket(path): +def setup_statssocket(path, uid, gid): global shared if os.path.exists(path): @@ -42,6 +42,7 @@ def setup_statssocket(path): try: sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) sock.bind(path) + os.chown(path, uid, gid) except socket.error as e: shared.log.error('Error setting up socket: %s - %s' % (path, str(e))) raise SystemExit(1) @@ -81,7 +82,7 @@ def daemon_start(context_daemon, restart=False): context_daemon.uid = uid context_daemon.gid = gid os.chown(shared.log.fileloghandle.name, uid, gid) - sock = setup_statssocket(shared.general['statsocket']) + sock = setup_statssocket(shared.general['statsocket'], uid, gid) context_daemon.files_preserve = [shared.log.fileloghandle, sock.fileno()] if not restart: diff --git a/pymod/stats.py b/pymod/stats.py index 40f818e..ef68e57 100644 --- a/pymod/stats.py +++ b/pymod/stats.py @@ -13,6 +13,7 @@ maxcmdlength = 128 + class StatSig(object): """ Class is meant to be subclassed by ConsumerQueue and Publish classes for @@ -80,7 +81,7 @@ def run(self): break now = int(time.time()) for k, v in self.last_reset.iteritems(): - if now - self.last_reset[k] >= int(k) * 60: + if int(k) != 0 and now - self.last_reset[k] >= int(k) * 60: for what in ['consumed', 'published']: for w in self.shared.workers: idx = self.map[k] @@ -110,7 +111,7 @@ def __init__(self, events, sock): self.shared = Shared() self.sock = sock self._int2idx = {'15': 0, '30': 1, '60': 2, '180': 3, '360': 4, - '720': 5, '1440': 6, 'gen': 7} + '720': 5, '1440': 6, '0': 7} self.resetth = Reset(events=events, map=self._int2idx) try: From d2bbdc0d6361aad31ebbc6f86f9ffdadfe32bbae Mon Sep 17 00:00:00 2001 From: Daniel Vrcic Date: Fri, 15 Jun 2018 12:32:47 +0200 Subject: [PATCH 09/11] added default value for actual_data field --- config/metric_data.avsc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/config/metric_data.avsc b/config/metric_data.avsc index a1cf20c..606f152 100644 --- a/config/metric_data.avsc +++ b/config/metric_data.avsc @@ -8,7 +8,7 @@ {"name": "metric", "type": "string"}, {"name": "status", "type": "string"}, {"name": "monitoring_host", "type": ["null", "string"]}, - {"name": "actual_data", "type": ["null", "string"]}, + {"name": "actual_data", "type": ["null", "string"], "default": null}, {"name": "summary", "type": ["null", "string"]}, {"name": "message", "type": ["null", "string"]}, {"name": "tags", "type" : ["null", {"name" : "Tags", From 23b5bd3f981014816ebbd7b3c1463f1d0b7ad480 Mon Sep 17 00:00:00 2001 From: Daniel Vrcic Date: Sat, 16 Jun 2018 22:17:09 +0200 Subject: [PATCH 10/11] call stats methods from childs processes --- pymod/consume.py | 6 ++++++ pymod/run.py | 4 ++-- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/pymod/consume.py b/pymod/consume.py index 5760656..9b993e5 100644 --- a/pymod/consume.py +++ b/pymod/consume.py @@ -44,6 +44,7 @@ def cleanup(self): def run(self): termev = self.events['term-'+self.name] usr1ev = self.events['usr1-'+self.name] + periodev = self.events['period-'+self.name] lck = self.events['lck-'+self.name] evgup = self.events['giveup-'+self.name] @@ -67,6 +68,11 @@ def run(self): lck.release() usr1ev.clear() + if periodev.is_set(): + self.stat_reset() + self.publisher.stat_reset() + periodev.clear() + if self.consume_dirq_msgs(max(self.shared.topic['bulk'], self.shared.queue['rate'])): ret, published = self.publisher.write() diff --git a/pymod/run.py b/pymod/run.py index 3ac53a3..ff66c23 100644 --- a/pymod/run.py +++ b/pymod/run.py @@ -51,6 +51,7 @@ def init_dirq_consume(workers, daemonized, sockstat): localevents.update({'lck-'+w: Lock()}) localevents.update({'usr1-'+w: Event()}) + localevents.update({'period-'+w: Event()}) localevents.update({'term-'+w: Event()}) localevents.update({'termth-'+w: ThreadEvent()}) localevents.update({'giveup-'+w: Event()}) @@ -77,8 +78,7 @@ def init_dirq_consume(workers, daemonized, sockstat): if int(time.time()) - prevstattime >= shared.general['statseveryhour'] * 3600: shared.log.info('Periodic report (every %sh)' % shared.general['statseveryhour']) for c in consumers: - c.stat_reset() - c.publisher.stat_reset() + localevents['period-'+c.name].set() prevstattime = int(time.time()) for c in consumers: From 7f343a0d62ea520fb92b16e1ea5636a8958bf6a4 Mon Sep 17 00:00:00 2001 From: Daniel Vrcic Date: Tue, 19 Jun 2018 08:31:03 +0200 Subject: [PATCH 11/11] spec update for release --- argo-nagios-ams-publisher.spec | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/argo-nagios-ams-publisher.spec b/argo-nagios-ams-publisher.spec index 9802803..901480b 100644 --- a/argo-nagios-ams-publisher.spec +++ b/argo-nagios-ams-publisher.spec @@ -10,7 +10,7 @@ %endif Name: argo-nagios-ams-publisher -Version: 0.3.0 +Version: 0.3.1 Release: 1%{mydist} Summary: Bridge from Nagios to the ARGO Messaging system @@ -109,6 +109,10 @@ if ! /usr/bin/getent group nagiocmd &>/dev/null; then fi %changelog +* Tue Jun 19 2018 Daniel Vrcic - 0.3.1-1%{?dist} +- ARGO-1250 Inspection local socket is left with root permissions +- ARGO-1147 AMS publisher to add optional field +- ARGO-986 Purger should not try to remove non-existing cache msg * Tue Mar 27 2018 Daniel Vrcic - 0.3.0-1%{?dist} - ARGO-1084 Connection settings per topic publisher - ARGO-1023 Send messages to prod and devel AMS instance in parallel