Skip to content

Commit

Permalink
Register_peers support for receptor_addresses
Browse files Browse the repository at this point in the history
register_peers has inputs:

source: source instance
peers: list of instances the source should peer to

InstanceLink "target" is now expected to be a ReceptorAddress

For each peer, we can just use the first receptor address. If
multiple receptor addresses exist, throw a command error.

Currently this command is only used on VM-deployments, where
there is only a single receptor address per instance, so this
should work fine.

Other changes:
drop listener_port field from Instance. Listener port is now just
"port" on ReceptorAddress

Signed-off-by: Seth Foster <[email protected]>
  • Loading branch information
fosterseth committed Nov 2, 2023
1 parent 3272b28 commit 3206672
Show file tree
Hide file tree
Showing 7 changed files with 48 additions and 39 deletions.
35 changes: 18 additions & 17 deletions awx/main/management/commands/add_receptor_address.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,27 @@
# All Rights Reserved

from django.core.management.base import BaseCommand
from django.db import transaction

from awx.main.models import Instance, ReceptorAddress


def add_address(**kwargs):
try:
instance = Instance.objects.get(hostname=kwargs.pop('hostname'))
kwargs['instance'] = instance
# address and protocol are unique together for ReceptorAddress
# If an address has (address, protocol), it will update the rest of the values suppled in defaults dict
# if no address exists with (address, protocol), then a new address will be created
# these unique together fields need to be consistent with the unique constraint in the ReceptorAddress model
addr, _ = ReceptorAddress.objects.update_or_create(address=kwargs.pop('address'), protocol=kwargs.pop('protocol'), defaults=kwargs)
print(f"Successfully added receptor address {addr.get_full_address()}")
changed = True
except Exception as e:
changed = False
print(f"Error adding receptor address: {e}")
return changed


