📜  消息队列

📅  最后修改于: 2021-01-18 06:41:59             🧑  作者: Mango


当我们已经有共享内存时,为什么需要消息队列?出于多种原因,为简化起见,让我们尝试将其分解为多个点-

  • 可以理解,一旦消息被某个进程接收,它将不再可用于任何其他进程。而在共享内存中,数据可供多个进程访问。

  • 如果我们想以小消息格式进行通信。

  • 当多个进程同时通信时,共享内存数据需要通过同步进行保护。

  • 使用共享内存进行写入和读取的频率很高,因此实现功能将非常复杂。在这种情况下,利用不值得。

  • 如果所有进程都不需要访问共享内存,但是很少有进程只需要共享内存,那么最好使用消息队列来实现。

  • 如果我们想与不同的数据包进行通信,则说进程A将消息类型1发送到进程B,将消息类型10发送到进程C,将消息类型20发送到进程D。在这种情况下,使用消息队列来实现比较简单。为了将给定的消息类型简化为1、10、20,可以是0或+ ve或–ve,如下所述。

  • 当然,消息队列的顺序是FIFO(先进先出)。插入队列中的第一条消息是要检索的第一条消息。

使用共享内存或消息队列取决于应用程序的需求以及如何有效地利用它。

使用消息队列的通信可以通过以下方式进行:

  • 通过一个进程写入共享内存,并通过另一个进程从共享内存中读取。我们知道,阅读也可以通过多个过程完成。

消息队列

  • 通过具有不同数据包的一个进程写入共享内存,并通过多个进程(即根据消息类型)从共享内存中读取。

多消息队列

看完消息队列上的某些信息之后,现在该检查支持消息队列的系统调用(System V)。

要使用消息队列执行通信,请执行以下步骤-

步骤1-创建消息队列或连接到已经存在的消息队列(msgget())

第2步-写入消息队列(msgsnd())

第3步-从消息队列中读取(msgrcv())

第4步-对消息队列执行控制操作(msgctl())

现在,让我们检查上述调用的语法和某些信息。

#include 
#include 
#include 

int msgget(key_t key, int msgflg)

此系统调用创建或分配系统V消息队列。以下参数需要传递-

  • 第一个参数key是识别消息队列。键可以是任意值,也可以是从库函数ftok()派生的值。

  • 第二个参数shmflg指定所需的消息队列标记,例如IPC_CREAT(如果不存在则创建消息队列)或IPC_EXCL(与IPC_CREAT一起使用来创建消息队列,如果消息队列已经存在,则调用失败)。还需要传递权限。

注意-有关权限的详细信息,请参阅前面的部分。

如果成功,此调用将返回有效的消息队列标识符(用于消息队列的进一步调用),如果失败,则返回-1。要了解失败的原因,请使用errno变量或perror()函数。

与该调用有关的各种错误包括EACCESS(拒绝权限),EEXIST(无法创建已经存在的队列),ENOENT(没有队列),ENOMEM(没有足够的内存来创建队列)等。

#include 
#include 
#include 

int msgsnd(int msgid, const void *msgp, size_t msgsz, int msgflg)

此系统调用将消息发送/追加到消息队列(系统V)。以下参数需要传递-

  • 第一个参数msgid识别消息队列,即消息队列标识符。 msgget()成功时接收到标识符值

  • 第二个参数msgp是指向发送给调用方的消息的指针,其格式如下:

struct msgbuf {
   long mtype;
   char mtext[1];
};

