Skip to content

Commit

Permalink
Merge pull request #86 from gabrielmougard/feat/feature-api
Browse files Browse the repository at this point in the history
Add a new 'runtime extension' system to MicroCluster
  • Loading branch information
masnax authored Apr 26, 2024
2 parents ed20c91 + b52ca98 commit f877c78
Show file tree
Hide file tree
Showing 17 changed files with 985 additions and 42 deletions.
53 changes: 53 additions & 0 deletions cluster/cluster_members.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/canonical/lxd/lxd/db/query"

"github.com/canonical/microcluster/internal/extensions"
internalTypes "github.com/canonical/microcluster/internal/rest/types"
"github.com/canonical/microcluster/rest/types"
)
Expand Down Expand Up @@ -45,6 +46,7 @@ type InternalClusterMember struct {
Certificate string
SchemaInternal uint64
SchemaExternal uint64
APIExtensions extensions.Extensions
Heartbeat time.Time
Role Role
}
Expand Down Expand Up @@ -79,6 +81,7 @@ func (c InternalClusterMember) ToAPI() (*internalTypes.ClusterMember, error) {
SchemaExternalVersion: c.SchemaExternal,
LastHeartbeat: c.Heartbeat,
Status: internalTypes.MemberUnreachable,
Extensions: c.APIExtensions,
}, nil
}

Expand Down Expand Up @@ -129,3 +132,53 @@ func GetClusterMemberSchemaVersions(ctx context.Context, tx *sql.Tx) (internalSc

return internalSchema, externalSchema, nil
}

// UpdateClusterMemberAPIExtensions sets the API extensions for the cluster member with the given address.
// This helper is non-generated to work before generated statements are loaded, as we update the API extensions.
func UpdateClusterMemberAPIExtensions(tx *sql.Tx, apiExtensions extensions.Extensions, address string) error {
stmt := "UPDATE internal_cluster_members SET api_extensions=? WHERE address=?"
result, err := tx.Exec(stmt, apiExtensions, address)
if err != nil {
return err
}

n, err := result.RowsAffected()
if err != nil {
return err
}
if n != 1 {
return fmt.Errorf("Updated %d rows instead of 1", n)
}

return nil
}

// GetClusterMemberAPIExtensions returns the API extensions from all cluster members that are not pending.
// This helper is non-generated to work before generated statements are loaded, as we update the API extensions.
func GetClusterMemberAPIExtensions(ctx context.Context, tx *sql.Tx) ([]extensions.Extensions, error) {
query := "SELECT api_extensions FROM internal_cluster_members WHERE NOT role='pending'"
rows, err := tx.QueryContext(ctx, query)
if err != nil {
return nil, err
}

defer rows.Close()

var results []extensions.Extensions
for rows.Next() {
var ext extensions.Extensions
err := rows.Scan(&ext)
if err != nil {
return nil, err
}

results = append(results, ext)
}

err = rows.Err()
if err != nil {
return nil, err
}

return results, nil
}
27 changes: 14 additions & 13 deletions cluster/cluster_members.mapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,20 @@ import (
var _ = api.ServerEnvironment{}

var internalClusterMemberObjects = RegisterStmt(`
SELECT internal_cluster_members.id, internal_cluster_members.name, internal_cluster_members.address, internal_cluster_members.certificate, internal_cluster_members.schema_internal, internal_cluster_members.schema_external, internal_cluster_members.heartbeat, internal_cluster_members.role
SELECT internal_cluster_members.id, internal_cluster_members.name, internal_cluster_members.address, internal_cluster_members.certificate, internal_cluster_members.schema_internal, internal_cluster_members.schema_external, internal_cluster_members.api_extensions, internal_cluster_members.heartbeat, internal_cluster_members.role
FROM internal_cluster_members
ORDER BY internal_cluster_members.name
`)

var internalClusterMemberObjectsByAddress = RegisterStmt(`
SELECT internal_cluster_members.id, internal_cluster_members.name, internal_cluster_members.address, internal_cluster_members.certificate, internal_cluster_members.schema_internal, internal_cluster_members.schema_external, internal_cluster_members.heartbeat, internal_cluster_members.role
SELECT internal_cluster_members.id, internal_cluster_members.name, internal_cluster_members.address, internal_cluster_members.certificate, internal_cluster_members.schema_internal, internal_cluster_members.schema_external, internal_cluster_members.api_extensions, internal_cluster_members.heartbeat, internal_cluster_members.role
FROM internal_cluster_members
WHERE ( internal_cluster_members.address = ? )
ORDER BY internal_cluster_members.name
`)

var internalClusterMemberObjectsByName = RegisterStmt(`
SELECT internal_cluster_members.id, internal_cluster_members.name, internal_cluster_members.address, internal_cluster_members.certificate, internal_cluster_members.schema_internal, internal_cluster_members.schema_external, internal_cluster_members.heartbeat, internal_cluster_members.role
SELECT internal_cluster_members.id, internal_cluster_members.name, internal_cluster_members.address, internal_cluster_members.certificate, internal_cluster_members.schema_internal, internal_cluster_members.schema_external, internal_cluster_members.api_extensions, internal_cluster_members.heartbeat, internal_cluster_members.role
FROM internal_cluster_members
WHERE ( internal_cluster_members.name = ? )
ORDER BY internal_cluster_members.name
Expand All @@ -42,8 +42,8 @@ SELECT internal_cluster_members.id FROM internal_cluster_members
`)

var internalClusterMemberCreate = RegisterStmt(`
INSERT INTO internal_cluster_members (name, address, certificate, schema_internal, schema_external, heartbeat, role)
VALUES (?, ?, ?, ?, ?, ?, ?)
INSERT INTO internal_cluster_members (name, address, certificate, schema_internal, schema_external, api_extensions, heartbeat, role)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
`)

var internalClusterMemberDeleteByAddress = RegisterStmt(`
Expand All @@ -52,14 +52,14 @@ DELETE FROM internal_cluster_members WHERE address = ?

var internalClusterMemberUpdate = RegisterStmt(`
UPDATE internal_cluster_members
SET name = ?, address = ?, certificate = ?, schema_internal = ?, schema_external = ?, heartbeat = ?, role = ?
SET name = ?, address = ?, certificate = ?, schema_internal = ?, schema_external = ?, api_extensions = ?, heartbeat = ?, role = ?
WHERE id = ?
`)

// internalClusterMemberColumns returns a string of column names to be used with a SELECT statement for the entity.
// Use this function when building statements to retrieve database entries matching the InternalClusterMember entity.
func internalClusterMemberColumns() string {
return "internal_cluster_members.id, internal_cluster_members.name, internal_cluster_members.address, internal_cluster_members.certificate, internal_cluster_members.schema_internal, internal_cluster_members.schema_external, internal_cluster_members.heartbeat, internal_cluster_members.role"
return "internal_cluster_members.id, internal_cluster_members.name, internal_cluster_members.address, internal_cluster_members.certificate, internal_cluster_members.schema_internal, internal_cluster_members.schema_external, internal_cluster_members.api_extensions, internal_cluster_members.heartbeat, internal_cluster_members.role"
}

// getInternalClusterMembers can be used to run handwritten sql.Stmts to return a slice of objects.
Expand All @@ -68,7 +68,7 @@ func getInternalClusterMembers(ctx context.Context, stmt *sql.Stmt, args ...any)

dest := func(scan func(dest ...any) error) error {
i := InternalClusterMember{}
err := scan(&i.ID, &i.Name, &i.Address, &i.Certificate, &i.SchemaInternal, &i.SchemaExternal, &i.Heartbeat, &i.Role)
err := scan(&i.ID, &i.Name, &i.Address, &i.Certificate, &i.SchemaInternal, &i.SchemaExternal, &i.APIExtensions, &i.Heartbeat, &i.Role)
if err != nil {
return err
}
Expand All @@ -92,7 +92,7 @@ func getInternalClusterMembersRaw(ctx context.Context, tx *sql.Tx, sql string, a

dest := func(scan func(dest ...any) error) error {
i := InternalClusterMember{}
err := scan(&i.ID, &i.Name, &i.Address, &i.Certificate, &i.SchemaInternal, &i.SchemaExternal, &i.Heartbeat, &i.Role)
err := scan(&i.ID, &i.Name, &i.Address, &i.Certificate, &i.SchemaInternal, &i.SchemaExternal, &i.APIExtensions, &i.Heartbeat, &i.Role)
if err != nil {
return err
}
Expand Down Expand Up @@ -272,16 +272,17 @@ func CreateInternalClusterMember(ctx context.Context, tx *sql.Tx, object Interna
return -1, api.StatusErrorf(http.StatusConflict, "This \"internal_cluster_members\" entry already exists")
}

args := make([]any, 7)
args := make([]any, 8)

// Populate the statement arguments.
args[0] = object.Name
args[1] = object.Address
args[2] = object.Certificate
args[3] = object.SchemaInternal
args[4] = object.SchemaExternal
args[5] = object.Heartbeat
args[6] = object.Role
args[5] = object.APIExtensions
args[6] = object.Heartbeat
args[7] = object.Role

// Prepared statement to use.
stmt, err := Stmt(tx, internalClusterMemberCreate)
Expand Down Expand Up @@ -343,7 +344,7 @@ func UpdateInternalClusterMember(ctx context.Context, tx *sql.Tx, name string, o
return fmt.Errorf("Failed to get \"internalClusterMemberUpdate\" prepared statement: %w", err)
}

result, err := stmt.Exec(object.Name, object.Address, object.Certificate, object.SchemaInternal, object.SchemaExternal, object.Heartbeat, object.Role, id)
result, err := stmt.Exec(object.Name, object.Address, object.Certificate, object.SchemaInternal, object.SchemaExternal, object.APIExtensions, object.Heartbeat, object.Role, id)
if err != nil {
return fmt.Errorf("Update \"internal_cluster_members\" entry failed: %w", err)
}
Expand Down
12 changes: 12 additions & 0 deletions example/api/extensions.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package api

// These are the extensions that are present when the daemon starts.
var extensions = []string{
"custom_extension_a_0",
"custom_extension_a_1",
}

// Extensions returns the list of MicroOVN extensions.
func Extensions() []string {
return extensions
}
39 changes: 38 additions & 1 deletion example/cmd/microd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/canonical/microcluster/example/api"
"github.com/canonical/microcluster/example/database"
"github.com/canonical/microcluster/example/version"
"github.com/canonical/microcluster/internal/extensions"
"github.com/canonical/microcluster/microcluster"
"github.com/canonical/microcluster/state"
)
Expand Down Expand Up @@ -72,6 +73,42 @@ func (c *cmdDaemon) Run(cmd *cobra.Command, args []string) error {
logCtx[k] = v
}

// You can check your app extensions using the *state.State object.
hasMissingExt := s.Extensions.HasExtension("missing_extension")
if !hasMissingExt {
logger.Warn("The 'missing_extension' is not registered")
}

// You can also check the internal extensions. (starting with "internal:" prefix)
// These are read-only and defined at the MicroCluster level and cannot be added at runtime
hasInternalExt := s.Extensions.HasExtension("internal:runtime_extension_v1")
if !hasInternalExt {
logger.Warn("Every system should have the 'internal:runtime_extension_v1' extension")
}

// You can also register new extensions at runtime.
err := s.Extensions.Register([]string{"new_extension_at_runtime_1", "new_extension_at_runtime_2"})
if err != nil {
return err
}

// This shows the number of extensions that are registered (internal and external).
numberOfExtensions := s.Extensions.Version()
logger.Infof("The number of extensions is %d", numberOfExtensions)

// You can also create a new registry of extensions from a list of extensions.
// This is useful to communicate a system's extensions to other systems, for comparison purposes for example.
newExt, err := extensions.NewExtensionRegistryFromList([]string{"internal:runtime_extension_v1", "new_extension_at_runtime_1", "new_extension_at_runtime_2", "custom_extension_a_0", "custom_extension_a_1"})
if err != nil {
return err
}

// You can compare the extensions of two systems.
err = s.Extensions.IsSameVersion(newExt)
if err != nil {
return err
}

logger.Info("This is a hook that runs after the daemon is initialized and bootstrapped")
logger.Info("Here are the extra configuration keys that were passed into the init --bootstrap command", logCtx)

Expand Down Expand Up @@ -152,7 +189,7 @@ func (c *cmdDaemon) Run(cmd *cobra.Command, args []string) error {
},
}

return m.Start(cmd.Context(), api.Endpoints, database.SchemaExtensions, exampleHooks)
return m.Start(cmd.Context(), api.Endpoints, database.SchemaExtensions, api.Extensions(), exampleHooks)
}

func main() {
Expand Down
28 changes: 22 additions & 6 deletions internal/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/canonical/microcluster/config"
"github.com/canonical/microcluster/internal/db"
"github.com/canonical/microcluster/internal/endpoints"
"github.com/canonical/microcluster/internal/extensions"
internalREST "github.com/canonical/microcluster/internal/rest"
internalClient "github.com/canonical/microcluster/internal/rest/client"
"github.com/canonical/microcluster/internal/rest/resources"
Expand Down Expand Up @@ -63,6 +64,8 @@ type Daemon struct {
shutdownDoneCh chan error // Receives the result of state.Stop() when exit() is called and tells the daemon to end.
shutdownCancel context.CancelFunc // Cancels the shutdownCtx to indicate shutdown starting.

Extensions extensions.Extensions // Extensions supported at runtime by the daemon.

// stop is a sync.Once which wraps the daemon's stop sequence. Each call will block until the first one completes.
stop func() error
}
Expand Down Expand Up @@ -93,7 +96,7 @@ func NewDaemon(project string) *Daemon {
// - `extensionsAPI` is a list of endpoints to be served over `/1.0`.
// - `extensionsSchema` is a list of schema updates in the order that they should be applied.
// - `hooks` are a set of functions that trigger at certain points during cluster communication.
func (d *Daemon) Run(ctx context.Context, listenPort string, stateDir string, socketGroup string, extensionsAPI []rest.Endpoint, extensionsSchema []schema.Update, hooks *config.Hooks) error {
func (d *Daemon) Run(ctx context.Context, listenPort string, stateDir string, socketGroup string, extensionsAPI []rest.Endpoint, extensionsSchema []schema.Update, apiExtensions []string, hooks *config.Hooks) error {
d.shutdownCtx, d.shutdownCancel = context.WithCancel(ctx)
if stateDir == "" {
stateDir = os.Getenv(sys.StateDir)
Expand All @@ -114,7 +117,7 @@ func (d *Daemon) Run(ctx context.Context, listenPort string, stateDir string, so
return fmt.Errorf("Failed to initialize directory structure: %w", err)
}

err = d.init(listenPort, extensionsAPI, extensionsSchema, hooks)
err = d.init(listenPort, extensionsAPI, extensionsSchema, apiExtensions, hooks)
if err != nil {
return fmt.Errorf("Daemon failed to start: %w", err)
}
Expand All @@ -136,7 +139,7 @@ func (d *Daemon) Run(ctx context.Context, listenPort string, stateDir string, so
}
}

func (d *Daemon) init(listenPort string, extendedEndpoints []rest.Endpoint, schemaExtensions []schema.Update, hooks *config.Hooks) error {
func (d *Daemon) init(listenPort string, extendedEndpoints []rest.Endpoint, schemaExtensions []schema.Update, apiExtensions []string, hooks *config.Hooks) error {
d.applyHooks(hooks)

var err error
Expand All @@ -145,6 +148,18 @@ func (d *Daemon) init(listenPort string, extendedEndpoints []rest.Endpoint, sche
return fmt.Errorf("Failed to assign default system name: %w", err)
}

// Initialize the extensions registry with the internal extensions.
d.Extensions, err = extensions.NewExtensionRegistry(true)
if err != nil {
return err
}

// Register the extensions passed at initialization.
err = d.Extensions.Register(apiExtensions)
if err != nil {
return err
}

d.serverCert, err = util.LoadServerCert(d.os.StateDir)
if err != nil {
return err
Expand Down Expand Up @@ -408,7 +423,7 @@ func (d *Daemon) StartAPI(bootstrap bool, initConfig map[string]string, newConfi

clusterMember.SchemaInternal, clusterMember.SchemaExternal = d.db.Schema().Version()

err = d.db.Bootstrap(d.project, d.address, clusterMember)
err = d.db.Bootstrap(d.Extensions, d.project, d.address, clusterMember)
if err != nil {
return err
}
Expand All @@ -428,12 +443,12 @@ func (d *Daemon) StartAPI(bootstrap bool, initConfig map[string]string, newConfi
}

if len(joinAddresses) != 0 {
err = d.db.Join(d.project, d.address, joinAddresses...)
err = d.db.Join(d.Extensions, d.project, d.address, joinAddresses...)
if err != nil {
return fmt.Errorf("Failed to join cluster: %w", err)
}
} else {
err = d.db.StartWithCluster(d.project, d.address, d.trustStore.Remotes().Addresses())
err = d.db.StartWithCluster(d.Extensions, d.project, d.address, d.trustStore.Remotes().Addresses())
if err != nil {
return fmt.Errorf("Failed to re-establish cluster connection: %w", err)
}
Expand Down Expand Up @@ -638,6 +653,7 @@ func (d *Daemon) State() *state.State {

return exit, stopErr
},
Extensions: d.Extensions,
}

return state
Expand Down
Loading

0 comments on commit f877c78

Please sign in to comment.