おっさんエンジニアの備忘録

とりあえず、忘れてもいいようにやってみたことを書いています。

並列処理でいろいろ試してみた

using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace ParallelWriteFiles
{
    class Program
    {
        static void Main(string[] args)
        {
            var numbers = Enumerable.Range(1, 10000000);
            var chukSize = 5000000;
            var maxDegreeOfParallelism = 2;
            System.Diagnostics.Stopwatch sw = System.Diagnostics.Stopwatch.StartNew();

            var path = Path.Combine(Directory.GetCurrentDirectory(), $"Export.csv");
            Export(numbers, path);
            Console.WriteLine($"Export finish : {sw.ElapsedMilliseconds}ms");

            sw.Restart();

            Parallel.ForEach(numbers.Chunk(chukSize), new ParallelOptions { MaxDegreeOfParallelism = maxDegreeOfParallelism } ,(values) =>
            {
                ObjectCacheExport(values, Thread.CurrentThread.ManagedThreadId);
            });
            sw.Stop();
            Console.WriteLine($"ObjectCacheExport finish : {sw.ElapsedMilliseconds}ms");

            sw.Restart();

            Parallel.ForEach(numbers.Chunk(chukSize), new ParallelOptions { MaxDegreeOfParallelism = maxDegreeOfParallelism }, (values) =>
            {
                ObjectAndStreamCacheExport(values, Thread.CurrentThread.ManagedThreadId);
            });
            sw.Stop();
            Console.WriteLine($"ObjectAndStreamCacheExport finish : {sw.ElapsedMilliseconds}ms");

            sw.Restart();

            Parallel.ForEach(numbers.Chunk(chukSize), new ParallelOptions { MaxDegreeOfParallelism = maxDegreeOfParallelism }, (values) =>
            {
                StreamCacheExport(values, Thread.CurrentThread.ManagedThreadId);
            });
            sw.Stop();
            Console.WriteLine($"StreamCacheExport finish : {sw.ElapsedMilliseconds}ms");

            sw.Restart();

            // ※ 1,000,000行で9分という話にならない遅さ
            //Parallel.ForEach(numbers.Chunk(chukSize), new ParallelOptions { MaxDegreeOfParallelism = maxDegreeOfParallelism }, (values) =>
            //{
            //    NoCacheExport(values, Thread.CurrentThread.ManagedThreadId);
            //});
            //sw.Stop();

            //Console.WriteLine($"NoCacheExport finish : {sw.ElapsedMilliseconds}ms");

            sw.Restart();

            Parallel.ForEach(numbers.Chunk(chukSize), new ParallelOptions { MaxDegreeOfParallelism = maxDegreeOfParallelism }, (values) =>
            {
                ObjectAndStreamCacheExportAsync(values, Thread.CurrentThread.ManagedThreadId).Wait();
            });
            sw.Stop();
            Console.WriteLine($"ObjectAndStreamCacheExportAsync finish : {sw.ElapsedMilliseconds}ms");


            sw.Restart();

            Parallel.ForEach(numbers.Chunk(chukSize), new ParallelOptions { MaxDegreeOfParallelism = maxDegreeOfParallelism }, (values) =>
            {
                StreamCacheExportAsync(values, Thread.CurrentThread.ManagedThreadId).Wait();
            });
            sw.Stop();
            Console.WriteLine($"StreamCacheExportAsync finish : {sw.ElapsedMilliseconds}ms");

            Console.ReadKey();
        }

        static void ObjectCacheExport(IEnumerable<int> values, int threadid)
        {
            var path = Path.Combine(Directory.GetCurrentDirectory(), $"{threadid}_ObjectCacheExport.csv");
            var cacheSize = 500;
            var cache = new List<int>();
            foreach (var value in values)
            {
                cache.Add(value);
                if (cache.Count() > cacheSize)
                {
                    Export(cache, path);
                    cache.Clear();
                }
            }
            Export(cache, path);
        }

        static void ObjectAndStreamCacheExport(IEnumerable<int> values, int threadid)
        {
            var path = Path.Combine(Directory.GetCurrentDirectory(), $"{threadid}_ObjectAndStreamCacheExport.csv");
            var cacheSize = 500;
            using (var writer = File.AppendText(path))
            {
                var cache = new List<int>();
                foreach(var value in values){ 

                    cache.Add(value);
                    if(cache.Count() > cacheSize)
                    {
                        Export(cache, path, writer);
                        cache.Clear();
                    }
                }
                Export(cache, path, writer);
            }
        }

        static async Task ObjectAndStreamCacheExportAsync(IEnumerable<int> values, int threadid)
        {
            var path = Path.Combine(Directory.GetCurrentDirectory(), $"{threadid}_ObjectAndStreamCacheExport.csv");
            var cacheSize = 500;
            using (var writer = File.AppendText(path))
            {
                var cache = new List<int>();
                foreach (var value in values)
                {

                    cache.Add(value);
                    if (cache.Count() > cacheSize)
                    {
                        await ExportAsync(cache, path, writer);
                        cache.Clear();
                    }
                }
                await ExportAsync(cache, path, writer);
            }
        }

        static void NoCacheExport(IEnumerable<int> values, int threadid)
        {
            var path = Path.Combine(Directory.GetCurrentDirectory(), $"{threadid}_NoCacheExport.csv");
            foreach(var value in values)
            {
                using (var writer = File.AppendText(path))
                {
                    writer.WriteLine(value);
                }
            }
        }

        static void StreamCacheExport(IEnumerable<int> values, int threadid)
        {
            var path = Path.Combine(Directory.GetCurrentDirectory(), $"{threadid}_StreamCacheExport.csv");
            Export(values, path);

        }

        private static async Task StreamCacheExportAsync(IEnumerable<int> values, int threadid)
        {
            var path = Path.Combine(Directory.GetCurrentDirectory(), $"{threadid}_StreamCacheExport.csv");
            await ExportAsync(values, path);
        }

        private static async Task ExportAsync(IEnumerable<int> values, string path)
        {
            using (var writer = File.AppendText(path))
            {
                await ExportAsync(values, path, writer);
            }
        }

        private static async Task ExportAsync(IEnumerable<int> values, string path, StreamWriter writer)
        {
            foreach (var value in values)
            {
               await writer.WriteLineAsync(value.ToString());
            }
        }

        static void Export(IEnumerable<int> values, string path)
        {
            using (var writer = File.AppendText(path))
            {
                Export(values, path, writer);
            }
        }

        static void Export(IEnumerable<int> values, string path, StreamWriter writer)
        {
            foreach (var value in values)
            {
                writer.WriteLine(value);
            }
        }

    }

    internal static class EnumerableExtension
    {
        public static IEnumerable<IEnumerable<T>> Chunk<T>(this IEnumerable<T> values , int size)
        {
            var count = values.Count();
            var index = 0;
            while(count > index)
            {
                yield return values.Skip(index).Take(size);
                index += size;
            }
            
        }
    }

}

実行結果

1,000,000行、1000行ずつ分割、2並列
Export finish : 628ms
ObjectCacheExport finish : 4364ms
ObjectAndStreamCacheExport finish : 3207ms
StreamCacheExport finish : 3146ms
ObjectAndStreamCacheExportAsync finish : 7508ms
StreamCacheExportAsync finish : 7238ms

1,000,000行、1000行ずつ分割、4並列
Export finish : 500ms
ObjectCacheExport finish : 3680ms
ObjectAndStreamCacheExport finish : 2549ms
StreamCacheExport finish : 2600ms
ObjectAndStreamCacheExportAsync finish : 7415ms
StreamCacheExportAsync finish : 7304ms

1,000,000行、100,000行ずつ分割、2並列
Export finish : 1031ms
ObjectCacheExport finish : 2204ms
ObjectAndStreamCacheExport finish : 617ms
StreamCacheExport finish : 836ms
ObjectAndStreamCacheExportAsync finish : 5976ms
StreamCacheExportAsync finish : 4978ms

1,000,000行、100,000行ずつ分割、4並列
Export finish : 679ms
ObjectCacheExport finish : 1367ms
ObjectAndStreamCacheExport finish : 491ms
StreamCacheExport finish : 275ms
ObjectAndStreamCacheExportAsync finish : 4164ms
StreamCacheExportAsync finish : 3040ms

1,000,000行、100,000行ずつ分割、4並列
Export finish : 679ms
ObjectCacheExport finish : 1367ms
ObjectAndStreamCacheExport finish : 491ms
StreamCacheExport finish : 275ms
ObjectAndStreamCacheExportAsync finish : 4164ms
StreamCacheExportAsync finish : 3040ms

1,000,000行、500,000行ずつ分割、4並列
Export finish : 482ms
ObjectCacheExport finish : 1718ms
ObjectAndStreamCacheExport finish : 400ms
StreamCacheExport finish : 301ms
ObjectAndStreamCacheExportAsync finish : 3613ms
StreamCacheExportAsync finish : 2849ms

10,000,000行、100,000行ずつ分割、2並列
Export(並列なし、Streamキャッシュあり) finish : 3966ms
ObjectCacheExport finish : 18095ms
ObjectAndStreamCacheExport finish : 7080ms
StreamCacheExport finish : 7076ms
ObjectAndStreamCacheExportAsync finish : 60079ms
StreamCacheExportAsync finish : 60633ms
遅い原因:メモリが枯渇してGCが発生して遅い

10,000,000行、100,000行ずつ分割、2並列
Export(並列なし、Streamキャッシュあり) finish : 3966ms
ObjectCacheExport finish : 18095ms
ObjectAndStreamCacheExport finish : 7080ms
StreamCacheExport finish : 7076ms
ObjectAndStreamCacheExportAsync finish : 60079ms
StreamCacheExportAsync finish : 60633ms
遅い原因:メモリが枯渇してGCが発生して遅い

10,000,000行、500,000行ずつ分割、2並列
Export finish : 3834ms
ObjectCacheExport finish : 21722ms
ObjectAndStreamCacheExport finish : 4005ms
StreamCacheExport finish : 4035ms
ObjectAndStreamCacheExportAsync finish : 51959ms
StreamCacheExportAsync finish : 53595ms
遅い原因:メモリが枯渇してGCが発生して遅い