logstash过滤器-json插件解析多层json嵌套

版本:logstash 6.6.2
方法:用logstash过滤器中的json插件配置选项实现

首先介绍用到的参数
参数:add_field
作用:添加字段

参数:%{field}
作用:调用事件中的指定字段

参数:remove_field
作用:删除字段

官方插件资料:https://www.elastic.co/guide/en/logstash/6.6/plugins-filters-json.html
通过文档描述,json插件可以将事件中的json格式日志提取到事件的根中,根中的字段传递到elasticsearch后,可以做过滤条件,而且展示起来比较直观。

比如json日志格式为:

{
    "name":"root",
    "info":{
        "from":"host1",
        "path":"var/log/log.log"
    }
}

info字段的值还是一个json,现在需要将info中的数据提取到事件的根中,也就是变为如下格式。

{
    "name":"root",
    "from":"host1",
    "path":"var/log/log.log"
}

# logstash配置文件

[root@localhost logstash]# cat kafka.conf 
input {

	kafka {
		bootstrap_servers => ["192.168.10.74:9092"]
		client_id => "test1"
		group_id => "test1"
		auto_offset_reset => "latest"
		consumer_threads => 1
		decorate_events => false
		topics => ["test"]
		type => "fromk"
	}
 
}

# 将message字段的数据(json格式) "导入"
filter {
	json {
		# 输入的日志,都会进入message字段,使用source解析json。
		source => "message"
		# 添加列,便于下面再执行source
		add_field => { "javalog" => "%{info}" }
	}

	# 第二次解析json串
	json {
		source => "javalog"
		# 移除列,info字段和javalog字段不需转存到elasticsearch。
		remove_field => [ "info","javalog" ]
	}
}


output {

	elasticsearch {
		hosts => "192.168.10.74"
		index => "jar-log-%{+YYYY.MM.dd}"
	}

	stdout {
		codec => rubydebug
	}
}

效果:

kibana中展示效果

kafka单机版配置

官方这个kafka版本集成了zookeeper,不需要单独安装。

[root@localhost ~]# wget -c http://mirror.bit.edu.cn/apache/kafka/2.3.0/kafka_2.12-2.3.0.tgz
[root@localhost ~]# mv kafka_2.12-2.3.0.tgz /usr/local
[root@localhost ~]# cd /usr/local/
[root@localhost local]# tar zxvf kafka_2.12-2.3.0.tgz
[root@localhost local]# ln -s kafka_2.12-2.3.0 kafka
[root@localhost local]# cd kafka/bin/

# 启动zookeeper,默认端口2181

[root@localhost bin]# ./zookeeper-server-start.sh -daemon ../config/zookeeper.properties

# kafka配置文件需要注意 listeners 参数,直接写服务器ip

[root@localhost config]# cat server.properties | grep -vE "^#|^$"
broker.id=0
# listeners 这个值会返给客户端(因为这里卡了2天没解决),具体信息看配置文件里的解释。
listeners=PLAINTEXT://192.168.10.74:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
# 指定zookeeper地址,因为在本机,直接默认localhost
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0

# 启动kafka

[root@localhost bin]# ./kafka-server-start.sh -daemon ../config/server.properties 

# 创建topic,类似创建一个数据库。

[root@localhost ~]# /usr/local/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

# 查看已创建的topic

[root@localhost ~]# /usr/local/kafka/bin/kafka-topics.sh --list --zookeeper localhost:2181

# 查看指定的topic信息

[root@localhost ~]# /usr/local/kafka/bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test

# 启动一个生产者

[root@localhost ~]# /usr/local/kafka/bin/kafka-console-producer.sh --broker-list 192.168.10.74:9092 --topic test

# 启动一个消费者

[root@localhost ~]# /usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic test

在生产者里输入内容并回车,在消费者里就可以显示出生产者输入的内容。

fluentd发送日志到kafka失败 Failed to connect to localhost:9092: Connection refused

# td-agent配置的broker地址

[root@abc td-agent]# cat /etc/td-agent/td-agent.conf
略
# 指定kafka地址及端口
brokers 192.168.10.74:9092
略

原本以为这样配置后,fluentd接收到的日志会直接发送到kafka。
(手动启动生产者,消费者,插入内容发现是可以消费的。)

# 但是看fluentd日志发现问题:

[root@abc td-agent]# tail -f /var/log/td-agent/td-agent.log
2019-08-21 14:30:05 +0800 [info]: #0 Fetching cluster metadata from kafka://192.168.10.74:9092
2019-08-21 14:30:05 +0800 [info]: #0 Discovered cluster metadata; nodes: localhost:9092 (node_id=0)

第一步从指定的地址获取集群metadata信息,第二步查找节点,发现地址是localhost:9092,一直没找到这个localhost地址是从哪里配置的。
然后fluentd发送日志到这个地址总是失败。
# 看下昨天排查问题时的日志:

