ホーム 主筆 その他ソフト その他情報 Syuhitu.org English

Windows関連

スクリーンセーバー作成法

半透明ウインドウの性能

bootfont.bin

キャビネット形式

ウインドウスタイルをいじる

Java製ソフトをServiceに登録する

イベントログにメッセージを出力する

コントロールパネルにアイコンを追加する

スクリプトによる拡張1

スクリプトによる拡張2

ガジェットの作成

大容量メモリ

メモリ搭載量の下限に挑む

スパースファイルにする

表示されるアイコンの種類を調べてみた

メモリマップIOとエラー処理

ファイルを作る順番と速度の関係

Cryptography API: Next Generationを使う

Windows 10のアクセントカラー

iSCSIディスクにバックアップを取る

サーバプロセスを分離して実装する

サーバプロセスを分離して実装する - F#

レジストリに大量に書き込む

Solaris関連

OpenGL

Solaris設定

ディレクトリの読み込み

主筆プラグイン開発

マルチスレッドでの開発

door

音を出す

Blade100の正しい虐め方

パッケージの作成

画像入出力

BMPファイル

ICOファイル

ANIファイル

JPEGファイル

減色アルゴリズム

減色アルゴリズムの並列化

その他アルゴリズムなど

自由軸回転

Base64

文字列操作

CPU利用率の取得

正規表現ライブラリ

メタボールを作る

メタボールを作る2

正規表現とNFA・DFA

C言語の構文解析

液晶ディスプレイを解体してみた

iSCSIの理論と実装

単一フォルダにファイルを沢山作る

USB-HUBのカスケード接続

SafeIntの性能

VHDファイルのフォーマット

USBメモリに書き込み続けてみた

マルチスレッドでの開発

2005年12月16日公開

最近、CPUのトレンドとしてマルチコアだの何だのといったものが取り沙汰されている。

こういったものは、ハードを作る側はどう思っているのかは知らないが、ソフトを作る側としては決して歓迎できるような潮流ではない。 理由は言うまでもなく、ソフトウェア開発が大幅に難しくなるからだ。結局、なんだかんだ言って、マルチスレッドのプログラムを作るのは大変なのだ。

と、まぁ、ここで愚痴っても仕方がない。今回はスレッド周辺の処理速度とか何とか、いろいろと考えてみた。

スレッドの生成

スレッドの歴史は非常に長く、古くは1960年代に汎用機上で行われた実装の研究にまで遡るそうだ。 そういうだけあって、スレッドの生成や同期に関する記述はかなり随所に見られる。

だからここでは、あえてスレッドの基礎知識の解説は行わないで、下記のURLを紹介するにとどめる。

それと、あとやっぱり、これも

で、とりあえず俺はこんなクラスライブラリを作っておいた。

CThreadクラスは、まぁ、JavaのThreadクラスとほぼ同じように使えるように作っておいた。だいたい、こんな感じだ。

#include "Thread.h"

class CFoo : public CThread
{
public:
  // 作成されたスレッドのエントリポイント
  void run()
  {
    // ここは新しいスレッドで実行される。
  }
};

void main()
{
  CFoo f;
  f.start();  // スレッドを構築する
}

CSemaphoreの方はもっと簡単で、単にCSemaphoreのインスタンスを作って、PとVのメソッドを呼んでやるだけだ。

#include "Semaphore.h"

void main()
{
  CSemaphore sema( 1 );

  sema.P();
  // クリティカルセクション
  sema.V();
}

とりあえず実装にはPOSIXのスレッドを使用している。だから大抵のUNIXチックな環境であれば動作すると思う。

一応、このページのプログラムの実行には下記の環境を用いた。

CPUPentium III Xeon 1GHz x 2
メモリ512MB
OS確かRedhatLinux バージョンは知らね
コンパイラgcc 2.96


スレッドの生成速度

ここからが本題。

