Skip to content

Commit

Permalink
Drop user context when closing client interceptor pipeline (#1279)
Browse files Browse the repository at this point in the history
Motivation:

Interceptor pipelines hold arrays of interceptor contexts. Each context
maintains a reference back to the pipeline. However the client
interceptor pipeline never broke this reference cycle when closing the
pipeline which means that using client interceptors introduces a memory
leak.

Modifications:

- Explicitly remove all user contexts when the client interceptor pipeline is
  torn down (the server pipeline already does this so is unaffected)
- Add allocation tests so we can see the overhead of using interceptors
  (and, more importantly, whether allocations are freed)

Result:

Using client interceptors no longer leaks.
  • Loading branch information
glbrntt authored Sep 29, 2021
1 parent 13b0ce9 commit 5ddbb61
Show file tree
Hide file tree
Showing 5 changed files with 131 additions and 6 deletions.
6 changes: 6 additions & 0 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ jobs:
MAX_ALLOCS_ALLOWED_embedded_server_bidi_1k_rpcs_1_small_request: 67000
MAX_ALLOCS_ALLOWED_embedded_server_unary_1k_rpcs_1_small_request: 63000
MAX_ALLOCS_ALLOWED_unary_1k_ping_pong: 204000
MAX_ALLOCS_ALLOWED_unary_1k_ping_pong_interceptors_client: 211000
MAX_ALLOCS_ALLOWED_unary_1k_ping_pong_interceptors_server: 211000
- image: swift:5.3-focal
env:
MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_10_requests: 504000
Expand All @@ -65,6 +67,8 @@ jobs:
MAX_ALLOCS_ALLOWED_embedded_server_bidi_1k_rpcs_1_small_request: 67000
MAX_ALLOCS_ALLOWED_embedded_server_unary_1k_rpcs_1_small_request: 63000
MAX_ALLOCS_ALLOWED_unary_1k_ping_pong: 205000
MAX_ALLOCS_ALLOWED_unary_1k_ping_pong_interceptors_client: 212000
MAX_ALLOCS_ALLOWED_unary_1k_ping_pong_interceptors_server: 212000
- image: swift:5.2-bionic
env:
MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_10_requests: 515000
Expand All @@ -73,6 +77,8 @@ jobs:
MAX_ALLOCS_ALLOWED_embedded_server_bidi_1k_rpcs_1_small_request: 67000
MAX_ALLOCS_ALLOWED_embedded_server_unary_1k_rpcs_1_small_request: 63000
MAX_ALLOCS_ALLOWED_unary_1k_ping_pong: 206000
MAX_ALLOCS_ALLOWED_unary_1k_ping_pong_interceptors_client: 213000
MAX_ALLOCS_ALLOWED_unary_1k_ping_pong_interceptors_server: 213000
name: Performance Tests on ${{ matrix.image }}
runs-on: ubuntu-latest
container:
Expand Down
90 changes: 88 additions & 2 deletions Performance/allocations/tests/shared/Common.swift
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@ import NIO
func makeEchoServer(
group: EventLoopGroup,
host: String = "127.0.0.1",
port: Int = 0
port: Int = 0,
interceptors: Echo_EchoServerInterceptorFactoryProtocol? = nil
) -> EventLoopFuture<Server> {
return Server.insecure(group: group)
.withServiceProviders([MinimalEchoProvider()])
.withServiceProviders([MinimalEchoProvider(interceptors: interceptors)])
.bind(host: host, port: port)
}

Expand All @@ -34,3 +35,88 @@ func makeClientConnection(
return ClientConnection.insecure(group: group)
.connect(host: host, port: port)
}

func makeEchoClientInterceptors(count: Int) -> Echo_EchoClientInterceptorFactoryProtocol? {
let factory = EchoClientInterceptors()
for _ in 0 ..< count {
factory.register { NoOpEchoClientInterceptor() }
}
return factory
}

func makeEchoServerInterceptors(count: Int) -> Echo_EchoServerInterceptorFactoryProtocol? {
let factory = EchoServerInterceptors()
for _ in 0 ..< count {
factory.register { NoOpEchoServerInterceptor() }
}
return factory
}

final class EchoClientInterceptors: Echo_EchoClientInterceptorFactoryProtocol {
internal typealias Factory = () -> ClientInterceptor<Echo_EchoRequest, Echo_EchoResponse>
private var factories: [Factory] = []

internal init(_ factories: Factory...) {
self.factories = factories
}

internal func register(_ factory: @escaping Factory) {
self.factories.append(factory)
}

private func makeInterceptors() -> [ClientInterceptor<Echo_EchoRequest, Echo_EchoResponse>] {
return self.factories.map { $0() }
}

func makeGetInterceptors() -> [ClientInterceptor<Echo_EchoRequest, Echo_EchoResponse>] {
return self.makeInterceptors()
}

func makeExpandInterceptors() -> [ClientInterceptor<Echo_EchoRequest, Echo_EchoResponse>] {
return self.makeInterceptors()
}

func makeCollectInterceptors() -> [ClientInterceptor<Echo_EchoRequest, Echo_EchoResponse>] {
return self.makeInterceptors()
}

func makeUpdateInterceptors() -> [ClientInterceptor<Echo_EchoRequest, Echo_EchoResponse>] {
return self.makeInterceptors()
}
}

internal final class EchoServerInterceptors: Echo_EchoServerInterceptorFactoryProtocol {
internal typealias Factory = () -> ServerInterceptor<Echo_EchoRequest, Echo_EchoResponse>
private var factories: [Factory] = []

internal init(_ factories: Factory...) {
self.factories = factories
}

internal func register(_ factory: @escaping Factory) {
self.factories.append(factory)
}

private func makeInterceptors() -> [ServerInterceptor<Echo_EchoRequest, Echo_EchoResponse>] {
return self.factories.map { $0() }
}

func makeGetInterceptors() -> [ServerInterceptor<Echo_EchoRequest, Echo_EchoResponse>] {
return self.makeInterceptors()
}

func makeExpandInterceptors() -> [ServerInterceptor<Echo_EchoRequest, Echo_EchoResponse>] {
return self.makeInterceptors()
}

func makeCollectInterceptors() -> [ServerInterceptor<Echo_EchoRequest, Echo_EchoResponse>] {
return self.makeInterceptors()
}

func makeUpdateInterceptors() -> [ServerInterceptor<Echo_EchoRequest, Echo_EchoResponse>] {
return self.makeInterceptors()
}
}

final class NoOpEchoClientInterceptor: ClientInterceptor<Echo_EchoRequest, Echo_EchoResponse> {}
final class NoOpEchoServerInterceptor: ServerInterceptor<Echo_EchoRequest, Echo_EchoResponse> {}
32 changes: 29 additions & 3 deletions Performance/allocations/tests/test_unary_1k_ping_pong.swift
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,31 @@ class UnaryPingPongBenchmark: Benchmark {
private var group: EventLoopGroup!
private var server: Server!
private var client: ClientConnection!
private let clientInterceptors: Echo_EchoClientInterceptorFactoryProtocol?
private let serverInterceptors: Echo_EchoServerInterceptorFactoryProtocol?

init(rpcs: Int, request: String) {
init(
rpcs: Int,
request: String,
clientInterceptors: Int = 0,
serverInterceptors: Int = 0
) {
self.rpcs = rpcs
self.request = .with { $0.text = request }
self.clientInterceptors = clientInterceptors > 0
? makeEchoClientInterceptors(count: clientInterceptors)
: nil
self.serverInterceptors = serverInterceptors > 0
? makeEchoServerInterceptors(count: serverInterceptors)
: nil
}

func setUp() throws {
self.group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
self.server = try makeEchoServer(group: self.group).wait()
self.server = try makeEchoServer(
group: self.group,
interceptors: self.serverInterceptors
).wait()
self.client = makeClientConnection(
group: self.group,
port: self.server.channel.localAddress!.port!
Expand All @@ -45,7 +61,7 @@ class UnaryPingPongBenchmark: Benchmark {
}

func run() throws -> Int {
let echo = Echo_EchoClient(channel: self.client)
let echo = Echo_EchoClient(channel: self.client, interceptors: self.clientInterceptors)
var responseLength = 0

for _ in 0 ..< self.rpcs {
Expand All @@ -63,4 +79,14 @@ func run(identifier: String) {
let benchmark = UnaryPingPongBenchmark(rpcs: 1000, request: "")
return try! benchmark.runOnce()
}

measure(identifier: identifier + "_interceptors_server") {
let benchmark = UnaryPingPongBenchmark(rpcs: 1000, request: "", serverInterceptors: 5)
return try! benchmark.runOnce()
}

measure(identifier: identifier + "_interceptors_client") {
let benchmark = UnaryPingPongBenchmark(rpcs: 1000, request: "", clientInterceptors: 5)
return try! benchmark.runOnce()
}
}
3 changes: 3 additions & 0 deletions Sources/GRPC/Interceptor/ClientInterceptorPipeline.swift
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,9 @@ extension ClientInterceptorPipeline {
self._scheduledClose?.cancel()
self._scheduledClose = nil

// Drop the contexts since they reference us.
self._userContexts.removeAll()

// Cancel the transport.
self._onCancel?(nil)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@ import NIO
/// The echo provider that comes with the example does some string processing, we'll avoid some of
/// that here so we're looking at the right things.
public class MinimalEchoProvider: Echo_EchoProvider {
public let interceptors: Echo_EchoServerInterceptorFactoryProtocol? = nil
public let interceptors: Echo_EchoServerInterceptorFactoryProtocol?

public init(interceptors: Echo_EchoServerInterceptorFactoryProtocol? = nil) {
self.interceptors = interceptors
}

public func get(
request: Echo_EchoRequest,
Expand Down

0 comments on commit 5ddbb61

Please sign in to comment.