Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add combined kraft mode #1511

Open
wants to merge 1 commit into
base: 7.5.1-post
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion docs/VARIABLES.md
Original file line number Diff line number Diff line change
Expand Up @@ -924,11 +924,28 @@ Default: "{{ skip_restarts }}"

***

### kraft_combined

Boolean used to declare broker nodes as controller (combined mode). Do not use in production environment

Default: false

### kafka_controller_quorum_voters

Default controller quorum voters

Default: "{% for controller_hostname in groups.kafka_controller|default([]) %}{% if loop.index > 1%},{% endif %}{{groups.kafka_controller.index(controller_hostname)|int + 9991}}@{{controller_hostname}}:{{ kafka_controller_listeners['controller']['port'] }}{%endfor%}"
Default: "
{%- if kraft_combined -%}
{%- for broker_hostname in groups.kafka_broker|default([]) %}
{%- if loop.index > 1%},{% endif -%}
{{ groups.kafka_broker.index(broker_hostname)|int + 1 }}@{{ broker_hostname }}:{{ kafka_broker_listeners['controller']['port'] }}
{%- endfor -%}
{%- else -%}
{%- for controller_hostname in groups.kafka_controller|default([]) -%}
{%- if loop.index > 1%},{% endif -%}
{{ groups.kafka_controller.index(controller_hostname)|int + 9991 }}@{{ controller_hostname }}:{{ kafka_controller_listeners['controller']['port'] }}
{%- endfor -%}
{%- endif -%}"

***

Expand Down
85 changes: 82 additions & 3 deletions roles/kafka_broker/tasks/get_meta_properties.yml
Original file line number Diff line number Diff line change
@@ -1,12 +1,91 @@
---
- name: Prepare SCRAM Users if needed
set_fact:
scram_users_to_create: []

# with kraft combined mode, first install have to define clusterid, instead of getting it from dedicated controllers
- name: Check meta.properties
run_once: true
when: kraft_combined
ansible.builtin.stat:
path: "{{ kafka_controller_final_properties['log.dirs'] }}/meta.properties"
register: meta_properties

- name: Initialize ClusterId
when:
- kraft_combined
- not meta_properties.stat.exists
run_once: true
shell: "{{ binary_base_path }}/bin/kafka-storage random-uuid"
environment:
KAFKA_OPTS: "-Xlog:all=error -XX:+IgnoreUnrecognizedVMOptions"
register: random_uuid

- name: Set ClusterId
when:
- kraft_combined
- not meta_properties.stat.exists
run_once: true
set_fact:
clusterid: "{{ random_uuid.stdout }}"
delegate_to: "{{ item }}"
delegate_facts: true
loop: "{{ groups.kafka_broker }}"

## and initialize temporary controller admin user
- name: Prepare SCRAM 512 admin user
when:
- kraft_combined
- "'SCRAM-SHA-512' in kafka_controller_sasl_enabled_mechanisms or 'SCRAM-SHA-512' in kafka_broker_sasl_enabled_mechanisms"
set_fact:
scram_users_to_create: "{{ scram_users_to_create + [ '--add-scram SCRAM-SHA-512=[name=\"'+ sasl_scram_users_final.admin.principal + '\",password=\"' + sasl_scram_users_final.admin.password + '\"]' ] }}"

- name: Prepare SCRAM 256 admin user
when:
- kraft_combined
- "'SCRAM-SHA-256' in kafka_controller_sasl_enabled_mechanisms or 'SCRAM-SHA-256' in kafka_broker_sasl_enabled_mechanisms"
set_fact:
scram_users_to_create: "{{ scram_users_to_create + [ '--add-scram SCRAM-SHA-256=[name=\"'+ sasl_scram_users_final.admin.principal + '\",password=\"' + sasl_scram_users_final.admin.password + '\"]' ] }}"

# after first install in combined mode, get clusterid from one broker node
- name: Extract ClusterId from meta.properties on KRaft Controller
when:
- kraft_combined
- meta_properties.stat.exists
run_once: true
slurp:
src: "{{ kafka_controller_final_properties['log.dirs'] }}/meta.properties"
register: uuid_broker

- name: Set ClusterId
when:
- kraft_combined
- meta_properties.stat.exists
run_once: true
set_fact:
clusterid: "{{ (uuid_broker['content'] | b64decode).partition('cluster.id=')[2].partition('\n')[0] }}"
delegate_to: "{{ item }}"
delegate_facts: true
loop: "{{ groups.kafka_broker }}"