スレッドは軽量プロセスといわれる通り、プロセスよりも短い時間で生成することができる。 (なお、俺の敬愛するSolarisでは、軽量プロセス(LWP : Light Wait Process)というものはカーネルスレッドとして実装され、ユーザスレッドである普通のスレッドとは違うものであるらしい。)

でも、いくら早いとは言っても、スレッドを生成するにはコンテキストやスタックを用意しなくてはならず、いくらかの時間がかかるのは確かだ。 だからスレッドを作るなら、作ったスレッドにある程度まとまった量の仕事をさせなければならない。

と、言うことで、まずはスレッド生成速度を測定してみることにした。

まずはこんなプログラム。

#include <stdio.h>
#include <unistd.h>
#include <time.h>
#include <stdlib.h>

#include "Thread.h"
#include "Semaphore.h"

class CFoo : public CThread
{
public:
  CFoo( CSemaphore *pS, int *pI ) :
    pSema( pS ),
    pInt( pI )
  {};
  ~CFoo(){};

  void run()
  {
    pSema->P();
    (*pInt)++;
    pSema->V();
  };

  CSemaphore *pSema;
  int *pInt;
};

int main()
{
  time_t t1;
  int cnt;
  int i;
  CSemaphore sema( 1 );
  CFoo **vFoo;

  vFoo = (CFoo**)malloc( sizeof( CFoo* ) * 100000 );
  for ( i = 0; i < 100000; i++ )
    vFoo[i] = new CFoo( &sema, &cnt );


  t1 = time( NULL );
  cnt = 0;
  for ( i = 0; i < 100000; i++ ) {
    vFoo[i]->start();
  }
  printf( "time1 = %d, cnt = %d\n", time( NULL ) - t1, cnt );


  for ( i = 0; i < 100000; i++ )
    vFoo[i]->Wait();


  t1 = time( NULL );
  cnt = 0;
  for ( i = 0; i < 100000; i++ ) {
    vFoo[i]->run();
  }
  printf( "time2 = %d, cnt = %d\n", time( NULL ) - t1, cnt );


  for ( i = 0; i < 100000; i++ )
    delete vFoo[i];
  free( vFoo );

  return 0;
}

このプログラムは、スレッドを10万個生成してcntに1を10万回加算する時間(time1)と、 スレッドを生成しないでcntに1を10万回加算する時間(time2)を測定する。

で、実行結果は

time1 = 6, cnt = 99999
time2 = 0, cnt = 100000

となった。

スレッドを生成する方は、結果を表示するまでに全部のスレッドが終了してない様だが、まぁ、細かいことは気にしない。一つだけだし。

で、つまり、少なくともこの環境ではスレッドを一つ作るのに0.6ms以上の時間がかかると言うことだ。 それと、このプログラムを実行すると同時に、一応CPU利用率の変化も測定していたのだが、その結果、スレッド生成処理を行っている間は、 二つあるCPUが両方とも80%から90%を推移していることが判った。

以上の事から判ることとしては、マルチスレッドで高速化を図るのであれば、生成したスレッドにはある程度まとまった処理をやらせて、 少なくともスレッド生成に要する時間以上は、並列処理を行わせなければならないと言うことだ。

ってまぁ、そんなことは今更言うまでも無いことなんだろうが。

スレッドを使い回す

とにかく、スレッドの生成にはある程度時間がかかることが判った。

ならばということで、今度は一度生成したスレッドを使い回すことを考えてみる。 これは基本的には、一度確保したメモリ領域を使い回す、というのと同じだ。

そういうことで、今度はこんなプログラムを作ってみた。

ここで、ちょっと解説。

まずCThreadPoolクラスは、必要に応じてスレッドを生成し、セマフォにより動きを止め、プールしておく。

そうして、CThreadPoolのrunメソッドにCDmyThreadのオブジェクトが渡されたら、CThreadPoolはセマフォを解放して、スレッドを一つだけ取り出す。

取り出されたスレッドはCDmyThreadのrunメソッド内を実行する。そして最終的にはCThreadPool内に戻ってきて、次に取り出されるまで待機する。

