Skip to content

Commit

Permalink
Update Kafka topics to use REST API
Browse files Browse the repository at this point in the history
  • Loading branch information
justinrlee committed May 14, 2023
1 parent 90b7595 commit 859b66c
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 30 deletions.
32 changes: 2 additions & 30 deletions roles/kafka_broker/tasks/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -516,34 +516,6 @@
when: (ssl_provided_keystore_and_truststore | bool)

- name: Create Topics
shell: |
{{ binary_base_path }}/bin/kafka-topics --bootstrap-server {{ hostvars[inventory_hostname]|confluent.platform.resolve_hostname }}:{{kafka_broker_listeners[kafka_broker_inter_broker_listener_name]['port']}} \
--command-config {{kafka_broker.client_config_file}} \
--create --if-not-exists \
--topic {{ item.name }} \
{{ '' if 'partitions' not in item else ' --partitions ' + item.partitions|string }} \
{{ '' if 'replication.factor' not in item else ' --replication-factor ' + item['replication.factor']|string }}
environment:
KAFKA_OPTS: "-Xlog:all=error -XX:+IgnoreUnrecognizedVMOptions {% if kerberos_client_config_file_dest != '/etc/krb5.conf' %}-Djava.security.krb5.conf={{kerberos_client_config_file_dest}}{% endif %}"
ignore_errors: true
loop: "{{ kafka_broker_topics }}"
when: not ( rbac_enabled|bool or kafka_broker_client_secrets_protection_enabled|bool )
run_once: true
tags:
- resource

- name: Configure Topics
shell: |
{{ binary_base_path }}/bin/kafka-configs --bootstrap-server {{ hostvars[inventory_hostname]|confluent.platform.resolve_hostname }}:{{kafka_broker_listeners[kafka_broker_inter_broker_listener_name]['port']}} \
--command-config {{kafka_broker.client_config_file}} \
--alter \
--topic {{ item.name }} \
{% for k in item.config %} --add-config {{ k|string }}={{ item.config[k]|string }} {% endfor %} \
environment:
KAFKA_OPTS: "-Xlog:all=error -XX:+IgnoreUnrecognizedVMOptions {% if kerberos_client_config_file_dest != '/etc/krb5.conf' %}-Djava.security.krb5.conf={{kerberos_client_config_file_dest}}{% endif %}"
ignore_errors: true
loop: "{{ kafka_broker_topics|selectattr('config','defined') }}"
when: not ( rbac_enabled|bool or kafka_broker_client_secrets_protection_enabled|bool )
run_once: true
include_tasks: kafka_topics.yml
tags:
- resource
- topics
98 changes: 98 additions & 0 deletions roles/kafka_broker/tasks/topics.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
---
- name: Get Cluster ID
uri:
url: "{{ kafka_broker_erp_clusters_url }}"
validate_certs: false
return_content: true
status_code: 200
url_username: "{{kafka_broker_rest_health_check_user}}"
url_password: "{{kafka_broker_rest_health_check_password}}"
force_basic_auth: true
body_format: json
register: cluster_id_query
run_once: true
tags:
- topics

- name: Parse Kafka Cluster ID from json query
set_fact:
kafka_cluster_id: "{{ cluster_id_query.json.data[0].cluster_id }}"
run_once: true
tags:
- topics

- name: Get topic info
uri:
url: "{{ kafka_broker_erp_clusters_url }}/{{ kafka_cluster_id }}/topics/{{ item.topic_name }}/configs"
status_code: [200, 404]
headers:
content-type: application/json
validate_certs: false
return_content: true
url_username: "{{kafka_broker_rest_health_check_user}}"
url_password: "{{kafka_broker_rest_health_check_password}}"
force_basic_auth: true
body_format: json
loop: "{{ kafka_broker_topics }}"
register: topic_info
run_once: true
tags:
- topics

- name: Identify existing topics
set_fact:
new_topics: "{{ topic_info.results | rejectattr('status', 'eq', 200) | map(attribute = 'item') }}"
existing_topics: "{{ topic_info.results | selectattr('status', 'eq', 200) }}"
run_once: true
tags:
- topics

- name: Create new topics
uri:
url: "{{ kafka_broker_erp_clusters_url }}/{{ kafka_cluster_id }}/topics"
method: POST
status_code: [201]
body: "{{ item }}"
headers:
content-type: application/json
validate_certs: false
return_content: true
url_username: "{{kafka_broker_rest_health_check_user}}"
url_password: "{{kafka_broker_rest_health_check_password}}"
force_basic_auth: true
body_format: json
loop: "{{ new_topics }}"
run_once: true
tags:
- topics

- name: Update existing configs for existing topics
loop: "{{ existing_topics }}"
uri:
url: "{{ kafka_broker_erp_clusters_url }}/{{ kafka_cluster_id }}/topics/{{ item.item.topic_name }}/configs:alter"
method: POST
status_code: [204]
body: |
{
"data": [
{% for config in item.item['configs']|default([]) %}
{% if loop.index > 1%},{% endif %}
{{ config }}
{% endfor %}
{% if item.item['configs']|default([]) | length > 0 %},{% endif %}
{% for config_remove in (item.json.data | selectattr('is_default', 'eq', false) | map(attribute = 'name')) | difference(item.item.configs|default({}) | map(attribute = 'name')) %}
{% if loop.index > 1%},{% endif %}
{"name": "{{ config_remove }}", "operation": "DELETE"}
{% endfor %}
]
}
headers:
content-type: application/json
validate_certs: false
return_content: true
url_username: "{{kafka_broker_rest_health_check_user}}"
url_password: "{{kafka_broker_rest_health_check_password}}"
force_basic_auth: true
body_format: json
tags:
- topics

0 comments on commit 859b66c

Please sign in to comment.