class Command(BaseCommand):
"""
Internal tower command.
Expand All @@ -24,24 +40,9 @@ def add_arguments(self, parser):
parser.add_argument('--is_internal', action='store_true', help="If true, address only resolvable within the Kubernetes cluster")
parser.add_argument('--peers_from_control_nodes', action='store_true', help="If true, control nodes will peer to this address")

def _add_address(self, **kwargs):
try:
instance = Instance.objects.get(hostname=kwargs.pop('hostname'))
kwargs['instance'] = instance
# address and protocol are unique together for ReceptorAddress
# If an address has (address, protocol), it will update the rest of the values suppled in defaults dict
# if no address exists with (address, protocol), then a new address will be created
# these unique together fields need to be consistent with the unique constraint in the ReceptorAddress model
addr, _ = ReceptorAddress.objects.update_or_create(address=kwargs.pop('address'), protocol=kwargs.pop('protocol'), defaults=kwargs)
print(f"Successfully added receptor address {addr.get_full_address()}")
self.changed = True
except Exception as e:
self.changed = False
print(f"Error adding receptor address: {e}")

def handle(self, **options):
self.changed = False
address_options = {k: options[k] for k in ('hostname', 'address', 'port', 'protocol', 'websocket_path', 'is_internal', 'peers_from_control_nodes')}
self._add_address(**address_options)
self.changed = add_address(**address_options)
if self.changed:
print("(changed: True)")
2 changes: 1 addition & 1 deletion awx/main/management/commands/list_instances.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def handle(self, *args, **options):

capacity = f' capacity={x.capacity}' if x.node_type != 'hop' else ''
version = f" version={x.version or '?'}" if x.node_type != 'hop' else ''
heartbeat = f' heartbeat="{x.last_seen:%Y-%m-%d %H:%M:%S}"' if x.capacity or x.node_type == 'hop' else ''
heartbeat = f' heartbeat="{x.last_seen:%Y-%m-%d %H:%M:%S}"' if x.last_seen and x.capacity or x.node_type == 'hop' else ''
print(f'\t{color}{x.hostname}{capacity} node_type={x.node_type}{version}{heartbeat}{end_color}')

print()
16 changes: 11 additions & 5 deletions awx/main/management/commands/register_peers.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def add_arguments(self, parser):

def handle(self, **options):
# provides a mapping of hostname to Instance objects
nodes = Instance.objects.in_bulk(field_name='hostname')
nodes = Instance.objects.all().prefetch_related('receptor_addresses').in_bulk(field_name='hostname')

if options['source'] not in nodes:
raise CommandError(f"Host {options['source']} is not a registered instance.")
Expand All @@ -39,6 +39,12 @@ def handle(self, **options):
if options['exact'] is not None and options['disconnect']:
raise CommandError("The option --disconnect may not be used with --exact.")

# make sure peers and source instances only have one receptor address
for hostname, node in nodes.items():
if hostname in options.get('peers', []) or hostname == options['source']:
if node.receptor_addresses.count() > 1:
raise CommandError(f"Instance {hostname} has more than one receptor address.")

# No 1-cycles
for collection in ('peers', 'disconnect', 'exact'):
if options[collection] is not None and options['source'] in options[collection]:
Expand All @@ -60,7 +66,7 @@ def handle(self, **options):
results = 0
for target in options['peers']:
_, created = InstanceLink.objects.update_or_create(
source=nodes[options['source']], target=nodes[target], defaults={'link_state': InstanceLink.States.ESTABLISHED}
source=nodes[options['source']], target=nodes[target].receptor_addresses.get(), defaults={'link_state': InstanceLink.States.ESTABLISHED}
)
if created:
results += 1
Expand All @@ -72,7 +78,7 @@ def handle(self, **options):
for target in options['disconnect']:
if target not in nodes: # Be permissive, the node might have already been de-registered.
continue
n, _ = InstanceLink.objects.filter(source=nodes[options['source']], target=nodes[target]).delete()
n, _ = InstanceLink.objects.filter(source=nodes[options['source']], target=nodes[target].receptor_addresses.get()).delete()
results += n

print(f"{results} peer links removed from the database.")
Expand All @@ -82,10 +88,10 @@ def handle(self, **options):
with transaction.atomic():
peers = set(options['exact'])
links = set(InstanceLink.objects.filter(source=nodes[options['source']]).values_list('target__hostname', flat=True))
removals, _ = InstanceLink.objects.filter(source=nodes[options['source']], target__hostname__in=links - peers).delete()
removals, _ = InstanceLink.objects.filter(source=nodes[options['source']], target__instance__hostname__in=links - peers).delete()
for target in peers - links:
_, created = InstanceLink.objects.update_or_create(
source=nodes[options['source']], target=nodes[target], defaults={'link_state': InstanceLink.States.ESTABLISHED}
source=nodes[options['source']], target=nodes[target].receptor_addresses.get(), defaults={'link_state': InstanceLink.States.ESTABLISHED}
)
if created:
additions += 1
Expand Down
8 changes: 4 additions & 4 deletions awx/main/managers.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,6 @@ def register(self, node_uuid=None, hostname=None, ip_address="", listener_port=N
if instance.node_type != node_type:
instance.node_type = node_type
update_fields.append('node_type')
if instance.listener_port != listener_port:
instance.listener_port = listener_port
update_fields.append('listener_port')
if update_fields:
instance.save(update_fields=update_fields)
return (True, instance)
Expand All @@ -183,10 +180,13 @@ def register(self, node_uuid=None, hostname=None, ip_address="", listener_port=N
instance = self.create(
hostname=hostname,
ip_address=ip_address,
listener_port=listener_port,
node_type=node_type,
peers_from_control_nodes=peers_from_control_nodes,
**create_defaults,
**uuid_option
)
from awx.main.management.commands.add_receptor_address import add_address

if listener_port:
add_address(address=hostname, hostname=hostname, port=listener_port, protocol='tcp')
return (True, instance)
15 changes: 12 additions & 3 deletions awx/main/migrations/0188_inbound_hop_nodes.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Generated by Django 4.2.5 on 2023-10-10 00:26
# Generated by Django 4.2.6 on 2023-11-02 18:07

from django.db import migrations, models
import django.db.models.deletion
Expand All @@ -15,8 +15,8 @@ class Migration(migrations.Migration):
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('address', models.CharField(max_length=255)),
('port', models.IntegerField()),
('protocol', models.CharField(max_length=10)),
('port', models.IntegerField(default=27199)),
('protocol', models.CharField(default='tcp', max_length=10)),
('websocket_path', models.CharField(blank=True, default='', max_length=255)),
('is_internal', models.BooleanField(default=False)),
('peers_from_control_nodes', models.BooleanField(default=False)),
Expand All @@ -30,6 +30,10 @@ class Migration(migrations.Migration):
name='instancelink',
unique_together=set(),
),
migrations.RemoveField(
model_name='instance',
name='listener_port',
),
migrations.AlterField(
model_name='instancelink',
name='source',
Expand All @@ -40,6 +44,11 @@ class Migration(migrations.Migration):
name='instance',
field=models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='receptor_addresses', to='main.instance'),
),
migrations.AddField(
model_name='activitystream',
name='receptor_address',
field=models.ManyToManyField(blank=True, to='main.receptoraddress'),
),
migrations.AlterField(
model_name='instance',
name='peers',
Expand Down
7 changes: 0 additions & 7 deletions awx/main/models/ha.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,13 +183,6 @@ class States(models.TextChoices):
node_state = models.CharField(
choices=States.choices, default=States.READY, max_length=16, help_text=_("Indicates the current life cycle stage of this instance.")
)
listener_port = models.PositiveIntegerField(
blank=True,
null=True,
default=None,
validators=[MinValueValidator(1024), MaxValueValidator(65535)],
help_text=_("Port that Receptor will listen for incoming connections on."),
)

peers = models.ManyToManyField('ReceptorAddress', through=InstanceLink, through_fields=('source', 'target'), related_name='peers_from')
peers_from_control_nodes = models.BooleanField(default=False, help_text=_("If True, control plane cluster nodes should automatically peer to it."))
Expand Down
4 changes: 2 additions & 2 deletions awx/main/models/receptor_address.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ class Meta:
]

address = models.CharField(max_length=255)
port = models.IntegerField(blank=False)
protocol = models.CharField(max_length=10)
port = models.IntegerField(default=27199)
protocol = models.CharField(max_length=10, default="tcp")
websocket_path = models.CharField(max_length=255, default="", blank=True)
is_internal = models.BooleanField(default=False)
peers_from_control_nodes = models.BooleanField(default=False)
Expand Down

0 comments on commit 3206672

Please sign in to comment.