sh1’s diary

プログラミング、読んだ本、資格試験、ゲームとか私を記録するところ

現実的なマルチスレッド (ロック) 対策

f:id:shikaku_sh:20210823173353p:plain:w300

2021 年末現在、C# におけるマルチスレッド対策はいくつかの方法があります。ニーズに合わせて使い分けることができるように、このような状態になっているのだと思いますが、パフォーマンスなどを意識した結果、誤った実装をしてしまう恐れがあります。

そんなわけで、マルチスレッド対策について勉強した内容がとても参考になったので、個人的にまとめておく内容になります。

参考元「【C#】マルチスレッド関連操作の詳説」は、とても勉強になる記事だと思います。

マルチスレッドとロックとは?

複数のスレッドを利用して動作するプログラムで、スレッド A とスレッド B から同じ変数 x に対して書き込みが発生したとします。

f:id:shikaku_sh:20211224174309p:plain:w500

図の例だと、2つのスレッド A, B で変数 x の値をインクリメントしています。それぞれ1回ずつ変数をインクリメントしているので、変数 x の値は2になってほしいのですが、結果は1です。例の場合だと、スレッド A の書き込みが上書きされてしまったため、意味のない処理になってしまっています。

そこで、スレッド A の処理の終了を待ってから、スレッド B の処理を開始するというようにして、下図のようにスレッド間の協調を取ります。

f:id:shikaku_sh:20211224174353p:plain:w500

処理の終了を待つという仕組みに利用されるのが、ロックです。

マルチスレッド処理の準備

例のようなテスト値を複数のスレッドからインクリメントするサンプルプログラムを作成してみます。

まず、実行コードがこちら。

public class Program
{
    const int _ThreadCount = 4;
    const int _TestValue = 1000000; // _ThreadCount で割れる数でテストすること
    const int _TestCount = 100;      // 実行回数(実行時間と AverageLog に影響)

    public static void Main(string[] args)
    {
        Console.WriteLine($"Run test {_TestCount:#,0} times. Info(thread count:{_ThreadCount:#,0} value:{_TestValue:#,0})");

        ThreadPool.SetMinThreads(_ThreadCount, _ThreadCount);
        ThreadPool.SetMaxThreads(_ThreadCount, _ThreadCount);

        var processes = new List<IProcess>
        {
            new NoLockedProcess(),
        };

        foreach (var process in processes)
        {
            for (int i = 0; i < _TestCount; i++)
            {
                process.Run(_ThreadCount, _TestValue, false);
            }
        }

        Console.WriteLine("--");

        foreach (var process in processes)
        {
            process.WriteAverageLog();
        }
    }
}

定義した NoLockedProcess クラスのインターフェースがこちら。Main から呼び出されているのは IProcess インターフェースのメソッドだけです。

Run メソッドは、マルチスレッド処理を実行し、実効速度を計測します。 WriteAverageLog メソッドは、Run メソッドを実行した際の計測結果を出力します。

public interface IProcess
{
    public void Run(int threadCount, int testValue, bool needLog);
    public void WriteAverageLog();
}

定義した NoLockedProcess クラスの基底(抽象)クラスがこちら。注意して読むポイントは、Tasks の部分でマルチスレッド処理を実行しているのがわかると思います。

スレッドのワーク(処理)をする部分は Increments メソッドで _Value の値をインクリメントするのですが、実装は抽象化していますので、NoLockedProcess で実装します。

public abstract class ProcessBase : IProcess
{
    protected int _Value = 0;

    private List<(int, long)> _ExecutedInfos = new ();

    public abstract string Name { get; }
    public abstract void Increment(int counts);

