-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathbase_snmp.py
140 lines (116 loc) · 4.29 KB
/
base_snmp.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
try:
from pysnmp.entity.rfc3413.oneliner import cmdgen
except ImportError:
raise ImportError('pysnmp library is not installed, install it with "pip install pysnmp"')
from exceptions import *
from base import BaseBackend
from interfaces import interfaces
__all__ = ['SNMP']
class SNMP(BaseBackend):
"""
SNMP base backend
"""
_oid_to_retrieve = None
def __init__(self, host, community='public', agent='my-agent', port=161):
"""
:host string: required
:community string: defaults to public
:agent string: defaults to my-agent
:port integer: defaults to 161
"""
self.host = host
self.community = cmdgen.CommunityData(agent, community, 0)
self.transport = cmdgen.UdpTransportTarget((host, port))
self.port = port
def __str__(self):
""" prints a human readable object description """
return "<SNMP: %s>" % self.host
def __repr__(self):
""" returns unicode string represantation """
return self.__str__()
def __unicode__(self):
""" unicode __str__() for python2.7 """
return unicode(self.__str__())
@property
def _command(self):
"""
alias to cmdgen.CommandGenerator()
"""
return cmdgen.CommandGenerator()
def _oid(self, oid):
"""
returns valid oid value to be passed to getCmd() or nextCmd()
"""
if type(oid) not in (str, unicode, tuple, list):
raise AttributeError('get accepts only strings, tuples or lists')
# allow string representations of oids with commas ,
elif isinstance(oid, basestring):
# ignore spaces
oid = oid.replace(' ', '').replace(',', '.')
# convert lists and tuples into strings
else:
# convert each list item to string
oid = [str(element) for element in oid]
oid = '.'.join(oid)
# ensure is string (could be unicode)
return str(oid)
def get(self, oid):
"""
alias to cmdgen.CommandGenerator().getCmd
:oid string|tuple|list: string,
tuple or list representing the OID to get
example of valid oid parameters:
* "1,3,6,1,2,1,1,5,0"
* "1, 3, 6, 1, 2, 1, 1, 5, 0"
* "1.3.6.1.2.1.1.5.0"
* [1, 3, 6, 1, 2, 1, 1, 5, 0]
* (1, 3, 6, 1, 2, 1, 1, 5, 0)
"""
print 'DEBUG: SNMP GET %s' % self._oid(oid)
return self._command.getCmd(self.community, self.transport, self._oid(oid))
def next(self, oid):
"""
alias to cmdgen.CommandGenerator().nextCmd
:oid string|tuple|list:
string, tuple or list representing the OID to get
example of valid oid parameters:
* "1,3,6,1,2,1,1,5,0"
* "1, 3, 6, 1, 2, 1, 1, 5, 0"
* "1.3.6.1.2.1.1.5.0"
* [1, 3, 6, 1, 2, 1, 1, 5, 0]
* (1, 3, 6, 1, 2, 1, 1, 5, 0)
"""
print 'DEBUG: SNMP NEXT %s' % self._oid(oid)
return self._command.nextCmd(self.community, self.transport, self._oid(oid))
def get_value(self, oid):
"""
returns value of oid,
or raises NetEngineError Exception is anything wrong
:oid string|tuple|list:
string, tuple or list representing the OID to get
"""
result = self.get(oid)
try:
return str(result[3][0][1]) # snmp stores results in several arrays
except IndexError:
raise NetEngineError(str(result[0]))
def _value_to_retrieve(self):
"""
return the final SNMP indexes for the
interfaces to be used in the other methods and properties
"""
value_to_retr = []
if (self._oid_to_retrieve is None):
raise NetEngineError('Please fix properly the _oid_to_retrieve string in OpenWRT or AirOS or Cisco SNMP backend')
indexes = self.next(self._oid_to_retrieve)[3]
for i in range(len(indexes)):
value_to_retr.append(int(indexes[i][0][1]))
return value_to_retr
def interfaces(self, key):
"""
Lookup to the interfaces' dictionary
"""
try:
return interfaces[key]
except KeyError:
raise NetEngineError("No valid interfaces found for key: %s" % key)