メッセージキュー

共有メモリがすでにあるのに、なぜメッセージキューが必要なのですか?それは複数の理由によるでしょう、単純化するためにこれを複数のポイントに分割してみましょう-

  • 理解されているように、メッセージがプロセスによって受信されると、他のプロセスでは使用できなくなります。一方、共有メモリでは、データは複数のプロセスがアクセスできるようになっています。

  • 小さなメッセージ形式で通信したい場合。

  • 複数のプロセスが同時に通信する場合は、共有メモリデータを同期で保護する必要があります。

  • 共有メモリを使用した書き込みと読み取りの頻度が高いため、機能の実装は非常に複雑になります。このような場合の利用に関しては価値がありません。

  • すべてのプロセスが共有メモリにアクセスする必要はないが、共有メモリのみを必要とするプロセスが非常に少ない場合は、メッセージキューを使用して実装する方がよいでしょう。

  • 異なるデータパケットと通信する場合、たとえば、プロセスAがメッセージタイプ1をプロセスBに、メッセージタイプ10をプロセスCに、メッセージタイプ20をプロセスDに送信しているとします。この場合、メッセージキューを使用して実装する方が簡単です。指定されたメッセージタイプを1、10、20として単純化するために、以下で説明するように、0、+ ve、または–veのいずれかになります。

  • もちろん、メッセージキューの順序はFIFO(先入れ先出し)です。キューに挿入された最初のメッセージは、最初に取得されるメッセージです。

共有メモリまたはメッセージキューの使用は、アプリケーションの必要性とそれをどれだけ効果的に利用できるかによって異なります。

メッセージキューを使用した通信は、次の方法で発生する可能性があります-

  • あるプロセスによって共有メモリに書き込み、別のプロセスによって共有メモリから読み取る。私たちが知っているように、読み取りは複数のプロセスでも実行できます。

  • 異なるデータパケットを使用する1つのプロセスによって共有メモリに書き込み、複数のプロセスによって、つまりメッセージタイプごとに読み取ります。

メッセージキューに関する特定の情報を確認したら、メッセージキューをサポートするシステムコール(System V)を確認します。

メッセージキューを使用して通信を実行するには、次の手順に従います。

Step 1 −メッセージキューを作成するか、既存のメッセージキューに接続します(msgget())

Step 2 −メッセージキューへの書き込み(msgsnd())

Step 3 −メッセージキューからの読み取り(msgrcv())

Step 4 −メッセージキューで制御操作を実行します(msgctl())

それでは、上記の呼び出しの構文と特定の情報を確認しましょう。

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

int msgget(key_t key, int msgflg)

このシステムコールは、SystemVメッセージキューを作成または割り当てます。次の引数を渡す必要があります-

  • 最初の引数keyは、メッセージキューを認識します。キーは、任意の値、またはライブラリ関数ftok()から派生できる値のいずれかです。

  • 2番目の引数shmflgは、IPC_CREAT(存在しない場合はメッセージキューを作成する)やIPC_EXCL(メッセージキューを作成するためにIPC_CREATとともに使用され、メッセージキューがすでに存在する場合は呼び出しが失敗する)などの必要なメッセージキューフラグを指定します。権限も渡す必要があります。

Note −権限の詳細については、前のセクションを参照してください。

この呼び出しは、成功した場合は有効なメッセージキュー識別子(メッセージキューの以降の呼び出しに使用)を返し、失敗した場合は-1を返します。失敗の原因を知るには、errno変数またはperror()関数で確認してください。

この呼び出しに関するさまざまなエラーには、EACCESS(許可が拒否されました)、EEXIST(キューが既に存在しますが作成できません)、ENOENT(キューが存在しません)、ENOMEM(キューを作成するのに十分なメモリがありません)などがあります。

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

int msgsnd(int msgid, const void *msgp, size_t msgsz, int msgflg)

このシステムコールは、メッセージをメッセージキューに送信/追加します(SystemV)。次の引数を渡す必要があります-

  • 最初の引数msgidは、メッセージキュー、つまりメッセージキュー識別子を認識します。msgget()が成功すると、識別子の値が受信されます。

  • 2番目の引数msgpは、呼び出し元に送信されるメッセージへのポインターであり、次の形式の構造で定義されます。

struct msgbuf {
   long mtype;
   char mtext[1];
};

