Kafka

Collect logs and metrics from Kafka servers with Elastic Agent.

Version
1.16.0 (View all)
Compatible Kibana version(s)
8.13.0 or higher
Supported Serverless project types

Security
Observability
Subscription level
Basic
Level of support
Elastic

This integration collects logs and metrics from Kafka servers.

Compatibility

The log dataset is tested with logs from Kafka 0.9, 1.1.0 and 2.0.0.

The broker, consumergroup, partition and producer metricsets are tested with Kafka 0.10.2.1, 1.1.0, 2.1.1, and 2.2.2.

The broker metricset requires Jolokia to fetch JMX metrics. Refer to the Metricbeat documentation about Jolokia for more information.

Logs

log

The log dataset collects and parses logs from Kafka servers.

ECS Field Reference

Please refer to the following document for detailed information on ECS fields.

Exported fields

FieldDescriptionType
@timestamp
Event timestamp.
date
cloud.image.id
Image ID for the cloud instance.
keyword
data_stream.dataset
Data stream dataset.
constant_keyword
data_stream.namespace
Data stream namespace.
constant_keyword
data_stream.type
Data stream type.
constant_keyword
event.dataset
Event dataset
constant_keyword
event.module
Event module
constant_keyword
host.containerized
If the host is a container.
boolean
host.os.build
OS build information.
keyword
host.os.codename
OS codename, if any.
keyword
kafka.log.class
Java class the log is coming from.
keyword
kafka.log.component
Component the log is coming from.
keyword
kafka.log.thread
Thread name the log is coming from.
keyword
kafka.log.trace.class
Java class the trace is coming from.
keyword
kafka.log.trace.message
Message part of the trace.
text

Metrics

broker

The broker dataset collects JMX metrics from Kafka brokers using Jolokia.

An example event for broker looks as following:

{
    "@timestamp": "2020-05-15T15:12:12.270Z",
    "agent": {
        "ephemeral_id": "178ff0e9-e3dd-4bdf-8e3d-8f67a6bd72ef",
        "id": "5aba67f2-2050-4d19-8953-ba20f0a5483c",
        "name": "kafka-01",
        "type": "metricbeat",
        "version": "8.0.0"
    },
    "ecs": {
        "version": "8.11.0"
    },
    "event": {
        "dataset": "kafka.broker",
        "duration": 4572918,
        "module": "kafka"
    },
    "kafka": {
        "broker": {
            "mbean": "kafka.server:name=BytesOutPerSec,topic=messages,type=BrokerTopicMetrics",
            "topic": {
                "net": {
                    "out": {
                        "bytes_per_sec": 0.6089809926927563
                    }
                }
            }
        }
    },
    "metricset": {
        "name": "broker",
        "period": 10000
    },
    "service": {
        "address": "localhost:8778",
        "type": "kafka"
    }
}

ECS Field Reference

Please refer to the following document for detailed information on ECS fields.

Exported fields

FieldDescriptionTypeMetric Type
@timestamp
Event timestamp.
date
agent.id
keyword
cloud.account.id
The cloud account or organization id used to identify different entities in a multi-tenant environment. Examples: AWS account id, Google Cloud ORG Id, or other unique identifier.
keyword
cloud.availability_zone
Availability zone in which this host is running.
keyword
cloud.image.id
Image ID for the cloud instance.
keyword
cloud.instance.id
Instance ID of the host machine.
keyword
cloud.provider
Name of the cloud provider. Example values are aws, azure, gcp, or digitalocean.
keyword
cloud.region
Region in which this host is running.
keyword
container.id
Unique container id.
keyword
data_stream.dataset
Data stream dataset.
constant_keyword
data_stream.namespace
Data stream namespace.
constant_keyword
data_stream.type
Data stream type.
constant_keyword
event.dataset
Event dataset
constant_keyword
event.module
Event module
constant_keyword
host.containerized
If the host is a container.
boolean
host.name
Name of the host. It can contain what hostname returns on Unix systems, the fully qualified domain name, or a name specified by the user. The sender decides which value to use.
keyword
host.os.build
OS build information.
keyword
host.os.codename
OS codename, if any.
keyword
kafka.broker.address
Broker advertised address
keyword
kafka.broker.id
Broker id
long
kafka.broker.log.flush_rate
The log flush rate
float
gauge
kafka.broker.mbean
Mbean that this event is related to
keyword
kafka.broker.messages_in
The incoming message rate
float
gauge
kafka.broker.net.in.bytes_per_sec
The incoming byte rate
float
gauge
kafka.broker.net.out.bytes_per_sec
The outgoing byte rate
float
gauge
kafka.broker.net.rejected.bytes_per_sec
The rejected byte rate
float
gauge
kafka.broker.replication.leader_elections
The leader election rate
float
gauge
kafka.broker.replication.unclean_leader_elections
The unclean leader election rate
float
gauge
kafka.broker.request.channel.queue.size
The size of the request queue
long
gauge
kafka.broker.request.fetch.failed
The number of client fetch request failures
float
counter
kafka.broker.request.fetch.failed_per_second
The rate of client fetch request failures per second
float
gauge
kafka.broker.request.produce.failed
The number of failed produce requests
float
counter
kafka.broker.request.produce.failed_per_second
The rate of failed produce requests per second
float
gauge
kafka.broker.session.zookeeper.disconnect
The ZooKeeper closed sessions per second
float
gauge
kafka.broker.session.zookeeper.expire
The ZooKeeper expired sessions per second
float
gauge
kafka.broker.session.zookeeper.readonly
The ZooKeeper readonly sessions per second
float
gauge
kafka.broker.session.zookeeper.sync
The ZooKeeper client connections per second
float
gauge
kafka.broker.topic.messages_in
The incoming message rate per topic
float
gauge
kafka.broker.topic.net.in.bytes_per_sec
The incoming byte rate per topic
float
gauge
kafka.broker.topic.net.out.bytes_per_sec
The outgoing byte rate per topic
float
gauge
kafka.broker.topic.net.rejected.bytes_per_sec
The rejected byte rate per topic
float
gauge
kafka.partition.id
Partition id.
long
kafka.partition.topic_broker_id
Unique id of the partition in the topic and the broker.
keyword
kafka.partition.topic_id
Unique id of the partition in the topic.
keyword
kafka.topic.error.code
Topic error code.
long
kafka.topic.name
Topic name
keyword
service.address
Address where data about this service was collected from. This should be a URI, network address (ipv4:port or [ipv6]:port) or a resource path (sockets).
keyword

