伯克利算法是一种用于分布式系统的时钟同步技术。该算法假设网络中的每个机器节点没有准确的时间源或没有UTC服务器。
算法
1)从网络中的池节点中选择一个单独的节点作为主节点。该节点是网络中的主节点,充当主节点,其余节点充当从节点。主节点是使用选举过程/领导者选举算法选择的。
2)主节点定期对从属节点执行ping操作,并使用Cristian算法在其上获取时钟时间。
下图说明了主节点如何将请求发送到从节点。
下图说明了从属节点如何发回其系统时钟给定的时间。
3)主节点计算接收到的所有时钟时间与主系统时钟本身给定的时钟时间之间的平均时差。该平均时差被添加到主机系统时钟的当前时间,并通过网络广播。
上面步骤的伪代码:
# receiving time from all slave nodes
repeat_for_all_slaves:
time_at_slave_node = receive_time_at_slave()
# calculating time difference
time_difference = time_at_master_node - time_at_slave_node
# average time difference calculation
average_time_difference = sum(all_time_differences) / number_of_slaves
synchronized_time = current_master_time + average_time_difference
# broadcasting synchronized to whole network
broadcast_time_to_all_slaves(synchronized_time)
下图说明了伯克利算法的最后一步。
改善范围
- 提高cristian算法的准确性。
- 在计算平均时差时忽略明显的异常值
- 万一主节点发生故障/损坏,必须准备好/预选辅助领导者来代替主节点,以减少由于主节点不可用而导致的停机时间。
- 主机不发送同步时间,而是广播相对的逆时差,这导致网络中遍历时间引起的等待时间减少,而从节点的计算时间却缩短了。
以下代码是可用于触发主时钟服务器的Python脚本。
# Python3 program imitating a clock server
from functools import reduce
from dateutil import parser
import threading
import datetime
import socket
import time
# datastructure used to store client address and clock data
client_data = {}
''' nested thread function used to receive
clock time from a connected client '''
def startRecieveingClockTime(connector, address):
while True:
# recieve clock time
clock_time_string = connector.recv(1024).decode()
clock_time = parser.parse(clock_time_string)
clock_time_diff = datetime.datetime.now() - \
clock_time
client_data[address] = {
"clock_time" : clock_time,
"time_difference" : clock_time_diff,
"connector" : connector
}
print("Client Data updated with: "+ str(address),
end = "\n\n")
time.sleep(5)
''' master thread function used to open portal for
accepting clients over given port '''
def startConnecting(master_server):
# fetch clock time at slaves / clients
while True:
# accepting a client / slave clock client
master_slave_connector, addr = master_server.accept()
slave_address = str(addr[0]) + ":" + str(addr[1])
print(slave_address + " got connected successfully")
current_thread = threading.Thread(
target = startRecieveingClockTime,
args = (master_slave_connector,
slave_address, ))
current_thread.start()
# subroutine function used to fetch average clock difference
def getAverageClockDiff():
current_client_data = client_data.copy()
time_difference_list = list(client['time_difference']
for client_addr, client
in client_data.items())
sum_of_clock_difference = sum(time_difference_list, \
datetime.timedelta(0, 0))
average_clock_difference = sum_of_clock_difference \
/ len(client_data)
return average_clock_difference
''' master sync thread function used to generate
cycles of clock synchronization in the network '''
def synchronizeAllClocks():
while True:
print("New synchroniztion cycle started.")
print("Number of clients to be synchronized: " + \
str(len(client_data)))
if len(client_data) > 0:
average_clock_difference = getAverageClockDiff()
for client_addr, client in client_data.items():
try:
synchronized_time = \
datetime.datetime.now() + \
average_clock_difference
client['connector'].send(str(
synchronized_time).encode())
except Exception as e:
print("Something went wrong while " + \
"sending synchronized time " + \
"through " + str(client_addr))
else :
print("No client data." + \
" Synchronization not applicable.")
print("\n\n")
time.sleep(5)
# function used to initiate the Clock Server / Master Node
def initiateClockServer(port = 8080):
master_server = socket.socket()
master_server.setsockopt(socket.SOL_SOCKET,
socket.SO_REUSEADDR, 1)
print("Socket at master node created successfully\n")
master_server.bind(('', port))
# Start listening to requests
master_server.listen(10)
print("Clock server started...\n")
# start making connections
print("Starting to make connections...\n")
master_thread = threading.Thread(
target = startConnecting,
args = (master_server, ))
master_thread.start()
# start synchroniztion
print("Starting synchronization parallely...\n")
sync_thread = threading.Thread(
target = synchronizeAllClocks,
args = ())
sync_thread.start()
# Driver function
if __name__ == '__main__':
# Trigger the Clock Server
initiateClockServer(port = 8080)
输出:
New synchroniztion cycle started.
Number of clients to be synchronized: 3
Client Data updated with: 127.0.0.1:57284
Client Data updated with: 127.0.0.1:57274
Client Data updated with: 127.0.0.1:57272
以下代码是可用于触发从属/客户端的Python脚本。
# Python3 program imitating a client process
from timeit import default_timer as timer
from dateutil import parser
import threading
import datetime
import socket
import time
# client thread function used to send time at client side
def startSendingTime(slave_client):
while True:
# provide server with clock time at the client
slave_client.send(str(
datetime.datetime.now()).encode())
print("Recent time sent successfully",
end = "\n\n")
time.sleep(5)
# client thread function used to receive synchronized time
def startReceivingTime(slave_client):
while True:
# receive data from the server
Synchronized_time = parser.parse(
slave_client.recv(1024).decode())
print("Synchronized time at the client is: " + \
str(Synchronized_time),
end = "\n\n")
# function used to Synchronize client process time
def initiateSlaveClient(port = 8080):
slave_client = socket.socket()
# connect to the clock server on local computer
slave_client.connect(('127.0.0.1', port))
# start sending time to server
print("Starting to receive time from server\n")
send_time_thread = threading.Thread(
target = startSendingTime,
args = (slave_client, ))
send_time_thread.start()
# start recieving synchronized from server
print("Starting to recieving " + \
"synchronized time from server\n")
receive_time_thread = threading.Thread(
target = startReceivingTime,
args = (slave_client, ))
receive_time_thread.start()
# Driver function
if __name__ == '__main__':
# initialize the Slave / Client
initiateSlaveClient(port = 8080)
输出:
Recent time sent successfully
Synchronized time at the client is: 2018-11-23 18:49:31.166449
下面是上述Python脚本的运行时屏幕截图,其中左上方的控制台代表主线程,而其他控制台代表从属线程。
Note: The scripts above closely depicts working of Berkley’s Algorithm but may differ from the actual implementation of the algorithm in production based distributed networking systems. Availability of port 8080 is machine dependent. In case port 8080 is not free, change the port number accordingly in both master and slave scripts.
参考:
- https://zh.wikipedia.org/wiki/伯克利算法
- https://www.geeksforgeeks.org/socket-programming-multi-threading-python/
- https://www.geeksforgeeks.org/cristians-algorithm/