Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Jshearer/materialize slack #722

Merged
merged 20 commits into from
Oct 9, 2024
Merged

Jshearer/materialize slack #722

merged 20 commits into from
Oct 9, 2024

Conversation

jshearer
Copy link
Contributor

@jshearer jshearer commented May 19, 2023

Description:

This fleshes out the Slack materialization connector that @willdonnelly originally wrote for one of our hackathons.

It automatically deals with joining channels when needed, and lets you specify a username and profile picture to communicate as. It also supports rich messages via Slack's blocks API

Note:
One thing I don't love is the config schema, which results in the following UI:
Screen Shot 2023-05-19 at 13 36 05
By default, Sender Config is collapsed. This comes from

type config struct {
	SenderConfig SlackSenderConfig `json:"sender_config" jsonschema:"title=Slack Config"`
	Credentials  CredentialConfig  `json:"credentials" jsonschema:"title=Authentication" jsonschema_extras:"x-oauth2-provider=slack"`
}

I had to extract SlackSenderConfig because it was used in a few different places. What I really want is the equivalent to Serde's flatten attribute, but I can't figure out a way to get Go to do the same thing. Any ideas?


This change is Reviewable

Copy link
Member

@williamhbaker williamhbaker left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some comments as an initial review. You will need to add this to CI in the github workflow so it actually gets built - there might be some changes needed to actually get it building (I would suggest rebasing on the last from the main branch).

I'll plan on doing another detailed review after that is done.


# Build Stage
################################################################################
FROM golang:1.17-bullseye as builder
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This dockerfile will need updated to be like the current ones on the main branch, which use go 1.20, and the directory structure of the imports ("Build the connector projects we depend on.") has changed. See

ARG BASE_IMAGE=ghcr.io/estuary/base-image:v1
# Build Stage
################################################################################
FROM --platform=linux/amd64 golang:1.20-bullseye as builder
WORKDIR /builder
# Download & compile dependencies early. Doing this separately allows for layer
# caching opportunities when no dependencies are updated.
COPY go.* ./
RUN go mod download
COPY go ./go
COPY materialize-boilerplate ./materialize-boilerplate
COPY materialize-pinecone ./materialize-pinecone
.

Copy link
Contributor Author

@jshearer jshearer Jun 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I had already updated this. The change you're talking about is moving things from e.g go-materialize-boilerplate to go/materialize-boilerplate right? I did push one change to just include the whole go/ subdir, which seems to be what the other dockerfiles do

materialize-slack/main.go Outdated Show resolved Hide resolved
materialize-slack/main.go Outdated Show resolved Hide resolved
materialize-slack/main.go Outdated Show resolved Hide resolved
materialize-slack/main.go Outdated Show resolved Hide resolved
materialize-slack/main.go Outdated Show resolved Hide resolved
materialize-slack/main.go Outdated Show resolved Hide resolved
materialize-slack/main.go Outdated Show resolved Hide resolved
materialize-slack/main.go Outdated Show resolved Hide resolved
Copy link
Member

@williamhbaker williamhbaker left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for updating the transactor! I left some more comments. You will also need to update .github/workflows/ci.yaml in order for the connector to be built automatically.

materialize-slack/slack.go Outdated Show resolved Hide resolved
materialize-slack/slack.go Outdated Show resolved Hide resolved
materialize-slack/slack.go Outdated Show resolved Hide resolved
materialize-slack/main.go Outdated Show resolved Hide resolved
materialize-slack/main.go Outdated Show resolved Hide resolved
materialize-slack/main.go Outdated Show resolved Hide resolved
materialize-slack/main.go Outdated Show resolved Hide resolved
materialize-slack/main.go Outdated Show resolved Hide resolved
if marshal_err == nil {
log.Warn(fmt.Sprintf("Parse Blocks: %+v", string(serializedBlocks)))
}
log.Warn(fmt.Errorf("error sending message: %w", err))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may be something where we'd want to retry with rate limiting, but it doesn't look like that is implemented here, although I see the partial comment below and the sleep.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, retries would be nice. Do we have any common way of implementing that with e.g exponential backoff and all that?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could probably leverage something like this, from the google sheets materialization:

for attempt := 1; true; attempt++ {
var _, err = client.Spreadsheets.BatchUpdate(spreadsheetID,
&sheets.BatchUpdateSpreadsheetRequest{Requests: requests}).
Context(ctx).
Do()
if err == nil {
return nil
}
// If we encounter a rate limit error, retry after exponential back-off.
if e, ok := err.(*googleapi.Error); ok && retryableErrorCodes[e.Code] {
var dur = time.Duration(1<<attempt) * time.Second
log.Println("received error ", e.Code, http.StatusText(e.Code), ", retrying after ", dur)
time.Sleep(dur)
continue
}
// All other errors bail out.
return err
}
panic("not reached")
}

}