    public void Run(int threadCount, int testValue, bool needLog)
    {
        if (needLog)
        {
            Console.WriteLine($"Start {Name} - thread no:{Thread.CurrentThread.ManagedThreadId:D3}");
        }

        var tasks = new List<Task>();
        var loops = testValue / threadCount; // test 値を指定数のスレッドで割って、加算プロセスを実行する
        var stopWatch = new Stopwatch();

        _Value = 0;

        stopWatch.Start();

        for (int i = 0; i < threadCount; ++i)
        {
            var task = Task.Run(() => 
            {
                Increment(loops);
            });

            tasks.Add(task);
        }

        Task.WhenAll(tasks).Wait();
        stopWatch.Stop();

        // 結果を保存
        _ExecutedInfos.Add((_Value, stopWatch.ElapsedMilliseconds));

        if (needLog)
        {
            Console.WriteLine($"End {Name} - thread no:{Thread.CurrentThread.ManagedThreadId:00} value: {_Value:#,0} time: {stopWatch.ElapsedMilliseconds:#,0} ms");
        }
    }

    public void WriteAverageLog()
    {
        var averagedValue = _ExecutedInfos.Average(p => p.Item1);
        var maxValue = _ExecutedInfos.Max(p => p.Item1);
        var minValue = _ExecutedInfos.Min(p => p.Item1);
        var averagedTime = _ExecutedInfos.Average(p => p.Item2);
        var maxTime = _ExecutedInfos.Max(p => p.Item2);
        var minTime = _ExecutedInfos.Min(p => p.Item2);

        Console.WriteLine($"Average {Name} - value: {averagedValue:#,0}(Max:{maxValue:#,0} Min:{minValue:#,0}), time: {averagedTime:#,0}(Max:{maxTime:#,0} Min:{minTime:#,0})");
    }
}

このコードのやりたいことは、テスト値の回数だけ変数 _Value の値をインクリメントするけど、スレッド数で分担して処理します。テスト値が100、スレッド数が2の場合は、変数 _Value の値を各スレッドで50回ずつインクリメントします。

ロックしないマルチスレッド処理

単純に _Value の値を引数の数だけ繰り返しています。

public class NoLockedProcess : ProcessBase
{
    public override string Name => nameof(NoLockedProcess);

    public override void Increment(int counts)
    {
        for (int i = 0; i < counts; i++)
        {
            _Value += 1;
        }
    }
}

実行するとどうなるでしょうか。 試行回数 100 回 (インクリメント回数: 1,000,000 スレッド数:4)で WriteAverageLog の結果を見てみます。

Average NoLockedProcess - value: 386,931(Max:798,288 Min:250,000), time: 1(Max:8 Min:1)

一番大切なことは、 value の値が 1,000,000 になりません。なので、一番最初の図で示したように値の書き込みが正しく行われていないことがわかります。

タスク処理の気を付けないといけない怖い部分が詰まっているコードですね。

lock

一番代表的なロック制御の機構 lock を利用したコードを確認します。

マルチスレッド処理をする場合、最初に検討していいやり方になります。

public class LockProcess : ProcessBase
{
    private object _Locker = new object();
    public override string Name => nameof(LockProcess);

    public override void Increment(int counts)
    {
        for (int i = 0; i < counts; i++)
        {
            lock (_Locker)
            {
                _Value += 1;
            }
        }
    }
}

実行結果がこちら。

Average LockProcess - value: 1,000,000(Max:1,000,000 Min:1,000,000), time: 41(Max:48 Min:38)

一番大切なことは、 value の値が 1,000,000 になりました。

スレッド A の処理の終了を待ってから、スレッド B の処理を開始する、という仕組みをロックで正しく実現できていますね。

lock はとても優れた仕組みでマルチスレッド処理における書きやすさと安全面で最も優れたプランになります。

パフォーマンス面でも実行結果のとおり、それほど致命的な遅延に繋がりません。

注意すること

ロック内の箇所で操作するオブジェクトを、別スレッドからロックしていない箇所から操作してしまった場合、値の保証ができなくなるため注意が必要です。

下のコードのように、_Data は Test2 でロックせずに操作していますが、これはよくないです。

class Data
{
    public void Run();
}

