Logstash 是一个开源数据收集引擎,支持从多个数据源采集数据、转换数据、并将数据写入到指定存储中。本节介绍如何对接 Logstash 和 Kafka。

前提条件

  • 已获取 KubeSphere 企业版平台登录账号和密码,且已获取平台管理权限。

  • 已经创建好 Kafka 和 OpenSearch 集群,且集群状态处于运行中。

Kafka 集群操作步骤

  1. 以 platform-admin 角色登录 KubeSphere 企业版 Web 控制台并进入数据库管理平台。

  2. 在左侧导航栏选择 Kafka

  3. 在 Kafka 集群列表中,点击一个集群名称打开其详情页面。

  4. 在页面右侧点击主题

  5. 创建主题对话框,输入主题名称,并设置分区数量和副本数量,点击确定

  6. 点击 Kafka 用户,然后点击创建

  7. 创建用户对话框,设置用户名为 logstash,然后点击确定。用户创建完成后将显示在用户列表中。

  8. 在页面左侧的传输加密区域,下载 CA 证书并保存密码。

  9. 在页面右侧点击 Kafka 用户页签,下载用户证书并保存密码。

    download CA user certificate
  10. 上传证书到服务器,根据 keystore 和 truststore 创建 secret 或 configmap。

    # 创建 secret
    kubectl -n p1 create secret generic test-kafka-ssl --from-file ./cluster.keystore.p12  --from-file ./cluster.truststore.p12
    # 创建 configmap
    kubectl -n p1 create cm test-cm-kafka --from-file=/tmp/cluster.keystore.p12 --from-file=/tmp/cluster.truststore.p12

OpenSearch 集群操作步骤

  1. 配置 Logstash CR 文件 logstash-kafka-ssl.yml

    apiVersion: opensearch.opster.io/v1
    kind: Logstash
    metadata:
      name: radondb-os
    spec:
      replicas: 1
      config:
        jvm: "-Xms512m -Xmx512m"
        openSearchInfo:
          openSearchCluster:
            name: radondb-os # opensearch 集群名称
            namespace: p1 # opensearch 所在的 ns
        ports: [8080]
        # bootstrap_servers       : kafka             :读写地址
        # security_protocol       : 认证类型          :固定值 SSL
        # ssl_keystore_location   : 用户证书的全路径  :固定值 /usr/share/ssl/cluster.keystore.p12
        # ssl_keystore_password   : 用户密码          :上述 logstash 用户证书密码
        # ssl_keystore_type       : 用户证书格式      :PKCS12 (kse中的 kafka 用此格式,根据实际情况调整)
        # ssl_truststore_location :ca 证书的全路径   :固定值 /usr/share/ssl/cluster.truststore.p12
       # ssl_truststore_password : ca 证书的密码     :上述 ca 证书密码
        # ssl_truststore_type     : ca 证书的格式     :PKCS12 (kse中的 kafka 用此格式,根据实际情况调整)
        # ssl_endpoint_identification_algorithm       :固定值 ""
        pipelineConfig:
          inputs: |-
            kafka {
              topics => ["test-topic","test-topic-1"]
              bootstrap_servers => "radondb-85z8fi-kafka-external-bootstrap.p1:9092"
              security_protocol => "SSL"
              ssl_keystore_location => "/usr/share/ssl/cluster.keystore.p12"
              ssl_keystore_password => "HZznFT_xv1BXDA6NkqBC7PbT9_vzI1t2"
              ssl_keystore_type => "PKCS12"
              ssl_truststore_location => "/usr/share/ssl/cluster.truststore.p12"
              ssl_truststore_password => "HZznFT_xv1BXDA6NkqBC7PbT9_vzI1t2"
              ssl_truststore_type => "PKCS12"
              ssl_endpoint_identification_algorithm => ""
            }
          filters: |-
            mutate {
              add_field => { "LogstashHost" => "${HOSTNAME}" }
            }
          outputs:
          openSearchIndex: |- # 保存到 opensearch 的索引设置
            index => "logstash-%{+YYYY.MM.dd}"
      podTemplate:
        spec:
          containers:
            - name: logstash
              resources:
                requests:
                  memory: "1Gi"
                  cpu: "500m"
                limits:
                  memory: "1Gi"
                  cpu: "500m"
              volumeMounts:
                - name: certification-volume
                  mountPath: /usr/share/ssl
          volumes:
            - name: certification-volume
              # test-secret-kafka Secret 需要提前创建,命令如下
              # kubectl -n p1 create secret generic test-kafka-ssl --from-file ./cluster.keystore.p12  --from-file ./cluster.truststore.p12
              secret:
                secretName: test-secret-kafka
              # 或者使用 configmap 挂载,也需要提前创建,命令如下
              # kubectl -n p1 create cm test-cm-kafka --from-file=/tmp/cluster.keystore.p12 --from-file=/tmp/cluster.truststore.p12
              # configMap:
              #   name: test-cm-kafka
  2. 执行以下命令创建 Logstash。

kubectl -n p1 apply -f ./logstash-kafka-ssl.yml

验证 Input 是否生效

  1. 在客户端节点执行以下命令,向主题发送消息。

    ./kafka-console-producer.sh --broker-list {连接地址} --topic {Topic 名称}
    • 连接地址:所连接的 Kafka 集群的地址,格式为 host_ip1:port,host_ip2:port,host_ip3:port。host_ip 为 Kafka 节点的 IP 地址,port 为客户端节点的访问端口 9092。

    • Topic 名称:创建的主题名称。

  2. 输入需要发送的消息内容,按 Enter 发送消息,每一行的内容都将作为一条消息发送到 Kafka。

    示例如下:

    ./kafka-console-producer.sh --broker-list 172.22.2.124:9092,172.22.2.125:9092,192.22.2.126:9092 --topic test-topic
    >hello world
    >test msg1
  3. 在 OpenSearch Dashboards 查看数据写入情况,如果能够找到索引 logstash-yyyy.MM.dd,则证明消息写入成功。

    GET _cat/indices
    kakfa index
  4. 执行以下命令查看写入到主题的信息是否完整。

    GET /logstash-2022.08.16/_search
    kafka input success