Broker Role

Perform installation and configuration of Kafka brokers cluster.

This role requires to have a zookeepers group (identified by the amq_streams_zookeeper_inventory_group variable) in the inventory file to identify each node of he Zookeeper cluster. This group is used to set up the right configuration files in order to establish the communication between Zookeeper cluster members.

The brokers group (identified by the amq_streams_broker_inventory_group variable) is used in the same way identifying the members of the Kafka cluster. The order of this group will be used to set up the right configuration files and establish the right cluster configuration:

Example of inventory:

[zookeepers]
zknode1
zknode2
zknode3

[brokers]
broker1
broker2
broker3
broker4

Role Defaults

Variable

Description

Default

amq_streams_broker_create_topic_script

{{ amq_streams_common_home }}/bin/kafka-topics.sh

amq_streams_broker_enabled

true

amq_streams_broker_server_start

{{ amq_streams_common_home }}/bin/kafka-server-start.sh

amq_streams_broker_config

/etc/amq_streams_broker.properties

amq_streams_broker_service_name

amq_streams_broker

amq_streams_broker_user

amq_streams_broker

amq_streams_broker_listener_port

Port defined for external communications

9092

amq_streams_broker_listener_internal_port

Port defined for inter-broker communications

9091

amq_streams_broker_group

amq_streams

amq_streams_broker_service_config_template

templates/service.conf.j2

amq_streams_broker_service_config_file

/etc/broker.conf

amq_streams_broker_data_dir

Folders to store the commit logs (Comma-value property)

/var/lib/{{ amq_streams_broker_service_name }}/

amq_streams_broker_logs_dir

Folder to store the logs of broker service

/var/log/{{ amq_streams_broker_service_name }}/

amq_streams_broker_java_opts

Default values to apply to KAFKA_OPTS env variable

amq_streams_broker_java_heap_opts

Default values to apply to KAFKA_HEAP_OPTS env variable

-Xmx1G -Xms1G

amq_streams_broker_java_performance_opts

Default values to apply to KAFKA_JVM_PERFORMANCE_OPTS env variable

amq_streams_broker_java_java_gc_log_opts

Default values to apply to KAFKA_GC_LOG_OPTS env variable

amq_streams_broker_java_jmx_opts

Default values to apply to KAFKA_JMX_OPTS env variable

amq_streams_broker_num_network_threads

3

amq_streams_broker_num_io_threads

8

amq_streams_broker_buffer_send_bytes

102400

amq_streams_broker_buffer_bytes

102400

amq_streams_broker_socket_request_max_bytes

104857600

amq_streams_broker_num_partitions

1

amq_streams_broker_num_recovery_threads_per_data_dir

1

amq_streams_broker_offsets_topic_replication_factor

1

amq_streams_broker_transaction_state_log_replication_factor

1

amq_streams_broker_transaction_state_log_min_isr

1

amq_streams_broker_log_retention_hours

168

amq_streams_broker_log_retention_check_interval_ms

300000

amq_streams_broker_log_message_format_version

Log message format version. Required for Kafka upgrade.

amq_streams_broker_inter_broker_protocol_version

Inter broker protocol version. Required for Kafka upgrade.

amq_streams_broker_zookeeper_connection_timeout_ms

18000

amq_streams_broker_group_initial_rebalance_delay_ms

0

amq_streams_broker_properties_template

templates/server.properties.j2

amq_streams_broker_bootstrap_server_host

localhost

amq_streams_broker_bootstrap_server_port

9092

amq_streams_firewalld_package_name:

firewalld

amq_streams_firewalld_enabled

false

amq_streams_broker_instance_count_enabled

true

amq_streams_broker_instance_count

0

amq_streams_deployment_balance_check_enabled

true

amq_streams_zookeeper_auth_enabled

Enable Zookeeper authentication. Zookeeper must be deployed with the authentication enabled.

false

amq_streams_broker_zookeeper_auth_config

JAAS file for brokers

/etc/broker-jaas.conf

amq_streams_broker_zookeeper_auth_config_template

JAAS template for brokers

templates/broker-jaas.conf.j2

amq_streams_broker_listeners

Default list of broker listeners

PLAINTEXT://:{{ amq_streams_broker_listener_port }}

amq_streams_broker_advertised_listeners

Default list of advertised listener per broker (hosts variable). Name must match those listed in the amq_streams_broker_listeners property

amq_streams_broker_auth_enabled

Enable Broker authentication

false

amq_streams_broker_auth_scram_enabled

