Dプログラミング-並行性

並行性とは、プログラムを一度に複数のスレッドで実行することです。並行プログラムの例は、同時に多くのクライアントに応答するWebサーバーです。並行性はメッセージパッシングで簡単ですが、データ共有に基づいている場合は書くのが非常に困難です。

スレッド間で受け渡されるデータはメッセージと呼ばれます。メッセージは、任意のタイプおよび任意の数の変数で構成できます。すべてのスレッドには、メッセージの受信者を指定するために使用されるIDがあります。別のスレッドを開始するスレッドは、新しいスレッドの所有者と呼ばれます。

Dでスレッドを開始する

関数spawn()は、パラメーターとしてポインターを受け取り、その関数から新しいスレッドを開始します。その関数によって実行されるすべての操作は、それが呼び出す可能性のある他の関数を含めて、新しいスレッドで実行されます。所有者と作業者はどちらも、独立したプログラムであるかのように別々に実行を開始します。

import std.stdio; 
import std.stdio; 
import std.concurrency; 
import core.thread;
  
void worker(int a) { 
   foreach (i; 0 .. 4) { 
      Thread.sleep(1); 
      writeln("Worker Thread ",a + i); 
   } 
}

void main() { 
   foreach (i; 1 .. 4) { 
      Thread.sleep(2); 
      writeln("Main Thread ",i); 
      spawn(≈worker, i * 5); 
   }
   
   writeln("main is done.");  
}

上記のコードをコンパイルして実行すると、前のセクションで作成したファイルが読み取られ、次の結果が生成されます。

Main Thread 1 
Worker Thread 5 
Main Thread 2 
Worker Thread 6 
Worker Thread 10 
Main Thread 3 
main is done. 
Worker Thread 7 
Worker Thread 11 
Worker Thread 15 
Worker Thread 8 
Worker Thread 12 
Worker Thread 16 
Worker Thread 13
Worker Thread 17 
Worker Thread 18

Dのスレッド識別子

thisTidモジュールレベルで使用可能な変数グローバルは、常に現在のスレッドのIDです。また、spawnが呼び出されたときにthreadIdを受け取ることができます。以下に例を示します。

import std.stdio; 
import std.concurrency;  

void printTid(string tag) { 
   writefln("%s: %s, address: %s", tag, thisTid, &thisTid); 
} 
 
void worker() { 
   printTid("Worker"); 
}
  
void main() { 
   Tid myWorker = spawn(&worker); 
   
   printTid("Owner "); 
   
   writeln(myWorker); 
}

上記のコードをコンパイルして実行すると、前のセクションで作成したファイルが読み取られ、次の結果が生成されます。

Owner : Tid(std.concurrency.MessageBox), address: 10C71A59C 
Worker: Tid(std.concurrency.MessageBox), address: 10C71A59C 
Tid(std.concurrency.MessageBox)

Dでのメッセージパッシング

関数send()はメッセージを送信し、関数receiveOnly()は特定のタイプのメッセージを待ちます。後で説明するprioritySend()、receive()、およびreceiveTimeout()という名前の関数が他にもあります。

次のプログラムの所有者は、そのワーカーにint型のメッセージを送信し、double型のワーカーからのメッセージを待ちます。スレッドは、所有者が負の整数を送信するまでメッセージを送受信し続けます。以下に例を示します。

import std.stdio; 
import std.concurrency; 
import core.thread; 
import std.conv;  

void workerFunc(Tid tid) { 
   int value = 0;  
   while (value >= 0) { 
      value = receiveOnly!int(); 
      auto result = to!double(value) * 5; tid.send(result);
   }
} 
 
void main() { 
   Tid worker = spawn(&workerFunc,thisTid); 
    
   foreach (value; 5 .. 10) { 
      worker.send(value); 
      auto result = receiveOnly!double(); 
      writefln("sent: %s, received: %s", value, result); 
   }
   
   worker.send(-1); 
}

上記のコードをコンパイルして実行すると、前のセクションで作成したファイルが読み取られ、次の結果が生成されます。

sent: 5, received: 25 
sent: 6, received: 30 
sent: 7, received: 35 
sent: 8, received: 40 
sent: 9, received: 45

Dで待機するメッセージパッシング

メッセージがwaitで渡される簡単な例を以下に示します。

import std.stdio; 
import std.concurrency; 
import core.thread; 
import std.conv; 
 
void workerFunc(Tid tid) { 
   Thread.sleep(dur!("msecs")( 500 ),); 
   tid.send("hello"); 
}
  
void main() { 
   spawn(&workerFunc,thisTid);  
   writeln("Waiting for a message");  
   bool received = false;
   
   while (!received) { 
      received = receiveTimeout(dur!("msecs")( 100 ), (string message) { 
         writeln("received: ", message); 
      });

      if (!received) { 
         writeln("... no message yet"); 
      }
   } 
}

上記のコードをコンパイルして実行すると、前のセクションで作成したファイルが読み取られ、次の結果が生成されます。

Waiting for a message 
... no message yet 
... no message yet 
... no message yet 
... no message yet 
received: hello