New

The executive guide to generative AI

Read more

Kafka integration

edit

Kafka integration

edit

Version

1.16.0 (View all)

Compatible Kibana version(s)

8.13.0 or higher

Supported Serverless project types
What’s this?

Security
Observability

Subscription level
What’s this?

Basic

Level of support
What’s this?

Elastic

This integration collects logs and metrics from Kafka servers.

Compatibility

edit

The log dataset is tested with logs from Kafka 0.9, 1.1.0 and 2.0.0.

The broker, consumergroup, partition and producer metricsets are tested with Kafka 0.10.2.1, 1.1.0, 2.1.1, and 2.2.2.

The broker metricset requires Jolokia to fetch JMX metrics. Refer to the Metricbeat documentation about Jolokia for more information.

Logs

edit

log

edit

The log dataset collects and parses logs from Kafka servers.

ECS Field Reference

Please refer to the following document for detailed information on ECS fields.

Exported fields
Field Description Type

@timestamp

Event timestamp.

date

cloud.image.id

Image ID for the cloud instance.

keyword

data_stream.dataset

Data stream dataset.

constant_keyword

data_stream.namespace

Data stream namespace.

constant_keyword

data_stream.type

Data stream type.

constant_keyword

event.dataset

Event dataset

constant_keyword

event.module

Event module

constant_keyword

host.containerized

If the host is a container.

boolean

host.os.build

OS build information.

keyword

host.os.codename

OS codename, if any.

keyword

kafka.log.class

Java class the log is coming from.

keyword

kafka.log.component

Component the log is coming from.

keyword

kafka.log.thread

Thread name the log is coming from.

keyword

kafka.log.trace.class

Java class the trace is coming from.

keyword

kafka.log.trace.message

Message part of the trace.

text

Metrics

edit

broker

edit

The broker dataset collects JMX metrics from Kafka brokers using Jolokia.

Example

An example event for broker looks as following:

{
    "@timestamp": "2020-05-15T15:12:12.270Z",
    "agent": {
        "ephemeral_id": "178ff0e9-e3dd-4bdf-8e3d-8f67a6bd72ef",
        "id": "5aba67f2-2050-4d19-8953-ba20f0a5483c",
        "name": "kafka-01",
        "type": "metricbeat",
        "version": "8.0.0"
    },
    "ecs": {
        "version": "8.11.0"
    },
    "event": {
        "dataset": "kafka.broker",
        "duration": 4572918,
        "module": "kafka"
    },
    "kafka": {
        "broker": {
            "mbean": "kafka.server:name=BytesOutPerSec,topic=messages,type=BrokerTopicMetrics",
            "topic": {
                "net": {
                    "out": {
                        "bytes_per_sec": 0.6089809926927563
                    }
                }
            }
        }
    },
    "metricset": {
        "name": "broker",
        "period": 10000
    },
    "service": {
        "address": "localhost:8778",
        "type": "kafka"
    }
}

ECS Field Reference

Please refer to the following document for detailed information on ECS fields.

Exported fields
Field Description Type Metric Type

@timestamp

Event timestamp.

date

agent.id

keyword

cloud.account.id

The cloud account or organization id used to identify different entities in a multi-tenant environment. Examples: AWS account id, Google Cloud ORG Id, or other unique identifier.

keyword

cloud.availability_zone

Availability zone in which this host is running.

keyword

cloud.image.id

Image ID for the cloud instance.

keyword

cloud.instance.id

Instance ID of the host machine.

keyword

cloud.provider

Name of the cloud provider. Example values are aws, azure, gcp, or digitalocean.

keyword

cloud.region

Region in which this host is running.

keyword

container.id

Unique container id.

keyword

data_stream.dataset

Data stream dataset.

constant_keyword

data_stream.namespace

Data stream namespace.

constant_keyword

data_stream.type

Data stream type.

constant_keyword

event.dataset

Event dataset

constant_keyword

event.module

Event module

constant_keyword

host.containerized

If the host is a container.

boolean

host.name

Name of the host. It can contain what hostname returns on Unix systems, the fully qualified domain name, or a name specified by the user. The sender decides which value to use.

keyword

host.os.build

OS build information.

keyword

host.os.codename

OS codename, if any.

keyword

kafka.broker.address

Broker advertised address

keyword

kafka.broker.id

Broker id

long

kafka.broker.log.flush_rate

The log flush rate

float

gauge

kafka.broker.mbean

Mbean that this event is related to

keyword

kafka.broker.messages_in

The incoming message rate

float

gauge

kafka.broker.net.in.bytes_per_sec

The incoming byte rate

float

gauge

kafka.broker.net.out.bytes_per_sec

The outgoing byte rate

float

gauge

kafka.broker.net.rejected.bytes_per_sec

The rejected byte rate

float

gauge

kafka.broker.replication.leader_elections

The leader election rate

float

gauge

kafka.broker.replication.unclean_leader_elections

The unclean leader election rate

float

gauge

kafka.broker.request.channel.queue.size

The size of the request queue

long

gauge

kafka.broker.request.fetch.failed

The number of client fetch request failures

float

counter

kafka.broker.request.fetch.failed_per_second

The rate of client fetch request failures per second

float

gauge

kafka.broker.request.produce.failed

The number of failed produce requests

float

counter

kafka.broker.request.produce.failed_per_second

The rate of failed produce requests per second

float

gauge

kafka.broker.session.zookeeper.disconnect

The ZooKeeper closed sessions per second

float

gauge

kafka.broker.session.zookeeper.expire

The ZooKeeper expired sessions per second

float

gauge

kafka.broker.session.zookeeper.readonly

The ZooKeeper readonly sessions per second

float

gauge

kafka.broker.session.zookeeper.sync

The ZooKeeper client connections per second

float

gauge

kafka.broker.topic.messages_in

The incoming message rate per topic

float

gauge

kafka.broker.topic.net.in.bytes_per_sec

The incoming byte rate per topic

float

gauge

kafka.broker.topic.net.out.bytes_per_sec

The outgoing byte rate per topic

float

gauge

kafka.broker.topic.net.rejected.bytes_per_sec

The rejected byte rate per topic

float

gauge

kafka.partition.id

Partition id.

long

kafka.partition.topic_broker_id

Unique id of the partition in the topic and the broker.

keyword

kafka.partition.topic_id

Unique id of the partition in the topic.

keyword

kafka.topic.error.code

Topic error code.

long

kafka.topic.name

Topic name

keyword

service.address

Address where data about this service was collected from. This should be a URI, network address (ipv4:port or [ipv6]:port) or a resource path (sockets).

keyword

consumergroup

edit
Example

An example event for consumergroup looks as following:

{
    "@timestamp": "2020-05-15T15:18:13.919Z",
    "agent": {
        "ephemeral_id": "178ff0e9-e3dd-4bdf-8e3d-8f67a6bd72ef",
        "id": "5aba67f2-2050-4d19-8953-ba20f0a5483c",
        "name": "kafka-01",
        "type": "metricbeat",
        "version": "8.0.0"
    },
    "ecs": {
        "version": "8.11.0"
    },
    "event": {
        "dataset": "kafka.consumergroup",
        "duration": 8821045,
        "module": "kafka"
    },
    "kafka": {
        "broker": {
            "address": "kafka-01:9092",
            "id": 0
        },
        "consumergroup": {
            "client": {
                "host": "127.0.0.1",
                "id": "consumer-console-consumer-99447-1",
                "member_id": "consumer-console-consumer-99447-1-208fdf91-2f28-4336-a2ff-5e5f4b8b71e4"
            },
            "consumer_lag": 112,
            "error": {
                "code": 0
            },
            "id": "console-consumer-99447",
            "meta": "",
            "offset": -1
        },
        "partition": {
            "id": 0,
            "topic_id": "0-messages"
        },
        "topic": {
            "name": "messages"
        }
    },
    "metricset": {
        "name": "consumergroup",
        "period": 10000
    },
    "service": {
        "address": "localhost:9092",
        "type": "kafka"
    }
}