変数mtypeは、さまざまなメッセージタイプとの通信に使用されます。これについては、msgrcv()呼び出しで詳しく説明されています。変数mtextは、サイズがmsgsz(正の値)で指定される配列またはその他の構造体です。mtextフィールドが言及されていない場合、それはゼロサイズのメッセージと見なされ、許可されます。

  • 3番目の引数msgszは、メッセージのサイズです(メッセージはヌル文字で終了する必要があります)

  • 4番目の引数msgflgは、IPC_NOWAIT(キューにメッセージが見つからない場合はすぐに返されます)やMSG_NOERROR(msgszバイトを超える場合はメッセージテキストを切り捨てます)などの特定のフラグを示します。

この呼び出しは、成功した場合は0を返し、失敗した場合は-1を返します。失敗の原因を知るには、errno変数またはperror()関数で確認してください。

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

int msgrcv(int msgid, const void *msgp, size_t msgsz, long msgtype, int msgflg)

このシステムコールは、メッセージキュー(System V)からメッセージを取得します。次の引数を渡す必要があります-

  • 最初の引数msgidは、メッセージキュー、つまりメッセージキュー識別子を認識します。msgget()が成功すると、識別子の値が受信されます。

  • 2番目の引数msgpは、呼び出し元から受信したメッセージのポインターです。次の形式の構造で定義されます-

struct msgbuf {
   long mtype;
   char mtext[1];
};

変数mtypeは、さまざまなメッセージタイプとの通信に使用されます。変数mtextは、サイズがmsgsz(正の値)で指定される配列またはその他の構造体です。mtextフィールドが言及されていない場合、それはゼロサイズのメッセージと見なされ、許可されます。

  • 3番目の引数msgszは、受信したメッセージのサイズです(メッセージはヌル文字で終了する必要があります)

  • 4番目の引数msgtypeは、メッセージのタイプを示します-

    • If msgtype is 0 −キューで最初に受信したメッセージを読み取ります

    • If msgtype is +ve −タイプmsgtypeのキュー内の最初のメッセージを読み取ります(msgtypeが10の場合、他のタイプが最初にキューにある場合でも、タイプ10の最初のメッセージのみを読み取ります)

    • If msgtype is –ve −メッセージタイプの絶対値以下の最小タイプの最初のメッセージを読み取ります(たとえば、msgtypeが-5の場合、タイプが5未満の最初のメッセージ、つまり1から5のメッセージタイプを読み取ります)。

  • 5番目の引数msgflgは、IPC_NOWAIT(キューにメッセージが見つからない場合はすぐに返されます)やMSG_NOERROR(msgszバイトを超える場合はメッセージテキストを切り捨てます)などの特定のフラグを示します。

この呼び出しは、成功した場合はmtext配列で実際に受信したバイト数を返し、失敗した場合は-1を返します。失敗の原因を知るには、errno変数またはperror()関数で確認してください。

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

int msgctl(int msgid, int cmd, struct msqid_ds *buf)

このシステムコールは、メッセージキュー(System V)の制御操作を実行します。次の引数を渡す必要があります-

  • 最初の引数msgidは、メッセージキュー、つまりメッセージキュー識別子を認識します。msgget()が成功すると、識別子の値が受信されます。

  • 2番目の引数cmdは、メッセージキューで必要な制御操作を実行するためのコマンドです。cmdの有効な値は次のとおりです。

IPC_STAT− struct msqid_dsの各メンバーの現在の値の情報を、bufが指す渡された構造体にコピーします。このコマンドには、メッセージキューに対する読み取り権限が必要です。

IPC_SET −構造体bufが指すユーザーID、所有者のグループID、権限などを設定します。

IPC_RMID −メッセージキューをすぐに削除します。

IPC_INFO − structmsginfo型のbufが指す構造体のメッセージキューの制限とパラメータに関する情報を返します。

MSG_INFO −メッセージキューによって消費されたシステムリソースに関する情報を含むmsginfo構造体を返します。

  • 3番目の引数bufは、structmsqid_dsという名前のメッセージキュー構造体へのポインタです。この構造体の値は、cmdに従ってsetまたはgetのいずれかに使用されます。

この呼び出しは、渡されたコマンドに応じて値を返します。IPC_INFOとMSG_INFOまたはMSG_STATが成功すると、メッセージキューのインデックスまたは識別子が返されます。他の操作の場合は0が返され、失敗した場合は-1が返されます。失敗の原因を知るには、errno変数またはperror()関数で確認してください。

メッセージキューに関する基本的な情報とシステムコールを確認したので、今度はプログラムで確認します。