で、使い方は上記のCThreadとさして違わず、こんな感じ

#include "ThreadPool.h"
#include "DmyThread.h"

class CFoo : public CDmyThread
{
public:
  void run()
  { /* ここは別スレッドで実行される */ }
};

void func()
{
  CthreadPool pool;
  CFoo foo;

  pool.run( &foo );  // プールされているスレッドから一つ割り当てる
}

で、これを使って、またスレッドを10万個生成して・・・という処理をやるプログラムを作ってみた。

#include <stdio.h>
#include <unistd.h>
#include <time.h>
#include <stdlib.h>

#include "DmyThread.h"
#include "ThreadPool.h"
#include "Semaphore.h"

class CFoo : public CDmyThread
{
public:
  CFoo( CSemaphore *pS, int *pI ) :
    pSema( pS ),
    pInt( pI )
  {};
  ~CFoo(){};

  void run()
  {
    pSema->P();
    (*pInt)++;
    pSema->V();
  };

  CSemaphore *pSema;
  int *pInt;
};

int main()
{
  time_t t1;
  int cnt;
  int i;
  CSemaphore sema( 1 );
  CThreadPool ThPool;
  CFoo **vFoo;

  // 10万個のスレッドオブジェクトを構築する
  vFoo = (CFoo**)malloc( sizeof( CFoo* ) * 100000 );
  for ( i = 0; i < 100000; i++ )
    vFoo[i] = new CFoo( &sema, &cnt );

  // スレッドを生成し、処理を行う
  t1 = time( NULL );
  cnt = 0;
  for ( i = 0; i < 100000; i++ ) {
    ThPool.run( vFoo[i] );
  }

  // 終了を待ち合わせる
  for ( i = 0; i < 100000; i++ )
    vFoo[i]->Wait();

  // 結果を表示する
  printf( "time1 = %d, cnt = %d\n", time( NULL ) - t1, cnt );

  // スレッドオブジェクトを破棄する
  for ( i = 0; i < 100000; i++ )
    delete vFoo[i];
  free( vFoo );

  return 0;
}

で、その実行結果がこれ

time1 = 1, cnt = 100000

やったぞ。処理時間が6分の1に短縮されたぞ。

・・・って、やっぱりそれでもスレッドをこんなに大量に生成する理由がわからないし、意味ないよなぁ・・・。

仕事の生産と消費

気を取り直して、今度はマルチスレッドで高速化を図るための、仕事の分配方法について考えてみる。

一番単純なのは、ループを分割する方法。例えば二つに分けるのであれば、ループの繰り返しの前半分を一つのスレッドで処理して、 後ろ半分をもう一つのスレッドで処理する様にする。

この方法はOpenMPでのマルチスレッド化において想定されている方法だ。

void foo()
{
  int i;
#pragma omp parallel for
  for ( i = 0; i < 10; i++ ) {
    v[i] = 0;
  }
}

上記のようなOpenMPのコードを、CPUが二つある環境で動作させた場合、配列vの0から4までと、配列vの5から9までをそれぞれ別のスレッドで処理することになる。

この方法は、科学技術計算を行うアプリケーションにおいて、実際の数値計算を行っているような部分に対しては、結構有効に使うことができる。

ところが、なかなかこのOpenMPがうまく適用できない場合もある。例えば、このfor文が配列インデックスの整数値で回るのではなく、リストのイテレータで回るような場合にはOpenMPは使えない。

そういった場合どうするか。

俺が好きなのは、生産者がリストに「仕事」を放り込み、消費者がリストから「仕事」を取り出して処理する、という方法だ。

当然、リストに対しては良い感じで排他制御をかけてやらねばならないし、仕事がない時には「消費者」のワーキングスレッドには止まっていてもらいたい。 だから、ここでは、このリストに対して「生産者と消費者の問題」のアルゴリズムを使用する。

「生産者と消費者の問題」の詳しい説明は省略するものとして、とりあえず下記に簡単なプログラム例を示す。

