Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Http client websocket support #1895

Merged
merged 26 commits into from
Jul 27, 2023
Merged

Http client websocket support #1895

merged 26 commits into from
Jul 27, 2023

Conversation

benzwreck
Copy link
Contributor

@benzwreck benzwreck commented Jul 12, 2023

Before submitting pull request:

  • Check if the project compiles by running sbt compile
  • Verify docs compilation by running sbt compileDocs
  • Check if tests pass by running sbt test
  • Format code by running sbt scalafmt

@benzwreck benzwreck marked this pull request as ready for review July 17, 2023 09:47

object DefaultSyncBackend {

/** Creates a default synchronous backend with the given `options`, which is currently based on
* [[HttpClientSyncBackend]].
*/
def apply(options: BackendOptions = BackendOptions.Default): SyncBackend =
def apply(options: BackendOptions = BackendOptions.Default): WebSocketBackend[Identity] =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should return a dedicated subtype, WebSocketSyncBackend; that's what #1766 is about

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(the goal is to expose to the user "clean" signatures, without the Identity wrapper)

docs/testing.md Outdated
@@ -230,7 +230,7 @@ It is also possible to create a stub backend which delegates calls to another (p

```scala mdoc:compile-only
val testingBackend =
SyncBackendStub.withFallback(DefaultSyncBackend())
WebSocketBackendStub.withFallback(DefaultSyncBackend())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can keep SyncBackendStub here

}

private[client4] object WebSocketImpl {
def sync[F[_]](
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we still need a sequencer for the sync version - a sync websocket can still be used from multiple threads. I think using a semaphore as in FutureSequencer should work here (but in a sync version, of course)

@adamw
Copy link
Member

adamw commented Jul 18, 2023

This is going in the right direction :)

@benzwreck benzwreck requested a review from adamw July 21, 2023 07:23
semaphore.acquire()
}
val result = t
semaphore.release()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the release should be in a finally block

@@ -52,6 +52,11 @@ trait SyncBackend extends Backend[Identity] {
override def monad: MonadError[Identity] = IdMonad
}

/** A [[GenericBackend]] which is synchronous (side effects are run directly), and supports web sockets. */
trait WebSocketSyncBackend extends SyncBackend with WebSocketBackend[Identity] {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should have an override for send in WebSocketRequest with the simplified signature:

def send(backend: WebSocketSyncBackend): Response[T] = backend.send(this)

@@ -13,7 +13,7 @@ object GetRawResponseBodySynchronous extends App {
.get(uri"https://httpbin.org/get")
.response(asBoth(asJson[HttpBinResponse], asStringAlways))

val backend: SyncBackend = HttpClientSyncBackend()
val backend: WebSocketSyncBackend = HttpClientSyncBackend()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think in the non-websocket examples we could leave SyncBackend, since it's a super-trait? We don't need the ws capability here, so maybe it's simpler when it's typed this way?

new WebSocketImpl[F](ws, queue, _isOpen, sequencer) {
override implicit def monad: MonadError[F] = _monad

override protected[client4] def fromCompletableFuture(cf: CompletableFuture[JWebSocket]): F[Unit] =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[minor] maybe we should simply make fromCompletableFuture a parameter (of type CF => F[Unit]), instead of a method?

sequencer: Sequencer[Identity]
): Identity[Response[T]] = {
val isOpen: AtomicBoolean = new AtomicBoolean(false)
val responseCell = new ArrayBlockingQueue[Either[Throwable, Future[Response[T]]]](5)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder why this is 5 ... wouldn't 1 work as well?

@benzwreck benzwreck requested a review from adamw July 24, 2023 07:54
@@ -36,8 +36,8 @@ You can disable the stripping of all sensitive headers using the following code:
import sttp.client4._
import sttp.client4.wrappers.{FollowRedirectsBackend, FollowRedirectsConfig}

val myBackend: SyncBackend = DefaultSyncBackend()
val backend: SyncBackend = FollowRedirectsBackend(
val myBackend: WebSocketSyncBackend = DefaultSyncBackend()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's use SyncBackend here, since we don't need web socket functionality

docs/json.md Outdated
@@ -46,7 +46,7 @@ Response can be parsed into json using `asJson[T]`, provided there's an implicit
import sttp.client4._
import sttp.client4.circe._

val backend: SyncBackend = DefaultSyncBackend()
val backend: WebSocketSyncBackend = DefaultSyncBackend()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here

@@ -32,7 +32,7 @@ In order to add digest authentication support just wrap other backend as follows
```scala mdoc:compile-only
import sttp.client4.wrappers.DigestAuthenticationBackend

val myBackend: SyncBackend = DefaultSyncBackend()
val myBackend: WebSocketSyncBackend = DefaultSyncBackend()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and here :)

blocking {
semaphore.acquire()
}
val result = t
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can be simplified: the result is not needed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, but t is a by-name parameter so I guess we need to run it before returning. Otherwise how would that method look like?

blocking {
    semaphore.acquire()
}
semaphore.release()
t

? Cause we need to return T here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

t will be evaluted each time it's referenced in code

so:

blocking {
    semaphore.acquire()
}

try t
finally semaphore.release()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I get it :) Thank you!

private[client4] class IdSequencer extends Sequencer[Identity] {
private val semaphore = new Semaphore(1)

def apply[T](t: => Identity[T]): Identity[T] =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need the Identity in types, wouldn't def apply[T](t: => T): T work?

@@ -103,8 +127,46 @@ abstract class HttpClientBackend[F[_], S, P, B](
monad.map(body)(Response(_, code, "", headers, Nil, request.onlyMetadata))
}

protected def createSimpleQueue[T]: F[SimpleQueue[F, T]]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do these methods need to be in the super-class? are they used here at all?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is needed in WebSocket implementation and used by all classes in httpclient package

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, but does it need to be declared in HttpClientBackend? Is there any common code that uses this method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was declared in HttpClientAsyncBackend before because it was only needed in async implementation (HttpClientFutureBackend). I thought it would be a good idea to move it to HttpClientBackend since it is used in HttpClientSyncBackend which implements HttpClientBackend and HttpClientFutureBackend which implements HttpClientAsyncBackend which implements HttpClientBackend.
So from my point of view it is not a bad idea to have it here. WDYT?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from the point of view of the HttpCLientBackend class, this method is not used, and hence an implement detail of child classes - so it shouldn't be declared there

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can see your point and it makes sense to me right now. Thanks for the explanation :)

protected def standardEncoding: (B, String) => B

private[client4] def prepareWebSocketBuilder[T](
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe this can be protected, as it's only used from subclasses?

f match {
case WebSocketFrame.Text(payload, finalFragment, _) =>
fromCompletableFuture(ws.sendText(payload, finalFragment))
handleWS(ws.sendText(payload, finalFragment))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the name is right here, this method doesn't "handle the websocket" - it just converts the future to the effect, ignoring the result?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed the name to fromCompletableFutureToEffect. Couldn't figured out a better one :/

}
)
val baseResponse = Response((), StatusCode.SwitchingProtocols, "", Nil, Nil, request.onlyMetadata)
val body = Future(blocking(bodyFromHttpClient(Right(webSocket), request.response, baseResponse)))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since we're in a blocking world I think we can simply do Future.successfull(bodyFrom...)

@adamw adamw merged commit d5a42e8 into master Jul 27, 2023
12 of 15 checks passed
@mergify mergify bot deleted the http-client-websocket-support branch July 27, 2023 10:20
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add WebSocket support to HttpClientSyncBackend Add a WebSocketSyncBackend type
2 participants