Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support publishing new log entries to Pub/Sub topics #1580

Merged
merged 1 commit into from
Aug 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Dockerfile.pubsub-emulator
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# gcloud sdk for pubsub emulator with netcat added for the startup health check
FROM google/cloud-sdk:443.0.0@sha256:a59335d227a98b41ecdf6ff3d69923b8aef0703694e58992ecb75c7147140d37
RUN apt-get install -y netcat
4 changes: 4 additions & 0 deletions cmd/rekor-server/app/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ func init() {
Memory and file-based signers should only be used for testing.`)
rootCmd.PersistentFlags().String("rekor_server.signer-passwd", "", "Password to decrypt signer private key")

rootCmd.PersistentFlags().String("rekor_server.new_entry_publisher", "", "URL for pub/sub queue to send messages to when new entries are added to the log. Ignored if not set. Supported providers: [gcppubsub]")
rootCmd.PersistentFlags().Bool("rekor_server.publish_events_protobuf", false, "Whether to publish events in Protobuf wire format. Applies to all enabled event types.")
rootCmd.PersistentFlags().Bool("rekor_server.publish_events_json", false, "Whether to publish events in CloudEvents JSON format. Applies to all enabled event types.")
jalseth marked this conversation as resolved.
Show resolved Hide resolved

rootCmd.PersistentFlags().Uint16("port", 3000, "Port to bind to")

rootCmd.PersistentFlags().Bool("enable_retrieve_api", true, "enables Redis-based index API endpoint")
Expand Down
3 changes: 0 additions & 3 deletions cmd/rekor-server/app/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ var serveCmd = &cobra.Command{
Short: "start http server with configured api",
Long: `Starts a http server and serves the configured api`,
Run: func(cmd *cobra.Command, args []string) {

// Setup the logger to dev/prod
log.ConfigureLogger(viper.GetString("log_type"))

Expand All @@ -83,7 +82,6 @@ var serveCmd = &cobra.Command{
log.Logger.Error(err)
}
}()

//TODO: make this a config option for server to load via viper field
//TODO: add command line option to print versions supported in binary

Expand All @@ -101,7 +99,6 @@ var serveCmd = &cobra.Command{
hashedrekord.KIND: {hashedrekord_v001.APIVERSION},
dsse.KIND: {dsse_v001.APIVERSION},
}

for k, v := range pluggableTypeMap {
log.Logger.Infof("Loading support for pluggable type '%v'", k)
log.Logger.Infof("Loading version '%v' for pluggable type '%v'", v, k)
Expand Down
25 changes: 24 additions & 1 deletion docker-compose.test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ services:
context: .
target: "test"
environment:
- TMPDIR=/var/run/attestations # workaround for https://github.com/google/go-cloud/issues/3294
TMPDIR: /var/run/attestations # workaround for https://github.com/google/go-cloud/issues/3294
PUBSUB_EMULATOR_HOST: gcp-pubsub-emulator:8085
command: [
"rekor-server",
"-test.coverprofile=rekor-server.cov",
Expand All @@ -34,7 +35,29 @@ services:
"--enable_attestation_storage",
"--attestation_storage_bucket=file:///var/run/attestations",
"--max_request_body_size=32792576",
"--rekor_server.new_entry_publisher=gcppubsub://projects/test-project/topics/new-entry",
"--rekor_server.publish_events_json=true",
]
ports:
- "3000:3000"
- "2112:2112"
depends_on:
- gcp-pubsub-emulator
gcp-pubsub-emulator:
image: gcp-pubsub-emulator
ports:
- "8085:8085"
command:
- gcloud
- beta
- emulators
- pubsub
- start
- --host-port=0.0.0.0:8085
- --project=test-project
healthcheck:
test: ["CMD", "nc", "-zv", "localhost", "8085"]
interval: 10s
timeout: 3s
retries: 3
start_period: 10s
1 change: 0 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -114,4 +114,3 @@ services:
timeout: 3s
retries: 3
start_period: 5s

3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ require (
)

require (
cloud.google.com/go/pubsub v1.33.0
github.com/AdamKorcz/go-fuzz-headers-1 v0.0.0-20230618160516-e936619f9f18
github.com/cyberphone/json-canonicalization v0.0.0-20220623050100-57a0ce2678a7
github.com/go-redis/redismock/v9 v9.0.3
Expand Down Expand Up @@ -183,7 +184,7 @@ require (
golang.org/x/term v0.11.0 // indirect
golang.org/x/text v0.12.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
google.golang.org/api v0.135.0 // indirect
google.golang.org/api v0.135.0
google.golang.org/appengine v1.6.7 // indirect
gopkg.in/square/go-jose.v2 v2.6.0 // indirect
gopkg.in/yaml.v2 v2.4.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ cloud.google.com/go/pubsub v1.0.1/go.mod h1:R0Gpsv3s54REJCy4fxDixWD93lHJMoZTyQ2k
cloud.google.com/go/pubsub v1.1.0/go.mod h1:EwwdRX2sKPjnvnqCa270oGRyludottCI76h+R3AArQw=
cloud.google.com/go/pubsub v1.2.0/go.mod h1:jhfEVHT8odbXTkndysNHCcx0awwzvfOlguIAii9o8iA=
cloud.google.com/go/pubsub v1.3.1/go.mod h1:i+ucay31+CNRpDW4Lu78I4xXG+O1r/MAHgjpRVR+TSU=
cloud.google.com/go/pubsub v1.33.0 h1:6SPCPvWav64tj0sVX/+npCBKhUi/UjJehy9op/V3p2g=
cloud.google.com/go/pubsub v1.33.0/go.mod h1:f+w71I33OMyxf9VpMVcZbnG5KSUkCOUHYpFd5U1GdRc=
cloud.google.com/go/storage v1.0.0/go.mod h1:IhtSnM/ZTZV8YYJWCY8RULGVqBDmpoyjwiyrjsg+URw=
cloud.google.com/go/storage v1.5.0/go.mod h1:tpKbwo567HUNpVclU5sGELwQWBDZ8gh0ZeosJ0Rtdos=
cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohlUTyfDhBk=
Expand Down
24 changes: 24 additions & 0 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"google.golang.org/grpc/credentials/insecure"

"github.com/sigstore/rekor/pkg/log"
"github.com/sigstore/rekor/pkg/pubsub"
"github.com/sigstore/rekor/pkg/sharding"
"github.com/sigstore/rekor/pkg/signer"
"github.com/sigstore/rekor/pkg/storage"
Expand All @@ -39,6 +40,8 @@ import (
"github.com/sigstore/sigstore/pkg/cryptoutils"
"github.com/sigstore/sigstore/pkg/signature"
"github.com/sigstore/sigstore/pkg/signature/options"

_ "github.com/sigstore/rekor/pkg/pubsub/gcp" // Load GCP pubsub implementation
haydentherapper marked this conversation as resolved.
Show resolved Hide resolved
)

func dial(ctx context.Context, rpcServer string) (*grpc.ClientConn, error) {
Expand All @@ -63,6 +66,9 @@ type API struct {
signer signature.Signer
// stops checkpoint publishing
checkpointPublishCancel context.CancelFunc
// Publishes notifications when new entries are added to the log. May be
// nil if no publisher is configured.
newEntryPublisher pubsub.Publisher
}

func NewAPI(treeID uint) (*API, error) {
Expand Down Expand Up @@ -112,6 +118,18 @@ func NewAPI(treeID uint) (*API, error) {

pubkey := cryptoutils.PEMEncode(cryptoutils.PublicKeyPEMType, b)

var newEntryPublisher pubsub.Publisher
if p := viper.GetString("rekor_server.new_entry_publisher"); p != "" {
if !viper.GetBool("rekor_server.publish_events_protobuf") && !viper.GetBool("rekor_server.publish_events_json") {
return nil, fmt.Errorf("%q is configured but neither %q or %q are enabled", "new_entry_publisher", "publish_events_protobuf", "publish_events_json")
}
newEntryPublisher, err = pubsub.Get(ctx, p)
if err != nil {
return nil, fmt.Errorf("init event publisher: %w", err)
}
log.ContextLogger(ctx).Infof("Initialized new entry event publisher: %s", p)
}

return &API{
// Transparency Log Stuff
logClient: logClient,
Expand All @@ -121,6 +139,8 @@ func NewAPI(treeID uint) (*API, error) {
pubkey: string(pubkey),
pubkeyHash: hex.EncodeToString(pubkeyHashBytes[:]),
signer: rekorSigner,
// Utility functionality not required for operation of the core service
newEntryPublisher: newEntryPublisher,
}, nil
}

Expand Down Expand Up @@ -166,4 +186,8 @@ func ConfigureAPI(treeID uint) {

func StopAPI() {
api.checkpointPublishCancel()

if api.newEntryPublisher != nil {
api.newEntryPublisher.Close()
jalseth marked this conversation as resolved.
Show resolved Hide resolved
}
}
63 changes: 62 additions & 1 deletion pkg/api/entries.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,14 @@ import (
"google.golang.org/genproto/googleapis/rpc/code"
"google.golang.org/grpc/codes"

"github.com/sigstore/rekor/pkg/events"
"github.com/sigstore/rekor/pkg/events/newentry"
"github.com/sigstore/rekor/pkg/generated/models"
"github.com/sigstore/rekor/pkg/generated/restapi/operations/entries"
"github.com/sigstore/rekor/pkg/log"
"github.com/sigstore/rekor/pkg/pubsub"
"github.com/sigstore/rekor/pkg/sharding"
"github.com/sigstore/rekor/pkg/tle"
"github.com/sigstore/rekor/pkg/trillianclient"
"github.com/sigstore/rekor/pkg/types"
"github.com/sigstore/rekor/pkg/util"
Expand Down Expand Up @@ -290,7 +294,7 @@ func createLogEntry(params entries.CreateLogEntryParams) (models.LogEntry, middl
RootHash: swag.String(hex.EncodeToString(root.RootHash)),
LogIndex: swag.Int64(queuedLeaf.LeafIndex),
Hashes: hashes,
Checkpoint: stringPointer(string(scBytes)),
Checkpoint: swag.String(string(scBytes)),
}

logEntryAnon.Verification = &models.LogEntryAnonVerification{
Expand All @@ -301,9 +305,66 @@ func createLogEntry(params entries.CreateLogEntryParams) (models.LogEntry, middl
logEntry := models.LogEntry{
entryID: logEntryAnon,
}

if api.newEntryPublisher != nil {
// Publishing notifications should not block the API response.
jalseth marked this conversation as resolved.
Show resolved Hide resolved
go func() {
verifiers, err := entry.Verifiers()
if err != nil {
incPublishEvent(newentry.Name, "", false)
log.ContextLogger(ctx).Errorf("Could not get verifiers for log entry %s: %v", entryID, err)
return
}
var subjects []string
for _, v := range verifiers {
subjects = append(subjects, v.Subjects()...)
}

pbEntry, err := tle.GenerateTransparencyLogEntry(logEntryAnon)
if err != nil {
incPublishEvent(newentry.Name, "", false)
log.ContextLogger(ctx).Error(err)
return
}
event, err := newentry.New(entryID, pbEntry, subjects)
if err != nil {
incPublishEvent(newentry.Name, "", false)
log.ContextLogger(ctx).Error(err)
return
}
if viper.GetBool("rekor_server.publish_events_protobuf") {
go publishEvent(ctx, api.newEntryPublisher, event, events.ContentTypeProtobuf)
jalseth marked this conversation as resolved.
Show resolved Hide resolved
}
if viper.GetBool("rekor_server.publish_events_json") {
go publishEvent(ctx, api.newEntryPublisher, event, events.ContentTypeJSON)
}
}()
}

return logEntry, nil
}

func publishEvent(ctx context.Context, publisher pubsub.Publisher, event *events.Event, contentType events.EventContentType) {
err := publisher.Publish(context.WithoutCancel(ctx), event, contentType)
incPublishEvent(event.Type().Name(), contentType, err == nil)
if err != nil {
log.ContextLogger(ctx).Error(err)
}
}

func incPublishEvent(event string, contentType events.EventContentType, ok bool) {
status := "SUCCESS"
if !ok {
status = "ERROR"
}
labels := map[string]string{
"event": event,
"status": status,
"content_type": string(contentType),
}
metricPublishEvents.With(labels).Inc()
}

// CreateLogEntryHandler creates new entry into log
func CreateLogEntryHandler(params entries.CreateLogEntryParams) middleware.Responder {
httpReq := params.HTTPRequest
Expand Down
5 changes: 5 additions & 0 deletions pkg/api/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ var (
Help: "The total number of new log entries",
})

metricPublishEvents = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "rekor_publish_events",
Help: "The status of publishing events to Pub/Sub",
}, []string{"event", "content_type", "status"})

MetricLatency = promauto.NewHistogramVec(prometheus.HistogramOpts{
Name: "rekor_api_latency",
Help: "Api Latency on calls",
Expand Down
19 changes: 19 additions & 0 deletions pkg/events/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Copyright 2023 The Sigstore Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package events provides methods for working with CloudEvents.
package events

// The version of the CloudEvents specification the package adheres to.
const CloudEventsSpecVersion = "1.0"
Loading