プログラムを見る前に説明を見てみましょう-

Step 1 − 2つのプロセスを作成します。1つはメッセージキューに送信するためのもの(msgq_send.c)で、もう1つはメッセージキューから取得するためのもの(msgq_recv.c)です。

Step 2− ftok()関数を使用してキーを作成します。このため、最初にファイルmsgq.txtが作成され、一意のキーが取得されます。

Step 3 −送信プロセスは以下を実行します。

  • ユーザーから入力された文字列を読み取ります

  • 新しい行が存在する場合は削除します

  • メッセージキューに送信します

  • 入力が終了するまでこのプロセスを繰り返します(CTRL + D)

  • 入力の終了を受信すると、プロセスの終了を示すメッセージ「end」を送信します

Step 4 −受信プロセスで、以下を実行します。

  • キューからメッセージを読み取ります
  • 出力を表示します
  • 受信したメッセージが「終了」の場合、プロセスを終了して終了します

簡単にするために、このサンプルではメッセージタイプを使用していません。また、1つのプロセスがキューに書き込み、別のプロセスがキューから読み取ります。これは必要に応じて拡張できます。つまり、理想的には1つのプロセスがキューに書き込み、複数のプロセスがキューから読み取ります。

それでは、プロセス(キューへのメッセージ送信)を確認しましょう–ファイル:msgq_send.c

/* Filename: msgq_send.c */
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <errno.h>
#include <string.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

#define PERMS 0644
struct my_msgbuf {
   long mtype;
   char mtext[200];
};

int main(void) {
   struct my_msgbuf buf;
   int msqid;
   int len;
   key_t key;
   system("touch msgq.txt");
   
   if ((key = ftok("msgq.txt", 'B')) == -1) {
      perror("ftok");
      exit(1);
   }
   
   if ((msqid = msgget(key, PERMS | IPC_CREAT)) == -1) {
      perror("msgget");
      exit(1);
   }
   printf("message queue: ready to send messages.\n");
   printf("Enter lines of text, ^D to quit:\n");
   buf.mtype = 1; /* we don't really care in this case */
   
   while(fgets(buf.mtext, sizeof buf.mtext, stdin) != NULL) {
      len = strlen(buf.mtext);
      /* remove newline at end, if it exists */
      if (buf.mtext[len-1] == '\n') buf.mtext[len-1] = '\0';
      if (msgsnd(msqid, &buf, len+1, 0) == -1) /* +1 for '\0' */
      perror("msgsnd");
   }
   strcpy(buf.mtext, "end");
   len = strlen(buf.mtext);
   if (msgsnd(msqid, &buf, len+1, 0) == -1) /* +1 for '\0' */
   perror("msgsnd");
   
   if (msgctl(msqid, IPC_RMID, NULL) == -1) {
      perror("msgctl");
      exit(1);
   }
   printf("message queue: done sending messages.\n");
   return 0;
}

コンパイルと実行のステップ

message queue: ready to send messages.
Enter lines of text, ^D to quit:
this is line 1
this is line 2
message queue: done sending messages.

以下は、メッセージ受信プロセス(キューからメッセージを取得)からのコードです–ファイル:msgq_recv.c

/* Filename: msgq_recv.c */
#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

#define PERMS 0644
struct my_msgbuf {
   long mtype;
   char mtext[200];
};

int main(void) {
   struct my_msgbuf buf;
   int msqid;
   int toend;
   key_t key;
   
   if ((key = ftok("msgq.txt", 'B')) == -1) {
      perror("ftok");
      exit(1);
   }
   
   if ((msqid = msgget(key, PERMS)) == -1) { /* connect to the queue */
      perror("msgget");
      exit(1);
   }
   printf("message queue: ready to receive messages.\n");
   
   for(;;) { /* normally receiving never ends but just to make conclusion 
             /* this program ends wuth string of end */
      if (msgrcv(msqid, &buf, sizeof(buf.mtext), 0, 0) == -1) {
         perror("msgrcv");
         exit(1);
      }
      printf("recvd: \"%s\"\n", buf.mtext);
      toend = strcmp(buf.mtext,"end");
      if (toend == 0)
      break;
   }
   printf("message queue: done receiving messages.\n");
   system("rm msgq.txt");
   return 0;
}

コンパイルと実行のステップ

message queue: ready to receive messages.
recvd: "this is line 1"
recvd: "this is line 2"
recvd: "end"
message queue: done receiving messages.