Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix #4154: adding a callback for stream consumption #5119

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

shawkins
Copy link
Contributor

Description

Here's a rough draft of the handling for places where outputstreams are passed in.

Instead the user may provide a simple StreamConsumer modeled after the handling that is done for the jdk WebSocket.Listener.onMessage method:

logWatch(bytes -> // do something, true)

The next parameter is whether the handler is blocking or not - this merits some discussion.

We can do several things:

  1. Allow the user to explicitly to declare if the consumer if blocking as shown in this initial draft. The downside is that this adds some complexity with the gain of cutting down on context switching and keeping the work on the io thread pool. Here are three examples:
logWatch(StreamConsumer.newStreamConsumer(socketOutputStream), true) -- effectively what logWatch(outputStream) does

logWatch(StreamConsumer.newBlockingStreamConsumer(writableByteChannel), true) 

logWatch(StreamConsumer.newStreamConsumer(byteArrayOutputStream), false) -- write to the consumer as non-blocking

Any other non-blocking logic would be up to the user, but there is probably something we could build in for non-blocking writable byte channels.

  1. Rather than a parameter, change StreamConsumer to an abstract class or add default methods that nominally default to blocking.

In this scenario we could do some instanceof or other checks to try to automatically determine the appropriate mode as well.

The user could ultimately override though if needed.

  1. Just assume everything is blocking, similar to what we're doing now with watches. If the eventual goal is to fully eliminate the kubernetes client thread pool, this won't get us there.

  2. Or rely upon an existing reactive library for stream handling.

Beyond this the other cases we need to handle are:

  • where the api provides an outputstream (exec in stream), we can additionally provide a non-blocking write method
  • where the api provides an reader/inputstream (getLogReader/getLogInputStream) would also provide something like a consumeLog(StreamConsumer ...) - and we'd need to consider if this should directly provide both a binary and text option.
  • where the user provides an inputstream (load, upload, submit build) is a sticking point. The logic currently won't handle that very well as load / parsing is blocking. There isn't good support all the way to the httpclient layer for put/post options - and even at that layer it appears jetty has no support for this. upload however can be implemented as non-blocking, to avoid polling we'll probably need a StreamProducer read method that provides a CompletionStage.

@rohanKanojia @manusa @vietj what are your thoughts?

Type of change

  • Bug fix (non-breaking change which fixes an issue)
  • Feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to change
  • Chore (non-breaking change which doesn't affect codebase;
    test, version modification, documentation, etc.)

Checklist

  • Code contributed by me aligns with current project license: Apache 2.0
  • I Added CHANGELOG entry regarding this change
  • I have implemented unit tests to cover my changes
  • I have added/updated the javadocs and other documentation accordingly
  • No new bugs, code smells, etc. in SonarCloud report
  • I tested my code in Kubernetes
  • I tested my code in OpenShift

and implementing exec stream available
@sonarcloud
Copy link

sonarcloud bot commented May 18, 2023

SonarCloud Quality Gate failed.    Quality Gate failed

Bug E 3 Bugs
Vulnerability A 0 Vulnerabilities
Security Hotspot A 0 Security Hotspots
Code Smell A 5 Code Smells

21.1% 21.1% Coverage
1.4% 1.4% Duplication

@shawkins
Copy link
Contributor Author

where the api provides an outputstream (exec in stream), we can additionally provide a non-blocking write method

The options I see here are:

  1. Provide a blocking / non-blocking WritableByteChannel, that is also used as the basis for the getInput outputstream. Creating a subclass of AbstractSelectableChannel is possible for this, but it may be more confusing that we'll have both getInput and getInputChannel given that the user should only use one of them. This requires quite a bit of new logic.
  2. Provide a readingInput method variant that takes a ReadableByteChannel and a blocking mode parameter. We'd need to bifurcate the InputStreamPumper related logic to account for polling from non-blocking channel. This also provides a little less control over how writes are processed as there's no ability to flush.
  3. Punt for now and just provide a method that would detect if a given write of bytes would block, users would be on the hook for writing there own non-blocking logic on top of that. The disadvantage of this approach is that the method may not age well.

For this draft I've added the third option, but we can certainly change that if desired.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant