Vectorization / SIMD kullanarak C# ile Power Iteration algoritmasını hızlandıralım

Aynı anda birden fazla iş yapmak istediğimizde akla ilk gelen birden fazla işlemci çekirdeği veya GPU kullanmak olabilir. Fakat, bunu daha az maliyet ile çözmenin yolları bulunuyor. Çok basitleştirirsek bir işlemci çekirdeği kendisine gelen komutu ve komutun parametresini sırasıyla işlemektedir. Arka arkaya 4 sayıyı toplamak istersek 3 adet toplama komutunu iletmemiz gerekecektir. Fakat, işlemcimiz toplama için 2 değil 4 adet parametre sunsaydı, bu işi tek bir komutla halletmiş olacaktık. Bir de bunu iç çarpım (dot product) gibi bir çok alt işlem gerektiren hesaplamalar için düşünün. Bu zaten epey eskiden düşünülmüş ve işlemcilerin komut setlerine eklenmiş. Bunlara kısaca SIMD (Single Instruction, Multiple Data) adı verilmiş. Bizim açımızdan en popüler olanları SSE ve AVX olsa gerek. Bu arkadaşları .NET ile de kullanabiliyorduk aslında ama bir şeyleri dışarıdan temin etmek veya unsafe kodlama yapmak gerekiyordu. .NET Core ile beraber "Hardware Intrinsics" adıyla framework'e dahil oldular. Ama ilk versiyonlarının kullanımı o kadar da anlaşılır ve kolay değildi ve işlemciye özel kod yazmak gerekiyordu. Günümüzde işler biraz daha generic ilerliyor. Bu konuyla ilgili Umut Özel'in bahsettiğim ilk dönemden kalan harika yazısı https://www.umutozel.com/simd-netcore size bilmeniz gereken tüm temel bilgiyi sağlayacaktır.

Bu yazıda daha önce örneklediğim Power Iteration algoritmasını vektörleştireceğiz. Fakat, o yazıda genel olarak double veri tipini kullanmıştım. Bu yazıda bunu float (single) olarak değiştireceğiz. floatile hesaplarımızın hassaslığı düşecek ama tek bir işleme sığdırabildiğimiz vektör sayısı da artacak. Kod ve performans karşılaştırmalarında önceki kodların float a dönüştürülmüş hallerini kullanacağım.

Parçalardan başlayarak bütüne doğru ilerleyelim. Bir vektörün uzunluğunu bulduğumuz şu kod ile başlayalım:

private static float VectorNorm(float[] vector)
{
    return MathF.Sqrt(vector.Sum(static t => t * t));
}

Optimize edilmiş hali,

public static float VectorNorm(float[] vector)
{
    var vectors = MemoryMarshal.Cast<float, Vector<float>>(vector);
    var simdLength = Vector<float>.Count;
    var sumVector = Vector<float>.Zero;

    foreach (var v in vectors)
    {
        sumVector += Vector.Multiply(v, v);
    }

    var sum = Vector.Sum(sumVector);

    for (var i = vectors.Length * simdLength; i < vector.Length; i++)
    {
        sum += vector[i] * vector[i];
    }

    return MathF.Sqrt(sum);
}

İlk satırda göreceğiniz MemoryMarshal.Cast görece yeni bir metot. Daha önce bir diziyi vektöre çevirirken donanımdaki boyutuna göre parçalayarak küçük vektörler oluşturmak gerekiyordu. Bu arkadaş oldukça hızlı bir şekilde vektörlerden oluşan bir Span dönüyor. Span'ları kabaca dizilere benzetebilirsiniz. Yine de cihazın sağladığı vektör boyutunu bilmemiz gerekiyor. Bu sayıyı Vector<T>.Count ile öğrenebiliyoruz. Bu sayı bizim girdi vektörümüz (array) için tam bölünmüyor olabilir. Bu durumda artıklar oluşacaktır. Bu artıkları eski tarz halletmek gerekiyor (probleme göre sağdan etkisiz elemanlar doldurabilirsiniz). Çok az oldukları için bir önemi yok. Yine burada System.Numericsaltındaki Vector<T>yapısını kullandık, imkanları bir tık az olsa da zira benzer performansı çok daha az kod ile alabiliyoruz.

Bu ikisini yarıştıracak olursak sonuçlarımız şöyle oluyor:

|         Method |      N |         Mean |     Error |    StdDev | Allocated |
|--------------- |------- |-------------:|----------:|----------:|----------:|
|     VectorNorm |   1000 |   6,062.6 ns |   2.56 ns |   2.39 ns |      32 B |
| VectorNormSIMD |   1000 |     113.7 ns |   0.08 ns |   0.07 ns |         - |
|     VectorNorm |  10000 |  60,368.9 ns |  24.40 ns |  21.63 ns |      32 B |
| VectorNormSIMD |  10000 |   1,265.8 ns |   3.27 ns |   2.89 ns |         - |
|     VectorNorm | 100000 | 606,348.0 ns | 266.55 ns | 236.29 ns |      32 B |
| VectorNormSIMD | 100000 |  14,307.9 ns |  58.53 ns |  51.88 ns |         - |

Değerlerden de anlaşılacağı üzere, her iki yöntem de O(n) olarak çalışıyor. Ama SIMD kullandığımızda ciddi bir performans artışı görüyoruz.

Sıra diğer yardımcı metodumuz olan NormalizeVector de.

private static float[] NormalizeVector(float[] vector)
{
    var norm = VectorNorm(vector);
    return vector.Select(c => c / norm).ToArray();
}
    public static float[] NormalizeVector(float[] vector)
    {
        var norm = VectorNorm(vector);
        var simdLength =  Vector<float>.Count;
        var divider = new Vector<float>(norm);
        var vectors = MemoryMarshal.Cast<float, Vector<float>>(vector);
        var result = new float[vector.Length];
        var resultsVecSpan = MemoryMarshal.Cast<float, Vector<float>>(result.AsSpan());

        for (var i = 0; i < vectors.Length; i++)
        {
            resultsVecSpan[i] = Vector.Divide(vectors[i], divider);
        }

        for (var i = vectors.Length * simdLength; i < vector.Length; i++)
        {
            result[i] += vector[i] / norm;
        }

        return result;
    }

Oldukça benzer bir işlem yapıyoruz. Şu an için Vektör / Scaler bir işlem yapamadığımız için, divider adında bir vektör oluşturup tüm elemanlarını norm olacak şekilde değiştiriyorum. Bölme işlemi geriye yeni vektör döndüğü için sonucu oluştururken bir sıkıntımız oluyor. Bunu çözmek için resultsVecSpan'ı oluşturuyoruz kendisi bir float dizisini sanki bir Vector<float>gibi ele almamızı ve ekstra bir kopyalama işlemi olmaksızın doğrudan ilgili bellek bölgesine sonucu yazmamızı sağlıyor.

Benzer adımları vektör ile matris çarpımı için uyguluyoruz.

private static float[] MultiplyMatrixByVector(float[,] matrix, float[] vector)
    {
        var noRows = matrix.GetLength(0);
        var noCols = matrix.GetLength(1);

        var simdLength = Vector<float>.Count;
        var result = new float[vector.Length];

        Parallel.For(0, noRows, rowIndex =>
        {
            var matrixSpan = MemoryMarshal.CreateReadOnlySpan(ref Unsafe.As<byte, float>(ref MemoryMarshal.GetArrayDataReference(matrix)), matrix.Length);

            var resultsVecSpan = MemoryMarshal.Cast<float, Vector<float>>(result.AsSpan());
            var rowSpan = matrixSpan.Slice(rowIndex * noRows, noRows);
            var rowVectorSpan = MemoryMarshal.Cast<float, Vector<float>>(rowSpan);
            var vectorSpan = MemoryMarshal.Cast<float, Vector<float>>(vector.AsSpan());

            var j = 0;
            for (; j < noCols / simdLength - 1; j++)
            {
                result[rowIndex] += Vector.Dot(rowVectorSpan[j], vectorSpan[j]);
            }

            for (j *= simdLength; j < noCols; j++)
            {
                result[rowIndex] += matrix[rowIndex, j] * vector[j];
            }
        });
        return result;
    }

Burada iki değişik olay var. İlki Parallel.For kullandık. SIMD'in multithread ile ilgisi olmadığını unutmamak gerekiyor. Şayet kodu threadlere yayabiliyorsak bunu neden yapmayalım? Burada bir örneğini görebilirsiniz. Tek dikkat etmemiz gereken, farklı threadlerin aynı yere yazmaya çalışmamaları olacaktır. Diğer konu ise MemoryMarshal.CreateReadOnlySpan(ref Unsafe.As<byte, float>(ref MemoryMarshal.GetArrayDataReference(matrix)), matrix.Length) satırında. 2 boyutlu bir dizi doğrudan span olarak ele alacak bir şey henüz .NET'e eklenmedi. Bunu doğrudan fixed kullanarak çözebiliyoruz ama ben unsafe örnek vermek istemedim. Tabii buradaki Unsafe.As da unsafe ama benim kastettiğimi anladınız. Bu sayede matrisin satırlarına sanki birer vektörmüş gibi erişebileceğiz. Aksi durumda satır * sutün bir işlem yapmamız gerekecekti.

İterasyon yaptığmız kısımda ise türler dışında bir değişiklik yok.

   private static float[] PowerIteration(float[,] matrix, int maxIterations = 5000)
    {
        var n = matrix.GetLength(0);
        var vector = Enumerable.Range(0, n).Select(static _ => (float)Random.Shared.NextDouble()).ToArray();
        for (var iteration = 0; iteration < maxIterations; iteration++)
        {
            var newVector = MultiplyMatrixByVector(matrix, vector);
            newVector = NormalizeVector(newVector);
            if (newVector.Zip(vector).Sum(v => Math.Abs(v.First - v.Second)) < 0.0000001)
            {
                vector = newVector;
                break;
            }

            vector = newVector;
        }

        return vector;
    }

Evet tüm bu işleri yaptık ve ne elde ettik?

|               Method |     N |         Mean |      Error |     StdDev |   Gen0 |  Allocated |
|--------------------- |------ |-------------:|-----------:|-----------:|-------:|-----------:|
|     B_PowerIteration |  1000 |    91.503 ms |  0.9628 ms |  0.9006 ms |      - |   273.7 KB |
| B_PowerIterationSIMD |  1000 |     3.931 ms |  0.0321 ms |  0.0300 ms | 7.8125 |  455.85 KB |
|     B_PowerIteration | 10000 | 9,100.998 ms | 80.9557 ms | 75.7261 ms |      - |  2630.7 KB |
| B_PowerIterationSIMD | 10000 |   362.975 ms |  7.1253 ms | 11.9047 ms |      - | 2812.47 KB |

10000 * 10000 bir kare matrisin öz vektörünü almak 9sn'den 362ms'e düşmüş durumda. Tabii bu optimizasyonda biraz multithread de ekledik. Ama asıl farkı ortaya koyan SIMD oldu.

Kodun tamamı ise şöyle:

using System.Numerics;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;

public static class PowerIteration
{
    private static float[] MultiplyMatrixByVector(float[,] matrix, float[] vector)
    {
        var noRows = matrix.GetLength(0);
        var noCols = matrix.GetLength(1);

        var simdLength = Vector<float>.Count;
        var result = new float[vector.Length];

        Parallel.For(0, noRows, rowIndex =>
        {
            var matrixSpan = MemoryMarshal.CreateReadOnlySpan(ref Unsafe.As<byte, float>(ref MemoryMarshal.GetArrayDataReference(matrix)), matrix.Length);

            var resultsVecSpan = MemoryMarshal.Cast<float, Vector<float>>(result.AsSpan());
            var rowSpan = matrixSpan.Slice(rowIndex * noRows, noRows);
            var rowVectorSpan = MemoryMarshal.Cast<float, Vector<float>>(rowSpan);
            var vectorSpan = MemoryMarshal.Cast<float, Vector<float>>(vector.AsSpan());

            var j = 0;
            for (; j < noCols / simdLength - 1; j++)
            {
                result[rowIndex] += Vector.Dot(rowVectorSpan[j], vectorSpan[j]);
            }

            for (j *= simdLength; j < noCols; j++)
            {
                result[rowIndex] += matrix[rowIndex, j] * vector[j];
            }
        });
        return result;
    }

    private static float VectorNorm(float[] vector)
    {
        var vectors = MemoryMarshal.Cast<float, Vector<float>>(vector);
        var simdLength = Vector<float>.Count;
        var sumVector = Vector<float>.Zero;

        foreach (var v in vectors)
        {
            sumVector += Vector.Multiply(v, v);
        }

        var sum = Vector.Sum(sumVector);

        for (var i = vectors.Length * simdLength; i < vector.Length; i++)
        {
            sum += vector[i] * vector[i];
        }

        return MathF.Sqrt(sum);
    }

    private static float[] NormalizeVector(float[] vector)
    {
        var norm = VectorNorm(vector);
        var simdLength =  Vector<float>.Count;
        var divider = new Vector<float>(norm);
        var vectors = MemoryMarshal.Cast<float, Vector<float>>(vector);
        var result = new float[vector.Length];
        var resultsVecSpan = MemoryMarshal.Cast<float, Vector<float>>(result.AsSpan());

        for (var i = 0; i < vectors.Length; i++)
        {
            resultsVecSpan[i] = Vector.Divide(vectors[i], divider);
        }

        for (var i = vectors.Length * simdLength; i < vector.Length; i++)
        {
            result[i] += vector[i] / norm;
        }

        return result;
    }

    public static float[] Run(float[,] matrix, int maxIterations = 5000)
    {
        var n = matrix.GetLength(0);
        var vector = Enumerable.Range(0, n).Select(static _ => (float)Random.Shared.NextDouble()).ToArray();
        for (var iteration = 0; iteration < maxIterations; iteration++)
        {
            var newVector = MultiplyMatrixByVector(matrix, vector);
            newVector = NormalizeVector(newVector);
            if (newVector.Zip(vector).Sum(v => Math.Abs(v.First - v.Second)) < 0.0000001)
            {
                vector = newVector;
                break;
            }

            vector = newVector;
        }

        return vector;
    }
}

Bir cevap yazın

E-posta hesabınız yayımlanmayacak. Gerekli alanlar * ile işaretlenmişlerdir