变量mtype用于与不同的消息类型进行通信,这在msgrcv()调用中进行了详细说明。变量mtext是一个数组或其他结构,其大小由msgsz(正值)指定。如果未提及多行文字字段,则将其视为零大小消息,这是允许的。

  • 第三个参数msgsz是消息的大小(消息应以null字符结尾)

  • 第四个参数msgflg表示某些标志,例如IPC_NOWAIT(在队列中未找到消息时立即返回或MSG_NOERROR(如果超过msgsz字节则截断消息文本),立即返回

成功时此调用将返回0,失败则返回-1。要了解失败的原因,请使用errno变量或perror()函数。

#include 
#include 
#include 

int msgrcv(int msgid, const void *msgp, size_t msgsz, long msgtype, int msgflg)

该系统调用从消息队列(系统V)中检索消息。以下参数需要传递-

  • 第一个参数msgid识别消息队列,即消息队列标识符。 msgget()成功时接收到标识符值

  • 第二个参数msgp是从调用方收到的消息的指针。它以以下形式的结构定义-

struct msgbuf {
   long mtype;
   char mtext[1];
};

变量mtype用于与不同的消息类型进行通信。变量mtext是一个数组或其他结构,其大小由msgsz(正值)指定。如果未提及多行文字字段,则将其视为零大小消息,这是允许的。

  • 第三个参数msgsz是接收到的消息的大小(消息应以null字符结尾)

  • fouth参数msgtype指示消息的类型-

    • 如果msgtype为0-读取队列中的第一个收到的消息

    • 如果msgtype为+ ve-读取类型为msgtype的队列中的第一条消息(如果msgtype为10,则仅读取类型为10的第一条消息,即使开头的队列中可能还有其他类型)

    • 如果msgtype为–ve-读取类型小于或等于消息类型的绝对值的最低类型的第一条消息(例如,如果msgtype为-5,则它读取类型小于5的第一条消息,即消息类型从1到5)

  • 第五个参数msgflg表示某些标志,例如IPC_NOWAIT(当在队列中未找到消息时立即返回或MSG_NOERROR(如果超过msgsz字节则截断消息文本)立即返回)

此调用将在成功时返回mtext数组中实际接收的字节数,在失败情况下返回-1。要了解失败的原因,请使用errno变量或perror()函数。

#include 
#include 
#include 

int msgctl(int msgid, int cmd, struct msqid_ds *buf)

该系统调用执行消息队列(系统V)的控制操作。以下参数需要传递-

  • 第一个参数msgid识别消息队列,即消息队列标识符。 msgget()成功时接收到标识符值

  • 第二个参数cmd是用于在消息队列上执行所需控制操作的命令。 cmd的有效值为-

IPC_STAT-将结构msqid_ds的每个成员的当前值的信息复制到buf指向的传递结构。此命令需要对消息队列具有读取权限。

IPC_SET-设置用户ID,所有者的组ID,结构buf指向的权限等。

IPC_RMID-立即删除消息队列。

IPC_INFO-返回有关由buf指向的结构类型为msginfo的消息队列限制和参数的信息

MSG_INFO-返回一个msginfo结构,该结构包含有关消息队列消耗的系统资源的信息。

  • 第三个参数buf是指向名为struct msqid_ds的消息队列结构的指针。此结构的值将用于按cmd设置或获取。

该调用将根据传递的命令返回值。 IPC_INFO和MSG_INFO或MSG_STAT成功返回消息队列的索引或标识符,对于其他操作返回0,在失败的情况下返回-1。要了解失败的原因,请使用errno变量或perror()函数。

看完有关消息队列的基本信息和系统调用之后,现在该检查程序了。

让我们在看程序之前先看一下描述-

步骤1-创建两个进程,一个用于发送到消息队列(msgq_send.c),另一个用于从消息队列中检索(msgq_recv.c)

第2步-使用ftok()函数创建密钥。为此,最初将创建文件msgq.txt以获取唯一密钥。

步骤3-发送过程执行以下操作。

  • 读取用户输入的字符串

  • 删除新行(如果存在)

  • 发送到消息队列

  • 重复该过程,直到输入结束(CTRL + D)

  • 收到输入结束后,发送消息“结束”以表示过程结束

步骤4-在接收过程中,执行以下操作。

  • 从队列中读取消息
  • 显示输出
  • 如果收到的消息是“结束”,请完成该过程并退出

为简化起见,我们不在此样本中使用消息类型。同样,一个进程正在写入队列,而另一个进程正在从队列读取。可以根据需要进行扩展,即理想情况下,一个进程将写入队列,而多个进程将从队列中读取。

现在,让我们检查过程(消息发送到队列中)–文件:msgq_send.c

/* Filename: msgq_send.c */
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 

#define PERMS 0644
struct my_msgbuf {
   long mtype;
   char mtext[200];
};

int main(void) {
   struct my_msgbuf buf;
   int msqid;
   int len;
   key_t key;
   system("touch msgq.txt");
   
   if ((key = ftok("msgq.txt", 'B')) == -1) {
      perror("ftok");
      exit(1);
   }
   
   if ((msqid = msgget(key, PERMS | IPC_CREAT)) == -1) {
      perror("msgget");
      exit(1);
   }
   printf("message queue: ready to send messages.\n");
   printf("Enter lines of text, ^D to quit:\n");
   buf.mtype = 1; /* we don't really care in this case */
   
   while(fgets(buf.mtext, sizeof buf.mtext, stdin) != NULL) {
      len = strlen(buf.mtext);
      /* remove newline at end, if it exists */
      if (buf.mtext[len-1] == '\n') buf.mtext[len-1] = '\0';
      if (msgsnd(msqid, &buf, len+1, 0) == -1) /* +1 for '\0' */
      perror("msgsnd");
   }
   strcpy(buf.mtext, "end");
   len = strlen(buf.mtext);
   if (msgsnd(msqid, &buf, len+1, 0) == -1) /* +1 for '\0' */
   perror("msgsnd");
   
   if (msgctl(msqid, IPC_RMID, NULL) == -1) {
      perror("msgctl");
      exit(1);
   }
   printf("message queue: done sending messages.\n");
   return 0;
}

编译和执行步骤

message queue: ready to send messages.
Enter lines of text, ^D to quit:
this is line 1
this is line 2
message queue: done sending messages.

以下是消息接收过程中的代码(从队列中检索消息)–文件:msgq_recv.c

/* Filename: msgq_recv.c */
#include 
#include 
#include 
#include 
#include 
#include 

#define PERMS 0644
struct my_msgbuf {
   long mtype;
   char mtext[200];
};

int main(void) {
   struct my_msgbuf buf;
   int msqid;
   int toend;
   key_t key;
   
   if ((key = ftok("msgq.txt", 'B')) == -1) {
      perror("ftok");
      exit(1);
   }
   
   if ((msqid = msgget(key, PERMS)) == -1) { /* connect to the queue */
      perror("msgget");
      exit(1);
   }
   printf("message queue: ready to receive messages.\n");
   
   for(;;) { /* normally receiving never ends but just to make conclusion 
             /* this program ends wuth string of end */
      if (msgrcv(msqid, &buf, sizeof(buf.mtext), 0, 0) == -1) {
         perror("msgrcv");
         exit(1);
      }
      printf("recvd: \"%s\"\n", buf.mtext);
      toend = strcmp(buf.mtext,"end");
      if (toend == 0)
      break;
   }
   printf("message queue: done receiving messages.\n");
   system("rm msgq.txt");
   return 0;
}

编译和执行步骤

message queue: ready to receive messages.
recvd: "this is line 1"
recvd: "this is line 2"
recvd: "end"
message queue: done receiving messages.