class Bug
{
    private readonly object _Locker = new ();
    private Data _Data;

    public void Test1()
    {
        lock (_Locker)
        {
            _Data = new ();
        }
    }

    public void Test2()
    {
        var data = _Data;
        data.Run();
    }
}

ロックしてから処理しないとダメ。

public void Test2()
{
    Data data;

    lock (_Locker)
    {
        data = _Data;
    }

    data.Run();
}

Interlocked

Interlocked は、int, uint, long, ulong, double, float など変数の値(変数単体)を変更するときにだけロックをかけるやり方です。

public class InterlockedProcess : ProcessBase
{
    public override string Name => nameof(InterlockedProcess);

    public override void Increment(int counts)
    {
        for (int i = 0; i < counts; i++)
        {
            Interlocked.Increment(ref _Value);
        }
    }
}

lock よりもずっと負荷が小さい。注意しないといけないのは2つ以上の処理をロックすることができません。今回のようなシンプルなケースでの利用になります。

Average InterlockedProcess - value: 1,000,000(Max:1,000,000 Min:1,000,000), time: 20(Max:23 Min:17)

ReaderWriterLockSlim

書き込み処理だけをロックして、読み取り処理をロックを介さず高速化する仕組みを作るときに利用するクラスです。

lock よりも高速に実行したいニーズがあり、書き込み処理よりも読み取り処理が多数である場合に有効な仕組みです。

public class ReaderWriterLockSlimProcess : ProcessBase
{
    public ReaderWriterLockSlim _Locker = new ();
    public override string Name => nameof(ReaderWriterLockSlimProcess);

    public override void Increment(int counts)
    {
        for (int i = 0; i < counts; i++)
        {
            _Locker.EnterWriteLock();

            try
            {
                _Value += 1;
            }
            finally
            { 
                _Locker.ExitWriteLock(); 
            }
        }
    }
}

実行結果もほとんど lock と変わりません。

Average ReaderWriterLockSlimProcess - value: 1,000,000(Max:1,000,000 Min:1,000,000), time: 39(Max:71 Min:24)

テストのやり方は読み取りが多数ではないため、ReaderWriterLockSlim の高速性をチェックするテストコードにはなっていないと思います。

SemaphoreSlim

まず、Semaphoreセマフォ)という概念を把握しないとちょっとわかりづらいです。

並列処理を実行するときの並列処理数を制御する仕組みですが、並列処理する数を「1」個に設定すると、ロックをしているのと実質的には同じです。

ユースケースをよく考えて利用したいクラスです。

このあたりが転じて、ロックの中に async/await が存在していても利用可能なロック制御、つまり、ロック開始スレッドとロック終了スレッドが異なっていても動作する、といった応用的な使い方もあります。

public class SemaphoreSlimProcess : ProcessBase
{
    private SemaphoreSlim _Locker = new (1, 1);
    public override string Name => nameof(SemaphoreSlimProcess);

    public override void Increment(int counts)
    {
        for (int i = 0; i < counts; i++)
        {
            _Locker.Wait();

            try
            {
                _Value += 1;
                // Volatile.Write(ref _Value, Volatile.Read(ref _Value) + 1);
            }
            finally
            {
                _Locker.Release();
            }
        }
    }
}

実行結果は、それほど変わりません。

Average SemaphoreSlimProcess - value: 1,000,000(Max:1,000,000 Min:1,000,000), time: 53(Max:58 Min:51)

Mutex

プロセス間でのロック制御に利用するクラスです。非常に低速なので、利用には注意が必要です。

public class MutexProcess : ProcessBase
{
    private Mutex _Locker = new ();
    public override string Name => nameof(MutexProcess);

    public override void Increment(int counts)
    {
        for (int i = 0; i < counts; i++)
        {
            _Locker.WaitOne();

            try
            {
                _Value += 1;
            }
            finally
            {
                _Locker.ReleaseMutex();
            }
        }
    }
}
Average ReaderWriterLockSlimProcess - value: 1,000,000(Max:1,000,000 Min:1,000,000), time: 3,440(Max:3,703 Min:3,213)