ECS Field Reference

Please refer to the following document for detailed information on ECS fields.

Exported fields
Field Description Type Metric Type

@timestamp

Event timestamp.

date

agent.id

keyword

cloud.account.id

The cloud account or organization id used to identify different entities in a multi-tenant environment. Examples: AWS account id, Google Cloud ORG Id, or other unique identifier.

keyword

cloud.availability_zone

Availability zone in which this host is running.

keyword

cloud.image.id

Image ID for the cloud instance.

keyword

cloud.instance.id

Instance ID of the host machine.

keyword

cloud.provider

Name of the cloud provider. Example values are aws, azure, gcp, or digitalocean.

keyword

cloud.region

Region in which this host is running.

keyword

container.id

Unique container id.

keyword

data_stream.dataset

Data stream dataset.

constant_keyword

data_stream.namespace

Data stream namespace.

constant_keyword

data_stream.type

Data stream type.

constant_keyword

event.dataset

Event dataset

constant_keyword

event.module

Event module

constant_keyword

host.containerized

If the host is a container.

boolean

host.name

Name of the host. It can contain what hostname returns on Unix systems, the fully qualified domain name, or a name specified by the user. The sender decides which value to use.

keyword

host.os.build

OS build information.

keyword

host.os.codename

OS codename, if any.

keyword

kafka.broker.address

Broker advertised address

keyword

kafka.broker.id

Broker id

long

kafka.consumergroup.client.host

Client host

keyword

kafka.consumergroup.client.id

Client ID (kafka setting client.id)

keyword

kafka.consumergroup.client.member_id

internal consumer group member ID

keyword

kafka.consumergroup.consumer_lag

consumer lag for partition/topic calculated as the difference between the partition offset and consumer offset

long

gauge

kafka.consumergroup.error.code

kafka consumer/partition error code.

long

kafka.consumergroup.id

Consumer Group ID

keyword

kafka.consumergroup.meta

custom consumer meta data string

keyword

kafka.consumergroup.offset

consumer offset into partition being read

long

gauge

kafka.partition.id

Partition id.

long

kafka.partition.topic_broker_id

Unique id of the partition in the topic and the broker.

keyword

kafka.partition.topic_id

Unique id of the partition in the topic.

keyword

kafka.topic.error.code

Topic error code.

long

kafka.topic.name

Topic name

keyword

service.address

Address where data about this service was collected from. This should be a URI, network address (ipv4:port or [ipv6]:port) or a resource path (sockets).

keyword

partition

edit
Example

An example event for partition looks as following:

{
    "@timestamp": "2020-05-15T15:19:44.240Z",
    "agent": {
        "ephemeral_id": "178ff0e9-e3dd-4bdf-8e3d-8f67a6bd72ef",
        "id": "5aba67f2-2050-4d19-8953-ba20f0a5483c",
        "name": "kafka-01",
        "type": "metricbeat",
        "version": "8.0.0"
    },
    "ecs": {
        "version": "8.11.0"
    },
    "event": {
        "dataset": "kafka.partition",
        "duration": 11263377,
        "module": "kafka"
    },
    "kafka": {
        "broker": {
            "address": "kafka-01:9092",
            "id": 0
        },
        "partition": {
            "id": 0,
            "offset": {
                "newest": 111,
                "oldest": 0
            },
            "partition": {
                "insync_replica": true,
                "is_leader": true,
                "leader": 0,
                "replica": 0
            },
            "topic_broker_id": "0-messages-0",
            "topic_id": "0-messages"
        },
        "topic": {
            "name": "messages"
        }
    },
    "metricset": {
        "name": "partition",
        "period": 10000
    },
    "service": {
        "address": "localhost:9092",
        "type": "kafka"
    }
}