// Accept messages from at most 10 minutes in the past
if time.Since(ts).Minutes() < 10 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not really following this. What happens if a message is outside of this time window? Does it just get dropped? I would expect a connector like this to provide at-least-once materialization, but if it is possibly dropping messages it would not do that. It feels like there is some kind of filtering happening here that may not generally be desirable.

Copy link
Contributor Author

@jshearer jshearer Jun 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably be tunable, but the idea is that Slack is different from a regular database in that you can't send a message with a timestamp other than right now. So given that, the concept of "backfilling" slack messages doesn't make sense (this would look like sending a message to slack and having it get slotted in somewhere in the conversation's history, which you can't do), and we really do only want to deliver messages that happen in roughly real time. 10 minutes was picked as a reasonable window for "real time" slack messages to show up.

var parsed = buildDocument(b, it.Key, it.Values)
var tsStr, tsOk = parsed["ts"].(string)
var text, textOk = parsed["text"].(string)
var blocks = parsed["blocks"].(json.RawMessage)
Copy link
Member

@williamhbaker williamhbaker Jun 29, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is ok if blocks isn't present in the output from buildDocument? nil can be coerved into a json.RawMessage (which is a []byte)...and later string([]byte(nil)) I assume is ""? I'm not 100% sure on this so if you haven't tried it, it might not be a bad idea to test it out.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

narrator: it was not okay. fixed it

Copy link
Member

@williamhbaker williamhbaker left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall LGTM. Left a few minor comments.

My main uncertainty is with the filtering that the materialization is doing. I understand the reasoning and it sounds like it has been hashed out elsewhere already, so my uncertainty is not blocking. Perhaps the ideal solution would be for the kind of notBefore / notAfter type of general feature we have discussed elsewhere, and the behavior here will suffice until we have that.

if err := t.api.PostMessage(b.resource.Channel, text, blocksParsed.BlockSet, b.resource.SenderConfig); err != nil {
return nil, fmt.Errorf("error sending message: %w", err)
}
time.Sleep(time.Second * 10)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure about this 10 second sleep? It seems like if there are messages to be posted they should be posted without delay.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to avoid slamming Slack and hitting the rate-limiter

materialize-slack/main.go Outdated Show resolved Hide resolved
materialize-slack/slack.go Outdated Show resolved Hide resolved
if marshal_err == nil {
log.Warn(fmt.Sprintf("Parse Blocks: %+v", string(serializedBlocks)))
}
log.Warn(fmt.Errorf("error sending message: %w", err))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could probably leverage something like this, from the google sheets materialization:

for attempt := 1; true; attempt++ {
var _, err = client.Spreadsheets.BatchUpdate(spreadsheetID,
&sheets.BatchUpdateSpreadsheetRequest{Requests: requests}).
Context(ctx).
Do()
if err == nil {
return nil
}
// If we encounter a rate limit error, retry after exponential back-off.
if e, ok := err.(*googleapi.Error); ok && retryableErrorCodes[e.Code] {
var dur = time.Duration(1<<attempt) * time.Second
log.Println("received error ", e.Code, http.StatusText(e.Code), ", retrying after ", dur)
time.Sleep(dur)
continue
}
// All other errors bail out.
return err
}
panic("not reached")
}

@jgraettinger
Copy link
Member

👋 @jshearer We need to land this PR, because it's currently live in our UI but isn't getting updates until it does.

@williamhbaker
Copy link
Member

👀

willdonnelly and others added 15 commits October 8, 2024 14:39
This actually works pretty well, the main limitation is that
the config requires a concrete bearer token. I think it ought
to be possible to make it work via OAuth, so that might be a
route for future improvement if we want to actually mainline
this materialization.

The channel ID is part of the binding resource config, and can
be either a name or an explicit ID. Channel names will be looked
up using the `conversations.list` API method, while if you only
use explicit IDs that permission won't be needed. Explicit IDs
are specified with a name like `id:C12345` for Slack channel ID
`C12345`.
* Escape slack-specific strings
* Rate-limit API calls
* Fix `blocks` formatting
@jshearer
Copy link
Contributor Author

jshearer commented Oct 9, 2024

Ok, I patched this up and it's now working for the task-failures channel again. Gonna merge

@jshearer jshearer merged commit ec7f2c3 into main Oct 9, 2024
50 of 53 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants