Monitor Akka Application with Datadog

最近の仕事で、本格的Akkaを取り込んでいってエラーした処理だけリトライ出来るバッチ機構を実現しました。そのバッチ機構の実行効果を確認するので、KamonとDatadogを使ってAkkaプログラムの監視をやってみました。
ここでそのやり方と問題を簡単にまとめたいと思います。

Kamon とは

JVM上で走るアプリケーションを監視するためのオープンソースツールです。

Kamonで出来る事

アプリケーションを監視するため、Kamonはいろんなをツールが用意されています。今回はAkkaを監視するため使うものを紹介します。

メトリックツールとトレーシング

  • kamon-core

    基本のメトリックツールとトレーシングAPIとその基盤。他のモジュールが全部kamon-core依存しています。内部はAkkaを使ってSubscriberとメッセージ通信で実装しているらしいです。

  • kamon-scala

    Scala、ScalazなどのFuturesに対して、バイトコードレベルのトレーシングが出来る。 kamon-akka が依存しています。

  • kamon-akka

    Akkaのactors、routersと dispatchersに対してもメトリック集計、actorのmessageに対して、バイトコードレベルのトレーシングが出来る。

  • kamon-system-metrics

    システム上とJVMのメトリックが出来ます。

その他にも kamon-akka-remotekamon-jdbckamon-elasticsearchkamon-playkamon-spray などがありますが、今回のAkka監視に関係ないので、割愛します。

結果の出力

Kamonが kamon-log-reporter を使って監視結果をそのままログに出すことが出来ますが、SPM、Datadog、New Relicなどの監視用バックエンドもサポートしています。今回はdatadogを使用するので、 kamon-datadog を使います。

DataDog とは

  • システムモニタリング(監視)クラウドサービスである
  • Kamon導入に伴い、Kamonの結果を可視化できるサービスとしてよく使われている

導入

こちら を参考に、
サインインして、サーバの種類選んで、表示されたコマンド叩けば導入出来ます。

※今回はMacOS上で行いましたが、AWSやDockerにもちゃんと対応しています。

実装

  1. build.sbt にライブラリ依存を追加します。
1
2
3
4
5
"io.kamon" %% "kamon-system-metrics" % kamonVersion,
"io.kamon" %% "kamon-core" % kamonVersion,
"io.kamon" %% "kamon-akka" % kamonVersion,
"io.kamon" %% "kamon-datadog" % kamonVersion,
"io.kamon" %% "kamon-autoweave" % kamonVersion
  1. application.conf に出力したい項目を定義する。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# Kamonで監視するメトリックを定義
kamon {
metric {
filters {
akka-actor {
includes = [ "StressTestBatchActorSystem/user/**" ]
}
akka-dispatcher {
includes = [ "StressTestBatchActorSystem/**" ]
}
akka-router {
includes = [ "StressTestBatchActorSystem/**" ]
}
}
}
}
# kamonの監視データがDataDog上の表示名
kamon.datadog.application-name = "application"
# デフォルトで全部Datadogへ出力するので、必須ではない
kamon.datadog {
subscriptions {
akka-actor = [ "**" ]
akka-dispatcher = [ "**" ]
akka-router = [ "**" ]
system-metric = [ "**" ]
}
}
  1. AkkaプログラムにKamonを起動する。
1
Kamon.start()

監視できない

上記の設定でAkkaプログラムを実行してみたら。いろんなエラーがで出来ました。

問題1: sbtから実行する際に、aspectjが見つからない

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
2016-10-04 18:55:04,519 - [error] kamon.ModuleLoaderExtension -