#include <stdio.h>
#include <list>
#include "Semaphore.h"
#include "Thread.h"

using namespace std;

// 「仕事」一つ分の表す
class CJob
{
public:
  int JobData;
};

// 「仕事」のリストを保持・入出力を行う
class CJobList
{
public:
  CJobList() :
    semaList( 1 ),
    semaWait( 0 )
  {};

  // リストに仕事を追加する
  void push( const CJob &r )
  {
    // リストに対して排他制御を掛ける
    semaList.P();

    // リストに追加する
    listJob.push_back( r );

    // listJobに追加された分、semaWaitを加算する
    semaWait.V();

    // 排他制御を解除する
    semaList.V();
  };

  // リストから仕事を取り出す
  void pop( CJob *pJob )
  {
    // listJobにデータが追加されるまで待ち合わせる
    semaWait.P();

    // リストに対して排他制御を掛ける
    semaList.P();

    // リストから仕事を取り出す
    (*pJob) = *( listJob.begin() );
    listJob.erase( listJob.begin() );

    // 排他制御を解除する
    semaList.V();
  };

protected:

  // 「仕事」のリスト
  list< CJob > listJob;

  // listJobにアクセスを行うための排他制御用
  CSemaphore semaList;

  // listJobにデータが来るまでの待ち合わせ用
  CSemaphore semaWait;
};

// 生産者
class CProducer : public CThread
{
public:
  CProducer( CJobList *p ) :
    pJobList( p )
  {}

  void run()
  {
    CJob job;
    int i;

    for ( i = 0; i < 100; i++ ) {
      // 仕事を生産する
      job.JobData = i;

      // リストに追加する
      pJobList->push( job );
    }
  };
protected:
  CJobList *pJobList;
};

// 消費者
class CCustomer : public CThread
{
public:
  CCustomer( CJobList *pJ, CSemaphore *pS ) :
    pJobList( pJ ),
    pSemaStdout( pS )
  {}

  void run()
  {
    CJob job;
    int i;

    for ( i = 0; i < 10; i++ ) {
      // 仕事を取り出す
      pJobList->pop( &job );

      // 仕事をする
      pSemaStdout->P();
      printf(
        "Customer = 0x%08X : JobData = %d\n",
        this, job.JobData
      );
      pSemaStdout->V();
    }
  };
protected:
  CJobList *pJobList;
  CSemaphore *pSemaStdout;  // 標準出力の排他制御用
};


int main()
{
  CProducer *pProducer;       // 生産者
  CCustomer *pvCustomer[10];  // 消費者
  CJobList JobList;           // 仕事のリスト
  CSemaphore semaStdout( 1 ); // 標準出力の排他制御用
  int i;

  // 生産者および消費者を生成
  pProducer = new CProducer( &JobList );
  for ( i = 0; i < 10; i++ ) {
    pvCustomer[i] = new CCustomer( &JobList, &semaStdout );
  }

  // 生産者と消費者のスレッドを生成する
  pProducer->start();
  for ( i = 0; i < 10; i++ ) {
    pvCustomer[i]->start();
  }

  // 生産者と消費者のスレッドが終了するのを待ち合わせる
  pProducer->Wait();
  for ( i = 0; i < 10; i++ ) {
    pvCustomer[i]->Wait();
  }

  // 生産者と消費者のオブジェクトを破棄する
  delete pProducer;
  pProducer = NULL;
  for ( i = 0; i < 10; i++ ) {
    delete pvCustomer[i];
    pvCustomer[i] = NULL;
  }

  return 0;
}

このプログラムは、生産者(CProducer)が仕事のリスト(CJobList)に仕事(CJob)を100個追加し、10個の消費者(CCustomer)がそれぞれ10個ずつ仕事を消費する、というものだ。

他にもうまいやり方はいくらでもあるんだろうが、とにかく俺はこれが好きだ。

ついでに言っておくと、この方法には結構悩ましい問題がある。それは、生産や消費においてエラーが発生した場合だ。

無論、エラーが発生した場合は処理を停止しなければならないのだが、それが結構ややこしい。

生産側はただひたすら仕事をリストに押し込んでいるだけだから、止めるのはそう難しくもないのだが、困るのは消費者側だ。 消費者は上記のプログラムで言うところの「semaWait」で待ち合わせている可能性があるんだが、それをどうやって停止させるか、難しい。

つまり、スレッドを停止させるために、一旦「semaWait」で待ち合わせているのを走らせてやらなくてはいけないのだ。 かといって、不用意にsemaWait.V()としたら、間違いなく空のリストを参照して、宇宙の果てまで飛んでいくことになる。

だから仕方がない、消費者はsemaWait.P()をした後、それがリストにデータが追加されたためなのか、スレッドを停止する為だったのか、を判断するようにしなくてはならない。

それをふまえて、上記のプログラムのCJobListに停止機能を付けたバージョンを下記に示す。

// 「仕事」のリストを保持・入出力を行う
class CJobList
{
public:
  CJobList() :
    semaList( 1 ),
    semaWait( 0 ),
    semaExitFlg( 1 ),
    ExitFlg( false )
  {};

  // リストに仕事を追加する
  void push( const CJob &r )
  {
    // リストに対して排他制御を掛ける
    semaList.P();

    // リストに追加する
    listJob.push_back( r );

    // listJobに追加された分、semaWaitを加算する
    semaWait.V();

    // 排他制御を解除する
    semaList.V();
  };

  // リストから仕事を取り出す
  bool pop( CJob *pJob )
  {
    int flg;

    // listJobにデータが追加されるまで待ち合わせる
    semaWait.P();

    // ExitFlgに対する排他制御を掛ける
    semaExitFlg.P();

    // 終了フラグの値を取得する
    // 変数flgはスレッドローカルであることに注意
    flg = ExitFlg;

    // ExitFlgに対する排他制御を解除する
    semaExitFlg.V();

    if ( flg ) {
      // 割り込みが掛けられている

      // semaWaitで待っている他のスレッドが動けるようにしてやる
      semaWait.V();
      return false;
    }

    // リストに対して排他制御を掛ける
    semaList.P();

    // リストから仕事を取り出す
    (*pJob) = *( listJob.begin() );
    listJob.erase( listJob.begin() );

    // 排他制御を解除する
    semaList.V();

    return true;
  };

  // 割り込み
  void Interrupt()
  {
    // ExitFlgに対する排他制御を掛ける
    semaExitFlg.P();

    // 終了フラグを設定する
    ExitFlg = true;

    // semaWaitで待ち合わせているスレッドを走らせてやる
    semaWait.V();

    // ExitFlgに対する排他制御を解除する
    semaExitFlg.V();
  };

protected:

  // 「仕事」のリスト
  list< CJob > listJob;

  // listJobにアクセスを行うための排他制御用
  CSemaphore semaList;

  // listJobにデータが来るまでの待ち合わせ用
  CSemaphore semaWait;

  // 終了フラグ
  bool ExitFlg;

  // ExitFlgに対する排他制御用
  CSemaphore semaExitFlg;

};

で、消費者では「CJobList::pop」メソッドを呼び出した時に偽が返ってきたら、処理を終了するようにしてやればいい。

一応参考までに、生産者と消費者を無限ループにして、二秒後に停止するように修正したプログラムを公開しておく。

  • pc_test.cc

順序制御

正式な用語では何というのか俺は知らないのだが、順序制御とでも言えばいいのだろうか。

ある処理を複数に分割して、それぞれを別のスレッドに処理させたものとする。そうしたら、当然の事ながらその処理結果を使って、次の処理をやらなければならい。だがそのとき、よくあることだが、複数に分けたそれぞれの処理には順序性があり、処理結果を順序通りに受け取りたいと言うことが起こりうる。