ECS Field Reference

Please refer to the following document for detailed information on ECS fields.

Exported fields
Field Description Type Metric Type

@timestamp

Event timestamp.

date

agent.id

keyword

cloud.account.id

The cloud account or organization id used to identify different entities in a multi-tenant environment. Examples: AWS account id, Google Cloud ORG Id, or other unique identifier.

keyword

cloud.availability_zone

Availability zone in which this host is running.

keyword

cloud.image.id

Image ID for the cloud instance.

keyword

cloud.instance.id

Instance ID of the host machine.

keyword

cloud.provider

Name of the cloud provider. Example values are aws, azure, gcp, or digitalocean.

keyword

cloud.region

Region in which this host is running.

keyword

container.id

Unique container id.

keyword

data_stream.dataset

Data stream dataset.

constant_keyword

data_stream.namespace

Data stream namespace.

constant_keyword

data_stream.type

Data stream type.

constant_keyword

event.dataset

Event dataset

constant_keyword

event.module

Event module

constant_keyword

host.containerized

If the host is a container.

boolean

host.name

Name of the host. It can contain what hostname returns on Unix systems, the fully qualified domain name, or a name specified by the user. The sender decides which value to use.

keyword

host.os.build

OS build information.

keyword

host.os.codename

OS codename, if any.

keyword

kafka.broker.address

Broker advertised address

keyword

kafka.broker.id

Broker id

long

kafka.partition.id

Partition id.

long

kafka.partition.offset.newest

Newest offset of the partition.

long

gauge

kafka.partition.offset.oldest

Oldest offset of the partition.

long

gauge

kafka.partition.partition.error.code

Error code from fetching partition.

long

kafka.partition.partition.insync_replica

Indicates if replica is included in the in-sync replicate set (ISR).

boolean

kafka.partition.partition.is_leader

Indicates if replica is the leader

boolean

kafka.partition.partition.leader

Leader id (broker).

long

kafka.partition.partition.replica

Replica id (broker).

long

kafka.partition.topic_broker_id

Unique id of the partition in the topic and the broker.

keyword

kafka.partition.topic_id

Unique id of the partition in the topic.

keyword

kafka.topic.error.code

Topic error code.

long

kafka.topic.name

Topic name

keyword

service.address

Address where data about this service was collected from. This should be a URI, network address (ipv4:port or [ipv6]:port) or a resource path (sockets).

keyword

Changelog

edit
Changelog
Version Details Kibana version(s)

1.16.0

Enhancement (View pull request)
Add processor support for broker, consumergroup and partition data streams.

8.13.0 or higher

1.15.0

Enhancement (View pull request)
ECS version updated to 8.11.0. Update the kibana constraint to ^8.13.0. Modified the field definitions to remove ECS fields made redundant by the ecs@mappings component template.

8.13.0 or higher

1.14.0

Enhancement (View pull request)
Add global filter on data_stream.dataset to improve performance.

8.12.0 or higher

1.13.0

Enhancement (View pull request)
Enable secrets for sensitive fields. For more details, refer https://www.elastic.co/guide/en/fleet/current/agent-policy.html#agent-policy-secret-values

8.12.0 or higher

1.12.1

Bug fix (View pull request)
Disable secrets for older stack versions due to errors.

8.8.0 or higher

1.12.0

Enhancement (View pull request)
Enable secret for the sensitive fields, supported from 8.12.

8.8.0 or higher

1.11.0

Enhancement (View pull request)
Add missing SSL fields to agent config.

8.8.0 or higher

1.10.0

Enhancement (View pull request)
Update the package format_version to 3.0.0.

8.8.0 or higher

1.9.2

Bug fix (View pull request)
Add null check and ignore_missing check to the rename processor

8.8.0 or higher

1.9.1

Enhancement (View pull request)
Migrate visualizations to lens.