# with dedicated controller nodes, clusterid is already defined onto controller nodes
- name: Extract ClusterId from meta.properties on KRaft Controller
when: not kraft_combined
run_once: true
slurp:
src: "{{ kafka_controller_final_properties['log.dirs'] }}/meta.properties"
delegate_to: "{{ groups.kafka_controller[0] }}"
register: uuid_broker

- name: Set ClusterId
when: not kraft_combined
run_once: true
set_fact:
clusterid: "{{ (uuid_broker['content'] | b64decode).partition('cluster.id=')[2].partition('\n')[0] }}"
delegate_to: "{{ item }}"
delegate_facts: true
loop: "{{ groups.kafka_broker }}"

- name: Format Storage Directory
shell: "{{ binary_base_path }}/bin/kafka-storage format -t {{ clusterid }} -c {{ kafka_broker.config_file }} --ignore-formatted"
shell: "{{ binary_base_path }}/bin/kafka-storage format -t {{ clusterid }} -c {{ kafka_broker.config_file }} --ignore-formatted {{ scram_users_to_create|join(' ') }}"
register: format_meta
vars:
clusterid: "{{ (uuid_broker['content'] | b64decode).partition('cluster.id=')[2].partition('\n')[0] }}"
59 changes: 54 additions & 5 deletions roles/kafka_controller/tasks/get_meta_properties.yml
Original file line number Diff line number Diff line change
@@ -1,13 +1,62 @@
---
- name: Get ClusterId
- name: Prepare SCRAM Users if needed
set_fact:
scram_users_to_create: []

- name: Check meta.properties
run_once: true
ansible.builtin.stat:
path: "{{ kafka_controller_final_properties['log.dirs'] }}/meta.properties"
register: meta_properties

# if meta.properties does not exists , create uuid
- name: Initialize ClusterId
when: not meta_properties.stat.exists
run_once: true
shell: "{{ binary_base_path }}/bin/kafka-storage random-uuid"
environment:
KAFKA_OPTS: "-Xlog:all=error -XX:+IgnoreUnrecognizedVMOptions"
register: uuid_key
register: random_uuid

- name: Set ClusterId
when: not meta_properties.stat.exists
run_once: true
set_fact:
clusterid: "{{ random_uuid.stdout }}"
delegate_to: "{{ item }}"
delegate_facts: true
loop: "{{ groups['kafka_controller'] }}"

## and initialize temporary controller admin user
- name: Prepare SCRAM 512 admin user
when:
- "'SCRAM-SHA-512' in kafka_controller_sasl_enabled_mechanisms or 'SCRAM-SHA-512' in kafka_broker_sasl_enabled_mechanisms"
set_fact:
scram_users_to_create: "{{ scram_users_to_create + [ '--add-scram SCRAM-SHA-512=[name=\"'+ sasl_scram_users_final.admin.principal + '\",password=\"' + sasl_scram_users_final.admin.password + '\"]' ] }}"

- name: Prepare SCRAM 256 admin user
when:
- "'SCRAM-SHA-256' in kafka_controller_sasl_enabled_mechanisms or 'SCRAM-SHA-256' in kafka_broker_sasl_enabled_mechanisms"
set_fact:
scram_users_to_create: "{{ scram_users_to_create + [ '--add-scram SCRAM-SHA-256=[name=\"'+ sasl_scram_users_final.admin.principal + '\",password=\"' + sasl_scram_users_final.admin.password + '\"]' ] }}"

# else, extract it from meta.properties
- name: Extract ClusterId from meta.properties
when: meta_properties.stat.exists
run_once: true
slurp:
src: "{{ kafka_controller_final_properties['log.dirs'] }}/meta.properties"
register: uuid_broker

- name: Set ClusterId
when: meta_properties.stat.exists
run_once: true
set_fact:
clusterid: "{{ (uuid_broker['content'] | b64decode).partition('cluster.id=')[2].partition('\n')[0] }}"
delegate_to: "{{ item }}"
delegate_facts: true
loop: "{{ groups['kafka_controller'] }}"

- name: Format Data Directory
shell: "{{ binary_base_path }}/bin/kafka-storage format -t {{ clusterid }} -c {{ kafka_controller.config_file }} --ignore-formatted"
shell: "{{ binary_base_path }}/bin/kafka-storage format -t {{ clusterid }} -c {{ kafka_controller.config_file }} --ignore-formatted {{ scram_users_to_create|join(' ') }}"
register: format_meta
vars:
clusterid: "{{ uuid_key.stdout }}"
21 changes: 18 additions & 3 deletions roles/variables/defaults/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -416,8 +416,22 @@ zookeeper_skip_restarts: "{{ skip_restarts }}"