例えば、テキストファイルを一行ずつ読み込んで、何らかの処理を行って、それぞれの処理結果を読み込んだ順番に出力するものとする。さらに、各行の変換処理にはかなりの負荷がかかるから、この部分をマルチスレッド化するものとする。

そうした場合、出力処理の部分では、マルチスレッドで行った処理の結果を、出力順に受け取りたくなるものだ。だが、各行の処理結果は、必ずしも処理を開始した順に得られるという保証はない。つまり、与えられたデータやCPU割り当ての都合などで、途中で後から来た奴に追い越されるということも、十分に起こりうる。

よって、何らかの方法で処理結果の順序性を保証してやらなければならない。

だから俺は、上の生産者と消費者の問題のプログラムを元に、処理結果の順序性を保証するためのプログラムを作った。

#include <map>
#include "Semaphore.h"

template< typename T_MSG >
class CSequencer
{
public:
  CSequencer() :
    semaJob( 1 ),
    semaWait( 0 ),
    semaExitFlg( 1 ),
    ExitFlg( false ),
    SeqNumber( 0 )
  {};

  ~CSequencer()
  {};

  // 仕事を投入する
  void push( int idx, const T_MSG &r )
  {
    std::map< int, T_MSG >::iterator itr;

    assert( NULL != this && SeqNumber <= idx );

    // mapに対して排他制御を掛ける
    semaJob.P();

    // マップに追加する
    // ただし、同一インデックスのものがすでに存在した場合は、
    // 既存のものを削除するしてから追加する
    itr = mapJob.find( idx );
    if ( itr != mapJob.end() )
      mapJob.erase( itr );
    mapJob.insert( std::pair< int, T_MSG >( idx, r ) );

    // 次に取り出すべき仕事が投入されたら、semaWaitを加算する
    if ( SeqNumber == idx )
      semaWait.V();

    // 排他制御を解除する
    semaJob.V();
  };

  // 仕事を取り出す
  bool pop( T_MSG *pJob, int *pIdx = NULL )
  {
    int flg;

    assert( NULL != this && NULL != pJob );

    // mapJobにデータが追加されるまで待ち合わせる
    semaWait.P();

    // ExitFlgに対する排他制御を掛ける
    semaExitFlg.P();

    // 終了フラグの値を取得する
    // 変数flgはスレッドローカルであることに注意
    flg = ExitFlg;

    // ExitFlgに対する排他制御を解除する
    semaExitFlg.V();

    if ( flg ) {
      // 割り込みが掛けられている

      // semaWaitで待っている他のスレッドが動けるようにしてやる
      semaWait.V();
      return false;
    }

    // マップに対して排他制御を掛ける
    semaJob.P();

    // マップから仕事を取り出す
    (*pJob) = mapJob.begin()->second;
    if ( pIdx ) (*pIdx) = mapJob.begin()->first;
    mapJob.erase( mapJob.begin() );

    // 次に期待する仕事のインデックスを加算する
    SeqNumber++;

    // マップの中に、すでに次に期待する仕事が存在する場合は、
    // 待ち合わせ用のセマフォを加算する
    if ( mapJob.size() > 0 ) {
      if ( mapJob.begin()->first == SeqNumber )
        semaWait.V();
    }

    // 排他制御を解除する
    semaJob.V();

    return true;
  };

  // 割り込み
  void Interrupt()
  {
    // ExitFlgに対する排他制御を掛ける
    semaExitFlg.P();

    // 終了フラグを設定する
    ExitFlg = true;

    // semaWaitで待ち合わせているスレッドを走らせてやる
    semaWait.V();

    // ExitFlgに対する排他制御を解除する
    semaExitFlg.V();
  };

protected:
  // 「仕事」のマップ
  std::map< int, T_MSG > mapJob;

  // 次に取り出すべき仕事
  int SeqNumber;

  // mapjobにアクセスを行うための排他制御用
  CSemaphore semaJob;

  // mapJobにデータが来るまでの待ち合わせ用
  CSemaphore semaWait;

  // 終了フラグ
  volatile bool ExitFlg;

  // ExitFlgに対する排他制御用
  CSemaphore semaExitFlg;

};

まず、各仕事には0から始まるインデックスが付与され、pushするときには、自分が行った仕事のインデックスを指定しなければならない。そして、popする際には、pushするときに指定されたインデックスの順に取り出される。

なお、ここでは簡略化のために、インデックスは0から始まるint型の数値に限定させてもらった。

内部の処理は、生産者と消費者の問題の時にやっていたこととさして違いはない。だが、このプログラムでは「次にpopするべき仕事」のインデックスを管理しており、その「次にpopするべき仕事」がpushされたときに初めて、待ち合わせ用のセマフォを解放する様にしている。

生産者と消費者の問題の時には、とにかく仕事がpushされたらその都度セマフォを解放し、すぐにpopできるようにしていたのだが、その辺りが大きく変更されている。

ごく短いプログラムなのだが、こういったロジックがないと、意外とやりたいことができなかったりする。

一定時間だけ待ち合わせ

セマフォを一定時間だけ待ち合わせることを考える。つまり、待ち合わせに制限時間を設けることを考える。

WindowsAPIであれば、WaitForSingleObject関数がタイムアウトをサポートしているため、特に何も考える必要はないのだが、 POSIXのセマフォにはこの機能の代替となるようなものが見あたらない。少なくとも俺には見つけることができなかった。

だから仕方がない。こういう風に考えてみた。

一応POSIXのセマフォにはsem_trywaitという関数がある。この関数はセマフォの確保に成功しようが失敗しようが、すぐに制御を返す仕様になっている。

だからこれを使って、指定された時間まで一定時間ごとに一回sem_trywaitを試みるようにしてみた。

プログラム的にはこんな感じ。

// 占有出来るなら占有する
// 占有出来たら真を返す
bool CSemaphore::Try()
{
  return sem_trywait( &m_Semaphore ) == 0;
}

// 最大待ち時間を指定して待ち合わせる
bool CSemaphore::WaitForTime( int ms )
{
  // 100ms単位でTryを行う。
  // 最大ms時間だけ待ち合わせる

  struct timespec req;
  int cnt = ms / 100;  // Tryを行う回数
  int i;

  // 100ms単位でTryを行う
  req.tv_sec = 0;
  req.tv_nsec = 100 * 1000 * 1000;

  for ( i = 0; i < cnt; i++ ) {
    // 100ms待ち合わせる
    if ( nanosleep( &req, NULL ) ) return false;
    // 占有を試みる
    if ( Try() ) return true;
  }

  // 100msの剰余時間だけ待ち合わせる
  req.tv_nsec = ( ms % 100 ) * 1000 * 1000;
  if ( nanosleep( &req, NULL ) ) return false;

  // 最後に一回占有を試みる
  return Try();
}

まぁ、はっきり言ってあまり効率が良いとは言い難いけど、仰々しいプログラムを書かずに実装するとなると、この辺が妥当なのではないだろうか?

でも、制限時間やセマフォを確保するまでの時間に関して高い精度が必要な場合や、大量のスレッドが待ち合わせを行うような場合には、別の方法を考える必要があるだろう。その方法は俺には判らないが。

セマフォの性能

今度はセマフォの性能について考えてみる。

一つのセマフォで待ち合わせるスレッドの数を変化させた場合、どのような時間の変化が起こるのだろうか?

と、言うことで、こういうプログラムを作って走らせてみた。

このプログラムでは、一つのセマフォで待ち合わせるスレッドの数を125から1000まで順次増加させて、セマフォに対するV操作にかかる時間を測定している。

ついで言うと、俺が使った処理系では、同時に存在できるスレッドの数は最大で1023までだったようだ。だからとりあえず、125から1000までとしておいた。 (更に言うと、Debianが入ってる別のマシンでは最大数が255だった。結構タコいのな。)

で、プログラムを実行させた結果はこうなった。

