怠惰系エンジニアのメモ帳

勉強した内容をメモしていきます。解説ブログではないので悪しからず。

Log4j2からElasticsearchにログを流し込む

ログをファイルに出力するのではなく、Elasticsearchに流し込む方法が知りたかったので調べてみました。
「 Spring(SpringBoot)アプリケーションから出力されるログをElasticsearchに流し込む」のがゴールになります。
ログ実装には Log4j2 を用います。(Logbackでもいいのですが、お仕事では Log4j2 が採用されているので。。。)

この方法が正しいわけではないと思うので、ご指摘等いただけると幸いです。

結論

データ収集用のミドルウェア Logstash を経由することで Elasticsearch にログを流し込めました。

www.elastic.co www.elastic.co

なお、ログ可視化のために Kibana を使いました。

www.elastic.co

環境

今回 Elasticsearch, Logstash, Kibana の環境(この3つで ELK Stack と呼ばれてるっぽい)はDockerを利用して構築しています。

  • Docker for Mac(version 18.03.0-ce, build 0520e24)
    • Elasticsearch : docker.elastic.co/elasticsearch/elasticsearch:6.2.4
    • Logstash : docker.elastic.co/logstash/logstash:6.2.2
    • Kibana : docker.elastic.co/kibana/kibana:6.2.4
  • Spring Boot 1.5.12
    • Log4j2は依存関係に spring-boot-starter-log4j2 を追加して使用。
    • なお、log4j2 を使用する場合 spring-boot-starter-logging を依存関係から除外してください。

準備

1. ELK StackをDockerで用意する

docker-compose.yml と、各種設定ファイルを用意する。(全体的なフォルダ構成は最後に記載)

docker-compose.yml

version: '2'
services:

  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:6.2.4
    container_name: elasticsearch
    ports:
      - "9200:9200"
      - "9300:9300"
    environment:
      - discovery.type=single-node

  logstash:
    image: docker.elastic.co/logstash/logstash:6.2.2
    container_name: logstash
    ports:
      - "5044:5044"
    volumes:
      - ./logstash/config/:/usr/share/logstash/config/
      - ./logstash/pipeline/:/usr/share/logstash/pipeline/
    links:
      - elasticsearch

  kibana:
    image: docker.elastic.co/kibana/kibana:6.2.4
    container_name: kibana
    ports:
      - "5601:5601"
    volumes:
      - ./kibana/config/kibana.yml:/usr/share/kibana/config/kibana.yml
    links:
      - elasticsearch
  • イメージはDockerHubではなく、Elastic社から取得しています。
    • ※なんかDockerHubのは非推奨らしい

elasticsearch.yml

用意はしているんですが、ボリュームをマウントしてないので意味ないですね。 参考程度に。

network.host: 0.0.0.0
cluster.name: elasticsearch
discovery.type: single-node

logstash.yml

http.host: "0.0.0.0"
path.config: /usr/share/logstash/pipeline
xpack.monitoring.elasticsearch.url: http://elasticsearch:9200
xpack.monitoring.elasticsearch.username: logstash_system
xpack.monitoring.elasticsearch.password: changeme

logstash.conf

input {
  tcp {
    mode => "server"
    host => "0.0.0.0"
    port => 5044
  }
}
output {
  elasticsearch {
    hosts => "elasticsearch:9200"
    index => "logstash-%{+YYYY.MM.dd}"
    document_type => "logs"
  }
}
  • サーバーモードで入力を受け付け、Elasticsearchに出力します。

2. Log4j2の設定

<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="OFF">

    <Properties>
        <Property name="loglayout">[%d{yyyy-MM-dd HH:mm:ss.SSS}],%-5p,%t,%c:%m%n</Property>
    </Properties>

    <Appenders>
        <Console name="CONSOLE" target="SYSTEM_OUT">
            <PatternLayout pattern="${loglayout}"/>
        </Console>

        <Socket name="LOG_STASH" host="localhost" port="5044" >
            <JsonLayout compact="true" eventEol="true" />
        </Socket>
    </Appenders>

    <Loggers>
        <Root level="info">
            <AppenderRef ref="CONSOLE" />
            <AppenderRef ref="LOG_STASH" />
        </Root>
    </Loggers>

</Configuration>

ELKを立ち上げ、ログを流し込む

docker-compose up で、ELKを立ち上げます。

$ docker-compose up
Creating network "docker_default" with the default driver
Creating elasticsearch ... done
Creating kibana        ... done
Creating logstash      ... done
Attaching to elasticsearch, kibana, logstash
...

起動するまで少し時間がかかるのでしばらく待って、localhost:5601 にアクセスします。 ※ Kitematic を立ち上げておくと、各コンテナの状態が分かりやすいのでオススメです。

Kibanalocalhost:5601) にアクセスして、以下のような画面が表示されれば準備OKです。

f:id:tk5_21:20180421235025p:plain

大丈夫だとは思いますが、念のため、docker-compose ps で全てのコンテナが稼働しているか確認しておいてください。 ※もしくは Kitematic で確認

$ docker-compose ps
    Name                   Command               State                       Ports                     
-------------------------------------------------------------------------------------------------------
elasticsearch   /usr/local/bin/docker-entr ...   Up      0.0.0.0:9200->9200/tcp, 0.0.0.0:9300->9300/tcp
kibana          /bin/bash /usr/local/bin/k ...   Up      0.0.0.0:5601->5601/tcp                        
logstash        /usr/local/bin/docker-entr ...   Up      0.0.0.0:5044->5044/tcp, 9600/tcp    

ELKが立ち上がったら、アプリケーションを実行してログを出力します。
SpringBootの起動時、エラーが出ていなければ Logstash にログが転送されているはずです。

SpringBootが起動できたら Kibana を確認してみましょう。

f:id:tk5_21:20180422000730p:plain

Elasticsearch にログが転送されていれば、上記のような画面が表示されます。
「index pattern」には logstash.conf で指定したindexと同じパターンになるような文字列(今回は logstash-2018.04.21)を入力し、「Next step」を押下します。

f:id:tk5_21:20180422000735p:plain

「Time Filter field name」は @timestamp でいいです。 「Create index pattern」を押下後、サイドメニューの「Discover」をクリックします。

f:id:tk5_21:20180422000744p:plain

SpringBootの起動ログが表示されていればOKです。 Elasticsearch にログを流し込めてます。

まとめ

  • Log4j2SocketAppender を使用して、Logstash にログを送信する。
  • Logstash から Elasticsearch にデータ転送できる。

フォルダ構成(一部省略)

./
├── docker
│   ├── docker-compose.yml
│   ├── elasticsearch
│   │   └── config
│   │       └── elasticsearch.yml
│   ├── kibana
│   │   └── config
│   │       └── kibana.yml
│   └── logstash
│       ├── config
│       │   └── logstash.yml
│       └── pipeline
│           └── logstash.conf
├── pom.xml
├── src
   └── main
       ├── java
       └── resources
           ├── application.yml
           └── log4j2.xml

所感

とりあえず、Elasticsearch にログを流し込む、という目的は達成されましたが ログをそのまま送信しているので、セキュリティはガバガバ。。。

あと、Elasticsearch 以外に Logstash とか Kibana とか知れたのは大きいですね。 (使えるわけではないけど)

余談

ELKのDockerイメージは既にあって、イメージサイズも小さい

これも試したみたんですが、Logstash にログを転送するにはSSL(TLS)接続が必要だったので一旦諦めました。(ネットワークよく分からん。。。)
この辺詳しい場合は、こちらを使用するほうが楽かも。

Logback用のlogback-elasticsearch-appenderなるものがGitHubにある

Logbackの場合はコレ使えば直接流し込めるのかな?

【SpringWebFlux + Doma2】T型をFlux<T>に変換するCollector

SpringWebFlux + Doma2 で開発していて、Daoの戻り値を Flux<T> にしたかったので Flux<T> に変換する Collector 作った話。
当たり前だが、DomaのDaoメソッドは戻り値を Flux<T> で返せない。

@Dao
public interface PersonDao {
    @Select
    Flux<Person> findAll(); // これダメ
}

かと言って、List<Person>Stream<Person> で受け取りたくない。
となると、Domaのコレクト検索機能(もしくはストリーム検索)を利用して Flux<T> に変換する。

@Dao
public interface PersonDao {
    @Select(strategy = COLLECT)
    <R> R findAll(final Collector<Person, ?, R> collector);
}