#### kafka Controller variables ####

### Default controller quorum voters
kafka_controller_quorum_voters: "{% for controller_hostname in groups.kafka_controller|default([]) %}{% if loop.index > 1%},{% endif %}{{groups.kafka_controller.index(controller_hostname)|int + 9991}}@{{controller_hostname}}:{{ kafka_controller_listeners['controller']['port'] }}{%endfor%}"
### set to true to install controller and broker on same nodes
kraft_combined: false

### Default controller quorum voters. Dynamically assigned later if not user provided
kafka_controller_quorum_voters: >-
{%- if kraft_combined -%}
{%- for broker_hostname in groups.kafka_broker|default([]) %}
{%- if loop.index > 1%},{% endif -%}
{{ groups.kafka_broker.index(broker_hostname)|int + 1 }}@{{ broker_hostname }}:{{ kafka_broker_listeners['controller']['port'] }}
{%- endfor -%}
{%- else -%}
{%- for controller_hostname in groups.kafka_controller|default([]) -%}
{%- if loop.index > 1%},{% endif -%}
{{ groups.kafka_controller.index(controller_hostname)|int + 9991 }}@{{ controller_hostname }}:{{ kafka_controller_listeners['controller']['port'] }}
{%- endfor -%}
{%- endif -%}
### Default Kafka config prefix. Only valid to customize when installation_method: archive
kafka_controller_config_prefix: "{{ config_prefix }}/controller"
Expand Down Expand Up @@ -569,7 +583,8 @@ kafka_broker_default_listeners: "{
'ssl_enabled': {{ssl_enabled|string|lower}},
'ssl_mutual_auth_enabled': {{ssl_mutual_auth_enabled|string|lower}},
'sasl_protocol': '{{sasl_protocol}}'
}{% endif %}{% endif %}
}{% endif %}{% if kraft_enabled|bool and kraft_combined|bool %},
'controller': {{ kafka_controller_listeners['controller'] }}{% endif %}{% endif %}
}"

### Dictionary to put additional listeners to be configured within Kafka. Each listener must include a 'name' and 'port' key. Optionally they can include the keys 'ssl_enabled', 'ssl_mutual_auth_enabled', and 'sasl_protocol'
Expand Down
34 changes: 25 additions & 9 deletions roles/variables/vars/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ base_path: "{{ ((config_base_path,('confluent-',archive_version) | join) | path_
binary_base_path: "{{ ((config_base_path,('confluent-',archive_version) | join) | path_join) if installation_method == 'archive' else '/usr' }}"

### Runs kafka in Kraft mode if controller is present
kraft_enabled: "{{ true if 'kafka_controller' in groups.keys() and groups['kafka_controller'] | length > 0 else false }}"
kraft_enabled: "{{ true if kraft_combined or ('kafka_controller' in groups.keys() and groups['kafka_controller'] | length > 0) else false }}"

### One controller node to delegate actions to
kafka_controller_default_host: "{{ ( groups.kafka_broker[0] if kraft_combined else groups.kafka_controller[0] ) if kraft_enabled else 'no controller' }}"

