Rootop 服务器运维与web架构

fluentd接收docker日志转存到kafka (elasticsearch+fluentd+kafka+logstash+kibana)

系统版本:centos7
IP:192.168.10.74

组件版本:
logstash 6.6.2
elasticsearch 6.5.4
kibana 6.5.4
fluentd 1.3.2
kafka 2.12-2.3.0
都安装在本机。

目的:
docker容器(应用日志输出为json格式)日志通过log-driver直接输出到fluentd。
fluentd将接收的日志生产转存到kafka消息队列。
logstash从kafka中消费日志,经过处理后输出到elasticsearch中用于检索。
kibana展示。

# kafka消息队列
配置参考:https://www.rootop.org/pages/4508.html
此处略过

# 启动一个fluentd

[root@localhost]# docker pull fluentd
[root@localhost]# docker run -dit --name fluentd -p 24224:24224 -p 24224:24224/udp docker.io/fluent/fluentd

# 进入fluentd容器,配置转存kafka
fluentd docker版安装kafka插件 官网文档:https://docs.fluentd.org/output/kafka
1、安装插件

# fluent-gem install fluent-plugin-kafka

2、编辑配置文件

# vi /fluentd/etc/fluent.conf

<source>
  @type  forward
  @id    input1
  @label @mainstream
  port  24224
</source>

<filter **>
  @type stdout
</filter>

<label @mainstream>
  <match docker.**>
    @type file
    @id   output_docker1
    path         /fluentd/log/docker.*.log
    symlink_path /fluentd/log/docker.log
    append       true
    time_slice_format %Y%m%d
    time_slice_wait   1m
    time_format       %Y%m%dT%H%M%S%z
  </match>

  <match **>
    @type kafka2

    # list of seed brokers,这个地方可以通过逗号写多个地址比如 host1:9092,host2:9092
    brokers 192.168.10.74:9092
    use_event_time true

    # buffer settings
    <buffer topic>
	@type file
	# 下面的path可能需要手动创建目录,并给写入权限,我直接给了777
	path /var/log/td-agent/buffer/td
	flush_interval 3s
    </buffer>

    # data type settings
    <format>
	@type json
    </format>

    # kafka中创建的topic
    topic_key test
	# 默认topic
    default_topic test
    get_kafka_client_log true
    # producer settings
    required_acks -1
    compression_codec gzip
  </match>
</label>

保存退出,重启容器。

# docker容器通过log-driver输出到fluentd

[root@localhost]# docker run -dit --name name-api-2 -v /home/dockermount/api:/mnt --publish-all --log-driver=fluentd --log-opt fluentd-address=192.168.10.74:24224 --log-opt fluentd-async-connect java8t

docker fluentd官方资料:https://docs.docker.com/config/containers/logging/fluentd/
fluentd-async-connect # 此参数可以防止连不上fluentd导致容器退出。
Docker connects to Fluentd in the background. Messages are buffered until the connection is established. Defaults to false.
If container cannot connect to the Fluentd daemon, the container stops immediately unless the fluentd-async-connect option is used.
目前为止,容器日志已经可以写入kafka了。

# 安装es、kibana

[root@localhost log]# docker run -dit --name es -p 9200:9200 -p 9200:9200/udp elasticsearch:6.5.4
[root@localhost log]# docker run -dit --name kibana -e ELASTICSEARCH_HOST=http://192.168.10.74:9200 -e ELASTICSEARCH_URL=http://192.168.10.74:9200 -p 5601:5601 kibana:6.5.4

# 配置logstash配置文件,直接rpm安装,过程略。

[root@localhost ~]# cd /usr/share/logstash/
[root@localhost logstash]# cat kafka.conf 
input {

	kafka {
		bootstrap_servers => ["192.168.10.74:9092"]
		client_id => "test1"
		group_id => "test1"
		auto_offset_reset => "latest"
		consumer_threads => 1
		decorate_events => false
		topics => ["test"]
		type => "fromk"
	}
}

filter {
	
    json {
		# 将message字段的key及value(json格式)导入到es,在根部生成新字段。
        source => "message"
		# 添加新列,便于下面再执行source
		add_field => { "@javalog" => "%{log}" }
    }

	# 将json中的json字段、值导入到es中(json嵌套json)
	# {"xxx":"xxx","log":{"time":"xxx","path":"xxx"}}
	# 即导入json中的log


	# 第二次解析json串
    json {
		source => "@javalog"
		# 移除没用的字段
		remove_field => [ "log","@javalog" ]
    }
}

output
{

	elasticsearch {
		hosts => "192.168.10.74"
		index => "jar-log-%{+YYYY.MM.dd}"
	}

	stdout {
		codec => rubydebug
	}

}

# 启动logstash

[root@localhost logstash]# logstash -f kafka.conf

# 容器中java应用日志输出格式为:

{"@timestamp":"2019-08-22T15:09:26.801+08:00","@version":"1","message":"运行时报错:","logger_name":"com.sailei.modules.test.controller.TestController","threa"level":"INFO","level_value":20000}

# docker fluentd日志输出会将应用产生的日志再加入4个元数据发给fluentd(仍然是json格式):
container_id #The full 64-character container ID.
container_name #The container name at the time it was started. If you use docker rename to rename a container, the new name is not reflected in the journal entries.
source #stdout or stderr
log #The container log 应用日志会在log字段中做为value

所以要在logstash中处理json嵌套,获取log字段中数据(提取到根部,便于检索)。
(本来想用filebeat处理json问题,结果因为log字段和filebeat内部一个方法关键词冲突,导致log字段中的数据无法加到根部,才换的logstash)

原创文章,转载请注明。本文链接地址: https://www.rootop.org/pages/4521.html

作者:Venus

服务器运维与性能优化

评论已关闭。