蛮子哥 蛮子哥
首页
  • linux
  • windows
  • 中间件
  • 监控
  • 网络
  • 存储
  • 安全
  • 防火墙
  • 数据库
  • 系统
  • docker
  • 运维工具
  • other
  • elk
  • K8S
  • ansible
  • Jenkins
  • GitLabCI_CD
  • 随笔
  • 面试
  • 工具
  • 收藏夹
  • Shell
  • python
  • golang
友链
  • 索引

    • 分类
    • 标签
    • 归档
    • 首页 (opens new window)
    • 关于我 (opens new window)
    • 图床 (opens new window)
    • 评论 (opens new window)
    • 导航栏 (opens new window)
周刊
GitHub (opens new window)

蛮子哥

业精于勤,荒于嬉
首页
  • linux
  • windows
  • 中间件
  • 监控
  • 网络
  • 存储
  • 安全
  • 防火墙
  • 数据库
  • 系统
  • docker
  • 运维工具
  • other
  • elk
  • K8S
  • ansible
  • Jenkins
  • GitLabCI_CD
  • 随笔
  • 面试
  • 工具
  • 收藏夹
  • Shell
  • python
  • golang
友链
  • 索引

    • 分类
    • 标签
    • 归档
    • 首页 (opens new window)
    • 关于我 (opens new window)
    • 图床 (opens new window)
    • 评论 (opens new window)
    • 导航栏 (opens new window)
周刊
GitHub (opens new window)
  • linux

  • windows

  • 中间件

  • 网络

  • 安全

  • 存储

  • 防火墙

  • 数据库

  • 系统

  • docker

  • other

  • 监控

    • zabbix

    • prometheus

    • 自定义

      • 可视化ping监控:基于Python+InfluxDB+Grafana的实践
        • 前言
          • 一、痛点:传统网络监控的不足
          • 二、解决方案架构设计
          • 三、安装influxdb
          • 四、完整代码
          • 3.1 目录结果
          • 3.2 dst_ip.json文件内容格式参考
          • 3.3 监控脚本
          • 五、安装grafana并添加influxdb数据源
          • 5.1 安装grafana
          • 5.2 添加influxdb数据源
  • 运维
  • 监控
  • 自定义
蛮子哥
2026-01-28
目录

可视化ping监控:基于Python+InfluxDB+Grafana的实践

# 前言

作为运维工程师或网络管理员,你是否经常需要监控公司网络中各节点的连通性和延迟情况?

今天分享一个python+influxdb+grafana的轻量级网络监控方案,能够自动Ping你想要监控的目标IP并将丢包和延时结果可视化展示。

# 一、痛点:传统网络监控的不足

在日常网络运维中,我们经常需要: 监控各办公点、机房、云服务器的网络质量 实时了解网络延迟和丢包情况 历史数据追溯,分析网络趋势 传统的手动Ping或简单脚本存在以下问题: 数据无法持久化存储 缺乏可视化展示 无法多目标并发监控 没有历史趋势分析

# 二、解决方案架构设计

Python监控脚本 → InfluxDB时序数据库 → Grafana可视化展示

工作流程:

  • Python脚本并发Ping所有目标IP

  • 结果写入InfluxDB时序数据库

  • Grafana读取数据并生成监控图表

我下面的案例中 python脚本、influxdb、grafana 都部署在一台机器上

# 三、安装influxdb

以centos为例:

# 下载RPM包  
wget https://dl.influxdata.com/influxdb/releases/influxdb-1.8.10.x86_64.rpm  
# yum安装influxdb  
yum install influxdb-1.8.10.x86_64.rpm  
# 启动influxdb  
systemctl start influxdb.service  
systemctl enable influxdb.service
1
2
3
4
5
6
7

# 四、完整代码

# 3.1 目录结果

├── dst_ip.json  #需要监控的目标IP  
└── ping.py      #ping脚本
1
2

# 3.2 dst_ip.json文件内容格式参考