Enable SASL SCRAM authentication

false

amq_streams_broker_auth_listeners

Default list of authenticated listeners

PLAINTEXT:PLAINTEXT

amq_streams_broker_auth_sasl_mechanisms

Default list of authenticated SASL mechanism

PLAIN

amq_streams_broker_inventory_group

Identify the group of broker nodes

groups['brokers']

amq_streams_broker_broker_id

Identify the broker with specific id in the inventory.

amq_streams_broker_topics

List of topics to create. Each topics requires the name property, and optionally the partitions and replication_factor.

Role Variables

List of required variables for the role:

Variable

Description

Required

amq_streams_zookeeper_auth_user

Zookeeper user to authenticate. Mandatory if amq_streams_zookeeper_auth_enabled: true.

‘true’

amq_streams_zookeeper_auth_pass

Zookeeper user password to authenticate. Mandatory if amq_streams_zookeeper_auth_enabled: true.

true

Enabling the amq_streams_broker_auth_enabled requires to define the following variables to execute the role successfully:

Variable

Description

Required

Sample

amq_streams_broker_auth_listeners

List of authenticated listeners

true

` - PLAINTEXT:PLAINTEXT

  • AUTHENTICATED://:9093 | |amq_streams_broker_auth_sasl_mechanisms| Default list of authenticated SASL mechanism |true|PLAIN | |amq_streams_broker_auth_plain_users | List of users (username, password) to add into the Kafka cluster | true | `` | |amq_streams_broker_admin_username| Default admin user to manage topics |false |  | |amq_streams_broker_admin_password| Default password of the admin user to manage topics |false` | |

TLS Encryption

Kafka supports TLS for encrypting communication with Kafka clients. The role allows to enable TLS encryption but it is required to have a keystore containing private and public keys.

The following variables are involved to enable TLS encryption:

Variable

Description

Required

Default

amq_streams_broker_tls_enabled

Enable TLS encryption for listeners

false

false

amq_streams_broker_tls_keystore_dir

Local folder with the keystore

true

/tmp

amq_streams_broker_tls_keystore

Filename of the keystore

true

server.keystore.jks

amq_streams_broker_tls_keystore_location

Location on Kafka server for the keystore

true

/opt

amq_streams_broker_tls_keystore_password

Password of the keystore

true

PLEASE_CHANGEME_IAMNOTGOOD_FOR_PRODUCTION

amq_streams_broker_tls_truststore_dir

Local folder with the truststore

true

/tmp

amq_streams_broker_tls_truststore

Filename of the truststore

true

server.truststore.jks

amq_streams_broker_tls_truststore_location

Location on Kafka server for the truststore

true

/opt

amq_streams_broker_tls_truststore_password

Password of the truststore

true

PLEASE_CHANGEME_IAMNOTGOOD_FOR_PRODUCTION

amq_streams_broker_tls_truststore_client_dir

Local folder with the keystore file used by clients

false

/tmp

amq_streams_broker_tls_truststore_client

Filename of the client truststore

false

client.truststore.jks

amq_streams_broker_tls_truststore_client_location

Location on Kafka server for the client truststore

false

/opt

amq_streams_broker_tls_truststore_client_password

Password of the client truststore

false

PLEASE_CHANGEME_IAMNOTGOOD_FOR_PRODUCTION

Here a sample to enable SSL listeners at port 9093 for client connections and using plain connections for inter-broker communication:

  vars:
    # Enabling SSL listeners
    amq_streams_broker_tls_enabled: true
    amq_streams_broker_tls_keystore_dir: ./certs
    amq_streams_broker_tls_truststore_dir: ./certs
    amq_streams_broker_tls_truststore_client_dir: ./certs
    amq_streams_broker_tls_keystore_password: password
    amq_streams_broker_tls_truststore_password: password
    amq_streams_broker_tls_keystore_client_password: password
    amq_streams_broker_tls_truststore_client_password: password

    # Broker Listeners
    amq_streams_broker_listeners:
      - PLAINTEXT://:{{ amq_streams_broker_listener_port }} # Insecure for inter-broker connections
      - SSL://:{{ amq_streams_broker_listener_tls_port }} # Secure for client connections

    # Using SSL bootstrap server port for management operations
    amq_streams_broker_bootstrap_server_host: localhost
    amq_streams_broker_bootstrap_server_port: 9093

The client truststore identified by the amq_streams_broker_tls_truststore_client is only required for topic or user management operations done by the role. The properties amq_streams_broker_bootstrap_server_host and amq_streams_broker_bootstrap_server_port are used to establish the admin connections to operate with topics or users.