8.8.0 or higher

1.9.0

Enhancement (View pull request)
Revert changes to permissions to reroute events to logs-- for log datastream

8.8.0 or higher

1.8.0

Enhancement (View pull request)
Enable time series data streams for the metrics datasets. This dramatically reduces storage for metrics and is expected to progressively improve query performance. For more details, see https://www.elastic.co/guide/en/elasticsearch/reference/current/tsds.html.

8.8.0 or higher

1.7.0

Enhancement (View pull request)
Add permissions to reroute events to logs-- for log datastream

7.14.0 or higher
8.0.0 or higher

1.6.0

Enhancement (View pull request)
Rename ownership from obs-service-integrations to obs-infraobs-integrations

7.14.0 or higher
8.0.0 or higher

1.5.6

Enhancement (View pull request)
Modifed the dimension field mapping to support public cloud deployment.

7.14.0 or higher
8.0.0 or higher

1.5.5

Enhancement (View pull request)
Add dimension fields for partition datastream.

7.14.0 or higher
8.0.0 or higher

1.5.4

Enhancement (View pull request)
Add dimension mapping for consumergroup datastream.

7.14.0 or higher
8.0.0 or higher

1.5.3

Enhancement (View pull request)
Added dimension fields for broker datastream.

7.14.0 or higher
8.0.0 or higher

1.5.2

Enhancement (View pull request)
Add metric_type mapping for partition datastream.

7.14.0 or higher
8.0.0 or higher

1.5.1

Enhancement (View pull request)
Add metric_type mapping for consumer_group datastream.

7.14.0 or higher
8.0.0 or higher

1.5.0

Enhancement (View pull request)
Add metric_type mapping for broker datastream.

7.14.0 or higher
8.0.0 or higher

1.4.1

Enhancement (View pull request)
Added categories and/or subcategories.

7.14.0 or higher
8.0.0 or higher

1.4.0

Enhancement (View pull request)
Update ECS version to 8.5.1

7.14.0 or higher
8.0.0 or higher

1.3.1

Enhancement (View pull request)
Update the datashoboard fields

7.14.0 or higher
8.0.0 or higher

1.3.0

Enhancement (View pull request)
Added infrastructure category.

7.14.0 or higher
8.0.0 or higher

1.2.4

Enhancement (View pull request)
Support SASL mechanism

7.14.0 or higher
8.0.0 or higher

1.2.3

Enhancement (View pull request)
Add documentation for multi-fields

7.14.0 or higher
8.0.0 or higher

1.2.2

Bug fix (View pull request)
Pass down the SSL configs.

7.14.0 or higher
8.0.0 or higher

1.2.1

Bug fix (View pull request)
Add missing event.* fields into ecs.yml

1.2.0

Enhancement (View pull request)
Update to ECS 8.0

1.1.0

Enhancement (View pull request)
Support Kibana 8.0

7.14.0 or higher
8.0.0 or higher

1.0.0

Enhancement (View pull request)
Release Kafka as GA

0.7.2

Enhancement (View pull request)
Uniform with guidelines

0.7.1

Bug fix (View pull request)
Fix logic that checks for the forwarded tag

0.7.0

Enhancement (View pull request)
Update to ECS 1.12.0

0.6.2

Enhancement (View pull request)
Convert to generated ECS fields

0.6.1

Enhancement (View pull request)
update to ECS 1.11.0

0.6.0

Enhancement (View pull request)
Update integration description

0.5.0

Enhancement (View pull request)
Enable ECS dependency

Enhancement (View pull request)
Set "event.module" and "event.dataset"

0.4.0

Enhancement (View pull request)
Update to ECS 1.10.0 and adding event.original option

0.3.8

Enhancement (View pull request)
Updating package owner

0.3.7

Bug fix (View pull request)
Correct sample event file.

0.1.0

Enhancement (View pull request)
initial release

Was this helpful?
Feedback