项目作者: sameei

项目描述 :
A customized reporter to push Apache Flink metrics to ElasticSearch
高级语言: Scala
项目地址: git://github.com/sameei/elasticreporter.git
创建时间: 2018-04-20T13:31:55Z
项目社区:https://github.com/sameei/elasticreporter

开源协议:MIT License

下载


in use!

I found ElasticSearch (+ Kibana) an easy and flexible tool to use for monitoring my applications.
When I started to use it for my Apache Flink jobs I wrote a simple reporter to send all metrics to ElasticSearch.
But unfortunately it wan’t enough!

  • Flink use identities like this for its metrics; for example consider this one:
    the_hostname.taskmanager.9893439894343_as_taskmangaer_id.my_application_name.332343_as_my_app_id....
    . This Id has a lot of changing parts! the_hostname, 9893439894343_as_taskmangaer_id, …!
    I know that I’m able to use scope configurations to change identity, but that didn’t satisfied me.

  • I needed to have metrics variables (<host>, <job_name>, ….) in my metrics result! Why?
    I want to aggregate reported metrics by filtering base on those values: consider the need to see
    all your application metrics (<job_name>) from a specific task manager (<host>);
    for this reason you should have those values in every document.

  • I also need to seperate metrics per applications and other scopes! from the source.
    It will ended up to change metrics configuration per application! or maybe not.

So I improved my customized reporter to be configurable to fix my needs:

  • being able to groups specific metrics together (as GroupedMetrics in source)
  • being able to report every group to a different and dedicated resource (index and type in ElasticSearch)
  • being able to add and track all variables in every report document (like <host>, <tm>, or additional vars like <millis>, etc.)
  • take an eye on changes in metrics references

After that I generlized my reporter and to be used with Kamon too.

But anyway; here how it’s work:

General Configuration

  1. metrics.reporter.{}.class: xyz.sigmalab.xtool.elasticreporter.v1.flink.Reporter
  2. metrics.reporter.{}.name: metric.Application
  3. metrics.reporter.{}.elastic-url: http://localhost:9200
  4. metrics.reporter.{}.source-id: single-node
  5. metrics.reporter.{}.index-pattern: flink-metrics-forapp-<job_name>-<year>-<month>
  6. metrics.reporter.{}.id-pattern: <tm_id>-<task_name>-<job_name>-<operator_id>-<subtask_index>-<source_id>-<millis>
  7. metrics.reporter.{}.datetime-pattern: yyyy-MM-dd HH:mm:ss
  8. metrics.reporter.{}.zone: UTC
  • metrics.reporter.{}.class: xyz.sigmalab.xtool.elasticreporter.v1.flink.Reporter; The class that Flink wil instantiate as a reporter
  • metrics.reporter.{}.name: metric.Application; This will config a name that libraries internal logger will use it for this instance
  • metrics.reporter.{}.elastic-url: http://localhost:9200; The URL that refers to ElasticSearch endpoint
  • metrics.reporter.{}.source-id: single-node; The Id of this node; this could be different per JobManager & TaskManager instance (and it should be); I used this because the hostname wasn’t engough,unique, and sometimes readable.
  • metrics.reporter.{}.index-pattern: flink-metrics-<job_name>-<year>-<month> ; A pattern to make related index name per group! this pattern will be used with available variables in the metrics scopes and some additional variables in the report context (timestamp, source-id, etc. keep reading to find full list of vars).
  • metrics.reporter.{}.id-pattern: <tm_id>-<task_name>-<job_name>-<operator_id>-<subtask_index>-<source_id>-<millis>; same as for index, but this one will be used to generate the name of document-id
  • metrics.reporter.{}.datetime-pattern: yyyy-MM-dd HH:mm:ss the pattern to format datetime filed in result documents
  • metrics.reporter.{}.zone: UTC the timezone to format datetime filed in result documents

Application Specific Metrics

I use a specifc group name (MetricGroup) for all of my metrics over applications.

  1. lazy val metrics = getRuntime.getMetricGroup.addGroup("appmetric")

This will let met to filter related metrics easier. This is the configuration that I use to report application specific metrics:

  1. metrics.reporter.app.class: xyz.sigmalab.xtool.elasticreporter.v1.flink.Reporter
  2. metrics.reporter.app.name: metric.Application
  3. metrics.reporter.app.elastic-url: http://localhost:9200
  4. metrics.reporter.app.source-id: single-node
  5. metrics.reporter.app.index-pattern: flink-metrics-forapp-<job_name>-<year>-<month>
  6. metrics.reporter.app.id-pattern: <tm_id>-<task_name>-<job_name>-<operator_id>-<subtask_index>-<source_id>-<millis>
  7. metrics.reporter.app.datetime-pattern: yyyy-MM-dd HH:mm:ss
  8. metrics.reporter.app.zone: UTC
  9. metrics.reporter.app.filter-by.select-scope: appmetric
  10. metrics.reporter.app.group-by: <job_name>
  11. metrics.reporter.app.name-by.select-scope: appmetric
  • metrics.reporter.app.filter-by.select-scope: appmetric. I filter the metrics that have the mentioned scope in their scope list.
  • metrics.reporter.app.group-by: <job_name>. Then I group filtered metrics base on the job that they relate to it (<job_name>, a variable provided by Flink itself)
  • metrics.reporter.app.name-by.select-scope: appmetric. Then I assign a scoped name/Id to each metric in the group; in the result/report document, this id will be appeared

An example of inserted document in ElasticSearch:

  1. {
  2. "_index": "flink-metrics-forapp-mysimple_testjob-2018-8",
  3. "_type": "doc",
  4. "_id": "8dad13fe8dcbaabd42f1f5ced28702be-triggerwindow-processingtimesessionwindows-120000-reducingstatedescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.kryo.kryoserializer@1910163b-reducefunction=com.bisphone.calllog.writepath.mergelogevents-merge-}-com.bisphone.calllog.writepath.eventaggrigator$defaulttrigger@55787112-windowedstream.reduce-windowedstream.java-276-flat-map-map-map-calllog_aggr_rc1-operator_id-0-hadi.flink.tm.05-1534927371230",
  5. "_score": null,
  6. "_source": {
  7. "@meta.time.millis": 1534927371230,
  8. "@meta.uuid": "80a692dc-a63e-45df-831a-ccd2761a9795",
  9. "@meta.time.formatted": "2018-08-22 08:42:51",
  10. "@meta.source.id": "single-node",
  11. "even_numbs.count": 10362,
  12. "even_numbs.rate": 0.41000306771665294,
  13. "failures.count": 3,
  14. "failures.rate": 0.0001,
  15. "@meta.var.<job_id>": "b3f2321e76bce360734c1ffb459209c0",
  16. "@meta.var.<operator_name>": "Flat Map",
  17. "@meta.var.<task_name>": "TriggerWindow(ProcessingTimeSessionWindows(120000), ReducingStateDescriptor{...}) -> Flat Map -> Map -> Map",
  18. "@meta.var.<tm_id>": "8dad13fe8dcbaabd42f1f5ced28702be",
  19. "@meta.var.<millis>": "1534927371230",
  20. "@meta.var.<subtask_index>": "0",
  21. "@meta.var.<source_id>": "single-node",
  22. "@meta.var.<job_name>": "mysimple_testjob",
  23. "@meta.var.<host>": "localhost",
  24. "@meta.var.<month>": "8",
  25. "@meta.var.<day_of_month>": "22",
  26. "@meta.var.<year>": "2018",
  27. "@meta.var.<task_id>": "bf80c198166281d628838ffa871a5ca6",
  28. "@meta.var.<task_attempt_num>": "0",
  29. "@meta.var.<task_attempt_id>": "6baec68aebe4586978347448ba922e47"
  30. },
  31. "fields": {
  32. "@meta.time.millis": [
  33. 1534927371230
  34. ]
  35. },
  36. "sort": [
  37. 1534927371230
  38. ]
  39. }

Metrics of JobManager & TaskManager

  1. # JobManager
  2. metrics.reporter.jm.class: xyz.sigmalab.xtool.elasticreporter.v1.flink.Reporter
  3. metrics.reporter.jm.name: metric.JM
  4. metrics.reporter.jm.elastic-url: http://localhost:9200
  5. metrics.reporter.jm.source-id: single-node
  6. metrics.reporter.jm.index-pattern: flink-metrics-jobmanager-<year>-<month>
  7. metrics.reporter.jm.id-pattern: <host>-<millis>
  8. metrics.reporter.jm.datetime-pattern: yyyy-MM-dd HH:mm:ss
  9. metrics.reporter.jm.zone: UTC
  10. metrics.reporter.jm.filter-by.match-scope: <host>.jobmanager
  11. metrics.reporter.jm.filter-by.reject-vars: <job_name>.<tm_id>.<operator_name>
  12. metrics.reporter.jm.group-by: <host>
  13. metrics.reporter.jm.name-by.scope-drop-left: 2
  14. # TaskManager
  15. metrics.reporter.tm.class: xyz.sigmalab.xtool.elasticreporter.v1.flink.Reporter
  16. metrics.reporter.tm.name: metric.TM
  17. metrics.reporter.tm.elastic-url: http://localhost:9200
  18. metrics.reporter.tm.source-id: single-node
  19. metrics.reporter.tm.index-pattern: flink-metrics-taskmanager-<year>-<month>-<day_of_month>
  20. metrics.reporter.tm.id-pattern: <host>-<millis>
  21. metrics.reporter.tm.datetime-pattern: yyyy-MM-dd HH:mm:ss
  22. metrics.reporter.tm.zone: UTC
  23. metrics.reporter.tm.filter-by.scope: <host>.taskmanager.<tm_id>
  24. metrics.reporter.tm.filter-by.reject-vars: <job_name>
  25. metrics.reporter.tm.group-by: <tm_id>
  26. metrics.reporter.tm.name-by.scope-drop-left: 3
  • metrics.reporter.tm.filter-by.scope: <host>.taskmanager.<tm_id>: Filtering based on the match scope pattern
  • metrics.reporter.jm.filter-by.reject-vars: <job_name>.<tm_id>.<operator_name>: If the mentioned variabled/scopes apeared in MetricGroup, drop it! (these two filters will be applied sequentially)
  • metrics.reporter.tm.group-by: <tm_id> or metrics.reporter.jm.group-by: <host>: This means that you will have one group per Reporter instance in every JobManager/TaskManagr
  • metrics.reporter.jm.name-by.scope-drop-left: 2: A little weired! but this will transform localhost.jobmanager.Stats.Network... to Stats.Network...
Catch changes in metrics’ references

This will log all added/removed meterics in configured index.
In this way I’ll be able to cache changes in metrics references
to consider changes in metrics/reporters configurations if needed.

  1. # Debugger
  2. metrics.reporter.debug.class: xyz.sigmalab.xtool.elasticreporter.v1.flink.Debugger
  3. metrics.reporter.debug.name: metric.Debugger
  4. metrics.reporter.debug.elastic-url: http://localhost:9200
  5. metrics.reporter.debug.source-id: single-node
  6. metrics.reporter.debug.index-pattern: flink-mterics-debug-<year>-<month>
  7. metrics.reporter.debug.id-pattern: <source_id>-<millis>-<uuid>
  8. metrics.reporter.debug.datetime-pattern: yyyy-MM-dd HH:mm:ss
  9. metrics.reporter.debug.zone: UTC

