Skip to content

Commit

Permalink
Merge pull request #40 from ARGOeu/devel
Browse files Browse the repository at this point in the history
Version 0.3.1
  • Loading branch information
themiszamani authored Jun 19, 2018
2 parents 1f75b18 + 7b41d61 commit b5724c0
Show file tree
Hide file tree
Showing 11 changed files with 68 additions and 50 deletions.
8 changes: 6 additions & 2 deletions argo-nagios-ams-publisher.spec
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
%endif

Name: argo-nagios-ams-publisher
Version: 0.3.0
Version: 0.3.1
Release: 1%{mydist}
Summary: Bridge from Nagios to the ARGO Messaging system

Expand Down Expand Up @@ -61,7 +61,7 @@ install --directory --mode 755 $RPM_BUILD_ROOT/%{_localstatedir}/run/%{name}/
%{python_sitelib}/%{underscore %{name}}/*.py[co]
%defattr(-,nagios,nagios,-)
%dir %{_localstatedir}/log/%{name}/
%dir %{_localstatedir}/run/%{name}/
%attr(0755,nagios,nagios) %dir %{_localstatedir}/run/%{name}/
%dir %{_localstatedir}/spool/%{name}/metrics/
%dir %{_localstatedir}/spool/%{name}/alarms/

Expand Down Expand Up @@ -109,6 +109,10 @@ if ! /usr/bin/getent group nagiocmd &>/dev/null; then
fi

%changelog
* Tue Jun 19 2018 Daniel Vrcic <dvrcic@srce.hr> - 0.3.1-1%{?dist}
- ARGO-1250 Inspection local socket is left with root permissions
- ARGO-1147 AMS publisher to add optional field
- ARGO-986 Purger should not try to remove non-existing cache msg
* Tue Mar 27 2018 Daniel Vrcic <dvrcic@srce.hr> - 0.3.0-1%{?dist}
- ARGO-1084 Connection settings per topic publisher
- ARGO-1023 Send messages to prod and devel AMS instance in parallel
Expand Down
5 changes: 3 additions & 2 deletions bin/ams-publisherd
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,15 @@ shared = None
logger = None


def setup_statssocket(path):
def setup_statssocket(path, uid, gid):
global shared

if os.path.exists(path):
os.unlink(path)
try:
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.bind(path)
os.chown(path, uid, gid)
except socket.error as e:
shared.log.error('Error setting up socket: %s - %s' % (path, str(e)))
raise SystemExit(1)
Expand Down Expand Up @@ -81,7 +82,7 @@ def daemon_start(context_daemon, restart=False):
context_daemon.uid = uid
context_daemon.gid = gid
os.chown(shared.log.fileloghandle.name, uid, gid)
sock = setup_statssocket(shared.general['statsocket'])
sock = setup_statssocket(shared.general['statsocket'], uid, gid)
context_daemon.files_preserve = [shared.log.fileloghandle, sock.fileno()]

if not restart:
Expand Down
1 change: 1 addition & 0 deletions config/metric_data.avsc
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
{"name": "metric", "type": "string"},
{"name": "status", "type": "string"},
{"name": "monitoring_host", "type": ["null", "string"]},
{"name": "actual_data", "type": ["null", "string"], "default": null},
{"name": "summary", "type": ["null", "string"]},
{"name": "message", "type": ["null", "string"]},
{"name": "tags", "type" : ["null", {"name" : "Tags",
Expand Down
1 change: 1 addition & 0 deletions helpers/ams-msg-generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ def construct_msg(session, bodysize, timezone):
msg.body += 'summary: %s\n' % generator.rndb64(20)
msg.body += 'message: %s\n' % generator.rndb64(bodysize)
msg.body += 'vofqan: %s\n' % generator.rndb64(10)
msg.body += 'actual_data: %s\n' % generator.rndb64(10)
msg.body += 'voname: %s\n' % generator.rndb64(3)
msg.body += 'roc: %s\n' % generator.rndb64(3)

Expand Down
18 changes: 9 additions & 9 deletions pymod/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,12 @@ def parse_config(logger=None):
dirqopts = dict()
qname = section.split('_', 1)[1].lower()
dirqopts['directory'] = config.get(section, 'Directory')
dirqopts['rate'] = int(config.get(section, 'Rate'))
dirqopts['rate'] = config.getint(section, 'Rate')
dirqopts['purge'] = eval(config.get(section, 'Purge').strip())
dirqopts['purgeeverysec'] = int(config.get(section, 'PurgeEverySec'))
dirqopts['maxtemp'] = int(config.get(section, 'MaxTemp'))
dirqopts['maxlock'] = int(config.get(section, 'MaxLock'))
dirqopts['granularity'] = int(config.get(section, 'Granularity'))
dirqopts['purgeeverysec'] = config.getint(section, 'PurgeEverySec')
dirqopts['maxtemp'] = config.getint(section, 'MaxTemp')
dirqopts['maxlock'] = config.getint(section, 'MaxLock')
dirqopts['granularity'] = config.getint(section, 'Granularity')
queues[qname] = dirqopts
if section.startswith('Topic_'):
topts = dict()
Expand All @@ -74,13 +74,13 @@ def parse_config(logger=None):
topts['key'] = config.get(section, 'Key')
topts['project'] = config.get(section, 'Project')
topts['topic'] = config.get(section, 'Topic')
topts['bulk'] = int(config.get(section, 'BulkSize'))
topts['bulk'] = config.getint(section, 'BulkSize')
topts['avro'] = eval(config.get(section, 'Avro').strip())
if topts['avro']:
topts['avroschema'] = config.get(section, 'AvroSchema')
topts['retry'] = int(config.get(section, 'Retry'))
topts['timeout'] = int(config.get(section, 'Timeout'))
topts['sleepretry'] = int(config.get(section, 'SleepRetry'))
topts['retry'] = config.getint(section, 'Retry')
topts['timeout'] = config.getint(section, 'Timeout')
topts['sleepretry'] = config.getint(section, 'SleepRetry')
topics[tname] = topts

for k, v in queues.iteritems():
Expand Down
22 changes: 12 additions & 10 deletions pymod/consume.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ def cleanup(self):
self.unlock_dirq_msgs(self.seenmsgs)

def run(self):
self.prevstattime = int(time.time())
termev = self.events['term-'+self.name]
usr1ev = self.events['usr1-'+self.name]
periodev = self.events['period-'+self.name]
lck = self.events['lck-'+self.name]
evgup = self.events['giveup-'+self.name]

Expand All @@ -68,6 +68,11 @@ def run(self):
lck.release()
usr1ev.clear()

if periodev.is_set():
self.stat_reset()
self.publisher.stat_reset()
periodev.clear()

if self.consume_dirq_msgs(max(self.shared.topic['bulk'],
self.shared.queue['rate'])):
ret, published = self.publisher.write()
Expand All @@ -89,11 +94,6 @@ def run(self):
evgup.set()
raise SystemExit(0)

if int(time.time()) - self.prevstattime >= self.shared.general['statseveryhour'] * 3600:
self.stat_reset()
self.publisher.stat_reset()
self.prevstattime = int(time.time())

time.sleep(decimal.Decimal(1) / decimal.Decimal(self.shared.queue['rate']))

except KeyboardInterrupt:
Expand All @@ -107,7 +107,6 @@ def _increm_intervalcounters(self, num):
def consume_dirq_msgs(self, num=0):
def _inmemq_append(elem):
self.inmemq.append(elem)
self.shared.stats['consumed'] += 1
self._increm_intervalcounters(1)
self.sess_consumed += 1
if num and self.sess_consumed == num:
Expand Down Expand Up @@ -138,7 +137,9 @@ def unlock_dirq_msgs(self, msgs=None):
try:
msgl = msgs if msgs else self.inmemq
for m in msgl:
self.dirq.unlock(m[0] if not isinstance(m, str) else m)
msg = m[0] if not isinstance(m, str) else m
if os.path.exists('{0}/{1}'.format(self.dirq.path, msg)):
self.dirq.unlock(msg)
self.inmemq.clear()
except (OSError, IOError) as e:
self.shared.log.error(e)
Expand All @@ -147,8 +148,9 @@ def remove_dirq_msgs(self, msgs=None):
try:
msgl = msgs if msgs else self.inmemq
for m in msgl:
self.dirq.remove(m[0] if not isinstance(m, str) else m)
msg = m[0] if not isinstance(m, str) else m
if os.path.exists('{0}/{1}'.format(self.dirq.path, msg)):
self.dirq.remove(msg)
self.inmemq.clear()
except (OSError, IOError) as e:
self.shared.log.error(e)

7 changes: 4 additions & 3 deletions pymod/metrictoqueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def build_msg(args, *headers):
msg.header.update({'status': status.encode('utf-8')})
msg.header.update({'monitoring_host': nagioshost.encode('utf-8')})

for bs in ['summary', 'message', 'vofqan', 'voname', 'roc']:
for bs in ['summary', 'message', 'vofqan', 'voname', 'roc', 'actual_data']:
code = "msg.body += '%s: ' + args.%s.encode(\'utf-8\') + '\\n' if args.%s else ''" % (bs, bs, bs)
exec code

Expand All @@ -61,11 +61,12 @@ def main():
parser.add_argument('--status', required=True, type=str)

# msg body
parser.add_argument('--summary', required=False, type=str)
parser.add_argument('--actual_data', required=False, type=str)
parser.add_argument('--message', required=False, type=str)
parser.add_argument('--roc', required=False, type=str)
parser.add_argument('--summary', required=False, type=str)
parser.add_argument('--vofqan', required=False, type=str)
parser.add_argument('--voname', required=False, type=str)
parser.add_argument('--roc', required=False, type=str)

args = parser.parse_args()

Expand Down
1 change: 0 additions & 1 deletion pymod/publish.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,6 @@ def _write(self, msgs):
lck.acquire(False)
self.ams.publish(self.shared.topic['topic'], msgs, timeout=self.shared.topic['timeout'])
published.update([self.inmemq[e][0] for e in range(self.shared.topic['bulk'])])
self.shared.stats['published'] += self.shared.topic['bulk']
self._increm_intervalcounters(self.shared.topic['bulk'])
self.inmemq.rotate(-self.shared.topic['bulk'])
break
Expand Down
16 changes: 14 additions & 2 deletions pymod/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,12 @@ def init_dirq_consume(workers, daemonized, sockstat):

for w in workers:
shared = Shared(worker=w)
shared.statint[w]['published'] = Array('i', 7)
shared.statint[w]['consumed'] = Array('i', 7)
# Create arrays of integers that will be shared across spawned processes
# and that will keep track of number of published and consumed messages
# in 15, 30, 60, 180, 360, 720 and 1440 minutes. Last integer will be
# used for periodic reports.
shared.statint[w]['published'] = Array('i', 8)
shared.statint[w]['consumed'] = Array('i', 8)
if not getattr(shared, 'runtime', False):
shared.runtime = dict()
shared.runtime['started'] = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
Expand All @@ -47,6 +51,7 @@ def init_dirq_consume(workers, daemonized, sockstat):

localevents.update({'lck-'+w: Lock()})
localevents.update({'usr1-'+w: Event()})
localevents.update({'period-'+w: Event()})
localevents.update({'term-'+w: Event()})
localevents.update({'termth-'+w: ThreadEvent()})
localevents.update({'giveup-'+w: Event()})
Expand All @@ -68,7 +73,14 @@ def init_dirq_consume(workers, daemonized, sockstat):
statsp.daemon = False
statsp.start()

prevstattime = int(time.time())
while True:
if int(time.time()) - prevstattime >= shared.general['statseveryhour'] * 3600:
shared.log.info('Periodic report (every %sh)' % shared.general['statseveryhour'])
for c in consumers:
localevents['period-'+c.name].set()
prevstattime = int(time.time())

for c in consumers:
if localevents['giveup-'+c.name].is_set():
c.terminate()
Expand Down
10 changes: 2 additions & 8 deletions pymod/shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,15 @@ def __init__(self, confopts=None, worker=None):
self._topics = confopts['topics']
if not getattr(self, 'general', False):
self.general = confopts['general']
if not getattr(self, '_stats', False):
self._stats = dict()
if not getattr(self, 'statint', False):
self.statint = dict()
self.workers = self._queues.keys()
if worker:
self.worker = worker
self.queue = self._queues[worker]
self.topic = self._topics[worker]
if worker not in self._stats:
self._stats[worker] = dict(published=0)
self._stats[worker].update(dict(consumed=0))
if worker not in self.statint:
self.statint[worker] = dict(published=None, consumed=None)
self.stats = self._stats[worker]


def add_event(self, name, ev):
if not getattr(self, 'events', False):
Expand All @@ -44,7 +38,7 @@ def add_log(self, logger):
self.log= None
self.log = logger

def get_nmsg_interval(self, worker, what, interval):
def get_nmsg(self, worker, what, interval):
n = None

try:
Expand Down
29 changes: 16 additions & 13 deletions pymod/stats.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
import socket
import select
import os
import time
import re
import copy
import decimal
import errno
import os
import re
import select
import socket
import time

from threading import Thread
from multiprocessing import Process
from argo_nagios_ams_publisher.shared import Shared

maxcmdlength = 128


class StatSig(object):
"""
Class is meant to be subclassed by ConsumerQueue and Publish classes for
Expand All @@ -25,7 +27,8 @@ def __init__(self, worker):
self._reset()

def _stat_msg(self, hours):
nmsg = self.shared.stats['published'] if self._iam_publisher() else self.shared.stats['consumed']
what = 'published' if self._iam_publisher() else 'consumed'
nmsg = self.shared.get_nmsg(self.name, what, 7)
self.shared.log.info('{0} {1}: {2} {3} msgs in {4:0.2f} hours'.format(self.__class__.__name__,
self.name,
self.msgdo,
Expand All @@ -34,9 +37,9 @@ def _stat_msg(self, hours):

def _reset(self):
if self._iam_publisher():
self.shared.stats['published'] = 0
self.shared.statint[self.name]['published'][7] = 0
else:
self.shared.stats['consumed'] = 0
self.shared.statint[self.name]['consumed'][7] = 0

def _iam_publisher(self):
return bool('Publish' in self.__class__.__name__)
Expand All @@ -47,8 +50,8 @@ def stat_reset(self):
self.laststattime = int(time.time())

def stats(self):
sincelaststat = int(time.time()) - self.laststattime
self._stat_msg(sincelaststat/3600)
sincelaststat = decimal.Decimal(int(time.time()) - self.laststattime)
self._stat_msg(sincelaststat/decimal.Decimal(3600))


class Reset(Thread):
Expand Down Expand Up @@ -78,7 +81,7 @@ def run(self):
break
now = int(time.time())
for k, v in self.last_reset.iteritems():
if now - self.last_reset[k] >= int(k) * 60:
if int(k) != 0 and now - self.last_reset[k] >= int(k) * 60:
for what in ['consumed', 'published']:
for w in self.shared.workers:
idx = self.map[k]
Expand Down Expand Up @@ -108,7 +111,7 @@ def __init__(self, events, sock):
self.shared = Shared()
self.sock = sock
self._int2idx = {'15': 0, '30': 1, '60': 2, '180': 3, '360': 4,
'720': 5, '1440': 6}
'720': 5, '1440': 6, '0': 7}
self.resetth = Reset(events=events, map=self._int2idx)

try:
Expand Down Expand Up @@ -149,7 +152,7 @@ def answer(self, query):
a = ''
for q in query:
if q[1] != 'error':
r = self.shared.get_nmsg_interval(q[0], q[1], q[2])
r = self.shared.get_nmsg(q[0], q[1], q[2])
a += 'w:%s+r:%s ' % (str(q[0]), str(r))
else:
a += 'w:%s+r:error ' % str(q[0])
Expand Down

0 comments on commit b5724c0

Please sign in to comment.