Broker Authentication

This section includes a set of example to enable the broker authentication for the following scenarios:

  • SASL PLAIN authentication

  • SASL SCRAM authentication

SASL Plain Authentication

This is a sample configuration to enable that configuration using two different listeners, one for the clients (AUTHENTICATED) and other for the inter-broker connections (REPLICATION). Both listeners require a SASL plain authentication.

The amq_streams_broker_auth_plain_users variable defines the list of different users to create to authenticate to the brokers.

The amq_streams_broker_inter_broker_auth_sasl_mechanisms: PLAIN property defines the protocol to authenticate the inter-broker connections. That requires to identify which user will be used to establish these connections, that user is defined by the amq_streams_broker_inter_broker_auth_broker_username and amq_streams_broker_inter_broker_auth_broker_password variables.

    # Broker Listeners
    amq_streams_broker_listeners:
      - AUTHENTICATED://:{{ amq_streams_broker_listener_port }}
      - REPLICATION://:{{ amq_streams_broker_listener_internal_port }}

    amq_streams_broker_inter_broker_listener: REPLICATION

    # Broker Authentication
    amq_streams_broker_auth_enabled: 'true'
    amq_streams_broker_auth_listeners:
      - AUTHENTICATED:SASL_PLAINTEXT
      - REPLICATION:SASL_PLAINTEXT

    amq_streams_broker_auth_sasl_mechanisms:
      - PLAIN

    # Kafka Plain Users
    amq_streams_broker_auth_plain_users:
      - username: admin
        password: password
      - username: usr
        password: password

    amq_streams_broker_inter_broker_auth_sasl_mechanisms: PLAIN
    amq_streams_broker_inter_broker_auth_broker_username: interbroker
    amq_streams_broker_inter_broker_auth_broker_password: password

SASL SCRAM Authentication

This is a sample configuration to enable that configuration using two different listeners, one for the clients (AUTHENTICATED) and other for the inter-broker connections (REPLICATION). Both listeners require a SASL plain authentication. SCRAM authentication is enabled by the amq_streams_broker_auth_scram_enabled variable.

Users must be created using the kafka-configs.sh script from Kafka.

The amq_streams_broker_inter_broker_auth_sasl_mechanisms: SCRAM-SHA-512 property defines the protocol to authenticate the inter-broker connections. That requires to identify which user will be used to establish these connections, that user is defined by the amq_streams_broker_inter_broker_listener_auth variable.

    # Broker Listeners
    amq_streams_broker_listeners:
      - AUTHENTICATED://:{{ amq_streams_broker_listener_port }}
      - REPLICATION://:{{ amq_streams_broker_listener_internal_port }}

    amq_streams_broker_inter_broker_listener: REPLICATION

    # Broker Authentication
    amq_streams_broker_auth_enabled: 'true'
    amq_streams_broker_auth_scram_enabled: 'true'
    amq_streams_broker_auth_listeners:
      - AUTHENTICATED:SASL_PLAINTEXT
      - REPLICATION:SASL_PLAINTEXT

    amq_streams_broker_auth_sasl_mechanisms:
      - PLAIN
      - SCRAM-SHA-512

    amq_streams_broker_inter_broker_auth_sasl_mechanisms: SCRAM-SHA-512
    amq_streams_broker_inter_broker_listener_auth: |
      listener.name.replication.scram-sha-512.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="<USER>" password="<PASSWORD>";

To manage SCRAM users, the role includes the following tasks:

  • Create

  • Describe

  • Delete

The role uses the amq_streams_broker_topics variable to identify the list of topics to be managed by the role for each stage of the life cycle.

The amq_streams_broker_auth_scram_users variable defines the list of different SCRAM users to manage in the Kafka cluster.

Example of definition of SCRAM users:

  vars:
    # Kafka SCRAM Users
    amq_streams_broker_auth_scram_users:
      - username: kafkauser01
        password: p@ssw0rd
      - username: kafkauser02
        password: p@ssw0rd

Creating SCRAM users can be done with this task:

    - name: "Create SCRAM users"
      ansible.builtin.include_role:
        name: amq_streams_broker
        tasks_from: user-scram/create.yml
      loop: "{{ amq_streams_broker_auth_scram_users }}"
      loop_control:
        loop_var: user
      vars:
        user_username: "{{ user.username }}"
        user_password: "{{ user.password }}"