2019-08-20 19:35:31 +0000 [info]: #0 New topics added to target list: test
2019-08-20 19:35:31 +0000 [info]: #0 Fetching cluster metadata from kafka://192.168.10.74:9092
2019-08-20 19:35:31 +0000 [info]: #0 Discovered cluster metadata; nodes: localhost:9092 (node_id=0)
2019-08-20 19:35:31 +0000 [info]: #0 Sending 7 messages to localhost:9092 (node_id=0)
2019-08-20 19:35:31 +0000 [error]: #0 [produce] Failed to connect to localhost:9092: Connection refused - connect(2) for [::1]:9092
2019-08-20 19:35:31 +0000 [error]: #0 Could not connect to broker localhost:9092 (node_id=0): Connection refused - connect(2) for [::1]:9092
2019-08-20 19:35:31 +0000 [warn]: #0 Failed to send all messages; attempting retry 1 of 2 after 1s
2019-08-20 19:35:31.833125548 +0000 fluent.info: {"message":"New topics added to target list: test"}
2019-08-20 19:35:31.833243919 +0000 fluent.info: {"message":"Fetching cluster metadata from kafka://192.168.10.74:9092"}
2019-08-20 19:35:31.836307791 +0000 fluent.info: {"message":"Discovered cluster metadata; nodes: localhost:9092 (node_id=0)"}
2019-08-20 19:35:31.836867642 +0000 fluent.info: {"message":"Sending 7 messages to localhost:9092 (node_id=0)"}

# 觉得这个地址还是需要在kafka配置文件配置,终于找到一个参数。

[root@abc config]# vi /usr/local/kafka/config/server.properties
listeners=PLAINTEXT://192.168.10.74:9092

保存退出。

去zookeeper命令行里删除一个key。

[root@abc bin]# ./zookeeper-shell.sh localhost:2181
Connecting to localhost:2181
Welcome to ZooKeeper!
JLine support is disabled

WATCHER::

WatchedEvent state:SyncConnected type:None path:null
get /brokers/ids/0 # 查看 
{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://192.168.10.74:9092"],"jmx_port":-1,"host":"192.168.10.74","timestamp":"1566369132247","port":9092,"version":4}
cZxid = 0x2c7
ctime = Wed Aug 21 14:32:12 CST 2019
mZxid = 0x2c7
mtime = Wed Aug 21 14:32:12 CST 2019
pZxid = 0x2c7
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x100055b01b60002
dataLength = 196
numChildren = 0
rmr /brokers/ids/0 # 删除

(我之前endpoints和host的值都是localhost,所以要删除,让kafka重新写入)

# 获取键值
get /brokers/ids/0
# 删除键
rmr /brokers/ids/0

重启zookeeper,重启kafka。

再查看fluentd日志,这次获取的nodes地址对了。

2019-08-21 14:32:38.014038626 +0800 fluent.info: {"message":"Fetching cluster metadata from kafka://192.168.10.74:9092"}
2019-08-21 14:32:38.018237952 +0800 fluent.info: {"message":"Discovered cluster metadata; nodes: 192.168.10.74:9092 (node_id=0)"}

消息终于发送到kafka了,而且开启一个消费者也能看到内容了。

docker swarm 清理非Up状态容器

[root@m ~]# docker service update ··· # 更新服务

更新服务镜像后,发现旧的容器没有删除,变为Exited(Exited、Shutdown等)状态,查资料也没有说多久会删除。
(放了两个周也没自动清理),可能旧容器会一直存在,有其它用处。
有文章称保留是为了服务回滚操作。反正看着没啥用,就找删除方法。
在docker版本1.13之后可以用命令清理。

# 查看相关清理命令

[root@m ~]# docker system --help

Usage:	docker system COMMAND

Manage Docker

Commands:
  df          Show docker disk usage
  events      Get real time events from the server
  info        Display system-wide information
  prune       Remove unused data

# 清理命令相关参数

[root@m ~]# docker system prune --help

Usage:	docker system prune [OPTIONS]

Remove unused data

Options:
  -a, --all             Remove all unused images not just dangling ones
      --filter filter   Provide filter values (e.g. 'label=<key>=<value>')
  -f, --force           Do not prompt for confirmation
      --volumes         Prune volumes

通过 -a 参数可以清理未使用的镜像,-f 强制删除,不提示确认。

# 清理旧容器及不用的镜像

[root@m ~]# docker system prune -a -f

可以添加到任务计划里定时清理,或者是更新完服务测试后没问题通过脚本批量清理。

jenkins删除历史构建

原文:https://www.cnblogs.com/shaobin0604/p/9680621.html
找到 系统管理 – 脚本命令行

def jobName = "Some_Job_Name"
def maxNumber = 64

Jenkins.instance.getItemByFullName(jobName).builds.findAll {
  it.number <= maxNumber
}.each {
  it.delete()
}

jobName 是项目名称
maxNumber 是删除多少个