Skip to content

Commit

Permalink
flowctl: enable self-service task logs
Browse files Browse the repository at this point in the history
* Update local-stack certificate generation to create certs that work
  with rustls, when used with:

  SSL_CERT_FILE=~/estuary/data-plane-gateway/local-tls-cert.pem

* When reading ops logs, obtain a DPG token which is authorized for the
  task name, rather than the ops collection.

* Drop stats for now to remove flowctl surface area.

* Remove deprecated include-partition / exclude-partition (began to
  panic with auth_prefixes addition to ReadArgs for unknown reasons).
  • Loading branch information
jgraettinger committed Jan 19, 2024
1 parent 7681c66 commit 2224408
Show file tree
Hide file tree
Showing 5 changed files with 17 additions and 66 deletions.
7 changes: 6 additions & 1 deletion Tiltfile
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,13 @@ DPG_TLS_KEY_PATH='%s/local-tls-private-key.pem' % DPG_REPO

local_resource('dpg-tls-cert',
dir='%s/data-plane-gateway' % REPO_BASE,
# These incantations create a non-CA self-signed certificate which is
# valid for localhost and its subdomains. rustls is quite fiddly about
# accepting self-signed certificates so all of these are required.
cmd='[ -f %s ] || openssl req -x509 -nodes -days 365 \
-subj "/C=CA/ST=QC/O=Estuary/CN=localhost:28318" \
-subj "/ST=QC/O=Estuary/CN=localhost" \
-addext basicConstraints=critical,CA:FALSE,pathlen:1 \
-addext "subjectAltName=DNS:localhost,DNS:*.localhost,IP:127.0.0.1" \
-newkey rsa:2048 -keyout "%s" \
-out "%s"' % (DPG_TLS_KEY_PATH, DPG_TLS_KEY_PATH, DPG_TLS_CERT_PATH))

Expand Down
17 changes: 1 addition & 16 deletions crates/flowctl/src/collection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,8 @@ pub struct CollectionJournalSelector {
/// The selector is provided as JSON matching the same shape that's used
/// in Flow catalog specs. For example:
/// '{"include": {"myField1":["value1", "value2"]}}'
#[clap(
long,
value_parser(parse_partition_selector),
conflicts_with_all(&["include-partition", "exclude-partition"])
)]
#[clap(long, value_parser(parse_partition_selector))]
pub partitions: Option<models::PartitionSelector>,

/// Deprecated, use --partitions instead
#[clap(long = "include-partition", value_parser(parse_deprecated_selector))]
pub include_partitions: Vec<String>,
/// Deprecated, use --partitions instead
#[clap(long = "exclude-partition", value_parser(parse_deprecated_selector))]
pub exclude_partitions: Vec<String>,
}

fn parse_deprecated_selector(_: &str) -> Result<String, anyhow::Error> {
anyhow::bail!("this argument has been deprecated, and replaced by --partitions")
}

