おっさんエンジニアの備忘録

とりあえず、忘れてもいいようにやってみたことを書いています。

並列処理でいろいろ試してみた

using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace ParallelWriteFiles
{
    class Program
    {
        static void Main(string[] args)
        {
            var numbers = Enumerable.Range(1, 10000000);
            var chukSize = 5000000;
            var maxDegreeOfParallelism = 2;
            System.Diagnostics.Stopwatch sw = System.Diagnostics.Stopwatch.StartNew();

            var path = Path.Combine(Directory.GetCurrentDirectory(), $"Export.csv");
            Export(numbers, path);
            Console.WriteLine($"Export finish : {sw.ElapsedMilliseconds}ms");

            sw.Restart();

            Parallel.ForEach(numbers.Chunk(chukSize), new ParallelOptions { MaxDegreeOfParallelism = maxDegreeOfParallelism } ,(values) =>
            {
                ObjectCacheExport(values, Thread.CurrentThread.ManagedThreadId);
            });
            sw.Stop();
            Console.WriteLine($"ObjectCacheExport finish : {sw.ElapsedMilliseconds}ms");

            sw.Restart();

            Parallel.ForEach(numbers.Chunk(chukSize), new ParallelOptions { MaxDegreeOfParallelism = maxDegreeOfParallelism }, (values) =>
            {
                ObjectAndStreamCacheExport(values, Thread.CurrentThread.ManagedThreadId);
            });
            sw.Stop();
            Console.WriteLine($"ObjectAndStreamCacheExport finish : {sw.ElapsedMilliseconds}ms");

            sw.Restart();

            Parallel.ForEach(numbers.Chunk(chukSize), new ParallelOptions { MaxDegreeOfParallelism = maxDegreeOfParallelism }, (values) =>
            {
                StreamCacheExport(values, Thread.CurrentThread.ManagedThreadId);
            });
            sw.Stop();
            Console.WriteLine($"StreamCacheExport finish : {sw.ElapsedMilliseconds}ms");

            sw.Restart();

            // ※ 1,000,000行で9分という話にならない遅さ
            //Parallel.ForEach(numbers.Chunk(chukSize), new ParallelOptions { MaxDegreeOfParallelism = maxDegreeOfParallelism }, (values) =>
            //{
            //    NoCacheExport(values, Thread.CurrentThread.ManagedThreadId);
            //});
            //sw.Stop();

            //Console.WriteLine($"NoCacheExport finish : {sw.ElapsedMilliseconds}ms");

            sw.Restart();

            Parallel.ForEach(numbers.Chunk(chukSize), new ParallelOptions { MaxDegreeOfParallelism = maxDegreeOfParallelism }, (values) =>
            {
                ObjectAndStreamCacheExportAsync(values, Thread.CurrentThread.ManagedThreadId).Wait();
            });
            sw.Stop();
            Console.WriteLine($"ObjectAndStreamCacheExportAsync finish : {sw.ElapsedMilliseconds}ms");


            sw.Restart();

            Parallel.ForEach(numbers.Chunk(chukSize), new ParallelOptions { MaxDegreeOfParallelism = maxDegreeOfParallelism }, (values) =>
            {
                StreamCacheExportAsync(values, Thread.CurrentThread.ManagedThreadId).Wait();
            });
            sw.Stop();
            Console.WriteLine($"StreamCacheExportAsync finish : {sw.ElapsedMilliseconds}ms");

            Console.ReadKey();
        }

        static void ObjectCacheExport(IEnumerable<int> values, int threadid)
        {
            var path = Path.Combine(Directory.GetCurrentDirectory(), $"{threadid}_ObjectCacheExport.csv");
            var cacheSize = 500;
            var cache = new List<int>();
            foreach (var value in values)
            {
                cache.Add(value);
                if (cache.Count() > cacheSize)
                {
                    Export(cache, path);
                    cache.Clear();
                }
            }
            Export(cache, path);
        }

        static void ObjectAndStreamCacheExport(IEnumerable<int> values, int threadid)
        {
            var path = Path.Combine(Directory.GetCurrentDirectory(), $"{threadid}_ObjectAndStreamCacheExport.csv");
            var cacheSize = 500;
            using (var writer = File.AppendText(path))
            {
                var cache = new List<int>();
                foreach(var value in values){ 

                    cache.Add(value);
                    if(cache.Count() > cacheSize)
                    {
                        Export(cache, path, writer);
                        cache.Clear();
                    }
                }
                Export(cache, path, writer);
            }
        }

        static async Task ObjectAndStreamCacheExportAsync(IEnumerable<int> values, int threadid)
        {
            var path = Path.Combine(Directory.GetCurrentDirectory(), $"{threadid}_ObjectAndStreamCacheExport.csv");
            var cacheSize = 500;
            using (var writer = File.AppendText(path))
            {
                var cache = new List<int>();
                foreach (var value in values)
                {

                    cache.Add(value);
                    if (cache.Count() > cacheSize)
                    {
                        await ExportAsync(cache, path, writer);
                        cache.Clear();
                    }
                }
                await ExportAsync(cache, path, writer);
            }
        }

        static void NoCacheExport(IEnumerable<int> values, int threadid)
        {
            var path = Path.Combine(Directory.GetCurrentDirectory(), $"{threadid}_NoCacheExport.csv");
            foreach(var value in values)
            {
                using (var writer = File.AppendText(path))
                {
                    writer.WriteLine(value);
                }
            }
        }

        static void StreamCacheExport(IEnumerable<int> values, int threadid)
        {
            var path = Path.Combine(Directory.GetCurrentDirectory(), $"{threadid}_StreamCacheExport.csv");
            Export(values, path);

        }

        private static async Task StreamCacheExportAsync(IEnumerable<int> values, int threadid)
        {
            var path = Path.Combine(Directory.GetCurrentDirectory(), $"{threadid}_StreamCacheExport.csv");
            await ExportAsync(values, path);
        }

        private static async Task ExportAsync(IEnumerable<int> values, string path)
        {
            using (var writer = File.AppendText(path))
            {
                await ExportAsync(values, path, writer);
            }
        }

        private static async Task ExportAsync(IEnumerable<int> values, string path, StreamWriter writer)
        {
            foreach (var value in values)
            {
               await writer.WriteLineAsync(value.ToString());
            }
        }

        static void Export(IEnumerable<int> values, string path)
        {
            using (var writer = File.AppendText(path))
            {
                Export(values, path, writer);
            }
        }

        static void Export(IEnumerable<int> values, string path, StreamWriter writer)
        {
            foreach (var value in values)
            {
                writer.WriteLine(value);
            }
        }

    }

    internal static class EnumerableExtension
    {
        public static IEnumerable<IEnumerable<T>> Chunk<T>(this IEnumerable<T> values , int size)
        {
            var count = values.Count();
            var index = 0;
            while(count > index)
            {
                yield return values.Skip(index).Take(size);
                index += size;
            }
            
        }
    }

}