___ _ ___ _ _ ___ ___ _ _
/ _ \ | | |_ | | | | | | \/ |(_) (_)
/ /_\ \ ___ _ __ ___ ___ | |_ | | | | | | ___ __ _ __ __ ___ _ __ | . . | _ ___ ___ _ _ __ __ _
| _ |/ __|| '_ \ / _ \ / __|| __| | | | |/\| | / _ \ / _` |\ \ / // _ \| '__| | |\/| || |/ __|/ __|| || '_ \ / _` |
| | | |\__ \| |_) || __/| (__ | |_ /\__/ / \ /\ /| __/| (_| | \ V /| __/| | | | | || |\__ \\__ \| || | | || (_| |
\_| |_/|___/| .__/ \___| \___| \__|\____/ \/ \/ \___| \__,_| \_/ \___||_| \_| |_/|_||___/|___/|_||_| |_| \__, |
| | __/ |
|_| |___/

It seems like your application was not started with the -javaagent:/path-to-aspectj-weaver.jar option but Kamon detected
the following modules which require AspectJ to work properly:

kamon-akka, kamon-scala
  • 原因

Kamon-akkaの監視はAspectJを使ってバイトコードレベルの監視を実現しています。そのため起動する際にaspectj-weaverを指定しないといけないです。

  • 解決

公式によると3つの方法がありますが、sbtから起動する場合は sbt-aspectj を使います。

  1. plugin.sbt でSBTプラグインを導入
1
addSbtPlugin("com.typesafe.sbt" % "sbt-aspectj" % "0.10.6")
  1. build.sbt でプラグインを設定する。
1
2
3
4
5
lazy val sbtAspectjSettings = aspectjSettings ++ Seq(
fork in run := true,
javaOptions in run <++= AspectjKeys.weaverOptions in Aspectj,
AspectjKeys.aspectjVersion in Aspectj := "1.8.9"
)

これで、sbtからプログラムを実行する際には正常に動くようになりました。

問題2: assemblyする際に、apo.xmlのコンフリクト

今のプロジェクトはassemblyしたプログラムをサーバーに上げていますが、コマンドを実行する際に、こんなエラーが出ました。

1
2
3
4
5
[error] 1 error was encountered during merge
java.lang.RuntimeException: deduplicate: different file contents found in the following:
/Users/y_xiao/.ivy2/cache/io.kamon/kamon-core_2.11/jars/kamon-core_2.11-0.6.2.jar:META-INF/aop.xml
/Users/y_xiao/.ivy2/cache/io.kamon/kamon-akka_2.11/jars/kamon-akka_2.11-0.6.2.jar:META-INF/aop.xml
/Users/y_xiao/.ivy2/cache/io.kamon/kamon-scala_2.11/jars/kamon-scala_2.11-0.6.2.jar:META-INF/aop.xml
  • 原因

aspectjを使う以上、 aop.xml で設定しないといけないですが、kamonではライブラリ別々の設定となっています。

  • 解決

色々調べた結果、全部のaop設定ファイルを結合する方法を見つかりました。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// Create a new MergeStrategy for aop.xml files
lazy val aopMerge = new sbtassembly.MergeStrategy {
val name = "aopMerge"
import scala.xml._
import scala.xml.dtd._

def apply(tempDir: File, path: String, files: Seq[File]): Either[String, Seq[(File, String)]] = {
val dt = DocType("aspectj", PublicID("-//AspectJ//DTD//EN", "http://www.eclipse.org/aspectj/dtd/aspectj.dtd"), Nil) // xmlファイルのヘッダ−
val file = MergeStrategy.createMergeTarget(tempDir, path) // 保存するファイルを取得
val xmls: Seq[Elem] = files.map(XML.loadFile) // マージ対象ファイルをXML Elem対象へ変換
val aspectsChildren: Seq[Node] = xmls.flatMap(_ \\ "aspectj" \ "aspects" \ "_") // aspectj/aspects配下のすべての項目を取得
val weaverChildren: Seq[Node] = xmls.flatMap(_ \\ "aspectj" \ "weaver" \ "_") // aspectj/weaver配下のすべての項目を取得
val options: String = xmls.map(x => (x \\ "aspectj" \ "weaver" \ "@options").text).mkString(" ").trim // aspectj/weaverにoptionがある場合、optionを全部取得
val weaverAttr = if (options.isEmpty) Null else new UnprefixedAttribute("options", options, Null) // optionをUnprefixedAttributeへ変換
val aspects = new Elem(null, "aspects", Null, TopScope, false, aspectsChildren: _*) // aspectsを使って、新しいaspectj/aspectsを作成
val weaver = new Elem(null, "weaver", weaverAttr, TopScope, false, weaverChildren: _*) // weaverを使って、新しいaspectj/weaverを作成
val aspectj = new Elem(null, "aspectj", Null, TopScope, false, aspects, weaver) // 新しいaspects、weaverを使って、新しいaspectjを作成
XML.save(file.toString, aspectj, "UTF-8", xmlDecl = false, dt) // 新しいaspectsをファイルへ保存
IO.append(file, IO.Newline.getBytes(IO.defaultCharset))
Right(Seq(file -> path))
}
}

assemblyMergeStrategy in assembly := {
case "application.conf" =>
MergeStrategy.first
case PathList("META-INF", "aop.xml") =>
aopMerge
case x =>
val oldStrategy = (assemblyMergeStrategy in assembly).value
oldStrategy(x)
},

……

これて、assemblyも無事に終わって、サーバー上のAkkaプログラムを監視できるようになりました。

※ ちなみに、普通にjarから実行する際に -javaagent をつける必要ですが、kamon-autoweave を使えば、JVM起動後自動的にAspectJ loadtime weaving agentをつける、-javaagentの指定が必要無くなります。

まとめ

後は、Datadog上で簡単に設定すれば、Akkaプログラムの実行効果を見ながら、パフォーマンスチューニングができるようになりました。