テストコードでも、他コードが 50ms 程度で動作するのに対して 3000 ms、つまり3秒以上必要としました。単純なマルチスレッドのロック対策として採用しない必要があります。

ある意味厄介なことは、どのコードもほとんど差がありません。 なんとなく前に書いたコードの感覚からどのロック制御を採用するのか検討しようと思っても、同じようなコードなんでわかりづらいです。

それぞれのロックの違いを整理しておいたほうがいいです。

実行結果のまとめ

1M(100万)回インクリメントを4つのスレッドで分担して処理した場合の結果をまとめます。平均処理時間は前述の処理を 100 回実行した平均です。

ロックの種類 インクリメント結果の値 平均処理時間 (ms)
ロックなし 461,603 1 ms
lock 1,000,000 37 ms
Interlocked 1,000,000 20 ms
ReaderWriterLockSlim 1,000,000 40 ms
SemaphoreSlim 1,000,000 53 ms
Mutex 1,000,000 3,703 ms

ロックなしは、値を正しくインクリメントできませんでした。

f:id:shikaku_sh:20211224174851p:plain:w500

実行環境:

f:id:shikaku_sh:20211224174907p:plain:w500

補足

Volatile 修飾子

Volatile は、コンパイラの最適化によるマルチスレッド関連のバグから守るための仕組みで利用されることがあります。たとえば:

var sample = new Sample();

Task.Run(async () =>
{
    await Task.Deley(TimeSpan.FromSeconds(60)) // heavy work
    sample.IsFinished = true;
})

while (!sample.IsFinished)
{
    // do something
}
class Sample
{
    public bool IsFinished; // volatile
}

コンパイラがシングルスレッドで動作することを前提にした最適化をしてしまうため、while がいつまで経っても終了しない(終了条件無しに最適化してしまう)場合があります。こうした際に Volatile は適切です。

注意しないといけないことは、Volatile(キーワード and class)は acquire/release なメモリバリアに対応しますが、sequential consistency ではありません。Interlocked は sequential consistency です。

  • acquire メモリバリアは、acquire よりもに書かれたメモリー操作命令が、コンパイラの最適化などによってコード順序が変化されることはない
  • release メモリバリアは、release よりもに書かれたメモリー操作命令が、コンパイラの最適化などによってコード順序が変化されることはない
  • sequential consistency は、並列で動いているコードが直列で動いているように見えること(通常はこっちを使うことになってる)

コード順序を保証してくれる仕組みによって、そのスレッドのコードだけで見ると一見無駄に見えても、他のスレッドで値が更新されている可能性のある変数の最適化を回避するために利用されることになります。

ただし、acquire/release なメモリバリアであるため、単純なロジックにのみ利用することが望ましく、できるだけ Volatile キーワードではなく Volatile クラスを利用しましょう。(そして、値の書き込みは Interlocked を使うこと)

ちなみに、lock は acquire/release や sequential consistency を保証するものではなくて、スレッドがロックした部分を実行中の間、他のスレッドからアクセスできないことを保証するものです。

原子性

原子性は、それ以上に処理の分割が無いことです。

uint a;

a = 0xffffffff;

C# では、uint など 32 bit 以下のプリミティブ型+参照型は書き込み時の原子性が保証されているため、普段意識しないと思います。

GUID (128 bit) のように、ひとつのデータ型が大きい場合、0x0000ffff や 0xffff0000 のように中途半端な書き込み状態のデータを読み込んでしまう恐れがあります。これは、原子性が保証されない=処理が分割されている、ということになります。

C++ はプリミティブ型でも原子性を保証しないので、注意が必要。

なので、マルチスレッドの場合で値を操作するときは sequential consistency と原子性を保証してくれる Interlocked を使う。

サンプル

GitHub に今回のサンプルコードを公開しています。

参考