📅  最后修改于: 2023-12-03 15:01:21.712000             🧑  作者: Mango
ICMP(Internet控制消息协议,Internet Control Message Protocol)是互联网协议族的一个子协议,用于在IP数据报网络中传递错误消息和操作控制信息。IP协议本身并没有提供可靠的错误处理机制,因此ICMP协议就是用来提供这种机制的,同时它也被用于执行诊断网络故障的操作。
ICMP主要有以下功能:
ICMP报文的格式如下:
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Type | Code | Checksum |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Data |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
以下是用Python发送ICMP Ping请求的示例代码:
import os, sys
import struct
import select
import time
import socket
ICMP_ECHO_REQUEST = 8
def checksum(string):
csum = 0
countTo = (len(string) / 2) * 2
count = 0
while count < countTo:
thisVal = ord(string[count+1]) * 256 + ord(string[count])
csum = csum + thisVal
csum = csum & 0xffffffffL
count = count + 2
if countTo < len(string):
csum = csum + ord(string[len(string) - 1])
csum = csum & 0xffffffffL
csum = (csum >> 16) + (csum & 0xffff)
csum = csum + (csum >> 16)
answer = ~csum
answer = answer & 0xffff
answer = answer >> 8 | (answer << 8 & 0xff00)
return answer
def receiveOnePing(mySocket, ID, timeout, destAddr):
timeLeft = timeout
while 1:
startedSelect = time.time()
whatReady = select.select([mySocket], [], [], timeLeft)
howLongInSelect = (time.time() - startedSelect)
if not whatReady[0]:
return "Request timed out."
timeReceived = time.time()
recPacket, addr = mySocket.recvfrom(1024)
# Fetch the ICMP header from the IP packet
icmpHeader = recPacket[20:28]
type, code, checksum, packetID, sequence = struct.unpack(
"bbHHh", icmpHeader)
if packetID == ID:
bytesInDouble = struct.calcsize("d")
timeSent = struct.unpack("d", recPacket[28:28 + bytesInDouble])[0]
return timeReceived - timeSent
timeLeft = timeLeft - howLongInSelect
if timeLeft <= 0:
return "Request timed out."
def sendOnePing(mySocket, destAddr, ID):
myChecksum = 0
header = struct.pack("bbHHh", ICMP_ECHO_REQUEST, 0, myChecksum, ID, 1)
data = struct.pack("d", time.time())
myChecksum = checksum(header + data)
# Put the checksum back into the packet, and create the final packet
header = struct.pack("bbHHh", ICMP_ECHO_REQUEST, 0, socket.htons(myChecksum), ID, 1)
packet = header + data
mySocket.sendto(packet, (destAddr, 1))
def doOnePing(destAddr, timeout):
icmp = socket.getprotobyname("icmp")
# create a socket object
mySocket = socket.socket(socket.AF_INET, socket.SOCK_RAW, icmp)
myID = os.getpid() & 0xFFFF
sendOnePing(mySocket, destAddr, myID)
delay = receiveOnePing(mySocket, myID, timeout, destAddr)
mySocket.close()
return delay
def ping(host, timeout=1):
destAddr = socket.gethostbyname(host)
print("Pinging " + destAddr + " using Python:")
print("")
while True:
delay = doOnePing(destAddr, timeout)
print(delay)
time.sleep(1)
if KeyboardInterrupt:
break
return delay
这是一个简单的Ping程序,它连接到目标主机,并使用ICMP协议发送数据包。可以使用此程序测试到目标主机的连接性,并获得往返延迟时间。