using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace ParallelWriteFiles
{
class Program
{
static void Main(string[] args)
{
var numbers = Enumerable.Range(1, 10000000);
var chukSize = 5000000;
var maxDegreeOfParallelism = 2;
System.Diagnostics.Stopwatch sw = System.Diagnostics.Stopwatch.StartNew();
var path = Path.Combine(Directory.GetCurrentDirectory(), $"Export.csv");
Export(numbers, path);
Console.WriteLine($"Export finish : {sw.ElapsedMilliseconds}ms");
sw.Restart();
Parallel.ForEach(numbers.Chunk(chukSize), new ParallelOptions { MaxDegreeOfParallelism = maxDegreeOfParallelism } ,(values) =>
{
ObjectCacheExport(values, Thread.CurrentThread.ManagedThreadId);
});
sw.Stop();
Console.WriteLine($"ObjectCacheExport finish : {sw.ElapsedMilliseconds}ms");
sw.Restart();
Parallel.ForEach(numbers.Chunk(chukSize), new ParallelOptions { MaxDegreeOfParallelism = maxDegreeOfParallelism }, (values) =>
{
ObjectAndStreamCacheExport(values, Thread.CurrentThread.ManagedThreadId);
});
sw.Stop();
Console.WriteLine($"ObjectAndStreamCacheExport finish : {sw.ElapsedMilliseconds}ms");
sw.Restart();
Parallel.ForEach(numbers.Chunk(chukSize), new ParallelOptions { MaxDegreeOfParallelism = maxDegreeOfParallelism }, (values) =>
{
StreamCacheExport(values, Thread.CurrentThread.ManagedThreadId);
});
sw.Stop();
Console.WriteLine($"StreamCacheExport finish : {sw.ElapsedMilliseconds}ms");
sw.Restart();
// ※ 1,000,000行で9分という話にならない遅さ
//Parallel.ForEach(numbers.Chunk(chukSize), new ParallelOptions { MaxDegreeOfParallelism = maxDegreeOfParallelism }, (values) =>
//{
// NoCacheExport(values, Thread.CurrentThread.ManagedThreadId);
//});
//sw.Stop();
//Console.WriteLine($"NoCacheExport finish : {sw.ElapsedMilliseconds}ms");
sw.Restart();
Parallel.ForEach(numbers.Chunk(chukSize), new ParallelOptions { MaxDegreeOfParallelism = maxDegreeOfParallelism }, (values) =>
{
ObjectAndStreamCacheExportAsync(values, Thread.CurrentThread.ManagedThreadId).Wait();
});
sw.Stop();
Console.WriteLine($"ObjectAndStreamCacheExportAsync finish : {sw.ElapsedMilliseconds}ms");
sw.Restart();
Parallel.ForEach(numbers.Chunk(chukSize), new ParallelOptions { MaxDegreeOfParallelism = maxDegreeOfParallelism }, (values) =>
{
StreamCacheExportAsync(values, Thread.CurrentThread.ManagedThreadId).Wait();
});
sw.Stop();
Console.WriteLine($"StreamCacheExportAsync finish : {sw.ElapsedMilliseconds}ms");
Console.ReadKey();
}
static void ObjectCacheExport(IEnumerable<int> values, int threadid)
{
var path = Path.Combine(Directory.GetCurrentDirectory(), $"{threadid}_ObjectCacheExport.csv");
var cacheSize = 500;
var cache = new List<int>();
foreach (var value in values)
{
cache.Add(value);
if (cache.Count() > cacheSize)
{
Export(cache, path);
cache.Clear();
}
}
Export(cache, path);
}
static void ObjectAndStreamCacheExport(IEnumerable<int> values, int threadid)
{
var path = Path.Combine(Directory.GetCurrentDirectory(), $"{threadid}_ObjectAndStreamCacheExport.csv");
var cacheSize = 500;
using (var writer = File.AppendText(path))
{
var cache = new List<int>();
foreach(var value in values){
cache.Add(value);
if(cache.Count() > cacheSize)
{
Export(cache, path, writer);
cache.Clear();
}
}
Export(cache, path, writer);
}
}
static async Task ObjectAndStreamCacheExportAsync(IEnumerable<int> values, int threadid)
{
var path = Path.Combine(Directory.GetCurrentDirectory(), $"{threadid}_ObjectAndStreamCacheExport.csv");
var cacheSize = 500;
using (var writer = File.AppendText(path))
{
var cache = new List<int>();
foreach (var value in values)
{
cache.Add(value);
if (cache.Count() > cacheSize)
{
await ExportAsync(cache, path, writer);
cache.Clear();
}
}
await ExportAsync(cache, path, writer);
}
}
static void NoCacheExport(IEnumerable<int> values, int threadid)
{
var path = Path.Combine(Directory.GetCurrentDirectory(), $"{threadid}_NoCacheExport.csv");
foreach(var value in values)
{
using (var writer = File.AppendText(path))
{
writer.WriteLine(value);
}
}
}
static void StreamCacheExport(IEnumerable<int> values, int threadid)
{
var path = Path.Combine(Directory.GetCurrentDirectory(), $"{threadid}_StreamCacheExport.csv");
Export(values, path);
}
private static async Task StreamCacheExportAsync(IEnumerable<int> values, int threadid)
{
var path = Path.Combine(Directory.GetCurrentDirectory(), $"{threadid}_StreamCacheExport.csv");
await ExportAsync(values, path);
}
private static async Task ExportAsync(IEnumerable<int> values, string path)
{
using (var writer = File.AppendText(path))
{
await ExportAsync(values, path, writer);
}
}
private static async Task ExportAsync(IEnumerable<int> values, string path, StreamWriter writer)
{
foreach (var value in values)
{
await writer.WriteLineAsync(value.ToString());
}
}
static void Export(IEnumerable<int> values, string path)
{
using (var writer = File.AppendText(path))
{
Export(values, path, writer);
}
}
static void Export(IEnumerable<int> values, string path, StreamWriter writer)
{
foreach (var value in values)
{
writer.WriteLine(value);
}
}
}
internal static class EnumerableExtension
{
public static IEnumerable<IEnumerable<T>> Chunk<T>(this IEnumerable<T> values , int size)
{
var count = values.Count();
var index = 0;
while(count > index)
{
yield return values.Skip(index).Take(size);
index += size;
}
}
}
}
実行結果
1,000,000行、1000行ずつ分割、2並列
Export finish : 628ms
ObjectCacheExport finish : 4364ms
ObjectAndStreamCacheExport finish : 3207ms
StreamCacheExport finish : 3146ms
ObjectAndStreamCacheExportAsync finish : 7508ms
StreamCacheExportAsync finish : 7238ms
1,000,000行、1000行ずつ分割、4並列
Export finish : 500ms
ObjectCacheExport finish : 3680ms
ObjectAndStreamCacheExport finish : 2549ms
StreamCacheExport finish : 2600ms
ObjectAndStreamCacheExportAsync finish : 7415ms
StreamCacheExportAsync finish : 7304ms
1,000,000行、100,000行ずつ分割、2並列
Export finish : 1031ms
ObjectCacheExport finish : 2204ms
ObjectAndStreamCacheExport finish : 617ms
StreamCacheExport finish : 836ms
ObjectAndStreamCacheExportAsync finish : 5976ms
StreamCacheExportAsync finish : 4978ms
1,000,000行、100,000行ずつ分割、4並列
Export finish : 679ms
ObjectCacheExport finish : 1367ms
ObjectAndStreamCacheExport finish : 491ms
StreamCacheExport finish : 275ms
ObjectAndStreamCacheExportAsync finish : 4164ms
StreamCacheExportAsync finish : 3040ms
1,000,000行、100,000行ずつ分割、4並列
Export finish : 679ms
ObjectCacheExport finish : 1367ms
ObjectAndStreamCacheExport finish : 491ms
StreamCacheExport finish : 275ms
ObjectAndStreamCacheExportAsync finish : 4164ms
StreamCacheExportAsync finish : 3040ms
1,000,000行、500,000行ずつ分割、4並列
Export finish : 482ms
ObjectCacheExport finish : 1718ms
ObjectAndStreamCacheExport finish : 400ms
StreamCacheExport finish : 301ms
ObjectAndStreamCacheExportAsync finish : 3613ms
StreamCacheExportAsync finish : 2849ms
10,000,000行、100,000行ずつ分割、2並列
Export(並列なし、Streamキャッシュあり) finish : 3966ms
ObjectCacheExport finish : 18095ms
ObjectAndStreamCacheExport finish : 7080ms
StreamCacheExport finish : 7076ms
ObjectAndStreamCacheExportAsync finish : 60079ms
StreamCacheExportAsync finish : 60633ms
遅い原因:メモリが枯渇してGCが発生して遅い
10,000,000行、100,000行ずつ分割、2並列
Export(並列なし、Streamキャッシュあり) finish : 3966ms
ObjectCacheExport finish : 18095ms
ObjectAndStreamCacheExport finish : 7080ms
StreamCacheExport finish : 7076ms
ObjectAndStreamCacheExportAsync finish : 60079ms
StreamCacheExportAsync finish : 60633ms
遅い原因:メモリが枯渇してGCが発生して遅い
10,000,000行、500,000行ずつ分割、2並列
Export finish : 3834ms
ObjectCacheExport finish : 21722ms
ObjectAndStreamCacheExport finish : 4005ms
StreamCacheExport finish : 4035ms
ObjectAndStreamCacheExportAsync finish : 51959ms
StreamCacheExportAsync finish : 53595ms
遅い原因:メモリが枯渇してGCが発生して遅い