使用 C/C++ 中的套接字编程通过多线程处理服务器上的多个客户端
本教程假设读者具有套接字编程的基本知识,即熟悉基本的服务器和客户端模型。在基本模型中,服务器一次只处理一个客户端,如果要开发任何可扩展的服务器模型,这是一个很大的假设。处理多个客户端的简单方法是为每个连接到服务器的新客户端生成一个新线程。
信号量:信号量只是一个非负的变量,在线程之间共享。该变量用于解决临界区问题并在多处理环境中实现进程同步。
sem_post: sem_post() 增加(解锁)sem 指向的信号量。如果信号量的值因此变得大于零,则在 sem_wait(3) 调用中阻塞的另一个进程或线程将被唤醒并继续锁定信号量。
#include
int sem_post(sem_t *sem);
sem_wait: sem_wait() 递减(锁定)sem 指向的信号量。如果信号量的值大于零,则递减继续,函数立即返回。如果信号量当前的值为零,则调用将阻塞,直到可以执行递减(即,信号量值上升到零以上),或者信号处理程序中断调用为止。
#include
int sem_wait(sem_t *sem);
在本文中,Reader-Writers 算法是在服务器端实现的。
实现:对于服务端,创建两个不同的线程;一个读者线程和一个作者线程。首先,声明一个serverSocket ,一个整数,一个变量来保存 socket函数的返回值。
int serverSocket = socket(domain, type, protocol);
- serverSocket:套接字描述符,一个整数(如文件句柄)。
- domain:整数,通信域,例如,AF_INET(IPv4 协议)、AF_INET6(IPv6 协议)。
- 类型:通信类型。
- SOCK_STREAM : TCP(可靠,面向连接)。
- SOCK_DGRAM :UDP(不可靠,无连接)。
- protocol : Internet 协议 (IP) 的协议值,它是 0。这与出现在数据包 IP 标头中的协议字段上的数字相同。(有关详细信息,请参阅 man 协议)。
然后,在初始化所有必要的变量后绑定套接字。
bind:创建socket后,bind函数将socket绑定到addr (自定义数据结构)中指定的地址和端口号。在示例代码中,我们将服务器绑定到本地主机,因此 INADDR_ANY 用于指定 IP 地址。
int bind(int sockfd, const struct sockaddr *addr,
socklen_t addrlen);
侦听:它将服务器套接字置于被动模式,等待客户端接近服务器以建立连接。积压定义了 sockfd 的挂起连接队列可以增长到的最大长度。如果连接请求在队列已满时到达,客户端可能会收到带有 ECONNREFUSED 指示的错误。
int listen(int sockfd, int backlog);
本文中用到的更多连接函数,请参考这篇C中socket编程的文章。
方法:
- 接受到所需端口的连接后,从客户端接收一个整数,该整数定义了读取或写入的选择。选项 1 表示读者,而选项 2 表示作者。
- 成功接收数据后,调用pthread_create创建reader线程和writer线程。
- 成功连接到服务器客户端后,要求用户输入选择变量。
- 从用户那里得到选择后,客户端然后将此选择发送到服务器,通过为请求创建客户端线程来调用读取器或写入器线程。
下面是上述方法的实现:
服务器端代码:
C
// C program for the Server Side
// inet_addr
#include
// For threading, link with lpthread
#include
#include
#include
#include
#include
#include
#include
// Semaphore variables
sem_t x, y;
pthread_t tid;
pthread_t writerthreads[100];
pthread_t readerthreads[100];
int readercount = 0;
// Reader Function
void* reader(void* param)
{
// Lock the semaphore
sem_wait(&x);
readercount++;
if (readercount == 1)
sem_wait(&y);
// Unlock the semaphore
sem_post(&x);
printf("\n%d reader is inside",
readercount);
sleep(5);
// Lock the semaphore
sem_wait(&x);
readercount--;
if (readercount == 0) {
sem_post(&y);
}
// Lock the semaphore
sem_post(&x);
printf("\n%d Reader is leaving",
readercount + 1);
pthread_exit(NULL);
}
// Writer Function
void* writer(void* param)
{
printf("\nWriter is trying to enter");
// Lock the semaphore
sem_wait(&y);
printf("\nWriter has entered");
// Unlock the semaphore
sem_post(&y);
printf("\nWriter is leaving");
pthread_exit(NULL);
}
// Driver Code
int main()
{
// Initialize variables
int serverSocket, newSocket;
struct sockaddr_in serverAddr;
struct sockaddr_storage serverStorage;
socklen_t addr_size;
sem_init(&x, 0, 1);
sem_init(&y, 0, 1);
serverSocket = socket(AF_INET, SOCK_STREAM, 0);
serverAddr.sin_addr.s_addr = INADDR_ANY;
serverAddr.sin_family = AF_INET;
serverAddr.sin_port = htons(8989);
// Bind the socket to the
// address and port number.
bind(serverSocket,
(struct sockaddr*)&serverAddr,
sizeof(serverAddr));
// Listen on the socket,
// with 40 max connection
// requests queued
if (listen(serverSocket, 50) == 0)
printf("Listening\n");
else
printf("Error\n");
// Array for thread
pthread_t tid[60];
int i = 0;
while (1) {
addr_size = sizeof(serverStorage);
// Extract the first
// connection in the queue
newSocket = accept(serverSocket,
(struct sockaddr*)&serverStorage,
&addr_size);
int choice = 0;
recv(newSocket,
&choice, sizeof(choice), 0);
if (choice == 1) {
// Creater readers thread
if (pthread_create(&readerthreads[i++], NULL,
reader, &newSocket)
!= 0)
// Error in creating thread
printf("Failed to create thread\n");
}
else if (choice == 2) {
// Create writers thread
if (pthread_create(&writerthreads[i++], NULL,
writer, &newSocket)
!= 0)
// Error in creating thread
printf("Failed to create thread\n");
}
if (i >= 50) {
// Update i
i = 0;
while (i < 50) {
// Suspend execution of
// the calling thread
// until the target
// thread terminates
pthread_join(writerthreads[i++],
NULL);
pthread_join(readerthreads[i++],
NULL);
}
// Update i
i = 0;
}
}
return 0;
}
C
// C program for the Client Side
#include
#include
#include
#include
// inet_addr
#include
#include
// For threading, link with lpthread
#include
#include
// Function to send data to
// server socket.
void* clienthread(void* args)
{
int client_request = *((int*)args);
int network_socket;
// Create a stream socket
network_socket = socket(AF_INET,
SOCK_STREAM, 0);
// Initialise port number and address
struct sockaddr_in server_address;
server_address.sin_family = AF_INET;
server_address.sin_addr.s_addr = INADDR_ANY;
server_address.sin_port = htons(8989);
// Initiate a socket connection
int connection_status = connect(network_socket,
(struct sockaddr*)&server_address,
sizeof(server_address));
// Check for connection error
if (connection_status < 0) {
puts("Error\n");
return 0;
}
printf("Connection estabilished\n");
// Send data to the socket
send(network_socket, &client_request,
sizeof(client_request), 0);
// Close the connection
close(network_socket);
pthread_exit(NULL);
return 0;
}
// Driver Code
int main()
{
printf("1. Read\n");
printf("2. Write\n");
// Input
int choice;
scanf("%d", &choice);
pthread_t tid;
// Create connection
// depending on the input
switch (choice) {
case 1: {
int client_request = 1;
// Create thread
pthread_create(&tid, NULL,
clienthread,
&client_request);
sleep(20);
break;
}
case 2: {
int client_request = 2;
// Create thread
pthread_create(&tid, NULL,
clienthread,
&client_request);
sleep(20);
break;
}
default:
printf("Invalid Input\n");
break;
}
// Suspend execution of
// calling thread
pthread_join(tid, NULL);
}
客户端代码:
C
// C program for the Client Side
#include
#include
#include
#include
// inet_addr
#include
#include
// For threading, link with lpthread
#include
#include
// Function to send data to
// server socket.
void* clienthread(void* args)
{
int client_request = *((int*)args);
int network_socket;
// Create a stream socket
network_socket = socket(AF_INET,
SOCK_STREAM, 0);
// Initialise port number and address
struct sockaddr_in server_address;
server_address.sin_family = AF_INET;
server_address.sin_addr.s_addr = INADDR_ANY;
server_address.sin_port = htons(8989);
// Initiate a socket connection
int connection_status = connect(network_socket,
(struct sockaddr*)&server_address,
sizeof(server_address));
// Check for connection error
if (connection_status < 0) {
puts("Error\n");
return 0;
}
printf("Connection estabilished\n");
// Send data to the socket
send(network_socket, &client_request,
sizeof(client_request), 0);
// Close the connection
close(network_socket);
pthread_exit(NULL);
return 0;
}
// Driver Code
int main()
{
printf("1. Read\n");
printf("2. Write\n");
// Input
int choice;
scanf("%d", &choice);
pthread_t tid;
// Create connection
// depending on the input
switch (choice) {
case 1: {
int client_request = 1;
// Create thread
pthread_create(&tid, NULL,
clienthread,
&client_request);
sleep(20);
break;
}
case 2: {
int client_request = 2;
// Create thread
pthread_create(&tid, NULL,
clienthread,
&client_request);
sleep(20);
break;
}
default:
printf("Invalid Input\n");
break;
}
// Suspend execution of
// calling thread
pthread_join(tid, NULL);
}
输出: