Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add task enqueue command to cli #918

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions tools/asynq/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"os"
"strings"
"text/tabwriter"
"time"
"unicode"
"unicode/utf8"

Expand Down Expand Up @@ -369,6 +370,11 @@ func createRDB() *rdb.RDB {
return rdb.NewRDB(c)
}

// createClient creates a Client instance using flag values and returns it.
func createClient() *asynq.Client {
return asynq.NewClient(getRedisConnOpt())
}

// createInspector creates a Inspector instance using flag values and returns it.
func createInspector() *asynq.Inspector {
return asynq.NewInspector(getRedisConnOpt())
Expand Down Expand Up @@ -456,3 +462,37 @@ func isPrintable(data []byte) bool {
}
return !isAllSpace
}

// Helper to turn a command line flag into a duration
func getDuration(cmd *cobra.Command, arg string) time.Duration {
durationStr, err := cmd.Flags().GetString(arg)
if err != nil {
fmt.Printf("error: %v\n", err)
os.Exit(1)
}

duration, err := time.ParseDuration(durationStr)
if err != nil {
fmt.Printf("error: %v\n", err)
os.Exit(1)
}

return duration
}

// Helper to turn a command line flag into a time
func getTime(cmd *cobra.Command, arg string) time.Time {
timeStr, err := cmd.Flags().GetString(arg)
if err != nil {
fmt.Printf("error: %v\n", err)
os.Exit(1)
}

timeVal, err := time.Parse(time.RFC3339, timeStr)
if err != nil {
fmt.Printf("error: %v\n", err)
os.Exit(1)
}

return timeVal
}
118 changes: 118 additions & 0 deletions tools/asynq/cmd/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,24 @@ func init() {
taskRunCmd.MarkFlagRequired("queue")
taskRunCmd.MarkFlagRequired("id")

taskCmd.AddCommand(taskEnqueueCmd)
taskEnqueueCmd.Flags().StringP("type_name", "t", "", "type name to enqueue the task as (required)")
taskEnqueueCmd.Flags().StringP("payload", "l", "", "payload to enqueue (required)")
// The following are the various OptionTypes; if not specified we won't pass them so that composeOptions()
// can apply its own defaults
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Parameter names are chosen to match the names at https://github.com/hibiken/asynq/blob/master/client.go#L67 . The only exception is id; the Option is named TaskID but all of the other commands here use id so I figured it was better to stick with that.

taskEnqueueCmd.Flags().Int("retry", 0, "maximum retries")
taskEnqueueCmd.Flags().String("queue", "", "queue to enqueue the task to")
taskEnqueueCmd.Flags().String("id", "", "id to enqueue the task as")
taskEnqueueCmd.Flags().String("timeout", "", "timeout for the task (how long it can run); must be parseable as a time.Duration")
taskEnqueueCmd.Flags().String("deadline", "", "deadline for the task; must be in RFC3339 format")
taskEnqueueCmd.Flags().String("unique", "", "unique period for the task (duration within which it is guaranteed to be unique); must be parseable as a time.Duration")
taskEnqueueCmd.Flags().String("process_at", "", "process at time for the task; must be in RFC3339 format")
taskEnqueueCmd.Flags().String("process_in", "", "process in window for the task; must be parseable as a time.Duration")
taskEnqueueCmd.Flags().String("retention", "", "retention window for the task; must be parseable as a time.Duration")
taskEnqueueCmd.Flags().String("group", "", "group for the task")
taskEnqueueCmd.MarkFlagRequired("type_name")
taskEnqueueCmd.MarkFlagRequired("payload")

taskCmd.AddCommand(taskArchiveAllCmd)
taskArchiveAllCmd.Flags().StringP("queue", "q", "", "queue to which the tasks belong (required)")
taskArchiveAllCmd.Flags().StringP("state", "s", "", "state of the tasks; one of { pending | aggregating | scheduled | retry } (required)")
Expand Down Expand Up @@ -151,6 +169,16 @@ var taskRunCmd = &cobra.Command{
$ asynq task run --queue=myqueue --id=f1720682-f5a6-4db1-8953-4f48ae541d0f`),
}

var taskEnqueueCmd = &cobra.Command{
Use: "enqueue --type_name=footype --payload=barpayload",
Short: "Enqueue a task",
Args: cobra.NoArgs,
Run: taskEnqueue,
Example: heredoc.Doc(`
$ asynq task enqueue -t footype -l barpayload
$ asynq task enqueue -t footask -l barpayload --retry 3 --id f1720682-f5a6-4db1-8953-4f48ae541d0f --queue bazqueue --timeout 100s --deadline 2024-12-14T01:23:45Z --unique 100s --process_at 2024-12-14T01:22:05Z --process_in 100s --retention 5h --group baygroup`),
}

var taskArchiveAllCmd = &cobra.Command{
Use: "archiveall --queue=<queue> --state=<state>",
Short: "Archive all tasks in the given state",
Expand Down Expand Up @@ -521,6 +549,95 @@ func taskRun(cmd *cobra.Command, args []string) {
fmt.Println("task is now pending")
}

func taskEnqueue(cmd *cobra.Command, args []string) {
typeName, err := cmd.Flags().GetString("type_name")
if err != nil {
fmt.Printf("error: %v\n", err)
os.Exit(1)
}

payload, err := cmd.Flags().GetString("payload")
if err != nil {
fmt.Printf("error: %v\n", err)
os.Exit(1)
}

// For all of the optional flags, we need to explicitly check whether they were set or
// not; for consistency we want to use the defaults set in composeOptions() rather than
// the ones in the flag definitions.
opts := []asynq.Option{}
if cmd.Flags().Changed("retry") {
retry, err := cmd.Flags().GetInt("retry")
if err != nil {
fmt.Printf("error: %v\n", err)
os.Exit(1)
}
opts = append(opts, asynq.MaxRetry(retry))
}

if cmd.Flags().Changed("queue") {
queue, err := cmd.Flags().GetString("queue")
if err != nil {
fmt.Printf("error: %v\n", err)
os.Exit(1)
}
opts = append(opts, asynq.Queue(queue))
}

if cmd.Flags().Changed("id") {
id, err := cmd.Flags().GetString("id")
if err != nil {
fmt.Printf("error: %v\n", err)
os.Exit(1)
}
opts = append(opts, asynq.TaskID(id))
}

if cmd.Flags().Changed("timeout") {
opts = append(opts, asynq.Timeout(getDuration(cmd, "timeout")))
}

if cmd.Flags().Changed("deadline") {
opts = append(opts, asynq.Deadline(getTime(cmd, "deadline")))
}

if cmd.Flags().Changed("unique") {
opts = append(opts, asynq.Unique(getDuration(cmd, "unique")))
}

if cmd.Flags().Changed("process_at") {
opts = append(opts, asynq.ProcessAt(getTime(cmd, "process_at")))
}

if cmd.Flags().Changed("process_in") {
opts = append(opts, asynq.ProcessIn(getDuration(cmd, "process_in")))
}

if cmd.Flags().Changed("retention") {
opts = append(opts, asynq.Retention(getDuration(cmd, "retention")))
}

if cmd.Flags().Changed("group") {
group, err := cmd.Flags().GetString("group")
if err != nil {
fmt.Printf("error: %v\n", err)
os.Exit(1)
}
opts = append(opts, asynq.Group(group))
}

c := createClient()
task := asynq.NewTask(typeName, []byte(payload), opts...)

taskInfo, err := c.Enqueue(task)
if err != nil {
fmt.Printf("error: %v\n", err)
os.Exit(1)
}

fmt.Printf("Enqueued task %s to queue %s\n", taskInfo.ID, taskInfo.Queue)
}

func taskArchiveAll(cmd *cobra.Command, args []string) {
qname, err := cmd.Flags().GetString("queue")
if err != nil {
Expand Down Expand Up @@ -653,3 +770,4 @@ func taskRunAll(cmd *cobra.Command, args []string) {
}
fmt.Printf("%d tasks are now pending\n", n)
}

Loading