对接 Logstash 和 Kafka
Logstash 是一个开源数据收集引擎,支持从多个数据源采集数据、转换数据、并将数据写入到指定存储中。本节介绍如何对接 Logstash 和 Kafka。
前提条件
-
已获取 KubeSphere 企业版平台登录账号和密码,且已获取平台管理权限。
-
已经创建好 Kafka 和 OpenSearch 集群,且集群状态处于运行中。
Kafka 集群操作步骤
-
以 platform-admin 角色登录 KubeSphere 企业版 Web 控制台并进入数据库管理平台。
-
在左侧导航栏选择 Kafka。
-
在 Kafka 集群列表中,点击一个集群名称打开其详情页面。
-
在页面右侧点击主题。
-
在创建主题对话框,输入主题名称,并设置分区数量和副本数量,点击确定。
-
点击 Kafka 用户,然后点击创建。
-
在创建用户对话框,设置用户名为 logstash,然后点击确定。用户创建完成后将显示在用户列表中。
-
在页面左侧的传输加密区域,下载 CA 证书并保存密码。
-
在页面右侧点击 Kafka 用户页签,下载用户证书并保存密码。
-
上传证书到服务器,根据 keystore 和 truststore 创建 secret 或 configmap。
# 创建 secret kubectl -n p1 create secret generic test-kafka-ssl --from-file ./cluster.keystore.p12 --from-file ./cluster.truststore.p12 # 创建 configmap kubectl -n p1 create cm test-cm-kafka --from-file=/tmp/cluster.keystore.p12 --from-file=/tmp/cluster.truststore.p12
OpenSearch 集群操作步骤
-
配置 Logstash CR 文件 logstash-kafka-ssl.yml。
apiVersion: opensearch.opster.io/v1 kind: Logstash metadata: name: radondb-os spec: replicas: 1 config: jvm: "-Xms512m -Xmx512m" openSearchInfo: openSearchCluster: name: radondb-os # opensearch 集群名称 namespace: p1 # opensearch 所在的 ns ports: [8080] # bootstrap_servers : kafka :读写地址 # security_protocol : 认证类型 :固定值 SSL # ssl_keystore_location : 用户证书的全路径 :固定值 /usr/share/ssl/cluster.keystore.p12 # ssl_keystore_password : 用户密码 :上述 logstash 用户证书密码 # ssl_keystore_type : 用户证书格式 :PKCS12 (kse中的 kafka 用此格式,根据实际情况调整) # ssl_truststore_location :ca 证书的全路径 :固定值 /usr/share/ssl/cluster.truststore.p12 # ssl_truststore_password : ca 证书的密码 :上述 ca 证书密码 # ssl_truststore_type : ca 证书的格式 :PKCS12 (kse中的 kafka 用此格式,根据实际情况调整) # ssl_endpoint_identification_algorithm :固定值 "" pipelineConfig: inputs: |- kafka { topics => ["test-topic","test-topic-1"] bootstrap_servers => "radondb-85z8fi-kafka-external-bootstrap.p1:9092" security_protocol => "SSL" ssl_keystore_location => "/usr/share/ssl/cluster.keystore.p12" ssl_keystore_password => "HZznFT_xv1BXDA6NkqBC7PbT9_vzI1t2" ssl_keystore_type => "PKCS12" ssl_truststore_location => "/usr/share/ssl/cluster.truststore.p12" ssl_truststore_password => "HZznFT_xv1BXDA6NkqBC7PbT9_vzI1t2" ssl_truststore_type => "PKCS12" ssl_endpoint_identification_algorithm => "" } filters: |- mutate { add_field => { "LogstashHost" => "${HOSTNAME}" } } outputs: openSearchIndex: |- # 保存到 opensearch 的索引设置 index => "logstash-%{+YYYY.MM.dd}" podTemplate: spec: containers: - name: logstash resources: requests: memory: "1Gi" cpu: "500m" limits: memory: "1Gi" cpu: "500m" volumeMounts: - name: certification-volume mountPath: /usr/share/ssl volumes: - name: certification-volume # test-secret-kafka Secret 需要提前创建,命令如下 # kubectl -n p1 create secret generic test-kafka-ssl --from-file ./cluster.keystore.p12 --from-file ./cluster.truststore.p12 secret: secretName: test-secret-kafka # 或者使用 configmap 挂载,也需要提前创建,命令如下 # kubectl -n p1 create cm test-cm-kafka --from-file=/tmp/cluster.keystore.p12 --from-file=/tmp/cluster.truststore.p12 # configMap: # name: test-cm-kafka
-
执行以下命令创建 Logstash。
kubectl -n p1 apply -f ./logstash-kafka-ssl.yml
验证 Input 是否生效
-
在客户端节点执行以下命令,向主题发送消息。
./kafka-console-producer.sh --broker-list {连接地址} --topic {Topic 名称}
-
连接地址:所连接的 Kafka 集群的地址,格式为 host_ip1:port,host_ip2:port,host_ip3:port。host_ip 为 Kafka 节点的 IP 地址,port 为客户端节点的访问端口 9092。
-
Topic 名称:创建的主题名称。
-
-
输入需要发送的消息内容,按 Enter 发送消息,每一行的内容都将作为一条消息发送到 Kafka。
示例如下:
./kafka-console-producer.sh --broker-list 172.22.2.124:9092,172.22.2.125:9092,192.22.2.126:9092 --topic test-topic >hello world >test msg1
-
在 OpenSearch Dashboards 查看数据写入情况,如果能够找到索引 logstash-yyyy.MM.dd,则证明消息写入成功。
GET _cat/indices
-
执行以下命令查看写入到主题的信息是否完整。
GET /logstash-2022.08.16/_search