実行結果

1,000,000行、1000行ずつ分割、2並列
Export finish : 628ms
ObjectCacheExport finish : 4364ms
ObjectAndStreamCacheExport finish : 3207ms
StreamCacheExport finish : 3146ms
ObjectAndStreamCacheExportAsync finish : 7508ms
StreamCacheExportAsync finish : 7238ms

1,000,000行、1000行ずつ分割、4並列
Export finish : 500ms
ObjectCacheExport finish : 3680ms
ObjectAndStreamCacheExport finish : 2549ms
StreamCacheExport finish : 2600ms
ObjectAndStreamCacheExportAsync finish : 7415ms
StreamCacheExportAsync finish : 7304ms

1,000,000行、100,000行ずつ分割、2並列
Export finish : 1031ms
ObjectCacheExport finish : 2204ms
ObjectAndStreamCacheExport finish : 617ms
StreamCacheExport finish : 836ms
ObjectAndStreamCacheExportAsync finish : 5976ms
StreamCacheExportAsync finish : 4978ms

1,000,000行、100,000行ずつ分割、4並列
Export finish : 679ms
ObjectCacheExport finish : 1367ms
ObjectAndStreamCacheExport finish : 491ms
StreamCacheExport finish : 275ms
ObjectAndStreamCacheExportAsync finish : 4164ms
StreamCacheExportAsync finish : 3040ms

1,000,000行、100,000行ずつ分割、4並列
Export finish : 679ms
ObjectCacheExport finish : 1367ms
ObjectAndStreamCacheExport finish : 491ms
StreamCacheExport finish : 275ms
ObjectAndStreamCacheExportAsync finish : 4164ms
StreamCacheExportAsync finish : 3040ms

1,000,000行、500,000行ずつ分割、4並列
Export finish : 482ms
ObjectCacheExport finish : 1718ms
ObjectAndStreamCacheExport finish : 400ms
StreamCacheExport finish : 301ms
ObjectAndStreamCacheExportAsync finish : 3613ms
StreamCacheExportAsync finish : 2849ms

10,000,000行、100,000行ずつ分割、2並列
Export(並列なし、Streamキャッシュあり) finish : 3966ms
ObjectCacheExport finish : 18095ms
ObjectAndStreamCacheExport finish : 7080ms
StreamCacheExport finish : 7076ms
ObjectAndStreamCacheExportAsync finish : 60079ms
StreamCacheExportAsync finish : 60633ms
遅い原因:メモリが枯渇してGCが発生して遅い

10,000,000行、100,000行ずつ分割、2並列
Export(並列なし、Streamキャッシュあり) finish : 3966ms
ObjectCacheExport finish : 18095ms
ObjectAndStreamCacheExport finish : 7080ms
StreamCacheExport finish : 7076ms
ObjectAndStreamCacheExportAsync finish : 60079ms
StreamCacheExportAsync finish : 60633ms
遅い原因:メモリが枯渇してGCが発生して遅い

10,000,000行、500,000行ずつ分割、2並列
Export finish : 3834ms
ObjectCacheExport finish : 21722ms
ObjectAndStreamCacheExport finish : 4005ms
StreamCacheExport finish : 4035ms
ObjectAndStreamCacheExportAsync finish : 51959ms
StreamCacheExportAsync finish : 53595ms
遅い原因:メモリが枯渇してGCが発生して遅い

postgresコマンドの基礎

サクッと基本的なところをまとめてみる

なんか最近よく使うところを中心に備忘録のためにまとめているので、役に立たない人には全く立たないかも。

接続する

psql -U postgres -d dbname

psql

メタコマンド

-- ヘルプ
¥h

-- データベース一覧
\l

-- オブジェクトを表示
\d

--終了
\q

主なディレクトリ構成

65.1. データベースファイルのレイアウト

設定ファイルを確認する

select name, setting, unit, short_desc from pg_settings;
select name, setting, unit, short_desc from pg_settings where name like '%shared%';

サイズを測る

-- database
SELECT datname, pg_size_pretty(pg_database_size(datname)) FROM pg_database;
-- block size
SHOW block_size;
-- table size (toastは除く)
SELECT relname, relfilenode, pg_size_pretty(relpages::bigint * 8 * 1024) as size, reltuples as rows from pg_class where relname='table_name' and relkind = 'r';
-- index size
SELECT relname, relfilenode, pg_size_pretty(relpages::bigint * 8 * 1024) as size, reltuples as rows from pg_class where relname='index_name' and relkind = 'i';

ファイルで確認