#### Config prefix paths ####
zookeeper_config_prefix_path: "{{ zookeeper_config_prefix.strip('/') }}"
Expand Down Expand Up @@ -149,7 +152,7 @@ kafka_controller_properties:
confluent.security.event.logger.exporter.kafka.topic.replicas: "{{audit_logs_destination_bootstrap_servers.split(',')|length if audit_logs_destination_enabled and rbac_enabled else kafka_controller_default_internal_replication_factor}}"
confluent.support.metrics.enable: "true"
confluent.support.customer.id: anonymous
log.dirs: "/var/lib/controller/data"
log.dirs: "{{ '/var/lib/kafka/data' if kraft_combined else '/var/lib/controller/data' }}"
kafka.rest.enable: "{{kafka_controller_rest_proxy_enabled|string|lower}}"
process.roles: controller
controller.quorum.voters: "{{ kafka_controller_quorum_voters }}"
Expand Down Expand Up @@ -198,9 +201,9 @@ kafka_controller_properties:
properties:
sasl.kerberos.service.name: "{{kerberos_kafka_controller_primary}}"
inter_broker_sasl:
enabled: "{{ kafka_broker_listeners[kafka_broker_inter_broker_listener_name]['sasl_protocol'] | default(sasl_protocol) | confluent.platform.normalize_sasl_protocol != 'none' }}"
enabled: "{{ kafka_controller_listeners[kafka_broker_inter_broker_listener_name]['sasl_protocol'] | default(kafka_controller_sasl_protocol) | confluent.platform.normalize_sasl_protocol != 'none' }}"
properties:
sasl.mechanism.inter.broker.protocol: "{{kafka_broker_listeners[kafka_broker_inter_broker_listener_name]['sasl_protocol'] | default(sasl_protocol) | confluent.platform.normalize_sasl_protocol}}"
sasl.mechanism.inter.broker.protocol: "{{kafka_controller_listeners[kafka_broker_inter_broker_listener_name]['sasl_protocol'] | default(kafka_controller_sasl_protocol) | confluent.platform.normalize_sasl_protocol}}"
sr:
enabled: "{{ kafka_broker_schema_validation_enabled and 'schema_registry' in groups }}"
properties:
Expand Down Expand Up @@ -340,7 +343,7 @@ kafka_broker_properties:
socket.send.buffer.bytes: 102400
transaction.state.log.min.isr: "{{ [ 2, kafka_broker_default_internal_replication_factor|int ] | min }}"
transaction.state.log.replication.factor: "{{kafka_broker_default_internal_replication_factor}}"
advertised.listeners: "{% for listener in kafka_broker_listeners|dict2items %}{% if loop.index > 1%},{% endif %}{{ listener['value']['name'] }}://{{ listener['value']['hostname'] | default(hostvars[inventory_hostname]|confluent.platform.resolve_hostname) }}:{{ listener['value']['port'] }}{% endfor %}"
advertised.listeners: "{% for listener in kafka_broker_listeners|dict2items|rejectattr('key', 'equalto', 'controller') %}{% if loop.index > 1%},{% endif %}{{ listener['value']['name'] }}://{{ listener['value']['hostname'] | default(hostvars[inventory_hostname]|confluent.platform.resolve_hostname) }}:{{ listener['value']['port'] }}{% endfor %}"
confluent.ansible.managed: 'true'
confluent.license.topic: _confluent-command
confluent.license.topic.replication.factor: "{{kafka_broker_default_internal_replication_factor}}"
Expand All @@ -355,11 +358,24 @@ kafka_broker_properties:
broker_on_controller:
enabled: "{{kraft_enabled|bool}}"
properties:
process.roles: broker
process.roles: "broker{% if kraft_combined %},controller{% endif %}"
controller.quorum.voters: "{{ kafka_controller_quorum_voters }}"
controller.listener.names: "{{kafka_controller_listeners['controller']['name']}}"
listener.security.protocol.map: "{% for listener in kafka_controller_listeners|dict2items %}{% if loop.index > 1%},{% endif %}{{ listener['value']['name'] }}:{{ listener['value'] | confluent.platform.kafka_protocol_defaults(kafka_controller_ssl_enabled, kafka_controller_sasl_protocol)}}{% endfor %},{% for listener in kafka_broker_listeners|dict2items %}{% if loop.index > 1%},{% endif %}{{ listener['value']['name'] }}:{{ listener['value'] | confluent.platform.kafka_protocol_defaults(ssl_enabled, sasl_protocol)}}{% endfor %}"
listeners: "{% for listener in kafka_broker_listeners|dict2items %}{% if loop.index > 1%},{% endif %}{{ listener['value']['name'] }}://{{ listener['value']['ip'] | default('') }}:{{ listener['value']['port'] }}{% endfor %}"
controller.listener.names: "{{ kafka_controller_listeners['controller']['name'] }}"
listener.security.protocol.map: >-
{%- for listener in kafka_controller_listeners|dict2items -%}
{%- if loop.index > 1%},{% endif -%}
{{ listener['value']['name'] }}:{{ listener['value'] | confluent.platform.kafka_protocol_defaults(kafka_controller_ssl_enabled, kafka_controller_sasl_protocol)}}
{%- endfor -%}
,
{%- for listener in kafka_broker_listeners|dict2items -%}
{%- if loop.index > 1%},{% endif -%}
{{ listener['value']['name'] }}:{{ listener['value'] | confluent.platform.kafka_protocol_defaults(ssl_enabled, sasl_protocol)}}
{%- endfor -%}
listeners: >-
{%- for listener in kafka_broker_listeners|dict2items -%}
{%- if loop.index > 1 %},{% endif -%}
{{ listener['value']['name'] }}://{{ listener['value']['ip'] | default('') }}:{{ listener['value']['port'] }}
{%- endfor -%}
confluent.cluster.link.metadata.topic.replication.factor: "{{kafka_broker_default_internal_replication_factor}}"
broker_on_zookeeper:
enabled: "{{not kraft_enabled|bool}}"
Expand Down