consumergroup

An example event for consumergroup looks as following:

{
    "@timestamp": "2020-05-15T15:18:13.919Z",
    "agent": {
        "ephemeral_id": "178ff0e9-e3dd-4bdf-8e3d-8f67a6bd72ef",
        "id": "5aba67f2-2050-4d19-8953-ba20f0a5483c",
        "name": "kafka-01",
        "type": "metricbeat",
        "version": "8.0.0"
    },
    "ecs": {
        "version": "8.11.0"
    },
    "event": {
        "dataset": "kafka.consumergroup",
        "duration": 8821045,
        "module": "kafka"
    },
    "kafka": {
        "broker": {
            "address": "kafka-01:9092",
            "id": 0
        },
        "consumergroup": {
            "client": {
                "host": "127.0.0.1",
                "id": "consumer-console-consumer-99447-1",
                "member_id": "consumer-console-consumer-99447-1-208fdf91-2f28-4336-a2ff-5e5f4b8b71e4"
            },
            "consumer_lag": 112,
            "error": {
                "code": 0
            },
            "id": "console-consumer-99447",
            "meta": "",
            "offset": -1
        },
        "partition": {
            "id": 0,
            "topic_id": "0-messages"
        },
        "topic": {
            "name": "messages"
        }
    },
    "metricset": {
        "name": "consumergroup",
        "period": 10000
    },
    "service": {
        "address": "localhost:9092",
        "type": "kafka"
    }
}

ECS Field Reference

Please refer to the following document for detailed information on ECS fields.

Exported fields

FieldDescriptionTypeMetric Type
@timestamp
Event timestamp.
date
agent.id
keyword
cloud.account.id
The cloud account or organization id used to identify different entities in a multi-tenant environment. Examples: AWS account id, Google Cloud ORG Id, or other unique identifier.
keyword
cloud.availability_zone
Availability zone in which this host is running.
keyword
cloud.image.id
Image ID for the cloud instance.
keyword
cloud.instance.id
Instance ID of the host machine.
keyword
cloud.provider
Name of the cloud provider. Example values are aws, azure, gcp, or digitalocean.
keyword
cloud.region
Region in which this host is running.
keyword
container.id
Unique container id.
keyword
data_stream.dataset
Data stream dataset.
constant_keyword
data_stream.namespace
Data stream namespace.
constant_keyword
data_stream.type
Data stream type.
constant_keyword
event.dataset
Event dataset
constant_keyword
event.module
Event module
constant_keyword
host.containerized
If the host is a container.
boolean
host.name
Name of the host. It can contain what hostname returns on Unix systems, the fully qualified domain name, or a name specified by the user. The sender decides which value to use.
keyword
host.os.build
OS build information.
keyword
host.os.codename
OS codename, if any.
keyword
kafka.broker.address
Broker advertised address
keyword
kafka.broker.id
Broker id
long
kafka.consumergroup.client.host
Client host
keyword
kafka.consumergroup.client.id
Client ID (kafka setting client.id)
keyword
kafka.consumergroup.client.member_id
internal consumer group member ID
keyword
kafka.consumergroup.consumer_lag
consumer lag for partition/topic calculated as the difference between the partition offset and consumer offset
long
gauge
kafka.consumergroup.error.code
kafka consumer/partition error code.
long
kafka.consumergroup.id
Consumer Group ID
keyword
kafka.consumergroup.meta
custom consumer meta data string
keyword
kafka.consumergroup.offset
consumer offset into partition being read
long
gauge
kafka.partition.id
Partition id.
long
kafka.partition.topic_broker_id
Unique id of the partition in the topic and the broker.
keyword
kafka.partition.topic_id
Unique id of the partition in the topic.
keyword
kafka.topic.error.code
Topic error code.
long
kafka.topic.name
Topic name
keyword
service.address
Address where data about this service was collected from. This should be a URI, network address (ipv4:port or [ipv6]:port) or a resource path (sockets).
keyword