All together

  1. metrics.reporters: debug, jm, tm, jobstat, jobtask, joboperator
  2. # Debugger
  3. metrics.reporter.debug.class: xyz.sigmalab.xtool.elasticreporter.v1.flink.Debugger
  4. metrics.reporter.debug.name: metric.Debugger
  5. metrics.reporter.debug.elastic-url: http://localhost:9200
  6. metrics.reporter.debug.source-id: single-node
  7. metrics.reporter.debug.index-pattern: flink-mterics-debug-<year>-<month>
  8. metrics.reporter.debug.id-pattern: <source_id>-<millis>-<uuid>
  9. metrics.reporter.debug.datetime-pattern: yyyy-MM-dd HH:mm:ss
  10. metrics.reporter.debug.zone: UTC
  11. # Application
  12. metrics.reporter.app.class: xyz.sigmalab.xtool.elasticreporter.v1.flink.Reporter
  13. metrics.reporter.app.name: metric.Application
  14. metrics.reporter.app.elastic-url: http://localhost:9200
  15. metrics.reporter.app.source-id: single-node
  16. metrics.reporter.app.index-pattern: flink-metrics-forapp-<job_name>-<year>-<month>
  17. metrics.reporter.app.id-pattern: <tm_id>-<task_name>-<job_name>-<operator_id>-<subtask_index>-<source_id>-<millis>
  18. metrics.reporter.app.datetime-pattern: yyyy-MM-dd HH:mm:ss
  19. metrics.reporter.app.zone: UTC
  20. metrics.reporter.app.filter-by.select-scope: appmetric
  21. metrics.reporter.app.group-by: <job_name>
  22. metrics.reporter.app.name-by.select-scope: appmetric
  23. # JobManager
  24. metrics.reporter.jm.class: xyz.sigmalab.xtool.elasticreporter.v1.flink.Reporter
  25. metrics.reporter.jm.name: metric.JM
  26. metrics.reporter.jm.elastic-url: http://localhost:9200
  27. metrics.reporter.jm.source-id: single-node
  28. metrics.reporter.jm.index-pattern: flink-metrics-jobmanager-<year>-<month>
  29. metrics.reporter.jm.id-pattern: <host>-<millis>
  30. metrics.reporter.jm.datetime-pattern: yyyy-MM-dd HH:mm:ss
  31. metrics.reporter.jm.zone: UTC
  32. metrics.reporter.jm.filter-by.match-scope: <host>.jobmanager
  33. metrics.reporter.jm.filter-by.reject-vars: <job_name>.<tm_id>.<operator_name>
  34. metrics.reporter.jm.group-by: <host>
  35. metrics.reporter.jm.name-by.scope-drop-left: 2
  36. # TaskManager
  37. metrics.reporter.tm.class: xyz.sigmalab.xtool.elasticreporter.v1.flink.Reporter
  38. metrics.reporter.tm.name: metric.TM
  39. metrics.reporter.tm.elastic-url: http://localhost:9200
  40. metrics.reporter.tm.source-id: single-node
  41. metrics.reporter.tm.index-pattern: flink-metrics-taskmanager-<year>-<month>-<day_of_month>
  42. metrics.reporter.tm.id-pattern: <host>-<millis>
  43. metrics.reporter.tm.datetime-pattern: yyyy-MM-dd HH:mm:ss
  44. metrics.reporter.tm.zone: UTC
  45. metrics.reporter.tm.filter-by.scope: <host>.taskmanager.<tm_id>
  46. metrics.reporter.tm.filter-by.reject-vars: <job_name>
  47. metrics.reporter.tm.group-by: <tm_id>
  48. metrics.reporter.tm.name-by.scope-drop-left: 3
  49. # ====================================================================================
  50. # Jobs Stats
  51. metrics.reporter.jobstat.class: xyz.sigmalab.xtool.elasticreporter.v1.flink.Reporter
  52. metrics.reporter.jobstat.name: metric.JobState
  53. metrics.reporter.jobstat.elastic-url: http://localhost:9200
  54. metrics.reporter.jobstat.source-id: single-node
  55. metrics.reporter.jobstat.index-pattern: flink-metrics-jobstat-<job_name>-at-<year>-<month>-<day_of_month>
  56. metrics.reporter.jobstat.id-pattern: <task_id>-<subtask_index>-<source_id>-<millis>
  57. metrics.reporter.jobstat.datetime-pattern: yyyy-MM-dd HH:mm:ss
  58. metrics.reporter.jobstat.zone: UTC
  59. metrics.reporter.jobstat.filter-by.scope: <host>.jobmanager.<job_name>
  60. metrics.reporter.jobstat.filter-by.reject-vars: <tm_id>.<operator_id>
  61. metrics.reporter.jobstat.group-by: <job_name>
  62. metrics.reporter.jobstat.name-by.scope-drop-left: 3
  63. # Task Metrics Per Job
  64. metrics.reporter.jobtask.class: xyz.sigmalab.xtool.elasticreporter.v1.flink.Reporter
  65. metrics.reporter.jobtask.name: metric.TaskOfJob
  66. metrics.reporter.jobtask.elastic-url: http://localhost:9200
  67. metrics.reporter.jobtask.source-id: single-node
  68. metrics.reporter.jobtask.index-pattern: flink-metrics-forjob-<job_name>-at-<year>-<month>-<day_of_month>
  69. metrics.reporter.jobtask.id-pattern: <task_id>-<subtask_index>-<source_id>-<millis>
  70. metrics.reporter.jobtask.datetime-pattern: yyyy-MM-dd HH:mm:ss
  71. metrics.reporter.jobtask.zone: UTC
  72. metrics.reporter.jobtask.filter-by.scope: <host>.taskmanager.<tm_id>.<job_name>.<task_name>.<subtask_index>
  73. metrics.reporter.jobtask.group-by: <job_name>-<task_id>-<subtask_index>
  74. metrics.reporter.jobtask.name-by.scope-drop-left: 6
  75. # Operator Metrics Per Job
  76. metrics.reporter.joboperator.class: com.sameei.xtool.elasticreporter.v1.flink.Reporter
  77. metrics.reporter.joboperator.name: metric.OpteratorOfJob
  78. metrics.reporter.joboperator.elastic-url: http://localhost:9200
  79. metrics.reporter.joboperator.source-id: single-node
  80. metrics.reporter.joboperator.index-pattern: flink-metrics-foroperator-<job_name>-<operator_name>-<year>-<month>-<day_of_month>
  81. metrics.reporter.joboperator.id-pattern: <operator_id>-<subtask_index>-<source_id>-<millis>
  82. metrics.reporter.joboperator.datetime-pattern: yyyy-MM-dd HH:mm:ss
  83. metrics.reporter.joboperator.zone: UTC
  84. metrics.reporter.joboperator.filter-by.scope: <host>.taskmanager.<tm_id>.<job_name>.<operator_name>.<subtask_index>
  85. metrics.reporter.joboperator.group-by: <job_name>-<operator_id>-<subtask_index>
  86. metrics.reporter.joboperator.name-by.scope-drop-left: 6
  87. # ====================================================================================
  88. # metrics.reporter.{}.name-by.selected-scope = developerdefined
  89. # metrics.reporter.{}.filter-by.selected-scope = developerdefined
Available Variables

Use with Kamon

  1. val reporter = xyz.sigmalab.xtool.elasticreporter.v1.kamon.Reporter.fromConfig(typesafeConfig, "app.reporter.config")
  2. kamon.Kamon.addReporter(reporter)
  1. app.reporter.config = {
  2. name = "kamon.elastic-reporter"
  3. source-id = "SimpleAkkaApp"
  4. elastic-url = "http://localhost:9200"
  5. index-pattern = "<source_id>-<year>-<month>-<day_of_month>"
  6. id-pattern = "<source_id>-<millis>"
  7. datetime-pattern = "yyyy-MM-dd HH:mm:ss"
  8. datetime-zone = "UTC"
  9. }