diff --git a/tools/asynq/cmd/root.go b/tools/asynq/cmd/root.go index c314de7e..447044f9 100644 --- a/tools/asynq/cmd/root.go +++ b/tools/asynq/cmd/root.go @@ -11,6 +11,7 @@ import ( "os" "strings" "text/tabwriter" + "time" "unicode" "unicode/utf8" @@ -369,6 +370,11 @@ func createRDB() *rdb.RDB { return rdb.NewRDB(c) } +// createClient creates a Client instance using flag values and returns it. +func createClient() *asynq.Client { + return asynq.NewClient(getRedisConnOpt()) +} + // createInspector creates a Inspector instance using flag values and returns it. func createInspector() *asynq.Inspector { return asynq.NewInspector(getRedisConnOpt()) @@ -456,3 +462,37 @@ func isPrintable(data []byte) bool { } return !isAllSpace } + +// Helper to turn a command line flag into a duration +func getDuration(cmd *cobra.Command, arg string) time.Duration { + durationStr, err := cmd.Flags().GetString(arg) + if err != nil { + fmt.Printf("error: %v\n", err) + os.Exit(1) + } + + duration, err := time.ParseDuration(durationStr) + if err != nil { + fmt.Printf("error: %v\n", err) + os.Exit(1) + } + + return duration +} + +// Helper to turn a command line flag into a time +func getTime(cmd *cobra.Command, arg string) time.Time { + timeStr, err := cmd.Flags().GetString(arg) + if err != nil { + fmt.Printf("error: %v\n", err) + os.Exit(1) + } + + timeVal, err := time.Parse(time.RFC3339, timeStr) + if err != nil { + fmt.Printf("error: %v\n", err) + os.Exit(1) + } + + return timeVal +} diff --git a/tools/asynq/cmd/task.go b/tools/asynq/cmd/task.go index d89ff955..e9ea792d 100644 --- a/tools/asynq/cmd/task.go +++ b/tools/asynq/cmd/task.go @@ -53,6 +53,24 @@ func init() { taskRunCmd.MarkFlagRequired("queue") taskRunCmd.MarkFlagRequired("id") + taskCmd.AddCommand(taskEnqueueCmd) + taskEnqueueCmd.Flags().StringP("type_name", "t", "", "type name to enqueue the task as (required)") + taskEnqueueCmd.Flags().StringP("payload", "l", "", "payload to enqueue (required)") + // The following are the various OptionTypes; if not specified we won't pass them so that composeOptions() + // can apply its own defaults + taskEnqueueCmd.Flags().Int("retry", 0, "maximum retries") + taskEnqueueCmd.Flags().String("queue", "", "queue to enqueue the task to") + taskEnqueueCmd.Flags().String("id", "", "id to enqueue the task as") + taskEnqueueCmd.Flags().String("timeout", "", "timeout for the task (how long it can run); must be parseable as a time.Duration") + taskEnqueueCmd.Flags().String("deadline", "", "deadline for the task; must be in RFC3339 format") + taskEnqueueCmd.Flags().String("unique", "", "unique period for the task (duration within which it is guaranteed to be unique); must be parseable as a time.Duration") + taskEnqueueCmd.Flags().String("process_at", "", "process at time for the task; must be in RFC3339 format") + taskEnqueueCmd.Flags().String("process_in", "", "process in window for the task; must be parseable as a time.Duration") + taskEnqueueCmd.Flags().String("retention", "", "retention window for the task; must be parseable as a time.Duration") + taskEnqueueCmd.Flags().String("group", "", "group for the task") + taskEnqueueCmd.MarkFlagRequired("type_name") + taskEnqueueCmd.MarkFlagRequired("payload") + taskCmd.AddCommand(taskArchiveAllCmd) taskArchiveAllCmd.Flags().StringP("queue", "q", "", "queue to which the tasks belong (required)") taskArchiveAllCmd.Flags().StringP("state", "s", "", "state of the tasks; one of { pending | aggregating | scheduled | retry } (required)") @@ -151,6 +169,16 @@ var taskRunCmd = &cobra.Command{ $ asynq task run --queue=myqueue --id=f1720682-f5a6-4db1-8953-4f48ae541d0f`), } +var taskEnqueueCmd = &cobra.Command{ + Use: "enqueue --type_name=footype --payload=barpayload", + Short: "Enqueue a task", + Args: cobra.NoArgs, + Run: taskEnqueue, + Example: heredoc.Doc(` + $ asynq task enqueue -t footype -l barpayload + $ asynq task enqueue -t footask -l barpayload --retry 3 --id f1720682-f5a6-4db1-8953-4f48ae541d0f --queue bazqueue --timeout 100s --deadline 2024-12-14T01:23:45Z --unique 100s --process_at 2024-12-14T01:22:05Z --process_in 100s --retention 5h --group baygroup`), +} + var taskArchiveAllCmd = &cobra.Command{ Use: "archiveall --queue= --state=", Short: "Archive all tasks in the given state", @@ -521,6 +549,95 @@ func taskRun(cmd *cobra.Command, args []string) { fmt.Println("task is now pending") } +func taskEnqueue(cmd *cobra.Command, args []string) { + typeName, err := cmd.Flags().GetString("type_name") + if err != nil { + fmt.Printf("error: %v\n", err) + os.Exit(1) + } + + payload, err := cmd.Flags().GetString("payload") + if err != nil { + fmt.Printf("error: %v\n", err) + os.Exit(1) + } + + // For all of the optional flags, we need to explicitly check whether they were set or + // not; for consistency we want to use the defaults set in composeOptions() rather than + // the ones in the flag definitions. + opts := []asynq.Option{} + if cmd.Flags().Changed("retry") { + retry, err := cmd.Flags().GetInt("retry") + if err != nil { + fmt.Printf("error: %v\n", err) + os.Exit(1) + } + opts = append(opts, asynq.MaxRetry(retry)) + } + + if cmd.Flags().Changed("queue") { + queue, err := cmd.Flags().GetString("queue") + if err != nil { + fmt.Printf("error: %v\n", err) + os.Exit(1) + } + opts = append(opts, asynq.Queue(queue)) + } + + if cmd.Flags().Changed("id") { + id, err := cmd.Flags().GetString("id") + if err != nil { + fmt.Printf("error: %v\n", err) + os.Exit(1) + } + opts = append(opts, asynq.TaskID(id)) + } + + if cmd.Flags().Changed("timeout") { + opts = append(opts, asynq.Timeout(getDuration(cmd, "timeout"))) + } + + if cmd.Flags().Changed("deadline") { + opts = append(opts, asynq.Deadline(getTime(cmd, "deadline"))) + } + + if cmd.Flags().Changed("unique") { + opts = append(opts, asynq.Unique(getDuration(cmd, "unique"))) + } + + if cmd.Flags().Changed("process_at") { + opts = append(opts, asynq.ProcessAt(getTime(cmd, "process_at"))) + } + + if cmd.Flags().Changed("process_in") { + opts = append(opts, asynq.ProcessIn(getDuration(cmd, "process_in"))) + } + + if cmd.Flags().Changed("retention") { + opts = append(opts, asynq.Retention(getDuration(cmd, "retention"))) + } + + if cmd.Flags().Changed("group") { + group, err := cmd.Flags().GetString("group") + if err != nil { + fmt.Printf("error: %v\n", err) + os.Exit(1) + } + opts = append(opts, asynq.Group(group)) + } + + c := createClient() + task := asynq.NewTask(typeName, []byte(payload), opts...) + + taskInfo, err := c.Enqueue(task) + if err != nil { + fmt.Printf("error: %v\n", err) + os.Exit(1) + } + + fmt.Printf("Enqueued task %s to queue %s\n", taskInfo.ID, taskInfo.Queue) +} + func taskArchiveAll(cmd *cobra.Command, args []string) { qname, err := cmd.Flags().GetString("queue") if err != nil { @@ -653,3 +770,4 @@ func taskRunAll(cmd *cobra.Command, args []string) { } fmt.Printf("%d tasks are now pending\n", n) } +