Domaはこのコレクト検索が強い。良い。
で、作った Collector が以下。

public class PublisherCollectors {
    public static <T> Collector<T, ?, Flux<T>> toFlux() {
        return Collector.<T, List<T>, Flux<T>>of(
                ArrayList::new, 
                List::add,
                (right, left) -> { right.addAll(left); return right; },
                Flux::fromIterable
        );
    }
}
PersonDao personDao = // ...
Flux<Person> fluxPerson = personDao.findAll(PublisherCollectors.toFlux());

一旦 List に全ての要素を突っ込んで、最後に Flux に変換する方法をとった。
※本当は頭からケツまで Flux で処理したかったが、上手くいかなかった。

ちなみにストリーム検索を利用して Flux<T> に変換する場合は、

@Dao
public interface PersonDao {
    @Select(strategy = STREAM)
    <R> R findAll(final Function<Stream<Person>, R> function);
}
Flux<Person> fluxPerson = personDao.findAll(Flux::fromStream);

とする。

最後に

頭から Flux で処理できる方法や、良いライブラリがあったら教えていただけると嬉しいです。

【SpringWebFlux】エラーハンドラ(WebExceptionHandler)のテストを行う

SpringWebFluxでのエラーハンドリングは、WebExceptionHandler を継承してハンドラを作成する。

以下のようなエラーハンドラが定義されていると仮定。

@Component
public class GlobalErrorHandler implements WebExceptionHandler {

    @Override
    public Mono<Void> handle(final ServerWebExchange serverWebExchange, final Throwable throwable) {
        return ErrorHandleProvider.of(throwable) // ①
                .createResponse() // ②
                .flatMap(sr -> sr.writeTo(serverWebExchange, HandleContext.of(HandlerStrategies.withDefaults()))) // ③
                .flatMap(v -> Mono.empty()); // ④
    }
    ...
}

ざっと何をやっているか説明しておくと、

  • ①:ErrorHandleProvider は受け取った Throwable でハンドリング処理を提供するためのクラス。
    • ちなみに ErrorHandleProvider は自前で作成したクラス。(実装は割愛)
  • ②:クライアントに返却する Mono<ServerResponse> を作成。
    • この処理では、Httpステータスの設定と Throwable に設定されたメッセージをレスポンスボディに設定。
  • ③:ServerWebExchange に対して、Mono<ServerResponse> に設定されている情報を書き込む。
  • ④:handleメソッドの戻り値は Mono<Void> なので、Mono.empty() で空の Mono を作成。

となっています。

今回はこのエラーハンドラをテストする。
テストしたいのは

  1. あるエンドポイントに対して処理を依頼
  2. 依頼された処理でエラーが発生
  3. クライアントに対して、エラーに応じた情報が返る

というケースになります。
3ついては、例えば業務エラー(ApplicationException)がスローされた場合に

  • Httpステータス:500(Internal Server Error)
  • ボディ:ApplicationException に設定されている message

をレスポンスとして返却するといった具合。

準備

まず、エラーを発生させるエンドポイントの作成を行う。

続きを読む

【SpringWebFlux】ServerRequest のボディをジェネリックな型に変換する

ServerRequest#bodyToMono メソッドを使用する。(Flux の場合はbodyToFlux

public Mono<ServerResponse> handle(final ServerRequest request){
    // プレーンな型に変換する場合
    Mono<Person> person = request.bodyToMono(Person.class);

    // ジェネリックな型に変換する場合
    // ParameterizedTypeReferenceを渡す
    Mono<List<Person>> persons = request.bodyToMono(new ParameterizedTypeReference<List<Person>>() {};)

    ...
}

また ParameterizedTypeReferenceインスタンス化にはファクトリメソッドを使用する方法もある。 ParameterizedTypeReference#forType がファクトリメソッドで、java.lang.reflact.Type を渡す。

// Typeの作成
final Type type = new ArrayList<String>().getClass().getGenericSuperclass();

// ParameterizedTypeReferenceを作成
final ParameterizedTypeReference<List<String>> ptr =  ParameterizedTypeReference.forType(type);

ParameterizedTypeReference は抽象クラスなので、匿名クラスとしてインスタンス化した方が楽そう。
あと、IDEが補完してくれるとは言え new ParameterizedTypeReference<T>(){} って長いよね。