-- dababase
du -sh ./data/base/*

StackExchange.Redisを使ってみる

今回の目的

Redisをプログラムから利用してみます。 今回はクライアントとして、C#のStackExchange.Redisを使ってみます。

StackExchange.Redisとは

StackExchange.Redis is a high performance general purpose redis client for .NET languages (C# etc). It is the logical successor to BookSleeve, and is the client developed-by (and used-by) Stack Exchange for busy sites like Stack Overflow. For the full reasons why this library was created (i.e. “What about BookSleeve?”) please see here.( StackExchange.Redis | General purpose redis client)

StackOverflowなどでも使われている高性能なRedisクライアント

Azure Redis CacheでもClientとして公開されいます。 https://docs.microsoft.com/ja-jp/azure/redis-cache/cache-dotnet-how-to-use-azure-redis-cache

サクッと追加参照してみる

StackExchange.Redisではコネクションをアプリの起動から終了まで使いまわす前提になっています。 そのため、通常のDBのようにusingなどは使わないようです。 とりあえず、コンソールアプリケーションでこんにちはと表示して何か入力するとさよならと表示するアプリケーションを作ってみました。 その時にリソースをRedisに保存しています。特に意味のあるアプリではないです。

using System;
using StackExchange.Redis;
using System.Threading;

namespace RedisSample
{
    class Program
    {
        static void Main(string[] args)
        {
            // クライアントアプリケーションの開始から終了するまで使いまわす。だからusingは不要。
            ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("localhost");

            // データベースを取得
            IDatabase cache = redis.GetDatabase();

            // 値を保存
            cache.StringSet("key:jp:hello", "こんにちは");
            cache.StringSet("key:jp:goodbye", "さようなら");

            // 保存した値を取得して利用する
            Console.WriteLine(cache.StringGet("key:jp:hello"));
            Console.ReadLine();
            Console.WriteLine(cache.StringGet("key:jp:goodbye"));
            Thread.Sleep(1000);
        }
    }
}

キーの命名は公式ドキュメントのサンプルを参考にしました。

入門 : Redis のデータ構造と概念 — Redis Documentation (Japanese Translation)

AzureのサンプルではLazyでコネクションを取るようにしていましたが、ここではそこまでしませんでした。

Azure Redis Cache を使用する方法 | Microsoft Docs

非同期で処理する

10,000件のオブジェクトを追加して、パフォーマンスも図ってみた

using System;
using StackExchange.Redis;
using System.Linq;
using System.Diagnostics;

namespace RedisSample
{
    class Program
    {
        static void Main(string[] args)
        {

            // 管理者コマンドを使用するために、接続文字列にallowAdmin=true"を追記
            ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("localhost,allowAdmin=true");

            // データベースを取得
            IDatabase cache = redis.GetDatabase();

            // データベースを初期化
            var server = redis.GetServer("localhost", 6379);
            server.FlushAllDatabases();

            // キー生成のための数字を用意
            var range = Enumerable.Range(1, 10000);

            // 処理時間を測定
            Stopwatch sw = new Stopwatch();
            sw.Start();

            foreach (var index in range)
            {
                // 非同期でオブジェクトを追加
                cache.StringSetAsync($"number:{index}", $"result:{index}");
            }
            //すべての非同期処理を待つ
            cache.WaitAll();
            sw.Stop();

            //使用中のメモリを取得する
            var usedMemory = server.Info().Where(group => group.Key.Equals("Memory")).First().Where(c => c.Key.Equals("used_memory")).First();
            Console.WriteLine($"非同期処理処理終了後 / 使用メモリ {usedMemory.Key}:{usedMemory.Value} / 処理時間 {sw.ElapsedMilliseconds} ms / Keyの数 {server.Keys().Count()}");

            Console.ReadLine();
        }
    }
}

同様に同期処理でも図ってみたところ下記のような結果になりました。

同期処理:619ms 非同期処理:67ms

あくまでもローカルの参考値ですが、パイプラインの効果が出ていると感じられました。 あとやっぱりRedisって早いんだなって感じました。10,000件の追加が数十ms以内に終わりますもんね。 (Redisはシングルスレッドのため、非同期にしても追加する処理自体はかわらないはず・・・たぶん)

Pipelines and Multiplexers | StackExchange.Redis

参照

Azure Redis Cache を使用する方法 | Microsoft Docs

入門 : Redis のデータ構造と概念 — Redis Documentation (Japanese Translation)

c# - Flush/Empty db in StackExchange.Redis - Stack Overflow

とりあえず、今日はここまで。

Redisを使ってみる

今回の目的

Redisの使い方を復習してみます。

インストール

下記のURLからダウンロードしてインストールします。

Redis

この辺を見ればできそうです。 qiita.com

とりあえず、今回はローカルのwindowsにインストールしたものを使いますが、実際に使う場合はAWSやAzureなどもサポートしていますね。

Redis とは? - Amazon ElastiCache(キャッシュ管理・操作)| AWS

Azure Redis Cache - Redis キャッシュ クラウド サービス | Microsoft Azure

バージョンを確認する

とりあえず、今インストールしているバージョンを確認します。

> redis-cli.exe
> info

f:id:genpaku3:20171126165958p:plain

※ 今回はローカルですが、接続情報を入れる場合は下記のようにすれば良さそう。

redis-cliの使い方 - Qiita

キーを保存、参照する

とりあえず、キーを追加、参照、削除をしてみます。

> redis-cli.exe
> set test 11
OK
> get test
"11"
> del test
(integer) 1
> get test
(nil)

キーの一覧を参照します

> keys *

実際には型はたくさんあるので、これは別の機会に

データ型 — redis 2.0.3 documentation

※ 後日、プログラムからUTF-8で突っ込んだ場合に上のやり方だと文字化けしてしまったので、その場合は redis-cli.exe を下記のように変更するとなおりました。

> chcp 65001
> redis-cli.exe --raw

全データを削除する

> FLUSHALL
OK

全データ型対応の操作 — redis 2.0.3 documentation

使用容量調べる

コマンドは同じ

> redis-cli.exe
> info

f:id:genpaku3:20171126170118p:plain

※ INFOの詳細

https://redis.io/commands/INFO

FLUSHALLした後なのに900MBも使っているのかーとおもいましたが、 この辺が関係しているのかな。 メモリ最適化 — Redis Documentation (Japanese Translation)

Redisの使い道

Redis を LRU キャッシュとして使う
Redis を LRU キャッシュとして使う — Redis Documentation (Japanese Translation)

Redisの永続化
Redis の永続化 — Redis Documentation (Japanese Translation)

こういうのも大切そうですね。

Redis 本番障害から学んだコードレビューの勘所 - Qiita

Azure Redis Cache のトラブルシューティング方法 | Microsoft Docs

Redis3.0のredis.confまとめてみた。 - Qiita

コマンドリファレンス — redis 2.0.3 documentation

とりあえず、今回はここまで。