fn parse_partition_selector(arg: &str) -> Result<models::PartitionSelector, anyhow::Error> {
Expand Down
11 changes: 8 additions & 3 deletions crates/flowctl/src/collection/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,15 @@ pub struct ReadArgs {
pub selector: CollectionJournalSelector,
#[clap(flatten)]
pub bounds: ReadBounds,

/// Read all journal data, including messages from transactions which were
/// rolled back or never committed. Due to the current limitations of the Rust
/// Gazette client library, this is the only mode that's currently supported,
/// and this flag must be provided. In the future, committed reads will become
/// the default.
#[clap(long)]
pub uncommitted: bool,
#[clap(skip)]
pub auth_prefixes: Vec<String>,
}

/// Common definition for arguments specifying the begin and and bounds of a read command.
Expand All @@ -50,9 +51,13 @@ pub async fn journal_reader(
ctx: &mut crate::CliContext,
args: &ReadArgs,
) -> anyhow::Result<Reader<ExponentialBackoff>> {
let auth_prefixes = if args.auth_prefixes.is_empty() {
vec![args.selector.collection.clone()]
} else {
args.auth_prefixes.clone()
};
let cp_client = ctx.controlplane_client().await?;
let mut data_plane_client =
dataplane::journal_client_for(cp_client, vec![args.selector.collection.clone()]).await?;
let mut data_plane_client = dataplane::journal_client_for(cp_client, auth_prefixes).await?;

let selector = args.selector.build_label_selector();
tracing::debug!(?selector, "build label selector");
Expand Down
17 changes: 1 addition & 16 deletions crates/flowctl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,22 +98,8 @@ pub enum Command {
/// They can be edited, developed, and tested while still a draft.
/// Then when you're ready, publish your draft to make your changes live.
Draft(draft::Draft),
/// This command does not (yet) work for end users
///
/// Note: We're still working on allowing users access to task logs, and this command will not work until we do.
/// Prints the runtime logs of a task (capture, derivation, or materialization).
/// Reads contents from the `ops.<data-plane>/logs` collection, selecting the partition
/// that corresponds to the selected task. This command is essentially equivalent to the much longer:
/// `flowctl collections read --collection ops.<data-plane>/logs --include-partition estuary.dev/field/name=<task> --uncommitted`
/// Read operational logs of your tasks (captures, derivations, and materializations).
Logs(ops::Logs),
/// This command does not (yet) work for end users
///
/// Note: We're still working on allowing users access to task stats, and this command will not work until we do.
/// Prints the runtime stats of a task (capture, derivation, or materialization).
/// Reads contents from the `ops.<data-plane>/stats` collection, selecting the partition
/// that corresponds to the selected task. This command is essentially equivalent to the much longer:
/// `flowctl collections read --collection ops.<data-plane>/stats --include-partition estuary.dev/field/name=<task>`
Stats(ops::Stats),
/// Advanced, low-level, and experimental commands which are less common.
Raw(raw::Advanced),
}
Expand Down Expand Up @@ -194,7 +180,6 @@ impl Cli {
Command::Preview(preview) => preview.run(&mut context).await,
Command::Draft(draft) => draft.run(&mut context).await,
Command::Logs(logs) => logs.run(&mut context).await,
Command::Stats(stats) => stats.run(&mut context).await,
Command::Raw(advanced) => advanced.run(&mut context).await,
}?;

Expand Down
31 changes: 1 addition & 30 deletions crates/flowctl/src/ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,38 +28,9 @@ impl Logs {
}
}

#[derive(clap::Args, Debug)]
pub struct Stats {
#[clap(flatten)]
pub task: TaskSelector,

#[clap(flatten)]
pub bounds: ReadBounds,

/// Read raw data from stats journals, including possibly uncommitted or rolled back transactions.
/// This flag is currently required, but will be made optional in the future as we add support for
/// committed reads, which will become the default.
#[clap(long)]
pub uncommitted: bool,
}

impl Stats {
pub async fn run(&self, ctx: &mut crate::CliContext) -> anyhow::Result<()> {
let read_args = read_args(
&self.task.task,
OpsCollection::Stats,
&self.bounds,
self.uncommitted,
);
read_collection(ctx, &read_args).await?;
Ok(())
}
}

#[derive(Debug, PartialEq, Clone, Copy)]
pub enum OpsCollection {
Logs,
Stats,
}

pub fn read_args(
Expand All @@ -70,7 +41,6 @@ pub fn read_args(
) -> ReadArgs {
let logs_or_stats = match collection {
OpsCollection::Logs => "logs",
OpsCollection::Stats => "stats",
};
// Once we implement federated data planes, we'll need to update this to
// fetch the name of the data plane based on the tenant.
Expand All @@ -93,6 +63,7 @@ pub fn read_args(
selector,
uncommitted,
bounds: bounds.clone(),
auth_prefixes: vec![task_name.to_string()],
}
}

Expand Down

0 comments on commit 2224408

Please sign in to comment.