zabbix 线路质量监控自定义python模块,集成ICMP/TCP/UDP探测,批量监控线路质量自定义阈值联动mtr保存线路故障日志并发送至noc邮箱

zabbix 线路质量监控自定义python模块,集成ICMP/TCP/UDP探测,批量监控线路质量自定义阈值联动mtr保存线路故障日志并发送至noc邮箱

互联网故障一般表现为丢包和时延增大,持续性故障不难排查,难的是间歇性或凌晨故障,后者往往来不及等我们测试就已经恢复正常,得不到异常时的mtr无法判断故障点在哪里

故此有了根据丢包率和时延变换联动mtr的需求

前段时间使用Mysql实现了这个功能,缺点是占用太多系统资源,且脚本繁重,优点是数据可复用,做多种形式的展示

后续使用socket+deque实现低能耗与轻量,也可用通过开放互联网API来做分布式监控,缺点是历史数据不留存,用完即丢

系统环境

  Ubuntu 18.04.5 LTS+Python 3.6.9 

python库

  自带基本库,考虑到系统权限问题没有使用第三方库

ip查询

  http://ip-api.com,免费版,限制频率45次/分钟,国外归属地准确率较高,国内查询一塌糊涂,国内推荐使用ipip

  

  1 #!/usr/bin/env python3
  2 #-*-coding:utf-8-*-
  3 from collections import deque
  4 import itertools,time
  5 import queue,json
  6 import argparse,sys,re,os,subprocess
  7 import time,socket,random,string
  8 import threading
  9 from functools import reduce
 10 import logging
 11 
 12 ipqli=deque()
 13 filename = os.path.realpath(sys.argv[0])
 14 def logger():
 15     dir = os.path.dirname(os.path.realpath(sys.argv[0]))
 16     log_name = dir+"/log"
 17     logger = logging.getLogger()
 18     fh = logging.FileHandler(log_name)
 19     formater = logging.Formatter("%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s")
 20     fh.setFormatter(formater)
 21     logger.setLevel(logging.DEBUG)
 22     logger.addHandler(fh)
 23     return logger
 24 log = logger()
 25 #ping程序,避免系统权限问题未使用ping3
 26 class Ping:
 27     def __init__(self,ip,count=20,udp_length=64):
 28         ip = tuple(ip)
 29         self.sip,self.tip,self.type,self.port,self.inver=ip
 30         self.type = self.type.lower()
 31         self.port = int(self.port)
 32         self.count=count
 33         self.inver = float(self.inver)
 34         self.udp_length=udp_length
 35         restime_name = "restime_deque"+"".join(ip).replace(".","")
 36         pkloss_name = "pkloss_deque"+"".join(ip).replace(".","")
 37         ipqevent = "event"+"".join(ip).replace(".","")
 38         locals()[restime_name] = deque(maxlen=60)
 39         locals()[pkloss_name] = deque(maxlen=60)
 40         self.restime_deque = locals()[restime_name]
 41         self.pkloss_deque = locals()[pkloss_name]
 42         self.ret_restime_deque = globals()[restime_name]
 43         self.ret_pkloss_deque = globals()[pkloss_name]
 44         self.ipqevent = globals()[ipqevent]
 45         self.compile= r"(?<=time=)d+.?d+(?= ms)"
 46     def _tcp(self):
 47             s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 48             s.settimeout(1)
 49             start_time = time.time()
 50             res_count=0
 51             try:
 52                 s.bind((self.sip,0))
 53                 s.connect((self.tip, self.port))
 54                 s.shutdown(socket.SHUT_RD)
 55                 value = (time.time() - start_time)*1000  
 56                 self.restime_deque.append(value)
 57                 self.pkloss_deque.append(0)
 58                 res_count=1
 59             except socket.timeout:
 60                 self.restime_deque.append(0)
 61                 self.pkloss_deque.append(1)
 62             except OSError as e:
 63                 log.debug(e)
 64                 return 0,0
 65             usetime = time.time()-start_time
 66             sleep_time = self.inver - usetime if usetime<self.inver else self.inver
 67             return sleep_time,res_count
 68     def _udp(self):
 69         res_count=0
 70         s = socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
 71         s.settimeout(1)
 72         start_time = time.time()
 73         data="".join(random.choice(string.ascii_letters+ string.digits) for x in range(self.udp_length))
 74         try:
 75             s.sendto(data.encode("utf-8"),(self.tip,self.port))
 76             s.recv(1024)
 77             value = (time.time() - start_time)*1000
 78             self.restime_deque.append(value)
 79             self.pkloss_deque.append(0)
 80             res_count=1
 81         except socket.timeout:
 82             self.restime_deque.append(0)
 83             self.pkloss_deque.append(1)
 84         except OSError as e:
 85             log.debug(e)
 86             return 0,0
 87         usetime = time.time()-start_time
 88         sleep_time = self.inver - usetime if usetime<self.inver else self.inver
 89         return sleep_time,res_count
 90     def _icmp(self):
 91         res_count=0
 92         start_time = time.time()
 93         cmd = "ping -i %s -c 1 -W 1 -I %s %s"%(self.inver,self.sip,self.tip)
 94         ret = subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE).communicate()[0].decode("utf8")
 95         try:
 96             value=re.findall(self.compile, ret,re.S)[0]
 97             self.restime_deque.append(value)
 98             self.pkloss_deque.append(0)
 99             res_count=1
