From bc74ef9674f99b87ad25c28b1ee7e990190c885a Mon Sep 17 00:00:00 2001 From: Willy Lulciuc Date: Tue, 15 Oct 2024 10:57:48 -0700 Subject: [PATCH] Add `--runs-per-job`, `--max-run-fails-per-job`, and more to `metadata` cmd (#2923) * Add `--run-duration` and `--event-time` Signed-off-by: Willy Lulciuc * Apply formatting Signed-off-by: Willy Lulciuc * Add `--event-start-time` and `--event-end-time` Signed-off-by: Willy Lulciuc * Add support for `X` runs + durations Signed-off-by: Willy Lulciuc * Fix last `N` runs display Signed-off-by: Willy Lulciuc * Sort events by event time Signed-off-by: Willy Lulciuc * Use `--jobs` Signed-off-by: Willy Lulciuc * Apply lint on web Signed-off-by: Willy Lulciuc * Add `--runs-active` Signed-off-by: Willy Lulciuc * Fix runs ordering for jobs Signed-off-by: Willy Lulciuc --------- Signed-off-by: Willy Lulciuc --- .circleci/api-load-test.sh | 2 +- .../java/marquez/cli/MetadataCommand.java | 316 ++++++++++++++---- api/src/main/java/marquez/db/RunDao.java | 4 +- web/src/components/jobs/JobDetailPage.tsx | 5 +- web/src/routes/dashboard/JobRunItem.tsx | 6 +- 5 files changed, 255 insertions(+), 78 deletions(-) diff --git a/.circleci/api-load-test.sh b/.circleci/api-load-test.sh index 7edae229fe..510a84ca92 100755 --- a/.circleci/api-load-test.sh +++ b/.circleci/api-load-test.sh @@ -99,7 +99,7 @@ log "http API server is ready!" # (5) Use metadata command to generate random dataset, job, and run metadata log "generate load test metadata (${METADATA_FILE}):" -java -jar "${MARQUEZ_JAR}" metadata --runs 10 --bytes-per-event 16384 --output "${METADATA_FILE}" +java -jar "${MARQUEZ_JAR}" metadata --jobs 100 --bytes-per-event 16384 --output "${METADATA_FILE}" # Display CPU/MEM cpu_and_mem_info diff --git a/api/src/main/java/marquez/cli/MetadataCommand.java b/api/src/main/java/marquez/cli/MetadataCommand.java index 268d463b07..6d817b5747 100644 --- a/api/src/main/java/marquez/cli/MetadataCommand.java +++ b/api/src/main/java/marquez/cli/MetadataCommand.java @@ -5,9 +5,13 @@ package marquez.cli; +import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.collect.ImmutableList.toImmutableList; import static io.openlineage.client.OpenLineage.RunEvent.EventType.COMPLETE; +import static io.openlineage.client.OpenLineage.RunEvent.EventType.FAIL; import static io.openlineage.client.OpenLineage.RunEvent.EventType.START; +import static java.time.ZoneOffset.UTC; +import static java.time.format.DateTimeFormatter.ISO_ZONED_DATE_TIME; import com.google.common.collect.ImmutableList; import io.dropwizard.cli.Command; @@ -20,9 +24,13 @@ import java.time.Instant; import java.time.ZoneId; import java.time.ZonedDateTime; +import java.util.ArrayList; +import java.util.Comparator; import java.util.List; import java.util.Random; import java.util.UUID; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import java.util.stream.Stream; import lombok.NonNull; import lombok.extern.slf4j.Slf4j; @@ -41,17 +49,27 @@ * *

Usage

* - * For example, the following command will generate {@code metadata.json} with {@code 10} runs - * ({@code 20} events in total), where each START event will have a size of {@code ~16384} bytes; - * events will be written to {@code metadata.json} in the {@code current} directory. You may specify - * the location of {@code metadata.json} by using the command-line argument {@code --output}. + * For example, the following command will generate {@code metadata.json} with {@code 10} jobs, + * {@code 5} runs per job and a maximum of {@code 2} run failures ({@code 100} run events in total), + * where each START event will have a size of {@code ~16384} bytes; events will be written to {@code + * metadata.json} in the {@code current} directory. You may specify the location of {@code + * metadata.json} by using the command-line argument {@code --output}. * *
{@code
- * java -jar marquez-api.jar metadata --runs 10 --bytes-per-event 16384
+ * java -jar marquez-api.jar metadata \
+ *   --jobs 10 \
+ *   --runs-per-job 5 \
+ *   --max-run-fails-per-job 2 \
+ *   --bytes-per-event 16384
  * }
*/ @Slf4j public final class MetadataCommand extends Command { + /* Used for event randomization. */ + private static final Random RANDOM = new Random(); + private static final ZoneId AMERICA_LOS_ANGELES = ZoneId.of("America/Los_Angeles"); + private static final List FIELD_TYPES = ImmutableList.of("VARCHAR", "TEXT", "INTEGER"); + /* Used to calculate (approximate) total bytes per event. */ private static final int BYTES_PER_RUN = 578; private static final int BYTES_PER_JOB = 58; @@ -62,7 +80,21 @@ public final class MetadataCommand extends Command { private static final int DEFAULT_NUM_OF_FIELDS_IN_SCHEMA_PER_EVENT = 16; /* Default runs. */ - private static final int DEFAULT_RUNS = 25; + private static final int DEFAULT_JOBS = 5; + private static final int DEFAULT_RUNS_ACTIVE = 0; + private static final int DEFAULT_MIN_RUN_DURATION = 300; // 5.minutes + private static final int DEFAULT_MAX_RUN_DURATION = 900; // 15.minutes + private static final int DEFAULT_RUN_DURATION = + DEFAULT_MIN_RUN_DURATION + + RANDOM.nextInt( + DEFAULT_MAX_RUN_DURATION + - DEFAULT_MIN_RUN_DURATION + + 1); // Between 5.minutes and 15.minutes + private static final int DEFAULT_RUNS_PER_JOB = 10; + private static final int DEFAULT_MAX_RUNS_FAILS_PER_JOB = 2; + private static final ZonedDateTime DEFAULT_RUNS_START_TIME = newEventTimeAsUtc(); + private static final ZonedDateTime DEFAULT_RUNS_END_TIME = + DEFAULT_RUNS_START_TIME.plusSeconds(DEFAULT_RUN_DURATION); /* Default bytes. */ private static final int DEFAULT_BYTES_PER_EVENT = @@ -75,15 +107,17 @@ public final class MetadataCommand extends Command { private static final String DEFAULT_OUTPUT = "metadata.json"; /* Args for metadata command. */ - private static final String CMD_ARG_METADATA_RUNS = "runs"; + private static final String CMD_ARG_METADATA_JOBS = "jobs"; + private static final String CMD_ARG_METADATA_RUNS_PER_JOB = "runs-per-job"; + private static final String CMD_ARG_METADATA_RUNS_ACTIVE = "runs-active"; + private static final String CMD_ARG_METADATA_MAX_RUN_FAILS_PER_JOB = "max-run-fails-per-job"; + private static final String CMD_ARG_METADATA_MIN_RUN_DURATION = "min-run-duration"; + private static final String CMD_ARG_METADATA_MAX_RUN_DURATION = "max-run-duration"; + private static final String CMD_ARG_METADATA_RUN_START_TIME = "run-start-time"; + private static final String CMD_ARG_METADATA_RUN_END_TIME = "run-end-time"; private static final String CMD_ARG_METADATA_BYTES_PER_EVENT = "bytes-per-event"; private static final String CMD_ARG_METADATA_OUTPUT = "output"; - /* Used for event randomization. */ - private static final Random RANDOM = new Random(); - private static final ZoneId AMERICA_LOS_ANGELES = ZoneId.of("America/Los_Angeles"); - private static final List FIELD_TYPES = ImmutableList.of("VARCHAR", "TEXT", "INTEGER"); - private static final String OL_NAMESPACE = newNamespaceName().getValue(); private static final OpenLineage OL = new OpenLineage( @@ -99,12 +133,67 @@ public MetadataCommand() { @Override public void configure(@NonNull Subparser subparser) { subparser - .addArgument("--runs") - .dest("runs") + .addArgument("--jobs") + .dest("jobs") + .type(Integer.class) + .required(false) + .setDefault(DEFAULT_JOBS) + .help("limits OL jobs up to N"); + subparser + .addArgument("--runs-per-job") + .dest("runs-per-job") + .type(Integer.class) + .required(false) + .setDefault(DEFAULT_RUNS_PER_JOB) + .help("limits OL run executions per job up to N"); + subparser + .addArgument("--runs-active") + .dest("runs-active") + .type(Integer.class) + .required(false) + .setDefault(DEFAULT_RUNS_ACTIVE) + .help("limits OL run executions marked as active (='RUNNING') up to N"); + subparser + .addArgument("--max-run-fails-per-job") + .dest("max-run-fails-per-job") + .type(Integer.class) + .required(false) + .setDefault(DEFAULT_MAX_RUNS_FAILS_PER_JOB) + .help("maximum OL run fails per job"); + subparser + .addArgument("--min-run-duration") + .dest("min-run-duration") + .type(Integer.class) + .required(false) + .setDefault(DEFAULT_MIN_RUN_DURATION) + .help("minimum OL run duration (in seconds) per execution"); + subparser + .addArgument("--max-run-duration") + .dest("max-run-duration") .type(Integer.class) .required(false) - .setDefault(DEFAULT_RUNS) - .help("limits OL runs up to N"); + .setDefault(DEFAULT_MAX_RUN_DURATION) + .help("maximum OL run duration (in seconds) per execution"); + subparser + .addArgument("--run-start-time") + .dest("run-start-time") + .type(String.class) + .required(false) + .setDefault(DEFAULT_RUNS_START_TIME) + .help( + "specifies the OL run start time in UTC ISO ('YYYY-MM-DDTHH:MM:SSZ');\n" + + "used for the initial OL run, with subsequent runs starting relative to the\n" + + "initial start time."); + subparser + .addArgument("--run-end-time") + .dest("run-end-time") + .type(String.class) + .required(false) + .setDefault(DEFAULT_RUNS_END_TIME) + .help( + "specifies the OL run end time in UTC ISO ('YYYY-MM-DDTHH:MM:SSZ');\n" + + "used for the initial OL run, with subsequent runs ending relative to the\n" + + "initial end time."); subparser .addArgument("--bytes-per-event") .dest("bytes-per-event") @@ -117,40 +206,114 @@ public void configure(@NonNull Subparser subparser) { .dest("output") .type(String.class) .required(false) - .help("the output metadata file") + .help("output metadata file") .setDefault(DEFAULT_OUTPUT); } @Override public void run(@NonNull Bootstrap bootstrap, @NonNull Namespace namespace) { - final int runs = namespace.getInt(CMD_ARG_METADATA_RUNS); + final int jobs = namespace.getInt(CMD_ARG_METADATA_JOBS); + final int runsActive = namespace.getInt(CMD_ARG_METADATA_RUNS_ACTIVE); + final int runsPerJob = namespace.getInt(CMD_ARG_METADATA_RUNS_PER_JOB); + final int maxRunFailsPerJob = namespace.getInt(CMD_ARG_METADATA_MAX_RUN_FAILS_PER_JOB); + final int minRunDurationPerExecution = namespace.getInt(CMD_ARG_METADATA_MIN_RUN_DURATION); + final int maxRunDurationPerExecution = namespace.getInt(CMD_ARG_METADATA_MAX_RUN_DURATION); + final ZonedDateTime runStartTime = + ZonedDateTime.parse( + namespace.getString(CMD_ARG_METADATA_RUN_START_TIME), ISO_ZONED_DATE_TIME); + final ZonedDateTime runEndTime = + ZonedDateTime.parse( + namespace.getString(CMD_ARG_METADATA_RUN_END_TIME), ISO_ZONED_DATE_TIME); final int bytesPerEvent = namespace.getInt(CMD_ARG_METADATA_BYTES_PER_EVENT); final String output = namespace.getString(CMD_ARG_METADATA_OUTPUT); // Generate, then write events to metadata file. - writeOlEvents(newOlEvents(runs, bytesPerEvent), output); + writeOlEvents( + newOlEvents( + jobs, + runsActive, + runsPerJob, + maxRunFailsPerJob, + minRunDurationPerExecution, + maxRunDurationPerExecution, + runStartTime, + runEndTime, + bytesPerEvent), + output); } /** Returns new {@link OpenLineage.RunEvent} objects with random values. */ private static List newOlEvents( - final int numOfRuns, final int bytesPerEvent) { + final int jobs, + final int runsActive, + final int runsPerJob, + final int maxRunFailsPerJob, + final int minRunDurationPerExecution, + final int maxRunDurationPerExecution, + @NonNull final ZonedDateTime runStartTime, + @NonNull final ZonedDateTime runEndTime, + final int bytesPerEvent) { + checkArgument(maxRunFailsPerJob <= runsPerJob); + checkArgument(minRunDurationPerExecution <= maxRunDurationPerExecution); + checkArgument(runStartTime.isBefore(runEndTime)); System.out.format( - "Generating '%d' runs, each COMPLETE event will have a size of '~%d' (bytes)...\n", - numOfRuns, bytesPerEvent); - return Stream.generate(() -> newOlRunEvents(bytesPerEvent)) - .limit(numOfRuns) - .flatMap(runEvents -> Stream.of(runEvents.start(), runEvents.complete())) - .collect(toImmutableList()); + ">> generating '%d' jobs with '%d' runs per job\n", jobs, runsPerJob, bytesPerEvent); + final List runEvents = + Stream.generate( + () -> { + final RunAttemptsForJob runsForJob = + newRunAttemptsForJobWith( + runsPerJob, + maxRunFailsPerJob, + minRunDurationPerExecution, + maxRunDurationPerExecution, + runStartTime, + runEndTime); + return runsForJob.attempts().stream() + .map( + runAttempt -> + newOlRunEvents( + runsForJob.job(), + runAttempt.endState(), + runAttempt.startedAt(), + runAttempt.endedAt(), + bytesPerEvent)) + .flatMap(e -> Stream.of(e.start(), e.end())); + }) + .limit(jobs) + .flatMap(e -> e) + .sorted(Comparator.comparing(OpenLineage.RunEvent::getEventTime)) + .collect(Collectors.toList()); + + final ArrayList jobsWithRunActive = new ArrayList<>(); + final int markActive = Math.min(runsActive, jobs); + + for (int i = runEvents.size() - 1; i >= 0; i--) { + final OpenLineage.RunEvent runEvent = runEvents.get(i); + final String jobName = runEvent.getJob().getName(); + if (jobsWithRunActive.size() < markActive + && !jobsWithRunActive.contains(jobName) + && runEvent.getEventType() == COMPLETE) { + runEvents.remove(i); + jobsWithRunActive.add(jobName); // Last run marked as active for job. + } + } + + return runEvents; } /** * Returns new {@link RunEvents} objects. A {@link RunEvents} object contains the {@code START} * and {@code COMPLETE} event for a given run. */ - private static RunEvents newOlRunEvents(final int bytesPerEvent) { - // (1) Generate run with an optional parent run, then the job. - final OpenLineage.Run olRun = newRun(hasParentRunOrNot()); - final OpenLineage.Job olJob = newJob(); + private static RunEvents newOlRunEvents( + @NonNull OpenLineage.Job job, + @NonNull final OpenLineage.RunEvent.EventType runEndState, + @NonNull final ZonedDateTime runStartTime, + @NonNull final ZonedDateTime runEndTime, + final int bytesPerEvent) { + // (1) Generate run. + final OpenLineage.Run run = newRun(); // (2) Generate number of I/O for run. int numOfInputs = RANDOM.nextInt(DEFAULT_NUM_OF_IO_PER_EVENT); @@ -183,24 +346,24 @@ private static RunEvents newOlRunEvents(final int bytesPerEvent) { return new RunEvents( OL.newRunEventBuilder() .eventType(START) - .eventTime(newEventTime()) - .run(olRun) - .job(olJob) + .eventTime(runStartTime) + .run(run) + .job(job) .inputs(newInputs(numOfInputs, numOfFieldsInSchemaForInputs)) .outputs(newOutputs(numOfOutputs, numOfFieldsInSchemaForOutputs)) .build(), OL.newRunEventBuilder() - .eventType(COMPLETE) - .eventTime(newEventTime()) - .run(olRun) - .job(olJob) + .eventType(runEndState) + .eventTime(runEndTime) + .run(run) + .job(job) .build()); } /** Write {@link OpenLineage.RunEvent}s to the specified {@code output}. */ private static void writeOlEvents( @NonNull final List olEvents, @NonNull final String output) { - System.out.format("Writing '%d' events to: '%s'\n", olEvents.size(), output); + System.out.format(">> writing '%d' events to: '%s'\n", olEvents.size(), output); FileWriter fileWriter; PrintWriter printWriter = null; try { @@ -216,19 +379,11 @@ private static void writeOlEvents( } } - /** - * Returns a new {@link OpenLineage.Run} object. A {@code parent} run will be associated with - * {@code child} run if {@code hasParentRun} is {@code true}; otherwise, the {@code child} run - * will not have a {@code parent} run. - */ - private static OpenLineage.Run newRun(final boolean hasParentRun) { + /** Returns a new {@link OpenLineage.Run} object. */ + private static OpenLineage.Run newRun() { return OL.newRun( newRunId().getValue(), OL.newRunFacetsBuilder() - .parent( - hasParentRun - ? OL.newParentRunFacetBuilder().run(newParentRun()).job(newParentJob()).build() - : null) .nominalTime( OL.newNominalTimeRunFacetBuilder() .nominalStartTime(newNominalTime()) @@ -237,19 +392,6 @@ private static OpenLineage.Run newRun(final boolean hasParentRun) { .build()); } - /** Returns a new {@link OpenLineage.ParentRunFacetRun} object. */ - private static OpenLineage.ParentRunFacetRun newParentRun() { - return OL.newParentRunFacetRunBuilder().runId(newRunId().getValue()).build(); - } - - /** Returns a new {@link OpenLineage.ParentRunFacetJob} object. */ - private static OpenLineage.ParentRunFacetJob newParentJob() { - return OL.newParentRunFacetJobBuilder() - .namespace(OL_NAMESPACE) - .name(newJobName().getValue()) - .build(); - } - /** Returns a new {@link OpenLineage.Job} object. */ static OpenLineage.Job newJob() { return OL.newJobBuilder().namespace(OL_NAMESPACE).name(newJobName().getValue()).build(); @@ -343,19 +485,55 @@ private static ZonedDateTime newNominalTime() { } /** Returns a new {@code event} time. */ - private static ZonedDateTime newEventTime() { - return Instant.now().atZone(AMERICA_LOS_ANGELES); - } - - /** Returns {@code true} if parent run should be generated; {@code false} otherwise. */ - private static boolean hasParentRunOrNot() { - return RANDOM.nextBoolean(); + private static ZonedDateTime newEventTimeAsUtc() { + return ZonedDateTime.now(UTC); } private static int newId() { return RANDOM.nextInt(Integer.MAX_VALUE - 1); } + private static RunAttemptsForJob newRunAttemptsForJobWith( + final int runsPerJob, + final int maxRunFailsPerJob, + final int minRunDurationPerExecution, + final int maxRunDurationPerExecution, + @NonNull ZonedDateTime runStartTime, + @NonNull ZonedDateTime runEndTime) { + // (1) Generate COMPLETEs runs for job up to N. + final List completeOrFail = + IntStream.range(0, runsPerJob).mapToObj(run -> COMPLETE).collect(Collectors.toList()); + + // (2) Randomly assign FAIL runs for job by replacing COMPLETEs. + RANDOM + .ints(0, runsPerJob) + .distinct() + .limit(maxRunFailsPerJob) // Limit FAILs allowed up to N + .forEach(run -> completeOrFail.set(run, FAIL)); + + // (3) Adjust run start and end times relative to the initial start and end time. + final ImmutableList.Builder runAttempts = ImmutableList.builder(); + for (OpenLineage.RunEvent.EventType runEndState : completeOrFail) { + runAttempts.add(new RunAttempt(runEndState, runStartTime, runEndTime)); + int nextRunAttemptDurationInSeconds = + minRunDurationPerExecution + + RANDOM.nextInt(maxRunDurationPerExecution - minRunDurationPerExecution + 1); + runStartTime = runEndTime; + runEndTime = runStartTime.plusSeconds(nextRunAttemptDurationInSeconds); + } + + return new RunAttemptsForJob(newJob(), runAttempts.build()); + } + + /** A container class for job run attempt info. */ + record RunAttemptsForJob(@NonNull OpenLineage.Job job, @NonNull List attempts) {} + + /** A container class for run attempt info. */ + record RunAttempt( + OpenLineage.RunEvent.EventType endState, + @NonNull ZonedDateTime startedAt, + @NonNull ZonedDateTime endedAt) {} + /** A container class for run info. */ - record RunEvents(@NonNull OpenLineage.RunEvent start, @NonNull OpenLineage.RunEvent complete) {} + record RunEvents(@NonNull OpenLineage.RunEvent start, @NonNull OpenLineage.RunEvent end) {} } diff --git a/api/src/main/java/marquez/db/RunDao.java b/api/src/main/java/marquez/db/RunDao.java index ac9b65331f..56cfe8f853 100644 --- a/api/src/main/java/marquez/db/RunDao.java +++ b/api/src/main/java/marquez/db/RunDao.java @@ -514,9 +514,9 @@ WHERE r.uuid IN ( SELECT r.uuid FROM runs_view r INNER JOIN jobs_view j ON j.namespace_name=r.namespace_name AND j.name=r.job_name WHERE j.namespace_name=:namespace AND (j.name=:jobName OR j.name=ANY(j.aliases)) - ORDER BY transitioned_at DESC - LIMIT :limit OFFSET :offset ) + ORDER BY transitioned_at DESC + LIMIT :limit OFFSET :offset """) List findByLatestJob(String namespace, String jobName, int limit, int offset); diff --git a/web/src/components/jobs/JobDetailPage.tsx b/web/src/components/jobs/JobDetailPage.tsx index 9aa89c6b07..56126ad504 100644 --- a/web/src/components/jobs/JobDetailPage.tsx +++ b/web/src/components/jobs/JobDetailPage.tsx @@ -34,7 +34,6 @@ import { } from '../../store/actionCreators' import { faCog } from '@fortawesome/free-solid-svg-icons/faCog' import { formatUpdatedAt } from '../../helpers' -import { jobRunsStatus } from '../../helpers/nodes' import { stopWatchDuration } from '../../helpers/time' import { truncateText } from '../../helpers/text' import { useNavigate, useSearchParams } from 'react-router-dom' @@ -49,6 +48,7 @@ import MqStatus from '../core/status/MqStatus' import MqText from '../core/text/MqText' import RunInfo from './RunInfo' import Runs from './Runs' +import {runStateColor} from "../../helpers/nodes"; interface DispatchProps { fetchLatestRuns: typeof fetchLatestRuns @@ -79,7 +79,6 @@ const JobDetailPage: FunctionComponent = (props) => { resetRuns, deleteJob, dialogToggle, - latestRuns, display, tabIndex, setTabIndex, @@ -260,7 +259,7 @@ const JobDetailPage: FunctionComponent = (props) => { } label={'Running Status'.toUpperCase()} - value={} + value={} /> diff --git a/web/src/routes/dashboard/JobRunItem.tsx b/web/src/routes/dashboard/JobRunItem.tsx index 454068d22c..a1ef4f2d3c 100644 --- a/web/src/routes/dashboard/JobRunItem.tsx +++ b/web/src/routes/dashboard/JobRunItem.tsx @@ -60,7 +60,7 @@ const JobRunItem: React.FC = ({ job }) => { LAST 10 RUNS {/*pad 10 - latestRuns length with a small grey bar*/} - {Array.from({ length: 10 - job.latestRuns?.length || 10 }, (_, i) => ( + {Array.from({ length: 10 - job.latestRuns?.length || 0 }, (_, i) => ( = ({ job }) => { }} /> ))} - {job.latestRuns?.map((run) => ( + {job.latestRuns?.reverse().map((run) => ( - {stopWatchDuration(Math.random() * 10000)} + {run ? stopWatchDuration(run.durationMs) : 'N/A'} }