[  
  {"ISP":"阿里云","IP":"223.5.5.5"},  
  {"ISP":"腾讯云","IP":"119.29.29.29"},  
  {"ISP":"百度云","IP":"180.76.76.76"},  
  {"ISP":"上海电信","IP":"202.96.209.5"},  
  {"ISP":"北京电信","IP":"202.96.199.133"}  
]
1
2
3
4
5
6
7

优势:增删监控目标无需修改代码,只需更新配置文件

# 3.3 监控脚本

import time  
import os  
import threading  
import json  
from ping3 import ping  
from influxdb import InfluxDBClient  
from influxdb.exceptions import InfluxDBClientError  
  
# =============== 配置变量(可修改) ===============  
MONITORING_SOURCE = "office"      #监控源名称  
INFLUXDB_HOST = '127.0.0.1'   #InfluxDB服务器地址  
INFLUXDB_PORT = 8086              #InfluxDB服务器端口  
INFLUXDB_TIMEOUT = 3              #InfluxDB连接超时时间  
DATABASE_NAME = 'icmp_Monitor'    #InfluxDB数据库名称  
IP_CONFIG_FILE = 'dst_ip.json'    # IP列表配置文件  
PING_TIMEOUT = 1  # 秒            # Ping超时时间  
MAIN_LOOP_INTERVAL = 5  # 主循环间隔(秒),即每隔5秒执行一次ping  
THREAD_JOIN_TIMEOUT = 2  # 线程等待超时(秒)  
# ==============================================  
  
  
# 创建InfluxDB连接  
def create_influx_client():  
    try:  
        client = InfluxDBClient(  
            host=INFLUXDB_HOST,  
            port=INFLUXDB_PORT,  
            timeout=INFLUXDB_TIMEOUT  
        )  
  
        # 检查数据库是否存在,不存在则创建  
        databases = client.get_list_database()  
        database_exists = False  
  
        for db in databases:  
            if db['name'] == DATABASE_NAME:  
                database_exists = True  
                break  
  
        if not database_exists:  
            print(f"数据库 '{DATABASE_NAME}' 不存在,正在创建...")  
            client.create_database(DATABASE_NAME)  
            print(f"数据库 '{DATABASE_NAME}' 创建成功")  
  
        client.switch_database(DATABASE_NAME)  
        return client  
    except Exception as e:  
        print(f"创建InfluxDB连接失败: {e}")  
        return None  
  
  
def get_iplist():  
    current_dir = os.getcwd()  
    ip_file = os.path.join(current_dir, IP_CONFIG_FILE)  
    try:  
        with open(ip_file, encoding='utf-8') as f:  
            iplist_data = json.load(f)  
            return iplist_data  
    except FileNotFoundError:  
        print(f"配置文件不存在: {ip_file}")  
        return []  
    except json.JSONDecodeError as e:  
        print(f"JSON解析错误: {e}")  
        return []  
    except Exception as error:  
        print(f"读取配置文件错误: {error}")  
        return []  
  
  
def write_to_influx(client, dst_isp, influx_stamp, ping_rtt, ping_loss):  
    """写入数据到InfluxDB"""  
    if client is None:  
        return False  
  
    json_body = [  
        {  
            "measurement": MONITORING_SOURCE,  
            "tags": {  
                "dst_isp": dst_isp  
            },  
            "time": influx_stamp,  
            "fields": {  
                "rrt": ping_rtt,  
                "loss": ping_loss  
            }  
        }  
    ]  
  
    try:  
        result = client.write_points(json_body)  
        if result:  
            print(f"数据成功写入InfluxDB: {dst_isp}, RTT: {ping_rtt}ms, Loss: {ping_loss}%")  
        return result  
    except InfluxDBClientError as e:  
        print(f"写入InfluxDB失败: {e}")  
        return False  
    except Exception as e:  
        print(f"写入数据异常: {e}")  
        return False  
  
  
