From de4b5f69f944ab9fa186c4b007d84c9e34695e79 Mon Sep 17 00:00:00 2001 From: Christian Lefebvre Date: Fri, 3 Nov 2023 23:20:01 +0100 Subject: [PATCH] handle combined nodes --- roles/variables/defaults/main.yml | 21 ++++++++++++++++--- roles/variables/vars/main.yml | 34 +++++++++++++++++++++++-------- 2 files changed, 43 insertions(+), 12 deletions(-) diff --git a/roles/variables/defaults/main.yml b/roles/variables/defaults/main.yml index a25a0993cf..9aa854e777 100644 --- a/roles/variables/defaults/main.yml +++ b/roles/variables/defaults/main.yml @@ -416,8 +416,22 @@ zookeeper_skip_restarts: "{{ skip_restarts }}" #### kafka Controller variables #### -### Default controller quorum voters -kafka_controller_quorum_voters: "{% for controller_hostname in groups.kafka_controller|default([]) %}{% if loop.index > 1%},{% endif %}{{groups.kafka_controller.index(controller_hostname)|int + 9991}}@{{controller_hostname}}:{{ kafka_controller_listeners['controller']['port'] }}{%endfor%}" +### set to true to install controller and broker on same nodes +kraft_combined: false + +### Default controller quorum voters. Dynamically assigned later if not user provided +kafka_controller_quorum_voters: >- + {%- if kraft_combined -%} + {%- for broker_hostname in groups.kafka_broker|default([]) %} + {%- if loop.index > 1%},{% endif -%} + {{ groups.kafka_broker.index(broker_hostname)|int + 1 }}@{{ broker_hostname }}:{{ kafka_broker_listeners['controller']['port'] }} + {%- endfor -%} + {%- else -%} + {%- for controller_hostname in groups.kafka_controller|default([]) -%} + {%- if loop.index > 1%},{% endif -%} + {{ groups.kafka_controller.index(controller_hostname)|int + 9991 }}@{{ controller_hostname }}:{{ kafka_controller_listeners['controller']['port'] }} + {%- endfor -%} + {%- endif -%} ### Default Kafka config prefix. Only valid to customize when installation_method: archive kafka_controller_config_prefix: "{{ config_prefix }}/controller" @@ -569,7 +583,8 @@ kafka_broker_default_listeners: "{ 'ssl_enabled': {{ssl_enabled|string|lower}}, 'ssl_mutual_auth_enabled': {{ssl_mutual_auth_enabled|string|lower}}, 'sasl_protocol': '{{sasl_protocol}}' - }{% endif %}{% endif %} + }{% endif %}{% if kraft_enabled|bool and kraft_combined|bool %}, + 'controller': {{ kafka_controller_listeners['controller'] }}{% endif %}{% endif %} }" ### Dictionary to put additional listeners to be configured within Kafka. Each listener must include a 'name' and 'port' key. Optionally they can include the keys 'ssl_enabled', 'ssl_mutual_auth_enabled', and 'sasl_protocol' diff --git a/roles/variables/vars/main.yml b/roles/variables/vars/main.yml index d09e4cd952..38d7127fde 100644 --- a/roles/variables/vars/main.yml +++ b/roles/variables/vars/main.yml @@ -14,7 +14,10 @@ base_path: "{{ ((config_base_path,('confluent-',archive_version) | join) | path_ binary_base_path: "{{ ((config_base_path,('confluent-',archive_version) | join) | path_join) if installation_method == 'archive' else '/usr' }}" ### Runs kafka in Kraft mode if controller is present -kraft_enabled: "{{ true if 'kafka_controller' in groups.keys() and groups['kafka_controller'] | length > 0 else false }}" +kraft_enabled: "{{ true if kraft_combined or ('kafka_controller' in groups.keys() and groups['kafka_controller'] | length > 0) else false }}" + +### One controller node to delegate actions to +kafka_controller_default_host: "{{ ( groups.kafka_broker[0] if kraft_combined else groups.kafka_controller[0] ) if kraft_enabled else 'no controller' }}" #### Config prefix paths #### zookeeper_config_prefix_path: "{{ zookeeper_config_prefix.strip('/') }}" @@ -149,7 +152,7 @@ kafka_controller_properties: confluent.security.event.logger.exporter.kafka.topic.replicas: "{{audit_logs_destination_bootstrap_servers.split(',')|length if audit_logs_destination_enabled and rbac_enabled else kafka_controller_default_internal_replication_factor}}" confluent.support.metrics.enable: "true" confluent.support.customer.id: anonymous - log.dirs: "/var/lib/controller/data" + log.dirs: "{{ '/var/lib/kafka/data' if kraft_combined else '/var/lib/controller/data' }}" kafka.rest.enable: "{{kafka_controller_rest_proxy_enabled|string|lower}}" process.roles: controller controller.quorum.voters: "{{ kafka_controller_quorum_voters }}" @@ -198,9 +201,9 @@ kafka_controller_properties: properties: sasl.kerberos.service.name: "{{kerberos_kafka_controller_primary}}" inter_broker_sasl: - enabled: "{{ kafka_broker_listeners[kafka_broker_inter_broker_listener_name]['sasl_protocol'] | default(sasl_protocol) | confluent.platform.normalize_sasl_protocol != 'none' }}" + enabled: "{{ kafka_controller_listeners[kafka_broker_inter_broker_listener_name]['sasl_protocol'] | default(kafka_controller_sasl_protocol) | confluent.platform.normalize_sasl_protocol != 'none' }}" properties: - sasl.mechanism.inter.broker.protocol: "{{kafka_broker_listeners[kafka_broker_inter_broker_listener_name]['sasl_protocol'] | default(sasl_protocol) | confluent.platform.normalize_sasl_protocol}}" + sasl.mechanism.inter.broker.protocol: "{{kafka_controller_listeners[kafka_broker_inter_broker_listener_name]['sasl_protocol'] | default(kafka_controller_sasl_protocol) | confluent.platform.normalize_sasl_protocol}}" sr: enabled: "{{ kafka_broker_schema_validation_enabled and 'schema_registry' in groups }}" properties: @@ -340,7 +343,7 @@ kafka_broker_properties: socket.send.buffer.bytes: 102400 transaction.state.log.min.isr: "{{ [ 2, kafka_broker_default_internal_replication_factor|int ] | min }}" transaction.state.log.replication.factor: "{{kafka_broker_default_internal_replication_factor}}" - advertised.listeners: "{% for listener in kafka_broker_listeners|dict2items %}{% if loop.index > 1%},{% endif %}{{ listener['value']['name'] }}://{{ listener['value']['hostname'] | default(hostvars[inventory_hostname]|confluent.platform.resolve_hostname) }}:{{ listener['value']['port'] }}{% endfor %}" + advertised.listeners: "{% for listener in kafka_broker_listeners|dict2items|rejectattr('key', 'equalto', 'controller') %}{% if loop.index > 1%},{% endif %}{{ listener['value']['name'] }}://{{ listener['value']['hostname'] | default(hostvars[inventory_hostname]|confluent.platform.resolve_hostname) }}:{{ listener['value']['port'] }}{% endfor %}" confluent.ansible.managed: 'true' confluent.license.topic: _confluent-command confluent.license.topic.replication.factor: "{{kafka_broker_default_internal_replication_factor}}" @@ -355,11 +358,24 @@ kafka_broker_properties: broker_on_controller: enabled: "{{kraft_enabled|bool}}" properties: - process.roles: broker + process.roles: "broker{% if kraft_combined %},controller{% endif %}" controller.quorum.voters: "{{ kafka_controller_quorum_voters }}" - controller.listener.names: "{{kafka_controller_listeners['controller']['name']}}" - listener.security.protocol.map: "{% for listener in kafka_controller_listeners|dict2items %}{% if loop.index > 1%},{% endif %}{{ listener['value']['name'] }}:{{ listener['value'] | confluent.platform.kafka_protocol_defaults(kafka_controller_ssl_enabled, kafka_controller_sasl_protocol)}}{% endfor %},{% for listener in kafka_broker_listeners|dict2items %}{% if loop.index > 1%},{% endif %}{{ listener['value']['name'] }}:{{ listener['value'] | confluent.platform.kafka_protocol_defaults(ssl_enabled, sasl_protocol)}}{% endfor %}" - listeners: "{% for listener in kafka_broker_listeners|dict2items %}{% if loop.index > 1%},{% endif %}{{ listener['value']['name'] }}://{{ listener['value']['ip'] | default('') }}:{{ listener['value']['port'] }}{% endfor %}" + controller.listener.names: "{{ kafka_controller_listeners['controller']['name'] }}" + listener.security.protocol.map: >- + {%- for listener in kafka_controller_listeners|dict2items -%} + {%- if loop.index > 1%},{% endif -%} + {{ listener['value']['name'] }}:{{ listener['value'] | confluent.platform.kafka_protocol_defaults(kafka_controller_ssl_enabled, kafka_controller_sasl_protocol)}} + {%- endfor -%} + , + {%- for listener in kafka_broker_listeners|dict2items -%} + {%- if loop.index > 1%},{% endif -%} + {{ listener['value']['name'] }}:{{ listener['value'] | confluent.platform.kafka_protocol_defaults(ssl_enabled, sasl_protocol)}} + {%- endfor -%} + listeners: >- + {%- for listener in kafka_broker_listeners|dict2items -%} + {%- if loop.index > 1 %},{% endif -%} + {{ listener['value']['name'] }}://{{ listener['value']['ip'] | default('') }}:{{ listener['value']['port'] }} + {%- endfor -%} confluent.cluster.link.metadata.topic.replication.factor: "{{kafka_broker_default_internal_replication_factor}}" broker_on_zookeeper: enabled: "{{not kraft_enabled|bool}}"