Describing SCRAM users can be done with this task:

    - name: "Describe SCRAM users"
      ansible.builtin.include_role:
        name: amq_streams_broker
        tasks_from: user-scram/describe.yml
      loop: "{{ amq_streams_broker_auth_scram_users }}"
      loop_control:
        loop_var: user
      vars:
        user_username: "{{ user.username }}"

Deleting SCRAM users can be done with this task:

    - name: "Delete SCRAM users"
      ansible.builtin.include_role:
        name: amq_streams_broker
        tasks_from: user-scram/delete.yml
      loop: "{{ amq_streams_broker_auth_scram_users }}"
      loop_control:
        loop_var: user
      vars:
        user_username: "{{ user.username }}"

SASL Authentication over SSL listeners

SASL authentication mechanism using SSL listeners follows the same configuration described above, however the listener must be declared as SASL_SSL and the authentication for that listener must be declared as SASL_SSL:SASL_SSL.

Here an example of vars to authenticate over a SSL listener at 9093 port:

  vars:
    # Enabling SSL
    amq_streams_broker_tls_enabled: true

    # Broker Listeners
    amq_streams_broker_listeners:
      - PLAINTEXT://:{{ amq_streams_broker_listener_port }} # Insecure for inter-broker connections
      - SASL_SSL://:{{ amq_streams_broker_listener_tls_port }} # Secured connections

    # Enabling Kafka Broker Authentication
    amq_streams_broker_auth_enabled: true
    amq_streams_broker_auth_scram_enabled: true
    amq_streams_broker_auth_listeners:
      - PLAINTEXT:PLAINTEXT
      - SASL_SSL:SASL_SSL

    amq_streams_broker_auth_sasl_mechanisms:
      - PLAIN
      - SCRAM-SHA-512

    # Using SSL bootstrap server port
    amq_streams_broker_bootstrap_server_host: localhost
    amq_streams_broker_bootstrap_server_port: 9093

Topic Management

The role allows to manage the following stages of the life cycle of topics:

  • Create

  • Describe

  • Delete

The role uses the amq_streams_broker_topics variable to identify the list of topics to be managed by the role for each stage of the life cycle.

Example of definition of topics:

  vars:
    # Topic management
    amq_streams_broker_topics:
      - name: sampleTopic
        partitions: 2
        replication_factor: 1
      - name: otherTopic
        partitions: 4
        replication_factor: 1

NOTE: To manage topics in a distributed Kafka broker, the operation should be done in one single broker instance. Create or delete actions are replicated across the Kafka cluster.

Create Topics

This task will create new topics in a Kafka broker:

    - name: "Create topics"
      ansible.builtin.include_role:
        name: amq_streams_broker
        tasks_from: topic/create.yml
      loop: "{{ amq_streams_broker_topics }}"
      loop_control:
        loop_var: topic
      vars:
        topic_name: "{{ topic.name }}"
        topic_partitions: "{{ topic.partitions }}"
        topic_replication_factor: "{{ topic.replication_factor }}"

Describe Topics

This task will describe the information and details of each topic from a Kafka broker:

    - name: "Describe topics"
      ansible.builtin.include_role:
        name: amq_streams_broker
        tasks_from: topic/describe.yml
      loop: "{{ amq_streams_broker_topics }}"
      loop_control:
        loop_var: topic
      vars:
        topic_name: "{{ topic.name }}"

Delete Topics

This task will delete topics from a Kafka broker:

    - name: "Delete topics"
      ansible.builtin.include_role:
        name: amq_streams_broker
        tasks_from: topic/delete.yml
      loop: "{{ amq_streams_broker_topics }}"
      loop_control:
        loop_var: topic
      vars:
        topic_name: "{{ topic.name }}"

Topic Management with Broker Authentication

To operate topics with this role in Broker with authentication enabled, it is required to identify the mechanism of authentication and admin user credentials. The admin user must be identified to connect to the Broker and execute the topic action.

The amq_streams_broker_admin_mechanism variable identifies the authentication mechanism from one of the following options: PLAIN, SCRAM-SHA-512.

The amq_streams_admin_username, and amq_streams_broker_admin_password variables identify the admin user.

All these variables are required to execute the actions.

Example of definition:

  vars:
    # Enabling Kafka Broker Authentication
    amq_streams_broker_auth_enabled: 'true'
    amq_streams_broker_admin_mechanism: PLAIN
    amq_streams_broker_admin_username: admin
    amq_streams_broker_admin_password: password

License

Apache License v2.0 or later

Author Information