def ping_host(dst_isp_ip, dst_isp, influx_stamp, client):  
    """ping目标主机并记录结果"""  
    try:  
        ping_result = ping(dst_isp_ip, timeout=PING_TIMEOUT, unit="ms")  
  
        if ping_result is None or ping_result is False:  
            ping_loss = 100  
            ping_rtt = 0  
            print(f'无法ping通: {dst_isp_ip} ({dst_isp}) 丢包率: {ping_loss}%')  
        else:  
            ping_loss = 0  
            ping_rtt = float(ping_result)  
            print(f'ping通: {dst_isp_ip} ({dst_isp}) 延迟: {ping_rtt:.2f}ms')  
  
        # 写入数据  
        write_to_influx(client, dst_isp, influx_stamp, ping_rtt, ping_loss)  
  
    except Exception as e:  
        print(f"ping {dst_isp_ip} 时出错: {e}")  
  
  
def main_loop():  
    """主监控循环"""  
    # 创建InfluxDB客户端  
    client = create_influx_client()  
    if client is None:  
        print("无法连接到InfluxDB,程序退出")  
        return  
  
    try:  
        while True:  
            loop_start_time = time.time()  
            influx_stamp = int(loop_start_time * 1000000000)  # 纳秒时间戳  
  
            # 获取IP列表  
            iplist = get_iplist()  
            # 创建并启动线程  
            threads = []  
            for v in iplist:  
                dst_ip = v.get('IP')  
                dst_isp = v.get('ISP')  
  
                if not dst_ip or not dst_isp:  
                    print(f"跳过无效记录: {v}")  
                    continue  
  
                t = threading.Thread(  
                    target=ping_host,  
                    args=(dst_ip, dst_isp, influx_stamp, client)  
                )  
                threads.append(t)  
                t.daemon = True  # 设置为守护线程  
                t.start()  
  
            # 等待所有线程完成  
            for t in threads:  
                t.join(timeout=THREAD_JOIN_TIMEOUT)  # 设置超时防止线程挂起  
  
            # 计算并控制循环间隔  
            loop_end_time = time.time()  
            runtime = loop_end_time - loop_start_time  
            sleep_time = max(MAIN_LOOP_INTERVAL - runtime, 0.1)  # 最少休眠0.1秒  
  
            if runtime > MAIN_LOOP_INTERVAL:  
                print(f"警告: 循环执行时间({runtime:.2f}s)超过{MAIN_LOOP_INTERVAL}秒")  
  
            time.sleep(sleep_time)  
  
    except KeyboardInterrupt:  
        print("\n收到中断信号,程序退出")  
    except Exception as e:  
        print(f"主循环异常: {e}")  
    finally:  
        # 清理资源  
        try:  
            client.close()  
        except:  
            pass  
        print("连接已关闭")  
  
  
if __name__ == '__main__':  
    main_loop()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184

# 五、安装grafana并添加influxdb数据源

# 5.1 安装grafana

grafana下载链接:https://grafana.com/grafana/download?pg=get&plcmt=selfmanaged-box1-cta1

以centos为例:  
# yum安装  
yum install -y https://dl.grafana.com/grafana-enterprise/release/12.3.1/grafana-enterprise_12.3.1_20271043721_linux_amd64.rpm  
# 启动grafana  
systemctl start grafana-server  
systemctl enable grafana-server
1
2
3
4
5
6

# 5.2 添加influxdb数据源

企业微信截图_20260128105346.png

企业微信截图_20260128105449.png

最后在使用influxdb数据源添加图形即可,得到如下效果:

企业微信截图_20260128105533.png

由于结果是存储在influxdb数据库中,所以后期也可以在编写脚本对influxdb的结果进行过滤获取丢包率并发送微信或者钉钉告警通知,如果感兴趣的同学较多,后续这个会在单独介绍

微信 支付宝
上次更新: 2026/01/28, 11:04:59

← alertmanager-webhook与API

最近更新
01
认知
03-30
02
人生三观
03-30
03
牙齿记录
03-30
更多文章>
Theme by Vdoing | Copyright © 2019-2026 | 点击查看十年之约 | 鄂ICP备2024072800号
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式