100         except:
101             self.pkloss_deque.append(1)
102             self.restime_deque.append(0)
103         usetime = time.time()-start_time
104         sleep_time = self.inver - usetime if usetime<self.inver else self.inver
105         return sleep_time,res_count
106     def fastping(self):
107         getattr(self, "_"+self.type)()
108     def slow_ping(self):
109         index = 0
110         res_count=0
111         self.ipqevent.set()
112         while index<self.count:
113             sleep_time,count=getattr(self, "_"+self.type)()
114             index+=1
115             res_count+=count
116             if len(self.ret_restime_deque)<2 or len(self.ret_pkloss_deque)<2 :
117                 break
118             time.sleep(sleep_time)
119         return index,res_count
120     def ping_value(self):
121         start_time = time.time()
122         count = self.count
123         rescount = self.count
124         if len(self.ret_restime_deque)<2 or len(self.ret_pkloss_deque)<2:
125             fastli=[]
126             for x in range(self.count):
127                 t = threading.Thread(target=self.fastping)
128                 t.start()
129                 fastli.append(t)
130             for th in fastli:
131                 th.join()
132         else:
133             count,rescount = self.slow_ping()
134             rescount=count if rescount==0 else rescount
135         use_time = round(time.time()-start_time,4)
136         li = [self.restime_deque.pop() for x in range(count)]
137         pkli = [self.pkloss_deque.pop() for x in range(count)]
138         try:
139             restime = reduce(lambda x ,y :round(float(x)+float(y),2), li)/rescount if len(li) >1 else round(float(li[0]),2)
140             pkloss= reduce(lambda x ,y :int(x)+int(y), pkli)/count*100
141             return (round(restime,2),round(pkloss,2),use_time)   
142         except Exception as e:
143             log.debug(e)
144             return 0,0,0
145 #server端代码
146 class Server():
147     def __init__(self,sock):
148         global ipqli
149         self.ipqli=ipqli
150         self.thli=[]
151         self.ipli = []
152         self.sock=sock
153         self.basedir = os.path.dirname(os.path.realpath(sys.argv[0]))
154         self.env = threading.Event()
155     @classmethod
156     def start(cls):
157         s = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
158         s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
159         address = ("127.0.0.1",6590)
160         s.bind(address)
161         obj = cls(s)
162         ping_server=threading.Thread(target=obj.server)
163         ping_server.start()
164         obj.thli.append(ping_server)
165         create_t = threading.Thread(target=obj.create)
166         create_t.start()
167         obj.thli.append(create_t)
168         for t in obj.thli:
169             t.join()
170     def server(self):
171         while True:
172             self.sock.listen(100)
173             conn,addr = self.sock.accept() 
174             data=conn.recv(1024) 
175             data = data.decode("utf-8")
176             data = json.loads(data)
177             ip,item = data
178             restime_ipq = "restime_deque"+"".join(ip).replace(".","")
179             pkloss_ipq = "pkloss_deque"+"".join(ip).replace(".","")
180             ipqevent = "event"+"".join(ip).replace(".","")
181             if ip not in self.ipli:
182                 globals()[restime_ipq] = deque(maxlen=30)
183                 globals()[pkloss_ipq] = deque(maxlen=30)
184                 globals()[ipqevent] = threading.Event()
185                 self.ipqli.append(ip)
186                 self.ipli.append(ip)
187                 log.debug("create ipdeque %s %s"%(restime_ipq,pkloss_ipq))
188                 self.env.set()
189             self.sendvalue(conn,ip,item)
190             conn.close()
191     def create(self):
192         while True:
193             self.env.wait()
194             try:
195                 ip = self.ipqli.pop()
196                 log.debug("create %s"%ip)
197                 t=threading.Thread(target=self.makevalue,args=(ip,))
198                 t.start()
199             except Exception as a:
200                 log.debug(str(a))
201             if not self.ipqli:
202                 self.env.clear()
203             
204     def makevalue(self,ip):
205         restime_name = "restime_deque"+"".join(ip).replace(".","")
206         pkloss_name = "pkloss_deque"+"".join(ip).replace(".","")
207         restime_ipq = globals()[restime_name]
208         pkloss_ipq = globals()[pkloss_name]
209         obj = Ping(ip)
210         while len(restime_ipq) < 30 or len(pkloss_ipq) <30:
211                 restime,pkloss,use_time=obj.ping_value()            
212                 restime_ipq.append((restime,use_time))
213                 pkloss_ipq.append((pkloss,use_time))   
214         else:
215             del restime_ipq
216             del pkloss_ipq
217             self.ipli.remove(ip)
218             log.debug("delete ipdeque %s %s"%(restime_name,pkloss_name))
219     def sendvalue(self,conn,ip,item):
220         fromat_ip="".join(ip).replace(".","")
221         _,tip,*arg=ip
222         restime_name = "restime_deque"+fromat_ip
223         pkloss_name = "pkloss_deque"+fromat_ip
224         ipqevent_name = "event"+fromat_ip
225         restime_ipq = globals()[restime_name]
226         pkloss_ipq = globals()[pkloss_name]
227         ipqevent = globals()[ipqevent_name]
228         mtr_dir = self.basedir+"/mtr_log/"+tip+"-"+time.strftime("%Y-%m-%d",time.localtime()) + ".log"
229         mtr_cmd = self.basedir + "/mtr.py"+" "+tip+" "+mtr_dir
230         if len(restime_ipq) < 2 and len(restime_ipq) <2:
231             ipqevent.clear()
232         try:
233             ipqevent.wait()
234             if item =="restime":
235                 ret,use_time = restime_ipq.pop()
236                 hisret,_=restime_ipq[-1]
237                 if ret - hisret >20:
238                     subprocess.Popen(mtr_cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
239             elif item =="pkloss":
240                 ret,use_time = pkloss_ipq.pop()
241                 if 100> ret  >20:
242                     subprocess.Popen(mtr_cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
243         except Exception as a:
244             ret = a
245             log.debug(str(ret))
246         conn.sendall(str(ret).encode())
247 
248 #用户输入IP格式检查
249 class Ipcheck():
250     def __init__(self,sip,tip,item,ping_type,inver):
251         self.sip =sip
252         self.tip=tip
253         self.item=item
254         self.type = ping_type.lower()
255         self.inver=float(inver)
256     def check(self):
257         if self.item not in ["restime","pkloss"] or self.type not in ["icmp","tcp","udp"] or self.inver<0.2:
258             return False
259         elif not self.checkipformat():
260             return False
261         else:
262             return True
263     def check_fun(self,ip):
264         return int(ip)<256
265     def checkipformat(self):
266         try:
267             tiplist = self.tip.split(".")
268             tipformat = re.findall(r"^d+.d+.d+.d+$", self.tip)
269             if  self.sip:
270                 siplist = self.sip.split(".")
271                 sipformat = re.findall(r"^d+.d+.d+.d+$", self.sip)
272             else:
273                 siplist=[1,1,1,1]
274                 sipformat=True
275             if not tipformat or not sipformat:
276                 raise
277             check_ipli = tiplist+siplist
278             return self.checkiplength(check_ipli)
279         except:
280             return False
281     def checkiplength(self,check_ipli):
282         if list(itertools.filterfalse(self.check_fun, check_ipli)):
283             return False
284         else:
285             return True        
286 def run():
287     
288     cmd = "python3 %s -S server"%filename
289     subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
290 #socket_client端,向server请求数据并返回给用户
291 def socket_client(ip,item):
292     try:
293         s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
294         s.settimeout(3)
295         s.connect(("127.0.0.1",6590))
296         data = [ip,item]
297         data = json.dumps(data)
298         s.sendall(data.encode())
299         ret = s.recv(1024)
300         s.close()
301         print(ret.decode())
302     except socket.timeout as t:
303         log.debug(str(t))
304         s.close()
305         sys.exit(0)
306     except Exception as e:
307         print("server will start")
308         log.debug(str(e))
309         sys.exit(0)
310 if __name__ == "__main__":
311     parser = argparse.ArgumentParser(description="icmp for monitor")
312     parser.add_argument("-S",action = "store",dest="server")
313     parser.add_argument("-t",action = "store",dest="tip")
314     parser.add_argument("-s",action = "store",dest="sip")
315     parser.add_argument("-I",action="store",dest="item")
316     parser.add_argument("-i",action="store",dest="inver",default="1")
317     parser.add_argument("-T",action="store",dest="ping_type",default="icmp")
318     parser.add_argument("-p",action="store",dest="port",default="0")
319     args= parser.parse_args()
320     server_status_cmd = "ps -ef | grep "%s -S server" | grep -v grep | cut -c 9-16"%filename
321     server_status  = subprocess.Popen(server_status_cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE).communicate()[0]
322     if not server_status:
323         run()
324     if args.server:
325         Server.start()
326         sys.exit(0)
327     try:
328         tip = socket.gethostbyname(args.tip)
329         sip = args.sip
330         item = args.item
331         ping_type = args.ping_type
332         port = args.port
333         inver=args.inver
334         ip=(sip,tip,ping_type,port,inver)
335     except:
336         print("format error")
337     check = Ipcheck(sip, tip, item,ping_type,inver)
338     if not check.check():
339         print("""---------------------------Options-----------------------------------
340 -s --source ip address
341 -t --destination ip address
342 -I --item(restime/pkloss)
343 -T --type(icmp/tcp/udp default icmp)
344 -p --port(default 0)
345 -i --inver(default 1/min 0.2)
346 ---------------------------Example-----------------------------------
347 ------pingd -s 10.0.3.108 -t 10.0.0.1 -I restime -i 1 -T tcp -p 80-------
348         """)
349         sys.exit(0)
350     socket_client(ip,item)
hmoban主题是根据ripro二开的主题,极致后台体验,无插件,集成会员系统
自学咖网 » zabbix 线路质量监控自定义python模块,集成ICMP/TCP/UDP探测,批量监控线路质量自定义阈值联动mtr保存线路故障日志并发送至noc邮箱