zabbix 线路质量监控自定义python模块,集成ICMP/TCP/UDP探测,批量监控线路质量自定义阈值联动mtr保存线路故障日志并发送至noc邮箱
互联网故障一般表现为丢包和时延增大,持续性故障不难排查,难的是间歇性或凌晨故障,后者往往来不及等我们测试就已经恢复正常,得不到异常时的mtr无法判断故障点在哪里
故此有了根据丢包率和时延变换联动mtr的需求
前段时间使用Mysql实现了这个功能,缺点是占用太多系统资源,且脚本繁重,优点是数据可复用,做多种形式的展示
后续使用socket+deque实现低能耗与轻量,也可用通过开放互联网API来做分布式监控,缺点是历史数据不留存,用完即丢
系统环境
Ubuntu 18.04.5 LTS+Python 3.6.9
python库
自带基本库,考虑到系统权限问题没有使用第三方库
ip查询
http://ip-api.com,免费版,限制频率45次/分钟,国外归属地准确率较高,国内查询一塌糊涂,国内推荐使用ipip
1 #!/usr/bin/env python3 2 #-*-coding:utf-8-*- 3 from collections import deque 4 import itertools,time 5 import queue,json 6 import argparse,sys,re,os,subprocess 7 import time,socket,random,string 8 import threading 9 from functools import reduce 10 import logging 11 12 ipqli=deque() 13 filename = os.path.realpath(sys.argv[0]) 14 def logger(): 15 dir = os.path.dirname(os.path.realpath(sys.argv[0])) 16 log_name = dir+"/log" 17 logger = logging.getLogger() 18 fh = logging.FileHandler(log_name) 19 formater = logging.Formatter("%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s") 20 fh.setFormatter(formater) 21 logger.setLevel(logging.DEBUG) 22 logger.addHandler(fh) 23 return logger 24 log = logger() 25 #ping程序,避免系统权限问题未使用ping3 26 class Ping: 27 def __init__(self,ip,count=20,udp_length=64): 28 ip = tuple(ip) 29 self.sip,self.tip,self.type,self.port,self.inver=ip 30 self.type = self.type.lower() 31 self.port = int(self.port) 32 self.count=count 33 self.inver = float(self.inver) 34 self.udp_length=udp_length 35 restime_name = "restime_deque"+"".join(ip).replace(".","") 36 pkloss_name = "pkloss_deque"+"".join(ip).replace(".","") 37 ipqevent = "event"+"".join(ip).replace(".","") 38 locals()[restime_name] = deque(maxlen=60) 39 locals()[pkloss_name] = deque(maxlen=60) 40 self.restime_deque = locals()[restime_name] 41 self.pkloss_deque = locals()[pkloss_name] 42 self.ret_restime_deque = globals()[restime_name] 43 self.ret_pkloss_deque = globals()[pkloss_name] 44 self.ipqevent = globals()[ipqevent] 45 self.compile= r"(?<=time=)d+.?d+(?= ms)" 46 def _tcp(self): 47 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 48 s.settimeout(1) 49 start_time = time.time() 50 res_count=0 51 try: 52 s.bind((self.sip,0)) 53 s.connect((self.tip, self.port)) 54 s.shutdown(socket.SHUT_RD) 55 value = (time.time() - start_time)*1000 56 self.restime_deque.append(value) 57 self.pkloss_deque.append(0) 58 res_count=1 59 except socket.timeout: 60 self.restime_deque.append(0) 61 self.pkloss_deque.append(1) 62 except OSError as e: 63 log.debug(e) 64 return 0,0 65 usetime = time.time()-start_time 66 sleep_time = self.inver - usetime if usetime<self.inver else self.inver 67 return sleep_time,res_count 68 def _udp(self): 69 res_count=0 70 s = socket.socket(socket.AF_INET,socket.SOCK_DGRAM) 71 s.settimeout(1) 72 start_time = time.time() 73 data="".join(random.choice(string.ascii_letters+ string.digits) for x in range(self.udp_length)) 74 try: 75 s.sendto(data.encode("utf-8"),(self.tip,self.port)) 76 s.recv(1024) 77 value = (time.time() - start_time)*1000 78 self.restime_deque.append(value) 79 self.pkloss_deque.append(0) 80 res_count=1 81 except socket.timeout: 82 self.restime_deque.append(0) 83 self.pkloss_deque.append(1) 84 except OSError as e: 85 log.debug(e) 86 return 0,0 87 usetime = time.time()-start_time 88 sleep_time = self.inver - usetime if usetime<self.inver else self.inver 89 return sleep_time,res_count 90 def _icmp(self): 91 res_count=0 92 start_time = time.time() 93 cmd = "ping -i %s -c 1 -W 1 -I %s %s"%(self.inver,self.sip,self.tip) 94 ret = subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE).communicate()[0].decode("utf8") 95 try: 96 value=re.findall(self.compile, ret,re.S)[0] 97 self.restime_deque.append(value) 98 self.pkloss_deque.append(0) 99 res_count=1 100 except: 101 self.pkloss_deque.append(1) 102 self.restime_deque.append(0) 103 usetime = time.time()-start_time 104 sleep_time = self.inver - usetime if usetime<self.inver else self.inver 105 return sleep_time,res_count 106 def fastping(self): 107 getattr(self, "_"+self.type)() 108 def slow_ping(self): 109 index = 0 110 res_count=0 111 self.ipqevent.set() 112 while index<self.count: 113 sleep_time,count=getattr(self, "_"+self.type)() 114 index+=1 115 res_count+=count 116 if len(self.ret_restime_deque)<2 or len(self.ret_pkloss_deque)<2 : 117 break 118 time.sleep(sleep_time) 119 return index,res_count 120 def ping_value(self): 121 start_time = time.time() 122 count = self.count 123 rescount = self.count 124 if len(self.ret_restime_deque)<2 or len(self.ret_pkloss_deque)<2: 125 fastli=[] 126 for x in range(self.count): 127 t = threading.Thread(target=self.fastping) 128 t.start() 129 fastli.append(t) 130 for th in fastli: 131 th.join() 132 else: 133 count,rescount = self.slow_ping() 134 rescount=count if rescount==0 else rescount 135 use_time = round(time.time()-start_time,4) 136 li = [self.restime_deque.pop() for x in range(count)] 137 pkli = [self.pkloss_deque.pop() for x in range(count)] 138 try: 139 restime = reduce(lambda x ,y :round(float(x)+float(y),2), li)/rescount if len(li) >1 else round(float(li[0]),2) 140 pkloss= reduce(lambda x ,y :int(x)+int(y), pkli)/count*100 141 return (round(restime,2),round(pkloss,2),use_time) 142 except Exception as e: 143 log.debug(e) 144 return 0,0,0 145 #server端代码 146 class Server(): 147 def __init__(self,sock): 148 global ipqli 149 self.ipqli=ipqli 150 self.thli=[] 151 self.ipli = [] 152 self.sock=sock 153 self.basedir = os.path.dirname(os.path.realpath(sys.argv[0])) 154 self.env = threading.Event() 155 @classmethod 156 def start(cls): 157 s = socket.socket(socket.AF_INET,socket.SOCK_STREAM) 158 s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 159 address = ("127.0.0.1",6590) 160 s.bind(address) 161 obj = cls(s) 162 ping_server=threading.Thread(target=obj.server) 163 ping_server.start() 164 obj.thli.append(ping_server) 165 create_t = threading.Thread(target=obj.create) 166 create_t.start() 167 obj.thli.append(create_t) 168 for t in obj.thli: 169 t.join() 170 def server(self): 171 while True: 172 self.sock.listen(100) 173 conn,addr = self.sock.accept() 174 data=conn.recv(1024) 175 data = data.decode("utf-8") 176 data = json.loads(data) 177 ip,item = data 178 restime_ipq = "restime_deque"+"".join(ip).replace(".","") 179 pkloss_ipq = "pkloss_deque"+"".join(ip).replace(".","") 180 ipqevent = "event"+"".join(ip).replace(".","") 181 if ip not in self.ipli: 182 globals()[restime_ipq] = deque(maxlen=30) 183 globals()[pkloss_ipq] = deque(maxlen=30) 184 globals()[ipqevent] = threading.Event() 185 self.ipqli.append(ip) 186 self.ipli.append(ip) 187 log.debug("create ipdeque %s %s"%(restime_ipq,pkloss_ipq)) 188 self.env.set() 189 self.sendvalue(conn,ip,item) 190 conn.close() 191 def create(self): 192 while True: 193 self.env.wait() 194 try: 195 ip = self.ipqli.pop() 196 log.debug("create %s"%ip) 197 t=threading.Thread(target=self.makevalue,args=(ip,)) 198 t.start() 199 except Exception as a: 200 log.debug(str(a)) 201 if not self.ipqli: 202 self.env.clear() 203 204 def makevalue(self,ip): 205 restime_name = "restime_deque"+"".join(ip).replace(".","") 206 pkloss_name = "pkloss_deque"+"".join(ip).replace(".","") 207 restime_ipq = globals()[restime_name] 208 pkloss_ipq = globals()[pkloss_name] 209 obj = Ping(ip) 210 while len(restime_ipq) < 30 or len(pkloss_ipq) <30: 211 restime,pkloss,use_time=obj.ping_value() 212 restime_ipq.append((restime,use_time)) 213 pkloss_ipq.append((pkloss,use_time)) 214 else: 215 del restime_ipq 216 del pkloss_ipq 217 self.ipli.remove(ip) 218 log.debug("delete ipdeque %s %s"%(restime_name,pkloss_name)) 219 def sendvalue(self,conn,ip,item): 220 fromat_ip="".join(ip).replace(".","") 221 _,tip,*arg=ip 222 restime_name = "restime_deque"+fromat_ip 223 pkloss_name = "pkloss_deque"+fromat_ip 224 ipqevent_name = "event"+fromat_ip 225 restime_ipq = globals()[restime_name] 226 pkloss_ipq = globals()[pkloss_name] 227 ipqevent = globals()[ipqevent_name] 228 mtr_dir = self.basedir+"/mtr_log/"+tip+"-"+time.strftime("%Y-%m-%d",time.localtime()) + ".log" 229 mtr_cmd = self.basedir + "/mtr.py"+" "+tip+" "+mtr_dir 230 if len(restime_ipq) < 2 and len(restime_ipq) <2: 231 ipqevent.clear() 232 try: 233 ipqevent.wait() 234 if item =="restime": 235 ret,use_time = restime_ipq.pop() 236 hisret,_=restime_ipq[-1] 237 if ret - hisret >20: 238 subprocess.Popen(mtr_cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE) 239 elif item =="pkloss": 240 ret,use_time = pkloss_ipq.pop() 241 if 100> ret >20: 242 subprocess.Popen(mtr_cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE) 243 except Exception as a: 244 ret = a 245 log.debug(str(ret)) 246 conn.sendall(str(ret).encode()) 247 248 #用户输入IP格式检查 249 class Ipcheck(): 250 def __init__(self,sip,tip,item,ping_type,inver): 251 self.sip =sip 252 self.tip=tip 253 self.item=item 254 self.type = ping_type.lower() 255 self.inver=float(inver) 256 def check(self): 257 if self.item not in ["restime","pkloss"] or self.type not in ["icmp","tcp","udp"] or self.inver<0.2: 258 return False 259 elif not self.checkipformat(): 260 return False 261 else: 262 return True 263 def check_fun(self,ip): 264 return int(ip)<256 265 def checkipformat(self): 266 try: 267 tiplist = self.tip.split(".") 268 tipformat = re.findall(r"^d+.d+.d+.d+$", self.tip) 269 if self.sip: 270 siplist = self.sip.split(".") 271 sipformat = re.findall(r"^d+.d+.d+.d+$", self.sip) 272 else: 273 siplist=[1,1,1,1] 274 sipformat=True 275 if not tipformat or not sipformat: 276 raise 277 check_ipli = tiplist+siplist 278 return self.checkiplength(check_ipli) 279 except: 280 return False 281 def checkiplength(self,check_ipli): 282 if list(itertools.filterfalse(self.check_fun, check_ipli)): 283 return False 284 else: 285 return True 286 def run(): 287 288 cmd = "python3 %s -S server"%filename 289 subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE) 290 #socket_client端,向server请求数据并返回给用户 291 def socket_client(ip,item): 292 try: 293 s=socket.socket(socket.AF_INET,socket.SOCK_STREAM) 294 s.settimeout(3) 295 s.connect(("127.0.0.1",6590)) 296 data = [ip,item] 297 data = json.dumps(data) 298 s.sendall(data.encode()) 299 ret = s.recv(1024) 300 s.close() 301 print(ret.decode()) 302 except socket.timeout as t: 303 log.debug(str(t)) 304 s.close() 305 sys.exit(0) 306 except Exception as e: 307 print("server will start") 308 log.debug(str(e)) 309 sys.exit(0) 310 if __name__ == "__main__": 311 parser = argparse.ArgumentParser(description="icmp for monitor") 312 parser.add_argument("-S",action = "store",dest="server") 313 parser.add_argument("-t",action = "store",dest="tip") 314 parser.add_argument("-s",action = "store",dest="sip") 315 parser.add_argument("-I",action="store",dest="item") 316 parser.add_argument("-i",action="store",dest="inver",default="1") 317 parser.add_argument("-T",action="store",dest="ping_type",default="icmp") 318 parser.add_argument("-p",action="store",dest="port",default="0") 319 args= parser.parse_args() 320 server_status_cmd = "ps -ef | grep "%s -S server" | grep -v grep | cut -c 9-16"%filename 321 server_status = subprocess.Popen(server_status_cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE).communicate()[0] 322 if not server_status: 323 run() 324 if args.server: 325 Server.start() 326 sys.exit(0) 327 try: 328 tip = socket.gethostbyname(args.tip) 329 sip = args.sip 330 item = args.item 331 ping_type = args.ping_type 332 port = args.port 333 inver=args.inver 334 ip=(sip,tip,ping_type,port,inver) 335 except: 336 print("format error") 337 check = Ipcheck(sip, tip, item,ping_type,inver) 338 if not check.check(): 339 print("""---------------------------Options----------------------------------- 340 -s --source ip address 341 -t --destination ip address 342 -I --item(restime/pkloss) 343 -T --type(icmp/tcp/udp default icmp) 344 -p --port(default 0) 345 -i --inver(default 1/min 0.2) 346 ---------------------------Example----------------------------------- 347 ------pingd -s 10.0.3.108 -t 10.0.0.1 -I restime -i 1 -T tcp -p 80------- 348 """) 349 sys.exit(0) 350 socket_client(ip,item)
hmoban主题是根据ripro二开的主题,极致后台体验,无插件,集成会员系统
自学咖网 » zabbix 线路质量监控自定义python模块,集成ICMP/TCP/UDP探测,批量监控线路质量自定义阈值联动mtr保存线路故障日志并发送至noc邮箱
自学咖网 » zabbix 线路质量监控自定义python模块,集成ICMP/TCP/UDP探测,批量监控线路质量自定义阈值联动mtr保存线路故障日志并发送至noc邮箱