partition

An example event for partition looks as following:

{
    "@timestamp": "2020-05-15T15:19:44.240Z",
    "agent": {
        "ephemeral_id": "178ff0e9-e3dd-4bdf-8e3d-8f67a6bd72ef",
        "id": "5aba67f2-2050-4d19-8953-ba20f0a5483c",
        "name": "kafka-01",
        "type": "metricbeat",
        "version": "8.0.0"
    },
    "ecs": {
        "version": "8.11.0"
    },
    "event": {
        "dataset": "kafka.partition",
        "duration": 11263377,
        "module": "kafka"
    },
    "kafka": {
        "broker": {
            "address": "kafka-01:9092",
            "id": 0
        },
        "partition": {
            "id": 0,
            "offset": {
                "newest": 111,
                "oldest": 0
            },
            "partition": {
                "insync_replica": true,
                "is_leader": true,
                "leader": 0,
                "replica": 0
            },
            "topic_broker_id": "0-messages-0",
            "topic_id": "0-messages"
        },
        "topic": {
            "name": "messages"
        }
    },
    "metricset": {
        "name": "partition",
        "period": 10000
    },
    "service": {
        "address": "localhost:9092",
        "type": "kafka"
    }
}

ECS Field Reference

Please refer to the following document for detailed information on ECS fields.

Exported fields

FieldDescriptionTypeMetric Type
@timestamp
Event timestamp.
date
agent.id
keyword
cloud.account.id
The cloud account or organization id used to identify different entities in a multi-tenant environment. Examples: AWS account id, Google Cloud ORG Id, or other unique identifier.
keyword
cloud.availability_zone
Availability zone in which this host is running.
keyword
cloud.image.id
Image ID for the cloud instance.
keyword
cloud.instance.id
Instance ID of the host machine.
keyword
cloud.provider
Name of the cloud provider. Example values are aws, azure, gcp, or digitalocean.
keyword
cloud.region
Region in which this host is running.
keyword
container.id
Unique container id.
keyword
data_stream.dataset
Data stream dataset.
constant_keyword
data_stream.namespace
Data stream namespace.
constant_keyword
data_stream.type
Data stream type.
constant_keyword
event.dataset
Event dataset
constant_keyword
event.module
Event module
constant_keyword
host.containerized
If the host is a container.
boolean
host.name
Name of the host. It can contain what hostname returns on Unix systems, the fully qualified domain name, or a name specified by the user. The sender decides which value to use.
keyword
host.os.build
OS build information.
keyword
host.os.codename
OS codename, if any.
keyword
kafka.broker.address
Broker advertised address
keyword
kafka.broker.id
Broker id
long
kafka.partition.id
Partition id.
long
kafka.partition.offset.newest
Newest offset of the partition.
long
gauge
kafka.partition.offset.oldest
Oldest offset of the partition.
long
gauge
kafka.partition.partition.error.code
Error code from fetching partition.
long
kafka.partition.partition.insync_replica
Indicates if replica is included in the in-sync replicate set (ISR).
boolean
kafka.partition.partition.is_leader
Indicates if replica is the leader
boolean
kafka.partition.partition.leader
Leader id (broker).
long
kafka.partition.partition.replica
Replica id (broker).
long
kafka.partition.topic_broker_id
Unique id of the partition in the topic and the broker.
keyword
kafka.partition.topic_id
Unique id of the partition in the topic.
keyword
kafka.topic.error.code
Topic error code.
long
kafka.topic.name
Topic name
keyword
service.address
Address where data about this service was collected from. This should be a URI, network address (ipv4:port or [ipv6]:port) or a resource path (sockets).
keyword

Changelog

VersionDetailsKibana version(s)

1.16.0

Enhancement View pull request
Add processor support for broker, consumergroup and partition data streams.

8.13.0 or higher

1.15.0

Enhancement View pull request
ECS version updated to 8.11.0. Update the kibana constraint to ^8.13.0. Modified the field definitions to remove ECS fields made redundant by the ecs@mappings component template.

8.13.0 or higher

1.14.0

Enhancement View pull request
Add global filter on data_stream.dataset to improve performance.

8.12.0 or higher

1.13.0

Enhancement View pull request
Enable secrets for sensitive fields. For more details, refer https://www.elastic.co/guide/en/fleet/current/agent-policy.html#agent-policy-secret-values

8.12.0 or higher

1.12.1

Bug fix View pull request
Disable secrets for older stack versions due to errors.

8.8.0 or higher

1.12.0

Enhancement View pull request
Enable 'secret' for the sensitive fields, supported from 8.12.

8.8.0 or higher

1.11.0

Enhancement View pull request
Add missing SSL fields to agent config.

8.8.0 or higher

1.10.0

Enhancement View pull request
Update the package format_version to 3.0.0.

8.8.0 or higher

1.9.2

Bug fix View pull request
Add null check and ignore_missing check to the rename processor

8.8.0 or higher

1.9.1

Enhancement View pull request
Migrate visualizations to lens.

8.8.0 or higher

1.9.0

Enhancement View pull request
Revert changes to permissions to reroute events to logs-- for log datastream

8.8.0 or higher

1.8.0

Enhancement View pull request
Enable time series data streams for the metrics datasets. This dramatically reduces storage for metrics and is expected to progressively improve query performance. For more details, see https://www.elastic.co/guide/en/elasticsearch/reference/current/tsds.html.

8.8.0 or higher

1.7.0

Enhancement View pull request
Add permissions to reroute events to logs-- for log datastream

7.14.0 or higher
8.0.0 or higher

1.6.0

Enhancement View pull request
Rename ownership from obs-service-integrations to obs-infraobs-integrations

7.14.0 or higher
8.0.0 or higher

1.5.6

Enhancement View pull request
Modifed the dimension field mapping to support public cloud deployment.

7.14.0 or higher
8.0.0 or higher

1.5.5

Enhancement View pull request
Add dimension fields for partition datastream.

7.14.0 or higher
8.0.0 or higher

1.5.4

Enhancement View pull request
Add dimension mapping for consumergroup datastream.

7.14.0 or higher
8.0.0 or higher

1.5.3

Enhancement View pull request
Added dimension fields for broker datastream.

7.14.0 or higher
8.0.0 or higher

1.5.2

Enhancement View pull request
Add metric_type mapping for partition datastream.

7.14.0 or higher
8.0.0 or higher

1.5.1

Enhancement View pull request
Add metric_type mapping for consumer_group datastream.

7.14.0 or higher
8.0.0 or higher

1.5.0

Enhancement View pull request
Add metric_type mapping for broker datastream.

7.14.0 or higher
8.0.0 or higher

1.4.1

Enhancement View pull request
Added categories and/or subcategories.

7.14.0 or higher
8.0.0 or higher

1.4.0

Enhancement View pull request
Update ECS version to 8.5.1

7.14.0 or higher
8.0.0 or higher

1.3.1

Enhancement View pull request
Update the datashoboard fields

7.14.0 or higher
8.0.0 or higher

1.3.0

Enhancement View pull request
Added infrastructure category.

7.14.0 or higher
8.0.0 or higher

1.2.4

Enhancement View pull request
Support SASL mechanism

7.14.0 or higher
8.0.0 or higher

1.2.3

Enhancement View pull request
Add documentation for multi-fields

7.14.0 or higher
8.0.0 or higher

1.2.2

Bug fix View pull request
Pass down the SSL configs.

7.14.0 or higher
8.0.0 or higher

1.2.1

Bug fix View pull request
Add missing event.* fields into ecs.yml

1.2.0

Enhancement View pull request
Update to ECS 8.0

1.1.0

Enhancement View pull request
Support Kibana 8.0

7.14.0 or higher
8.0.0 or higher

1.0.0

Enhancement View pull request
Release Kafka as GA

0.7.2

Enhancement View pull request
Uniform with guidelines

0.7.1

Bug fix View pull request
Fix logic that checks for the 'forwarded' tag

0.7.0

Enhancement View pull request
Update to ECS 1.12.0

0.6.2

Enhancement View pull request
Convert to generated ECS fields

0.6.1

Enhancement View pull request
update to ECS 1.11.0

0.6.0

Enhancement View pull request
Update integration description

0.5.0

Enhancement View pull request
Enable ECS dependency

Enhancement View pull request
Set "event.module" and "event.dataset"

0.4.0

Enhancement View pull request
Update to ECS 1.10.0 and adding event.original option

0.3.8

Enhancement View pull request
Updating package owner

0.3.7

Bug fix View pull request
Correct sample event file.

0.1.0

Enhancement View pull request
initial release