监控网络连接并保存到日志文件的Python脚本
在本文中,我们将了解如何监控网络连接并在Python保存日志文件。
此脚本的基本思想是提供有关正在运行该脚本的系统是否已连接到 Internet 连接的实时信息,并将该信息同时保存到日志文件中,记录系统何时连接断开连接时连接到互联网以及断开连接的持续时间。
这个脚本是使用Python的 socket 库制作的,在这个程序中,它用于在网络上发送或接收数据包。
日志文件:
只需在当前工作目录中创建一个日志文件来存储 Internet 连接状态。
Python
FILE = os.path.join(os.getcwd(), "networkinfo.log")
Python
def ping():
# to ping a particular PORT at an IP
# if the machine won't recieve any packets from
# the server for more than 3 seconds
# i.e no connection is
# made(machine doesn't have a live internet connection)
# part will be executed
try:
socket.setdefaulttimeout(3)
# AF_INET: address family (IPv4)
# SOCK_STREAM: type for TCP (PORT)
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
host = "8.8.8.8"
port = 53
server_address = (host, port)
# send connection request to the defined server
s.connect(server_address)
except OSError as error:
# function returning false after
# data interruption(no connection)
return False
else:
# the connection is closed after
# machine being connected
s.close()
return True
Python
def calculate_time(start, stop):
# to calculate unavailability time
difference = stop - start
seconds = float(str(difference.total_seconds()))
return str(datetime.timedelta(seconds=seconds)).split(".")[0]
Python
def first_check():
# to check if the machine already have a live internet connection
# if ping returns true
if ping():
live = "\nCONNECTION AQUIRED\n"
print(live)
connection_aquired_time = datetime.datetime.now()
aquiring_message = "connection aquired at: " + \
str(connection_aquired_time).split(".")[0]
print(aquiring_message)
# writes into the log file
with open(FILE, "a") as file:
file.write(live)
file.write(aquiring_message)
return True
# if ping reutrns false
else:
not_live = "\nCONNECTION NOT AQUIRED\n"
print(not_live)
# writes into the log file
with open(FILE, "a") as file:
file.write(not_live)
return False
Python
def main():
# MAIN
monitor_start_time = datetime.datetime.now()
# monitoring time is when the script
# started monitoring internet connection status
monitoring_date_time = "monitering started at: " + \
str(monitor_start_time).split(".")[0]
if first_check():
# if true
print(monitoring_date_time)
# monitoring will only start when
# the connection will be acquired
else:
# if false
while True:
# infinite loop to check if the connection is acquired
# will run until there is a live internet connection
if not ping():
# if connection not acquired
time.sleep(1)
else:
# if connection is acquired
first_check()
print(monitoring_date_time)
break
with open(FILE, "a") as file:
# writes into the log file
file.write("\n")
file.write(monitoring_date_time + "\n")
while True:
# FIRST WHILE, infinite loop,
# will run untill the machine is on
# or the script is manually terminated
if ping():
# if true: the loop will execute after every 5 seconds
time.sleep(5)
else:
# if false: fail message will be displayed
down_time = datetime.datetime.now()
fail_msg = "disconnected at: " + str(down_time).split(".")[0]
print(fail_msg)
with open(FILE, "a") as file:
# writes into the log file
file.write(fail_msg + "\n")
while not ping():
# infinite loop,
# will run till ping() return true
time.sleep(1)
up_time = datetime.datetime.now()
# will execute after while true is
# false (connection restored)
uptime_message = "connected again: " + str(up_time).split(".")[0]
down_time = calculate_time(down_time, up_time)
# calling time calculating
# function, printing down time
unavailablity_time = "connection was unavailable for: " + down_time
print(uptime_message)
print(unavailablity_time)
with open(FILE, "a") as file:
# log entry for connected restoration time,
# and unavailability time
file.write(uptime_message + "\n")
file.write(unavailablity_time + "\n")
Python
import os
import sys
import socket
import datetime
import time
FILE = os.path.join(os.getcwd(), "networkinfo.log")
# creating log file in the currenty directiory
# ??getcwd?? get current directory,
# os function, ??path?? to specify path
def ping():
# to ping a particular IP
try:
socket.setdefaulttimeout(3)
# if data interruption occurs for 3
# seconds, part will be executed
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# AF_INET: address family
# SOCK_STREAM: type for TCP
host = "8.8.8.8"
port = 53
server_address = (host, port)
s.connect(server_address)
except OSError as error:
return False
# function returns false value
# after data interruption
else:
s.close()
# closing the connection after the
# communication with the server is completed
return True
def calculate_time(start, stop):
# calculating unavailabilty
# time and converting it in seconds
difference = stop - start
seconds = float(str(difference.total_seconds()))
return str(datetime.timedelta(seconds=seconds)).split(".")[0]
def first_check():
# to check if the system was already
# connected to an internet connection
if ping():
# if ping returns true
live = "\nCONNECTION AQUIRED\n"
print(live)
connection_aquired_time = datetime.datetime.now()
aquiring_message = "connection aquired at: " + \
str(connection_aquired_time).split(".")[0]
print(aquiring_message)
with open(FILE, "a") as file:
# writes into the log file
file.write(live)
file.write(aquiring_message)
return True
else:
# if ping reutrns false
not_live = "\nCONNECTION NOT AQUIRED\n"
print(not_live)
with open(FILE, "a") as file:
# writes into the log file
file.write(not_live)
return False
def main():
# main function to call functions
monitor_start_time = datetime.datetime.now()
monitoring_date_time = "monitering started at: " + \
str(monitor_start_time).split(".")[0]
if first_check():
# if true
print(monitoring_date_time)
# monitoring will only start when
# the connection will be acquired
else:
# if false
while True:
# infinite loop to see if the connection is acquired
if not ping():
# if connection not acquired
time.sleep(1)
else:
# if connection is acquired
first_check()
print(monitoring_date_time)
break
with open(FILE, "a") as file:
# write into the file as a into networkinfo.log,
# "a" - append: opens file for appending,
# creates the file if it does not exist???
file.write("\n")
file.write(monitoring_date_time + "\n")
while True:
# infinite loop, as we are monetering
# the network connection till the machine runs
if ping():
# if true: the loop will execute after every 5 seconds
time.sleep(5)
else:
# if false: fail message will be displayed
down_time = datetime.datetime.now()
fail_msg = "disconnected at: " + str(down_time).split(".")[0]
print(fail_msg)
with open(FILE, "a") as file:
# writes into the log file
file.write(fail_msg + "\n")
while not ping():
# infinite loop, will run till ping() return true
time.sleep(1)
up_time = datetime.datetime.now()
# after loop breaks, connection restored
uptime_message = "connected again: " + str(up_time).split(".")[0]
down_time = calculate_time(down_time, up_time)
unavailablity_time = "connection was unavailable for: " + down_time
print(uptime_message)
print(unavailablity_time)
with open(FILE, "a") as file:
# log entry for connection restoration time,
# and unavailability time
file.write(uptime_message + "\n")
file.write(unavailablity_time + "\n")
main()
平():
使用此函数,脚本将尝试连接到定义的服务器,以检查系统是否具有实时互联网连接。此任务将使用Python的异常处理(try、except、else)来完成。
- 系统将尝试 ping 特定服务器(IP 上的端口)
- 如果机器连接失败,会执行EXCEPT语句
- 否则在系统成功连接到服务器后将关闭连接
代码:
Python
def ping():
# to ping a particular PORT at an IP
# if the machine won't recieve any packets from
# the server for more than 3 seconds
# i.e no connection is
# made(machine doesn't have a live internet connection)
# part will be executed
try:
socket.setdefaulttimeout(3)
# AF_INET: address family (IPv4)
# SOCK_STREAM: type for TCP (PORT)
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
host = "8.8.8.8"
port = 53
server_address = (host, port)
# send connection request to the defined server
s.connect(server_address)
except OSError as error:
# function returning false after
# data interruption(no connection)
return False
else:
# the connection is closed after
# machine being connected
s.close()
return True
上面程序中用到的函数
- socket.setdefaulttimeout() :它是Python的内置套接字库函数。通过将默认超时设置为 3 秒,我们指定如果超过 3 秒没有从服务器收到任何响应,则不建立连接。
- socket.socket(socket.AF_INET, socket.SOCK_STREAM): socket.socket() 用于定义连接网络中的两个节点的参数,即您的系统到特定 IP 的特定端口,以便它们可以相互通信.
- AF_INET 是一个地址族,用于接受地址类型为 v4 的 IP 作为已定义套接字将与之通信的参数
- SOCK_STREAM 是一个基于连接的协议,在这个程序中使用 TCP(传输控制协议)来接受一个端口号。作为参数
计算时间():
不可用时间是互联网连接不可用的持续时间。使用互联网连接丢失时的停机时间(停止)时间和互联网连接恢复时的正常运行时间(开始)时间计算
Python
def calculate_time(start, stop):
# to calculate unavailability time
difference = stop - start
seconds = float(str(difference.total_seconds()))
return str(datetime.timedelta(seconds=seconds)).split(".")[0]
first_check():
这个函数只会执行一次,即在脚本开始时检查系统是否已经连接到互联网连接,并将其写入日志文件。 ping()函数被调用
- 如果 ping 返回 true(机器已连接到互联网),脚本将打印“CONNECTION ACQUIRED”并将其写入日志文件。
- 如果 ping 返回 false(系统未连接到互联网),脚本将打印“CONNECTION NOT ACQUIRED”并将其写入日志文件。
Python
def first_check():
# to check if the machine already have a live internet connection
# if ping returns true
if ping():
live = "\nCONNECTION AQUIRED\n"
print(live)
connection_aquired_time = datetime.datetime.now()
aquiring_message = "connection aquired at: " + \
str(connection_aquired_time).split(".")[0]
print(aquiring_message)
# writes into the log file
with open(FILE, "a") as file:
file.write(live)
file.write(aquiring_message)
return True
# if ping reutrns false
else:
not_live = "\nCONNECTION NOT AQUIRED\n"
print(not_live)
# writes into the log file
with open(FILE, "a") as file:
file.write(not_live)
return False
主要的():
main函数,所有用户定义的程序将在其中执行,实时互联网状态将写入日志文件。
Python
def main():
# MAIN
monitor_start_time = datetime.datetime.now()
# monitoring time is when the script
# started monitoring internet connection status
monitoring_date_time = "monitering started at: " + \
str(monitor_start_time).split(".")[0]
if first_check():
# if true
print(monitoring_date_time)
# monitoring will only start when
# the connection will be acquired
else:
# if false
while True:
# infinite loop to check if the connection is acquired
# will run until there is a live internet connection
if not ping():
# if connection not acquired
time.sleep(1)
else:
# if connection is acquired
first_check()
print(monitoring_date_time)
break
with open(FILE, "a") as file:
# writes into the log file
file.write("\n")
file.write(monitoring_date_time + "\n")
while True:
# FIRST WHILE, infinite loop,
# will run untill the machine is on
# or the script is manually terminated
if ping():
# if true: the loop will execute after every 5 seconds
time.sleep(5)
else:
# if false: fail message will be displayed
down_time = datetime.datetime.now()
fail_msg = "disconnected at: " + str(down_time).split(".")[0]
print(fail_msg)
with open(FILE, "a") as file:
# writes into the log file
file.write(fail_msg + "\n")
while not ping():
# infinite loop,
# will run till ping() return true
time.sleep(1)
up_time = datetime.datetime.now()
# will execute after while true is
# false (connection restored)
uptime_message = "connected again: " + str(up_time).split(".")[0]
down_time = calculate_time(down_time, up_time)
# calling time calculating
# function, printing down time
unavailablity_time = "connection was unavailable for: " + down_time
print(uptime_message)
print(unavailablity_time)
with open(FILE, "a") as file:
# log entry for connected restoration time,
# and unavailability time
file.write(uptime_message + "\n")
file.write(unavailablity_time + "\n")
解释:
- 首先会执行first_check()(这个函数只会执行一次,即在脚本的开头)
- 如果为 true(已获得连接):监控将开始
- 如果为 false(未获得连接):else 语句将执行无限 while 循环以使用 ping()函数检查互联网连接
- 如果 ping 将返回 false,则循环将在每秒执行一次,直到 ping()函数返回 true
- 如果 ping 返回 true(已获得连接),则循环中断,并开始监控
- 二、监控网络连接,会执行第一个while语句,死循环
- 如果 ping 返回 true,则每 5 秒执行一次循环,直到返回 false
- 如果 ping 返回 false,则会打印停机时间,并且循环将在每秒执行一次,直到互联网连接恢复
- 互联网连接恢复后,将打印正常运行时间和不可用时间,并且迭代将返回到循环的开始
流程图:
下面是实现:
Python
import os
import sys
import socket
import datetime
import time
FILE = os.path.join(os.getcwd(), "networkinfo.log")
# creating log file in the currenty directiory
# ??getcwd?? get current directory,
# os function, ??path?? to specify path
def ping():
# to ping a particular IP
try:
socket.setdefaulttimeout(3)
# if data interruption occurs for 3
# seconds, part will be executed
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# AF_INET: address family
# SOCK_STREAM: type for TCP
host = "8.8.8.8"
port = 53
server_address = (host, port)
s.connect(server_address)
except OSError as error:
return False
# function returns false value
# after data interruption
else:
s.close()
# closing the connection after the
# communication with the server is completed
return True
def calculate_time(start, stop):
# calculating unavailabilty
# time and converting it in seconds
difference = stop - start
seconds = float(str(difference.total_seconds()))
return str(datetime.timedelta(seconds=seconds)).split(".")[0]
def first_check():
# to check if the system was already
# connected to an internet connection
if ping():
# if ping returns true
live = "\nCONNECTION AQUIRED\n"
print(live)
connection_aquired_time = datetime.datetime.now()
aquiring_message = "connection aquired at: " + \
str(connection_aquired_time).split(".")[0]
print(aquiring_message)
with open(FILE, "a") as file:
# writes into the log file
file.write(live)
file.write(aquiring_message)
return True
else:
# if ping reutrns false
not_live = "\nCONNECTION NOT AQUIRED\n"
print(not_live)
with open(FILE, "a") as file:
# writes into the log file
file.write(not_live)
return False
def main():
# main function to call functions
monitor_start_time = datetime.datetime.now()
monitoring_date_time = "monitering started at: " + \
str(monitor_start_time).split(".")[0]
if first_check():
# if true
print(monitoring_date_time)
# monitoring will only start when
# the connection will be acquired
else:
# if false
while True:
# infinite loop to see if the connection is acquired
if not ping():
# if connection not acquired
time.sleep(1)
else:
# if connection is acquired
first_check()
print(monitoring_date_time)
break
with open(FILE, "a") as file:
# write into the file as a into networkinfo.log,
# "a" - append: opens file for appending,
# creates the file if it does not exist???
file.write("\n")
file.write(monitoring_date_time + "\n")
while True:
# infinite loop, as we are monetering
# the network connection till the machine runs
if ping():
# if true: the loop will execute after every 5 seconds
time.sleep(5)
else:
# if false: fail message will be displayed
down_time = datetime.datetime.now()
fail_msg = "disconnected at: " + str(down_time).split(".")[0]
print(fail_msg)
with open(FILE, "a") as file:
# writes into the log file
file.write(fail_msg + "\n")
while not ping():
# infinite loop, will run till ping() return true
time.sleep(1)
up_time = datetime.datetime.now()
# after loop breaks, connection restored
uptime_message = "connected again: " + str(up_time).split(".")[0]
down_time = calculate_time(down_time, up_time)
unavailablity_time = "connection was unavailable for: " + down_time
print(uptime_message)
print(unavailablity_time)
with open(FILE, "a") as file:
# log entry for connection restoration time,
# and unavailability time
file.write(uptime_message + "\n")
file.write(unavailablity_time + "\n")
main()
输出:
CONNECTION AQUIRED
connection aquired at: 2021-09-16 15:03:18
monitering started at: 2021-09-16 15:03:18
disconnected at: 2021-09-16 15:03:49
connected again: 2021-09-16 15:03:50
connection was unavailable for: 0:00:01