AWS Lambdaメモ -ハンドラの引数eventの中身-

今回はLambda関数として実行されるハンドラの引数 event についてのメモ。
※Node.jsでLambda関数を作成する場合を想定しています。

間違っている場合はご指摘いただけると幸いです。

event が何者なのか分からない

Lambda関数の作成はNode.jsに落ち着いたのですが、 NodeでLambda関数を作成する場合には以下のような引数を3つ受け取るハンドラを定義する必要があります。

exports.myHandler = (event, context, callback) => {
  // 処理
}

これら3つの引数について、公式では以下のように説明されています。

  • event — AWS Lambda はこのパラメーターを使用してイベントデータをハンドラーに渡します。
  • context — AWS Lambda はこのパラメーターを使用して、実行中の Lambda 関数のランタイム情報をハンドラーに提供します。詳細については、「Context オブジェクト (Node.js)」を参照してください。
  • callback – コールバックオプションは呼び出し元に情報を返すために使用できます。使用しない場合戻り値は null です。詳細については、「コールバックパラメーターを使用する」を参照してください。

Lambda 関数ハンドラー (Node.js) - より引用

「何言ってる分からねぇ...。結局 event って何者。」

静的型付け言語大好きマンなので、正体不明の値が来るのがモヤっとするといいますか...。

結論

先に結論を述べてしまうと、Lambda関数の呼び出し元によって異なる です。
※型としては object 型になるようです。(要は何でも型)

呼び出し元によって異なる、とは

Lambda関数は何かをキッカケにして呼び出されるので、必ず呼び出し元が存在します。
呼び出し元の例で言えば API Gateway であったり、DynamoDB であったり。 (Lambdaでは、これらの呼び出し元を イベントソース と呼んでいるようです)

  • event — AWS Lambda はこのパラメーターを使用してイベントデータをハンドラーに渡します。

イベントソースのから渡されるデータ、なので イベントデータ という解釈をしています。

今回はイベントソースとして API Gateway を例に挙げます。
API Gateway は、RESTful APIを作成するサービスです。

API Gateway の場合に設定される event の中身

イベントソースを API Gateway にした場合、ハンドラの引数 event に設定されるのは API Gateway で定義した「本体マッピングテンプレート」になるようです。

f:id:tk5_21:20180112000146p:plain

上記マッピングテンプレートは

{
  "method": "$context.httpMethod", 
  "pathParams": "$input.params('userId')",
  "body": $input.json('$')
}

と定義しており、

  • 1行目はHTTPメソッド名
  • 2行目はパス・クエリ文字・ヘッダーからパラメータが userId の値を取得
  • 3行目はボディ部のJSONを取得

となっています。

仮にこのAPI/app/{userId} という形式でアクセス可能にし、以下のLambda関数を実行させるとします。

exports.handler = (event, context, callback) => {
    console.log("event type:", typeof event);
    console.log("event JSON:", JSON.stringify(event, null, 2));
    callback(null, 'Hello from Lambda');
};

クライアント側から、上記APIを呼び出します。

  • POSTメソッド。
  • URLは /app/0 とする。
  • 以下のJSONをボディ部に含める。
{
  "name": "Jane",
  "age": 20
}

呼び出した結果のログは以下の通り。

2018-01-11T15:21:14.555Z   xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx   event type: object
2018-01-11T15:21:14.559Z   xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx   event JSON:
{
    "method": "POST",
    "pathParams": "0",
    "body": {
        "name": "Jane",
        "age": 20
    }
}

ちゃんとテンプレートに定義した形式で eventマッピングされています。

イベントソースによって設定値を調べる必要がある

他にも DynamoDBSNSS3Kinesis などからLambda関数を呼び出すことが可能ですが、 呼び出し元のイベントソースによって event に設定される中身は異なります。 そのため、event にどのような値が設定されるかは都度調べる必要があります。

まとめ

  • event は、イベントソースから渡されるデータが設定される。
  • event は、object 型(何でも型)である。
  • API Gateway の場合には「本体マッピングテンプレート」で定義した内容が設定される。
    • イベントソースが異なれば、event の中身も異なる。(ので都度調べる必要がある)