Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP (do not merge): Support topic creation #1408

Open
wants to merge 4 commits into
base: 7.3.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions roles/kafka_broker/defaults/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,6 @@ kafka_broker_jmxexporter_startup_delay: 60

### Whether to cache bean name expressions to rule computation (match and mismatch). Not recommended for rules matching on bean value, as only the value from the first scrape will be cached and re-used. This can increase performance when collecting a lot of mbeans.
kafka_broker_jmxexporter_bean_name_expressions_cache: false

### Topics to create
kafka_broker_topics: []
5 changes: 5 additions & 0 deletions roles/kafka_broker/tasks/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -514,3 +514,8 @@
- "{{kafka_broker_cert_path}}"
- "{{kafka_broker_key_path}}"
when: (ssl_provided_keystore_and_truststore | bool)

- name: Create Topics
include_tasks: topics.yml
tags:
- topics
145 changes: 145 additions & 0 deletions roles/kafka_broker/tasks/topics.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
---
- name: Get Cluster ID
uri:
url: "{{ kafka_broker_erp_clusters_url }}"
validate_certs: false
return_content: true
status_code: 200
url_username: "{{kafka_broker_rest_health_check_user}}"
url_password: "{{kafka_broker_rest_health_check_password}}"
force_basic_auth: true
body_format: json
register: cluster_id_query
run_once: true
tags:
- topics

- name: Parse Kafka Cluster ID from json query
set_fact:
kafka_cluster_id: "{{ cluster_id_query.json.data[0].cluster_id }}"
run_once: true
tags:
- topics

# Topic API (.../topics/<topicname>) has information about partition count
- name: Get topics
uri:
url: "{{ kafka_broker_erp_clusters_url }}/{{ kafka_cluster_id }}/topics/{{ item.topic_name }}"
status_code: [200, 404]
headers:
content-type: application/json
validate_certs: false
return_content: true
url_username: "{{kafka_broker_rest_health_check_user}}"
url_password: "{{kafka_broker_rest_health_check_password}}"
force_basic_auth: true
return_content: true
body_format: json
loop: "{{ kafka_broker_topics }}"
register: get_topics
run_once: true

# Topic config API (.../topics/<topicname>/configs) has configurations
# TODO: Only run on existing topics
- name: Get topic configs
uri:
url: "{{ kafka_broker_erp_clusters_url }}/{{ kafka_cluster_id }}/topics/{{ item.topic_name }}/configs"
status_code: [200, 404]
headers:
content-type: application/json
validate_certs: false
return_content: true
url_username: "{{kafka_broker_rest_health_check_user}}"
url_password: "{{kafka_broker_rest_health_check_password}}"
force_basic_auth: true
body_format: json

loop: "{{ kafka_broker_topics }}"
register: topic_configs
run_once: true
tags:
- topics

# TODO: base on topics instead of topic configs?
- name: Identify existing topics
set_fact:
new_topics: "{{ topic_configs.results | rejectattr('status', 'eq', 200) | map(attribute = 'item') }}"
existing_topics: "{{ topic_configs.results | selectattr('status', 'eq', 200) }}"
run_once: true
tags:
- topics

- name: Create new topics
uri:
url: "{{ kafka_broker_erp_clusters_url }}/{{ kafka_cluster_id }}/topics"
method: POST
status_code: [201]
body: "{{ item }}"
headers:
content-type: application/json
validate_certs: false
return_content: true
url_username: "{{kafka_broker_rest_health_check_user}}"
url_password: "{{kafka_broker_rest_health_check_password}}"
force_basic_auth: true
body_format: json
loop: "{{ new_topics }}"
run_once: true
tags:
- topics

- name: Update configs for existing topics
uri:
url: "{{ kafka_broker_erp_clusters_url }}/{{ kafka_cluster_id }}/topics/{{ item.item.topic_name }}/configs:alter"
method: POST
status_code: [204]
body: |
{
"data": [
{% for config in item.item['configs']|default([]) %}
{% if loop.index > 1%},{% endif %}
{{ config }}
{% endfor %}
{% if item.item['configs']|default([]) | length > 0 %},{% endif %}
{% for config_remove in (item.json.data | selectattr('is_default', 'eq', false) | map(attribute = 'name')) | difference(item.item.configs|default({}) | map(attribute = 'name')) %}
{% if loop.index > 1%},{% endif %}
{"name": "{{ config_remove }}", "operation": "DELETE"}
{% endfor %}
]
}
headers:
content-type: application/json
validate_certs: false
return_content: true
url_username: "{{kafka_broker_rest_health_check_user}}"
url_password: "{{kafka_broker_rest_health_check_password}}"
force_basic_auth: true
body_format: json
loop: "{{ existing_topics }}"
run_once: true
tags:
- topics

- name: Update partition counts
uri:
url: "{{ kafka_broker_erp_clusters_url }}/{{ kafka_cluster_id }}/topics/{{ item.item.topic_name }}"
method: PATCH
status_code: [200]
body: |
{
"partitions_count": "{{ item.item.partitions_count }}"
}
headers:
content-type: application/json
validate_certs: false
return_content: true
url_username: "{{kafka_broker_rest_health_check_user}}"
url_password: "{{kafka_broker_rest_health_check_password}}"
force_basic_auth: true
return_content: true
body_format: json
loop: "{{ get_topics.results }}"
when: item.status == 200 and item.item.partitions_count > item.json.partitions_count
run_once: true
tags:
- topics