Thread Count 125 : time = 2
Thread Count 250 : time = 7
Thread Count 375 : time = 13
Thread Count 500 : time = 16
Thread Count 625 : time = 20
Thread Count 750 : time = 24
Thread Count 875 : time = 29
Thread Count 1000 : time = 31

当然といえば当然だが、性能はスレッド数の増加に伴い悪化する。しかし奇妙なのは1スレッド当たりで見たときの性能の変化だ。おそらくスレッド数が少ないときと多いときとでアルゴリズムを変化させているのだろう。

まぁ、もっとも、これの計測に使用したLinuxのバージョンは結構古いから、最近では大分向上してるのではないかと思うが。

あと、余談だが、sema_test.ccのプログラムをSolarisのマシン(俺のFire v250)で実行したところ、こういう結果になった。

Thread Count 125 : time = 0
Thread Count 250 : time = 0
Thread Count 375 : time = 0
Thread Count 500 : time = 0
Thread Count 625 : time = 0
Thread Count 750 : time = 0
Thread Count 875 : time = 0
Thread Count 1000 : time = 0

・・・早いって。をい。

と、言うことで、今度はsema_test.ccのStepを400(3.2倍)に、セマフォの解放を行う回数を4千万回(400倍)にして実行してみた。

Thread Count 400 : time = 11
Thread Count 800 : time = 12
Thread Count 1200 : time = 11
Thread Count 1600 : time = 11
Thread Count 2000 : time = 12
Thread Count 2400 : time = 11
Thread Count 2800 : time = 12
Thread Count 3200 : time = 11

むぅ。さすがは我が敬愛するSolarisだけある。純粋に処理速度が速いのみならずセマフォ解放の性能がスレッド数に依存していない。

ついでに言っておくと、Pentium III Xeon 1GHzとUltra SPARC IIIi 1GHzとでは、SPEC int 2000の結果によれば、だいたい同じかちょっとSPARCの方が早い位だ。

やはりスレッドの扱いなどは、Solarisの方が一日の長があるのだろう。



Mutex

同期オブジェクトには、セマフォ以外にもmutexという物がある。これは、機能的にはセマフォを簡略化したようなものだ。

セマフォより単純なもなのであれば、おそらくセマフォより高速なんだろう。ということで、調べてみた。

まずは、mutexを取り扱うクラスを作ってみた。

使い方はセマフォとほとんど同じだから、今更解説はしない。

テスト用のプログラムには、上記セマフォの性能を調べるのに使った奴を元に、mutexバージョンに仕立て上げたものを使った。

スレッドの増加数(Stepの値)は400、mutexの解放回数は4千万回。つまり、上記Solarisでの実験と同じ条件だ。

で、実行したら以下のようになった。

Thread Count 400 : time = 4
Thread Count 800 : time = 4
Thread Count 1200 : time = 4
Thread Count 1600 : time = 4
Thread Count 2000 : time = 4
Thread Count 2400 : time = 4
Thread Count 2800 : time = 4
Thread Count 3200 : time = 4

はやり、mutexの方が高速なようだ。どうやら、余り複雑な機能がいらない場合には、セマフォよりmutexを使った方が良いらしい。

最後に

やっぱり、複雑なマルチスレッドプログラムを作るのは大変だ。気まぐれに発生するバグ、不可思議な挙動、予測不可能な計算結果、まさにカオスだ。

プログラムをマルチスレッド化するのは、もちろん性能向上が最終目標だ。しかし、あまりにも細かく、重箱の隅をつつくような並列化をやろうとすると、どつぼにはまる。自分の身の丈に合った並列化にしておいた方が良い。

あと、ついでに言っておくが、このページで紹介しているプログラムが正しいという保証はどこにもない。とりあえず俺が実行したときには、それっぽい結果が得られた、というに過ぎない。ここに書かれた内容は、煮るなり焼くなり好きにしてもらって構わないが、使うときは十分注意して欲しい。


連絡先 - サイトマップ - 更新履歴
Copyright (C)  2000 - 2016 nabiki_t All Rights Reserved.