diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index f079d2fe60..94dd9ee899 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -10,6 +10,7 @@ androidx-test-runner = { module = "androidx.test:runner", version = "1.5.2" } binaryCompatibilityValidator = { module = "org.jetbrains.kotlinx.binary-compatibility-validator:org.jetbrains.kotlinx.binary-compatibility-validator.gradle.plugin", version = "0.16.3" } kotlin-test = { module = "org.jetbrains.kotlin:kotlin-test" } kotlin-test-junit = { module = "org.jetbrains.kotlin:kotlin-test-junit" } +kotlin-coroutines = { module = "org.jetbrains.kotlinx:kotlinx-coroutines-core", version = "1.7.3" } kotlin-time = { module = "org.jetbrains.kotlinx:kotlinx-datetime", version = "0.6.1" } jmh-gradle-plugin = { module = "me.champeau.jmh:jmh-gradle-plugin", version = "0.7.2" } jmh-core = { module = "org.openjdk.jmh:jmh-core", version.ref = "jmh" } diff --git a/okio/build.gradle.kts b/okio/build.gradle.kts index 0db30006ce..df5377a4a3 100644 --- a/okio/build.gradle.kts +++ b/okio/build.gradle.kts @@ -158,6 +158,11 @@ kotlin { nativeTest.dependsOn(nonWasmTest) nativeTest.dependsOn(zlibTest) createSourceSet("appleTest", parent = nativeTest, children = appleTargets) + .apply { + dependencies { + implementation(libs.kotlin.coroutines) + } + } } } diff --git a/okio/src/appleMain/kotlin/okio/ApplePlatform.kt b/okio/src/appleMain/kotlin/okio/ApplePlatform.kt new file mode 100644 index 0000000000..af2b8648fd --- /dev/null +++ b/okio/src/appleMain/kotlin/okio/ApplePlatform.kt @@ -0,0 +1,16 @@ +package okio + +import kotlinx.cinterop.UnsafeNumber +import platform.Foundation.NSError +import platform.Foundation.NSLocalizedDescriptionKey +import platform.Foundation.NSUnderlyingErrorKey + +@OptIn(UnsafeNumber::class) +internal fun Exception.toNSError() = NSError( + domain = "Kotlin", + code = 0, + userInfo = mapOf( + NSLocalizedDescriptionKey to message, + NSUnderlyingErrorKey to this, + ), +) diff --git a/okio/src/appleMain/kotlin/okio/BufferedSink.kt b/okio/src/appleMain/kotlin/okio/BufferedSink.kt new file mode 100644 index 0000000000..ce5d57f54e --- /dev/null +++ b/okio/src/appleMain/kotlin/okio/BufferedSink.kt @@ -0,0 +1,172 @@ +/* + * Copyright (C) 2020 Square, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package okio + +import kotlin.experimental.ExperimentalNativeApi +import kotlin.native.ref.WeakReference +import kotlinx.cinterop.CPointer +import kotlinx.cinterop.UnsafeNumber +import kotlinx.cinterop.convert +import platform.Foundation.NSError +import platform.Foundation.NSOutputStream +import platform.Foundation.NSRunLoop +import platform.Foundation.NSRunLoopMode +import platform.Foundation.NSStream +import platform.Foundation.NSStreamDataWrittenToMemoryStreamKey +import platform.Foundation.NSStreamDelegateProtocol +import platform.Foundation.NSStreamEvent +import platform.Foundation.NSStreamEventErrorOccurred +import platform.Foundation.NSStreamEventHasSpaceAvailable +import platform.Foundation.NSStreamEventOpenCompleted +import platform.Foundation.NSStreamPropertyKey +import platform.Foundation.NSStreamStatusClosed +import platform.Foundation.NSStreamStatusError +import platform.Foundation.NSStreamStatusNotOpen +import platform.Foundation.NSStreamStatusOpen +import platform.Foundation.NSStreamStatusOpening +import platform.Foundation.NSStreamStatusWriting +import platform.Foundation.performInModes +import platform.darwin.NSInteger +import platform.darwin.NSUInteger +import platform.posix.uint8_tVar + +/** + * Returns an output stream that writes to this sink. Closing the stream will also close this sink. + */ +fun BufferedSink.outputStream(): NSOutputStream = BufferedSinkNSOutputStream(this) + +@OptIn(UnsafeNumber::class, ExperimentalNativeApi::class) +private class BufferedSinkNSOutputStream( + private val sink: BufferedSink, +) : NSOutputStream(toMemory = Unit), NSStreamDelegateProtocol { + + private val isClosed: () -> Boolean = when (sink) { + is RealBufferedSink -> sink::closed + is Buffer -> { + { false } + } + } + + private var status = NSStreamStatusNotOpen + private var error: NSError? = null + set(value) { + status = NSStreamStatusError + field = value + postEvent(NSStreamEventErrorOccurred) + sink.close() + } + + override fun streamStatus() = if (status != NSStreamStatusError && isClosed()) NSStreamStatusClosed else status + + override fun streamError() = error + + override fun open() { + if (status == NSStreamStatusNotOpen) { + status = NSStreamStatusOpening + status = NSStreamStatusOpen + postEvent(NSStreamEventOpenCompleted) + postEvent(NSStreamEventHasSpaceAvailable) + } + } + + override fun close() { + if (status == NSStreamStatusError || status == NSStreamStatusNotOpen) return + status = NSStreamStatusClosed + runLoop = null + runLoopModes = listOf() + sink.close() + } + + override fun write(buffer: CPointer?, maxLength: NSUInteger): NSInteger { + if (streamStatus != NSStreamStatusOpen || buffer == null) return -1 + status = NSStreamStatusWriting + val toWrite = minOf(maxLength, Int.MAX_VALUE.convert()).toInt() + return try { + sink.buffer.write(buffer, toWrite) + sink.emitCompleteSegments() + status = NSStreamStatusOpen + toWrite.convert() + } catch (e: Exception) { + error = e.toNSError() + -1 + } + } + + override fun hasSpaceAvailable() = !isFinished + + private val isFinished + get() = when (streamStatus) { + NSStreamStatusClosed, NSStreamStatusError -> true + else -> false + } + + override fun propertyForKey(key: NSStreamPropertyKey): Any? = when (key) { + NSStreamDataWrittenToMemoryStreamKey -> sink.buffer.snapshotAsNSData() + else -> null + } + + override fun setProperty(property: Any?, forKey: NSStreamPropertyKey) = false + + // WeakReference as delegate should not be retained + // https://developer.apple.com/documentation/foundation/nsstream/1418423-delegate + private var _delegate: WeakReference? = null + private var runLoop: NSRunLoop? = null + private var runLoopModes = listOf() + + private fun postEvent(event: NSStreamEvent) { + val runLoop = runLoop ?: return + runLoop.performInModes(runLoopModes) { + if (runLoop == this.runLoop) { + delegateOrSelf.stream(this, event) + } + } + } + + override fun delegate() = _delegate?.value + + private val delegateOrSelf get() = delegate ?: this + + override fun setDelegate(delegate: NSStreamDelegateProtocol?) { + _delegate = delegate?.let { WeakReference(it) } + } + + override fun stream(aStream: NSStream, handleEvent: NSStreamEvent) { + // no-op + } + + override fun scheduleInRunLoop(aRunLoop: NSRunLoop, forMode: NSRunLoopMode) { + if (runLoop == null) { + runLoop = aRunLoop + } + if (runLoop == aRunLoop) { + runLoopModes += forMode + } + if (status == NSStreamStatusOpen) { + postEvent(NSStreamEventHasSpaceAvailable) + } + } + + override fun removeFromRunLoop(aRunLoop: NSRunLoop, forMode: NSRunLoopMode) { + if (aRunLoop == runLoop) { + runLoopModes -= forMode + if (runLoopModes.isEmpty()) { + runLoop = null + } + } + } + + override fun description() = "$sink.outputStream()" +} diff --git a/okio/src/appleMain/kotlin/okio/BufferedSource.kt b/okio/src/appleMain/kotlin/okio/BufferedSource.kt new file mode 100644 index 0000000000..cd6adbe5d0 --- /dev/null +++ b/okio/src/appleMain/kotlin/okio/BufferedSource.kt @@ -0,0 +1,196 @@ +/* + * Copyright (C) 2020 Square, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package okio + +import kotlin.experimental.ExperimentalNativeApi +import kotlin.native.ref.WeakReference +import kotlinx.cinterop.CPointer +import kotlinx.cinterop.CPointerVar +import kotlinx.cinterop.UnsafeNumber +import kotlinx.cinterop.convert +import platform.Foundation.NSData +import platform.Foundation.NSError +import platform.Foundation.NSInputStream +import platform.Foundation.NSRunLoop +import platform.Foundation.NSRunLoopMode +import platform.Foundation.NSStream +import platform.Foundation.NSStreamDelegateProtocol +import platform.Foundation.NSStreamEvent +import platform.Foundation.NSStreamEventEndEncountered +import platform.Foundation.NSStreamEventErrorOccurred +import platform.Foundation.NSStreamEventHasBytesAvailable +import platform.Foundation.NSStreamEventOpenCompleted +import platform.Foundation.NSStreamPropertyKey +import platform.Foundation.NSStreamStatusAtEnd +import platform.Foundation.NSStreamStatusClosed +import platform.Foundation.NSStreamStatusError +import platform.Foundation.NSStreamStatusNotOpen +import platform.Foundation.NSStreamStatusOpen +import platform.Foundation.NSStreamStatusOpening +import platform.Foundation.NSStreamStatusReading +import platform.Foundation.performInModes +import platform.darwin.NSInteger +import platform.darwin.NSUInteger +import platform.darwin.NSUIntegerVar +import platform.posix.uint8_tVar + +/** Returns an input stream that reads from this source. */ +fun BufferedSource.inputStream(): NSInputStream = BufferedSourceInputStream(this) + +@OptIn(UnsafeNumber::class, ExperimentalNativeApi::class) +private class BufferedSourceInputStream( + private val source: BufferedSource, +) : NSInputStream(NSData()), NSStreamDelegateProtocol { + + private val isClosed: () -> Boolean = when (source) { + is RealBufferedSource -> source::closed + is Buffer -> { + { false } + } + } + + private var status = NSStreamStatusNotOpen + private var error: NSError? = null + set(value) { + status = NSStreamStatusError + field = value + source.close() + } + + override fun streamStatus() = if (status != NSStreamStatusError && isClosed()) NSStreamStatusClosed else status + + override fun streamError() = error + + override fun open() { + if (status == NSStreamStatusNotOpen) { + status = NSStreamStatusOpening + status = NSStreamStatusOpen + postEvent(NSStreamEventOpenCompleted) + checkBytes() + } + } + + override fun close() { + if (status == NSStreamStatusError || status == NSStreamStatusNotOpen) return + status = NSStreamStatusClosed + runLoop = null + runLoopModes = listOf() + source.close() + } + + override fun read(buffer: CPointer?, maxLength: NSUInteger): NSInteger { + if (streamStatus != NSStreamStatusOpen && streamStatus != NSStreamStatusAtEnd || buffer == null) return -1 + status = NSStreamStatusReading + try { + if (source.exhausted()) { + status = NSStreamStatusAtEnd + return 0 + } + val toRead = minOf(maxLength.toLong(), source.buffer.size, Int.MAX_VALUE.toLong()).toInt() + val read = source.buffer.read(buffer, toRead).convert() + status = NSStreamStatusOpen + checkBytes() + return read + } catch (e: Exception) { + error = e.toNSError() + postEvent(NSStreamEventErrorOccurred) + return -1 + } + } + + override fun getBuffer(buffer: CPointer>?, length: CPointer?) = false + + override fun hasBytesAvailable() = !isFinished + + private val isFinished + get() = when (streamStatus) { + NSStreamStatusClosed, NSStreamStatusError -> true + else -> false + } + + override fun propertyForKey(key: NSStreamPropertyKey): Any? = null + + override fun setProperty(property: Any?, forKey: NSStreamPropertyKey) = false + + // WeakReference as delegate should not be retained + // https://developer.apple.com/documentation/foundation/nsstream/1418423-delegate + private var _delegate: WeakReference? = null + private var runLoop: NSRunLoop? = null + private var runLoopModes = listOf() + + private fun postEvent(event: NSStreamEvent) { + val runLoop = runLoop ?: return + runLoop.performInModes(runLoopModes) { + if (runLoop == this.runLoop) { + delegateOrSelf.stream(this, event) + } + } + } + + private fun checkBytes() { + val runLoop = runLoop ?: return + runLoop.performInModes(runLoopModes) { + if (runLoop != this.runLoop || isFinished) return@performInModes + val event = try { + if (source.exhausted()) { + status = NSStreamStatusAtEnd + NSStreamEventEndEncountered + } else { + NSStreamEventHasBytesAvailable + } + } catch (e: Exception) { + error = e.toNSError() + NSStreamEventErrorOccurred + } + delegateOrSelf.stream(this, event) + } + } + + override fun delegate() = _delegate?.value + + private val delegateOrSelf get() = delegate ?: this + + override fun setDelegate(delegate: NSStreamDelegateProtocol?) { + _delegate = delegate?.let { WeakReference(it) } + } + + override fun stream(aStream: NSStream, handleEvent: NSStreamEvent) { + // no-op + } + + override fun scheduleInRunLoop(aRunLoop: NSRunLoop, forMode: NSRunLoopMode) { + if (runLoop == null) { + runLoop = aRunLoop + } + if (runLoop == aRunLoop) { + runLoopModes += forMode + } + if (status == NSStreamStatusOpen) { + checkBytes() + } + } + + override fun removeFromRunLoop(aRunLoop: NSRunLoop, forMode: NSRunLoopMode) { + if (aRunLoop == runLoop) { + runLoopModes -= forMode + if (runLoopModes.isEmpty()) { + runLoop = null + } + } + } + + override fun description(): String = "$source.inputStream()" +} diff --git a/okio/src/appleMain/kotlin/okio/BuffersApple.kt b/okio/src/appleMain/kotlin/okio/BuffersApple.kt new file mode 100644 index 0000000000..a1814cc2b0 --- /dev/null +++ b/okio/src/appleMain/kotlin/okio/BuffersApple.kt @@ -0,0 +1,100 @@ +/* + * Copyright (C) 2020 Square, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +@file:OptIn(UnsafeNumber::class) + +package okio + +import kotlinx.cinterop.BetaInteropApi +import kotlinx.cinterop.CPointer +import kotlinx.cinterop.ExperimentalForeignApi +import kotlinx.cinterop.UnsafeNumber +import kotlinx.cinterop.addressOf +import kotlinx.cinterop.convert +import kotlinx.cinterop.plus +import kotlinx.cinterop.reinterpret +import kotlinx.cinterop.toKString +import kotlinx.cinterop.usePinned +import platform.Foundation.NSData +import platform.Foundation.create +import platform.Foundation.data +import platform.darwin.NSUIntegerMax +import platform.posix.errno +import platform.posix.malloc +import platform.posix.memcpy +import platform.posix.strerror +import platform.posix.uint8_tVar + +@OptIn(ExperimentalForeignApi::class) +internal fun Buffer.write(source: CPointer, maxLength: Int) { + require(maxLength >= 0) { "maxLength ($maxLength) must not be negative" } + + var currentOffset = 0 + while (currentOffset < maxLength) { + val tail = writableSegment(1) + + val toCopy = minOf(maxLength - currentOffset, Segment.SIZE - tail.limit) + tail.data.usePinned { + memcpy(it.addressOf(tail.limit), source + currentOffset, toCopy.convert()) + } + + currentOffset += toCopy + tail.limit += toCopy + } + size += maxLength +} + +internal fun Buffer.read(sink: CPointer, maxLength: Int): Int { + require(maxLength >= 0) { "maxLength ($maxLength) must not be negative" } + + val s = head ?: return 0 + val toCopy = minOf(maxLength, s.limit - s.pos) + s.data.usePinned { + memcpy(sink, it.addressOf(s.pos), toCopy.convert()) + } + + s.pos += toCopy + size -= toCopy.toLong() + + if (s.pos == s.limit) { + head = s.pop() + SegmentPool.recycle(s) + } + + return toCopy +} + +@OptIn(BetaInteropApi::class) +internal fun Buffer.snapshotAsNSData(): NSData { + if (size == 0L) return NSData.data() + + check(size.toULong() <= NSUIntegerMax) { "Buffer is too long ($size) to be converted into NSData." } + + val bytes = malloc(size.convert())?.reinterpret() + ?: throw Error("malloc failed: ${strerror(errno)?.toKString()}") + var curr = head + var index = 0 + do { + check(curr != null) { "Current segment is null" } + val pos = curr.pos + val length = curr.limit - pos + curr.data.usePinned { + memcpy(bytes + index, it.addressOf(pos), length.convert()) + } + curr = curr.next + index += length + } while (curr !== head) + return NSData.create(bytesNoCopy = bytes, length = size.convert()) +} diff --git a/okio/src/appleMain/kotlin/okio/NSInputStreamSource.kt b/okio/src/appleMain/kotlin/okio/NSInputStreamSource.kt new file mode 100644 index 0000000000..05eb935c91 --- /dev/null +++ b/okio/src/appleMain/kotlin/okio/NSInputStreamSource.kt @@ -0,0 +1,72 @@ +/* + * Copyright (C) 2020 Square, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package okio + +import kotlinx.cinterop.UnsafeNumber +import kotlinx.cinterop.addressOf +import kotlinx.cinterop.convert +import kotlinx.cinterop.reinterpret +import kotlinx.cinterop.usePinned +import platform.Foundation.NSInputStream +import platform.Foundation.NSStreamStatusClosed +import platform.Foundation.NSStreamStatusNotOpen +import platform.posix.uint8_tVar + +/** Returns a source that reads from `in`. */ +fun NSInputStream.source(): Source = NSInputStreamSource(this) + +@OptIn(UnsafeNumber::class) +private open class NSInputStreamSource( + private val input: NSInputStream, +) : Source { + + init { + if (input.streamStatus == NSStreamStatusNotOpen) input.open() + } + + override fun read(sink: Buffer, byteCount: Long): Long { + if (input.streamStatus == NSStreamStatusClosed) throw IOException("Stream Closed") + + if (byteCount == 0L) return 0L + require(byteCount >= 0L) { "byteCount < 0: $byteCount" } + + val tail = sink.writableSegment(1) + val maxToCopy = minOf(byteCount, Segment.SIZE - tail.limit) + val bytesRead = tail.data.usePinned { + val bytes = it.addressOf(tail.limit).reinterpret() + input.read(bytes, maxToCopy.convert()).toLong() + } + + if (bytesRead < 0L) throw IOException(input.streamError?.localizedDescription ?: "Unknown error") + if (bytesRead == 0L) { + if (tail.pos == tail.limit) { + // We allocated a tail segment, but didn't end up needing it. Recycle! + sink.head = tail.pop() + SegmentPool.recycle(tail) + } + return -1 + } + tail.limit += bytesRead.toInt() + sink.size += bytesRead + return bytesRead + } + + override fun close() = input.close() + + override fun timeout() = Timeout.NONE + + override fun toString() = "source($input)" +} diff --git a/okio/src/appleMain/kotlin/okio/NSOutputStreamSink.kt b/okio/src/appleMain/kotlin/okio/NSOutputStreamSink.kt new file mode 100644 index 0000000000..e7df3977f9 --- /dev/null +++ b/okio/src/appleMain/kotlin/okio/NSOutputStreamSink.kt @@ -0,0 +1,76 @@ +/* + * Copyright (C) 2020 Square, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package okio + +import kotlinx.cinterop.UnsafeNumber +import kotlinx.cinterop.addressOf +import kotlinx.cinterop.convert +import kotlinx.cinterop.reinterpret +import kotlinx.cinterop.usePinned +import platform.Foundation.NSOutputStream +import platform.Foundation.NSStreamStatusClosed +import platform.Foundation.NSStreamStatusNotOpen +import platform.posix.uint8_tVar + +/** Returns a sink that writes to `out`. */ +fun NSOutputStream.sink(): Sink = OutputStreamSink(this) + +@OptIn(UnsafeNumber::class) +private open class OutputStreamSink( + private val out: NSOutputStream, +) : Sink { + + init { + if (out.streamStatus == NSStreamStatusNotOpen) out.open() + } + + override fun write(source: Buffer, byteCount: Long) { + if (out.streamStatus == NSStreamStatusClosed) throw IOException("Stream Closed") + + checkOffsetAndCount(source.size, 0, byteCount) + var remaining = byteCount + while (remaining > 0) { + val head = source.head!! + val toCopy = minOf(remaining, head.limit - head.pos).toInt() + val bytesWritten = head.data.usePinned { + val bytes = it.addressOf(head.pos).reinterpret() + out.write(bytes, toCopy.convert()).toLong() + } + + if (bytesWritten < 0L) throw IOException(out.streamError?.localizedDescription ?: "Unknown error") + if (bytesWritten == 0L) throw IOException("NSOutputStream reached capacity") + + head.pos += bytesWritten.toInt() + remaining -= bytesWritten + source.size -= bytesWritten + + if (head.pos == head.limit) { + source.head = head.pop() + SegmentPool.recycle(head) + } + } + } + + override fun flush() { + // no-op + } + + override fun close() = out.close() + + override fun timeout(): Timeout = Timeout.NONE + + override fun toString() = "RawSink($out)" +} diff --git a/okio/src/appleTest/kotlin/okio/AppleByteStringTest.kt b/okio/src/appleTest/kotlin/okio/AppleByteStringTest.kt index 0e600e061f..d788de335e 100644 --- a/okio/src/appleTest/kotlin/okio/AppleByteStringTest.kt +++ b/okio/src/appleTest/kotlin/okio/AppleByteStringTest.kt @@ -17,11 +17,13 @@ package okio import kotlin.test.Test import kotlin.test.assertEquals +import kotlinx.cinterop.UnsafeNumber import platform.Foundation.NSData import platform.Foundation.NSString import platform.Foundation.NSUTF8StringEncoding import platform.Foundation.dataUsingEncoding +@OptIn(UnsafeNumber::class) class AppleByteStringTest { @Test fun nsDataToByteString() { val data = ("Hello" as NSString).dataUsingEncoding(NSUTF8StringEncoding) as NSData diff --git a/okio/src/appleTest/kotlin/okio/BufferedSinkNSOutputStreamTest.kt b/okio/src/appleTest/kotlin/okio/BufferedSinkNSOutputStreamTest.kt new file mode 100644 index 0000000000..d8ea1e5cfc --- /dev/null +++ b/okio/src/appleTest/kotlin/okio/BufferedSinkNSOutputStreamTest.kt @@ -0,0 +1,214 @@ +/* + * Copyright (C) 2020 Square, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package okio + +import kotlin.test.Test +import kotlin.test.assertContentEquals +import kotlin.test.assertEquals +import kotlin.test.assertFalse +import kotlin.test.assertNull +import kotlin.test.assertTrue +import kotlin.test.fail +import kotlinx.atomicfu.atomic +import kotlinx.cinterop.UnsafeNumber +import kotlinx.cinterop.addressOf +import kotlinx.cinterop.convert +import kotlinx.cinterop.reinterpret +import kotlinx.cinterop.usePinned +import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.sync.Mutex +import platform.CoreFoundation.CFRunLoopStop +import platform.Foundation.NSData +import platform.Foundation.NSDefaultRunLoopMode +import platform.Foundation.NSOutputStream +import platform.Foundation.NSStream +import platform.Foundation.NSStreamDataWrittenToMemoryStreamKey +import platform.Foundation.NSStreamDelegateProtocol +import platform.Foundation.NSStreamEvent +import platform.Foundation.NSStreamEventHasSpaceAvailable +import platform.Foundation.NSStreamEventOpenCompleted +import platform.Foundation.NSStreamStatusClosed +import platform.Foundation.NSStreamStatusNotOpen +import platform.Foundation.NSStreamStatusOpen +import platform.Foundation.NSThread +import platform.darwin.NSObject +import platform.darwin.NSUInteger +import platform.posix.uint8_tVar + +private fun NSOutputStream.write(vararg strings: String) { + for (str in strings) { + str.encodeToByteArray().apply { + assertEquals(size, this.write(this@write)) + } + } +} + +@OptIn(UnsafeNumber::class) +class BufferedSinkNSOutputStreamTest { + @Test + fun multipleWrites() { + val buffer = Buffer() + buffer.outputStream().apply { + open() + write("hello", " ", "world") + close() + } + assertEquals("hello world", buffer.readUtf8()) + + RealBufferedSink(buffer).outputStream().apply { + open() + write("hello", " ", "real", " sink") + close() + } + assertEquals("hello real sink", buffer.readUtf8()) + } + + @Test + fun bufferOutputStream() { + testOutputStream(Buffer(), "abc") + testOutputStream(Buffer(), "a" + "b".repeat(Segment.SIZE * 2) + "c") + } + + @Test + fun realBufferedSinkOutputStream() { + testOutputStream(RealBufferedSink(Buffer()), "abc") + testOutputStream(RealBufferedSink(Buffer()), "a" + "b".repeat(Segment.SIZE * 2) + "c") + } + + private fun testOutputStream(sink: BufferedSink, input: String) { + val out = sink.outputStream() + val byteArray = input.encodeToByteArray() + val size: NSUInteger = input.length.convert() + byteArray.usePinned { + val cPtr = it.addressOf(0).reinterpret() + + assertEquals(NSStreamStatusNotOpen, out.streamStatus) + assertEquals(-1, out.write(cPtr, size)) + out.open() + assertEquals(NSStreamStatusOpen, out.streamStatus) + + assertEquals(size.convert(), out.write(cPtr, size)) + sink.flush() + when (sink) { + is Buffer -> { + val data = out.propertyForKey(NSStreamDataWrittenToMemoryStreamKey) as NSData + assertContentEquals(byteArray, data.toByteArray()) + assertContentEquals(byteArray, sink.buffer.readByteArray()) + } + is RealBufferedSink -> assertContentEquals(byteArray, (sink.sink as Buffer).readByteArray()) + } + } + } + + @Test + fun nsOutputStreamClose() { + val buffer = Buffer() + val sink = RealBufferedSink(buffer) + assertFalse(sink.closed) + + val out = sink.outputStream() + out.open() + out.close() + assertTrue(sink.closed) + assertEquals(NSStreamStatusClosed, out.streamStatus) + + val byteArray = ByteArray(4) + byteArray.usePinned { + val cPtr = it.addressOf(0).reinterpret() + + assertEquals(-1, out.write(cPtr, 4U)) + assertNull(out.streamError) + assertTrue(sink.buffer.readByteArray().isEmpty()) + } + } + + @Test + fun delegateTest() { + val runLoop = startRunLoop() + + fun produceWithDelegate(out: NSOutputStream, data: String) { + val opened = Mutex(true) + val written = atomic(0) + val completed = Mutex(true) + + out.delegate = object : NSObject(), NSStreamDelegateProtocol { + val source = data.encodeToByteArray() + override fun stream(aStream: NSStream, handleEvent: NSStreamEvent) { + assertEquals("run-loop", NSThread.currentThread.name) + when (handleEvent) { + NSStreamEventOpenCompleted -> opened.unlock() + NSStreamEventHasSpaceAvailable -> { + if (source.isNotEmpty()) { + source.usePinned { + assertEquals( + data.length.convert(), + out.write(it.addressOf(written.value).reinterpret(), data.length.convert()), + ) + written.value += data.length + } + } + val writtenData = out.propertyForKey(NSStreamDataWrittenToMemoryStreamKey) as NSData + assertEquals(data, writtenData.toByteArray().decodeToString()) + out.close() + completed.unlock() + } + else -> fail("unexpected event ${handleEvent.asString()}") + } + } + } + out.scheduleInRunLoop(runLoop, NSDefaultRunLoopMode) + out.open() + runBlocking { + opened.lockWithTimeout() + completed.lockWithTimeout() + } + assertEquals(data.length, written.value) + } + + produceWithDelegate(Buffer().outputStream(), "custom") + produceWithDelegate(Buffer().outputStream(), "") + CFRunLoopStop(runLoop.getCFRunLoop()) + } + + @Test + fun testSubscribeAfterOpen() { + val runLoop = startRunLoop() + + fun subscribeAfterOpen(out: NSOutputStream) { + val available = Mutex(true) + + out.delegate = object : NSObject(), NSStreamDelegateProtocol { + override fun stream(aStream: NSStream, handleEvent: NSStreamEvent) { + assertEquals("run-loop", NSThread.currentThread.name) + when (handleEvent) { + NSStreamEventOpenCompleted -> fail("opened before subscribe") + NSStreamEventHasSpaceAvailable -> available.unlock() + else -> fail("unexpected event ${handleEvent.asString()}") + } + } + } + out.open() + out.scheduleInRunLoop(runLoop, NSDefaultRunLoopMode) + runBlocking { + available.lockWithTimeout() + } + out.close() + } + + subscribeAfterOpen(Buffer().outputStream()) + CFRunLoopStop(runLoop.getCFRunLoop()) + } +} diff --git a/okio/src/appleTest/kotlin/okio/BufferedSourceInputStreamTest.kt b/okio/src/appleTest/kotlin/okio/BufferedSourceInputStreamTest.kt new file mode 100644 index 0000000000..58e12c7319 --- /dev/null +++ b/okio/src/appleTest/kotlin/okio/BufferedSourceInputStreamTest.kt @@ -0,0 +1,300 @@ +/* + * Copyright (C) 2020 Square, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package okio + +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.assertFalse +import kotlin.test.assertNotEquals +import kotlin.test.assertNull +import kotlin.test.assertTrue +import kotlin.test.fail +import kotlinx.atomicfu.atomic +import kotlinx.atomicfu.locks.reentrantLock +import kotlinx.atomicfu.locks.withLock +import kotlinx.cinterop.UnsafeNumber +import kotlinx.cinterop.addressOf +import kotlinx.cinterop.convert +import kotlinx.cinterop.reinterpret +import kotlinx.cinterop.usePinned +import kotlinx.coroutines.delay +import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.sync.Mutex +import platform.CoreFoundation.CFRunLoopStop +import platform.Foundation.NSDefaultRunLoopMode +import platform.Foundation.NSInputStream +import platform.Foundation.NSStream +import platform.Foundation.NSStreamDelegateProtocol +import platform.Foundation.NSStreamEvent +import platform.Foundation.NSStreamEventEndEncountered +import platform.Foundation.NSStreamEventHasBytesAvailable +import platform.Foundation.NSStreamEventOpenCompleted +import platform.Foundation.NSStreamStatusClosed +import platform.Foundation.NSStreamStatusNotOpen +import platform.Foundation.NSStreamStatusOpen +import platform.Foundation.NSThread +import platform.darwin.NSObject +import platform.posix.uint8_tVar + +@OptIn(UnsafeNumber::class) +class BufferedSourceInputStreamTest { + @Test + fun bufferInputStream() { + val source = Buffer() + source.writeUtf8("abc") + testInputStream(source.inputStream()) + } + + @Test + fun realBufferedSourceInputStream() { + val source = Buffer() + source.writeUtf8("abc") + testInputStream(RealBufferedSource(source).inputStream()) + } + + private fun testInputStream(input: NSInputStream) { + val byteArray = ByteArray(4) + byteArray.usePinned { + val cPtr = it.addressOf(0).reinterpret() + + assertEquals(NSStreamStatusNotOpen, input.streamStatus) + assertEquals(-1, input.read(cPtr, 4U)) + input.open() + assertEquals(NSStreamStatusOpen, input.streamStatus) + + byteArray.fill(-5) + assertEquals(3, input.read(cPtr, 4U)) + assertEquals("[97, 98, 99, -5]", byteArray.contentToString()) + + byteArray.fill(-7) + assertEquals(0, input.read(cPtr, 4U)) + assertEquals("[-7, -7, -7, -7]", byteArray.contentToString()) + } + } + + @Test + fun bufferInputStreamLongData() { + val source = Buffer() + source.writeUtf8("a" + "b".repeat(Segment.SIZE * 2) + "c") + testInputStreamLongData(source.inputStream()) + } + + @Test + fun realBufferedSourceInputStreamLongData() { + val source = Buffer() + source.writeUtf8("a" + "b".repeat(Segment.SIZE * 2) + "c") + testInputStreamLongData(RealBufferedSource(source).inputStream()) + } + + private fun testInputStreamLongData(input: NSInputStream) { + val lengthPlusOne = Segment.SIZE * 2 + 3 + val byteArray = ByteArray(lengthPlusOne) + byteArray.usePinned { + val cPtr = it.addressOf(0).reinterpret() + + assertEquals(NSStreamStatusNotOpen, input.streamStatus) + assertEquals(-1, input.read(cPtr, lengthPlusOne.convert())) + input.open() + assertEquals(NSStreamStatusOpen, input.streamStatus) + + byteArray.fill(-5) + assertEquals(Segment.SIZE.convert(), input.read(cPtr, lengthPlusOne.convert())) + assertEquals("[97${", 98".repeat(Segment.SIZE - 1)}${", -5".repeat(Segment.SIZE + 3)}]", byteArray.contentToString()) + + byteArray.fill(-6) + assertEquals(Segment.SIZE.convert(), input.read(cPtr, lengthPlusOne.convert())) + assertEquals("[98${", 98".repeat(Segment.SIZE - 1)}${", -6".repeat(Segment.SIZE + 3)}]", byteArray.contentToString()) + + byteArray.fill(-7) + assertEquals(2, input.read(cPtr, lengthPlusOne.convert())) + assertEquals("[98, 99${", -7".repeat(Segment.SIZE * 2 + 1)}]", byteArray.contentToString()) + + byteArray.fill(-8) + assertEquals(0, input.read(cPtr, lengthPlusOne.convert())) + assertEquals("[-8${", -8".repeat(lengthPlusOne - 1)}]", byteArray.contentToString()) + } + } + + @Test + fun nsInputStreamClose() { + val buffer = Buffer() + buffer.writeUtf8("abc") + val source = RealBufferedSource(buffer) + assertFalse(source.closed) + + val input = source.inputStream() + input.open() + input.close() + assertTrue(source.closed) + assertEquals(NSStreamStatusClosed, input.streamStatus) + + val byteArray = ByteArray(4) + byteArray.usePinned { + val cPtr = it.addressOf(0).reinterpret() + + byteArray.fill(-5) + assertEquals(-1, input.read(cPtr, 4U)) + assertNull(input.streamError) + assertEquals("[-5, -5, -5, -5]", byteArray.contentToString()) + } + } + + @Test + fun delegateTest() { + val runLoop = startRunLoop() + + fun consumeWithDelegate(input: NSInputStream, data: String) { + val opened = Mutex(true) + val read = atomic(0) + val completed = Mutex(true) + + input.delegate = object : NSObject(), NSStreamDelegateProtocol { + val sink = ByteArray(data.length) + override fun stream(aStream: NSStream, handleEvent: NSStreamEvent) { + assertEquals("run-loop", NSThread.currentThread.name) + when (handleEvent) { + NSStreamEventOpenCompleted -> opened.unlock() + NSStreamEventHasBytesAvailable -> { + sink.usePinned { + assertEquals(1, input.read(it.addressOf(read.value).reinterpret(), 1U)) + read.value++ + } + } + NSStreamEventEndEncountered -> { + assertEquals(data, sink.decodeToString()) + input.close() + completed.unlock() + } + else -> fail("unexpected event ${handleEvent.asString()}") + } + } + } + input.scheduleInRunLoop(runLoop, NSDefaultRunLoopMode) + input.open() + runBlocking { + opened.lockWithTimeout() + completed.lockWithTimeout() + } + assertEquals(data.length, read.value) + } + + consumeWithDelegate(Buffer().apply { writeUtf8("custom") }.inputStream(), "custom") + consumeWithDelegate(Buffer().inputStream(), "") + CFRunLoopStop(runLoop.getCFRunLoop()) + } + + @Test + fun testRunLoopSwitch() { + val runLoop1 = startRunLoop("run-loop-1") + val runLoop2 = startRunLoop("run-loop-2") + + fun consumeSwitching(input: NSInputStream, data: String) { + val opened = Mutex(true) + val readLock = reentrantLock() + var read = 0 + val completed = Mutex(true) + + input.delegate = object : NSObject(), NSStreamDelegateProtocol { + val sink = ByteArray(data.length) + override fun stream(aStream: NSStream, handleEvent: NSStreamEvent) { + // Ensure thread safe access to `read` between scheduled run loops + readLock.withLock { + if (read == 0) { + // until first read + assertEquals("run-loop-1", NSThread.currentThread.name) + } else { + // after first read + assertEquals("run-loop-2", NSThread.currentThread.name) + } + when (handleEvent) { + NSStreamEventOpenCompleted -> opened.unlock() + NSStreamEventHasBytesAvailable -> { + if (read == 0) { + // switch to other run loop before first read + input.removeFromRunLoop(runLoop1, NSDefaultRunLoopMode) + input.scheduleInRunLoop(runLoop2, NSDefaultRunLoopMode) + } else if (read >= data.length - 3) { + // unsubscribe before last read + input.removeFromRunLoop(runLoop2, NSDefaultRunLoopMode) + } + sink.usePinned { + val readBytes = input.read(it.addressOf(read).reinterpret(), 3U) + assertNotEquals(0, readBytes) + read += readBytes.toInt() + } + if (read == data.length) { + assertEquals(data, sink.decodeToString()) + completed.unlock() + } + } + NSStreamEventEndEncountered -> fail("$data shouldn't be subscribed") + else -> fail("unexpected event ${handleEvent.asString()}") + } + } + } + } + input.scheduleInRunLoop(runLoop1, NSDefaultRunLoopMode) + input.open() + runBlocking { + opened.lockWithTimeout() + completed.lockWithTimeout() + // wait a bit to be sure delegate is no longer called + delay(200) + } + input.close() + } + + consumeSwitching(Buffer().apply { writeUtf8("custom") }.inputStream(), "custom") + CFRunLoopStop(runLoop1.getCFRunLoop()) + CFRunLoopStop(runLoop2.getCFRunLoop()) + } + + @Test + fun testSubscribeAfterOpen() { + val runLoop = startRunLoop() + + fun subscribeAfterOpen(input: NSInputStream, data: String) { + val available = Mutex(true) + + input.delegate = object : NSObject(), NSStreamDelegateProtocol { + override fun stream(aStream: NSStream, handleEvent: NSStreamEvent) { + assertEquals("run-loop", NSThread.currentThread.name) + when (handleEvent) { + NSStreamEventOpenCompleted -> fail("opened before subscribe") + NSStreamEventHasBytesAvailable -> { + val sink = ByteArray(data.length) + sink.usePinned { + assertEquals(data.length.convert(), input.read(it.addressOf(0).reinterpret(), data.length.convert())) + } + assertEquals(data, sink.decodeToString()) + input.close() + available.unlock() + } + else -> fail("unexpected event ${handleEvent.asString()}") + } + } + } + input.open() + input.scheduleInRunLoop(runLoop, NSDefaultRunLoopMode) + runBlocking { + available.lockWithTimeout() + } + } + + subscribeAfterOpen(Buffer().apply { writeUtf8("custom") }.inputStream(), "custom") + CFRunLoopStop(runLoop.getCFRunLoop()) + } +} diff --git a/okio/src/appleTest/kotlin/okio/NSInputStreamSourceTest.kt b/okio/src/appleTest/kotlin/okio/NSInputStreamSourceTest.kt new file mode 100644 index 0000000000..072849ed6a --- /dev/null +++ b/okio/src/appleTest/kotlin/okio/NSInputStreamSourceTest.kt @@ -0,0 +1,97 @@ +/* + * Copyright (C) 2020 Square, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package okio + +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.assertFailsWith +import okio.Path.Companion.toPath +import platform.Foundation.NSInputStream +import platform.Foundation.NSTemporaryDirectory +import platform.Foundation.NSURL +import platform.Foundation.NSUUID + +class NSInputStreamSourceTest { + @Test + fun nsInputStreamSource() { + val input = NSInputStream(data = byteArrayOf(0x61).toNSData()) + val source = input.source() + val buffer = Buffer() + assertEquals(1, source.read(buffer, 1L)) + assertEquals("a", buffer.readUtf8()) + } + + @OptIn(ExperimentalStdlibApi::class) + @Test + fun nsInputStreamSourceFromFile() { + val file = "${NSTemporaryDirectory()}${NSUUID().UUIDString()}" + try { + FileSystem.SYSTEM.write(file.toPath()) { + writeUtf8("example") + } + + val input = NSInputStream(uRL = NSURL.fileURLWithPath(file)) + val source = input.source() + val buffer = Buffer() + assertEquals(7, source.read(buffer, 10)) + assertEquals("example", buffer.readUtf8()) + } finally { + FileSystem.SYSTEM.delete(file.toPath(), false) + } + } + + @Test + fun sourceFromInputStream() { + val input = NSInputStream(data = ("a" + "b".repeat(Segment.SIZE * 2) + "c").encodeToByteArray().toNSData()) + + // Source: ab...bc + val source: Source = input.source() + val sink = Buffer() + + // Source: b...bc. Sink: abb. + assertEquals(3, source.read(sink, 3)) + assertEquals("abb", sink.readUtf8(3)) + + // Source: b...bc. Sink: b...b. + assertEquals(Segment.SIZE.toLong(), source.read(sink, 20000)) + assertEquals("b".repeat(Segment.SIZE), sink.readUtf8()) + + // Source: b...bc. Sink: b...bc. + assertEquals((Segment.SIZE - 1).toLong(), source.read(sink, 20000)) + assertEquals("b".repeat(Segment.SIZE - 2) + "c", sink.readUtf8()) + + // Source and sink are empty. + assertEquals(-1, source.read(sink, 1)) + } + + @Test + fun sourceFromInputStreamWithSegmentSize() { + val input = NSInputStream(data = ByteArray(Segment.SIZE).toNSData()) + val source = input.source() + val sink = Buffer() + + assertEquals(Segment.SIZE.toLong(), source.read(sink, Segment.SIZE.toLong())) + assertEquals(-1, source.read(sink, Segment.SIZE.toLong())) + + assertNoEmptySegments(sink) + } + + @Test + fun sourceFromInputStreamBounds() { + val source = NSInputStream(data = ByteArray(100).toNSData()).source() + assertFailsWith { source.read(Buffer(), -1) } + } +} diff --git a/okio/src/appleTest/kotlin/okio/NSOutputStreamSinkTest.kt b/okio/src/appleTest/kotlin/okio/NSOutputStreamSinkTest.kt new file mode 100644 index 0000000000..4519546a16 --- /dev/null +++ b/okio/src/appleTest/kotlin/okio/NSOutputStreamSinkTest.kt @@ -0,0 +1,65 @@ +/* + * Copyright (C) 2020 Square, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package okio + +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlinx.cinterop.ByteVar +import kotlinx.cinterop.UnsafeNumber +import kotlinx.cinterop.get +import kotlinx.cinterop.reinterpret +import platform.Foundation.NSData +import platform.Foundation.NSOutputStream +import platform.Foundation.NSStreamDataWrittenToMemoryStreamKey +import platform.Foundation.outputStreamToMemory + +class NSOutputStreamSinkTest { + @Test + @OptIn(UnsafeNumber::class) + fun nsOutputStreamSink() { + val out = NSOutputStream.outputStreamToMemory() + val sink = out.sink() + val buffer = Buffer().apply { + writeUtf8("a") + } + sink.write(buffer, 1L) + val data = out.propertyForKey(NSStreamDataWrittenToMemoryStreamKey) as NSData + assertEquals(1U, data.length) + val bytes = data.bytes!!.reinterpret() + assertEquals(0x61, bytes[0]) + } + + @Test + fun sinkFromOutputStream() { + val data = Buffer().apply { + writeUtf8("a") + writeUtf8("b".repeat(9998)) + writeUtf8("c") + } + val out = NSOutputStream.outputStreamToMemory() + val sink = out.sink() + + sink.write(data, 3) + val outData = out.propertyForKey(NSStreamDataWrittenToMemoryStreamKey) as NSData + val outString = outData.toByteArray().decodeToString() + assertEquals("abb", outString) + + sink.write(data, data.size) + val outData2 = out.propertyForKey(NSStreamDataWrittenToMemoryStreamKey) as NSData + val outString2 = outData2.toByteArray().decodeToString() + assertEquals("a" + "b".repeat(9998) + "c", outString2) + } +} diff --git a/okio/src/appleTest/kotlin/okio/TestUtil.kt b/okio/src/appleTest/kotlin/okio/TestUtil.kt new file mode 100644 index 0000000000..be06d742c4 --- /dev/null +++ b/okio/src/appleTest/kotlin/okio/TestUtil.kt @@ -0,0 +1,99 @@ +@file:OptIn(UnsafeNumber::class, BetaInteropApi::class) + +package okio + +import kotlin.test.assertTrue +import kotlin.test.fail +import kotlin.time.Duration +import kotlin.time.Duration.Companion.seconds +import kotlinx.cinterop.BetaInteropApi +import kotlinx.cinterop.UnsafeNumber +import kotlinx.cinterop.addressOf +import kotlinx.cinterop.convert +import kotlinx.cinterop.refTo +import kotlinx.cinterop.reinterpret +import kotlinx.cinterop.usePinned +import kotlinx.coroutines.TimeoutCancellationException +import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.withTimeout +import platform.Foundation.NSData +import platform.Foundation.NSDefaultRunLoopMode +import platform.Foundation.NSMachPort +import platform.Foundation.NSOutputStream +import platform.Foundation.NSRunLoop +import platform.Foundation.NSStreamEvent +import platform.Foundation.NSStreamEventEndEncountered +import platform.Foundation.NSStreamEventErrorOccurred +import platform.Foundation.NSStreamEventHasBytesAvailable +import platform.Foundation.NSStreamEventHasSpaceAvailable +import platform.Foundation.NSStreamEventNone +import platform.Foundation.NSStreamEventOpenCompleted +import platform.Foundation.NSThread +import platform.Foundation.create +import platform.Foundation.data +import platform.Foundation.run +import platform.posix.memcpy + +internal fun ByteArray.toNSData() = if (isNotEmpty()) { + usePinned { + NSData.create(bytes = it.addressOf(0), length = size.convert()) + } +} else { + NSData.data() +} + +fun NSData.toByteArray() = ByteArray(length.toInt()).apply { + if (isNotEmpty()) { + memcpy(refTo(0), bytes, length) + } +} + +fun startRunLoop(name: String = "run-loop"): NSRunLoop { + val created = Mutex(true) + lateinit var runLoop: NSRunLoop + val thread = NSThread { + runLoop = NSRunLoop.currentRunLoop + runLoop.addPort(NSMachPort.port(), NSDefaultRunLoopMode) + created.unlock() + runLoop.run() + } + thread.name = name + thread.start() + runBlocking { + created.lockWithTimeout() + } + return runLoop +} + +suspend fun Mutex.lockWithTimeout(timeout: Duration = 5.seconds) { + class MutexSource : Throwable() + val source = MutexSource() + try { + withTimeout(timeout) { lock() } + } catch (e: TimeoutCancellationException) { + fail("Mutex never unlocked", source) + } +} + +fun NSStreamEvent.asString(): String { + return when (this) { + NSStreamEventNone -> "NSStreamEventNone" + NSStreamEventOpenCompleted -> "NSStreamEventOpenCompleted" + NSStreamEventHasBytesAvailable -> "NSStreamEventHasBytesAvailable" + NSStreamEventHasSpaceAvailable -> "NSStreamEventHasSpaceAvailable" + NSStreamEventErrorOccurred -> "NSStreamEventErrorOccurred" + NSStreamEventEndEncountered -> "NSStreamEventEndEncountered" + else -> "Unknown event $this" + } +} + +fun ByteArray.write(to: NSOutputStream): Int { + this.usePinned { + return to.write(it.addressOf(0).reinterpret(), size.convert()).convert() + } +} + +fun assertNoEmptySegments(buffer: Buffer) { + assertTrue(segmentSizes(buffer).all { it != 0 }, "Expected all segments to be non-empty") +} diff --git a/okio/src/jvmMain/kotlin/okio/RealBufferedSource.kt b/okio/src/jvmMain/kotlin/okio/RealBufferedSource.kt index a313fcd1e9..2d1c7d00be 100644 --- a/okio/src/jvmMain/kotlin/okio/RealBufferedSource.kt +++ b/okio/src/jvmMain/kotlin/okio/RealBufferedSource.kt @@ -81,10 +81,7 @@ internal actual class RealBufferedSource actual constructor( commonRead(sink, offset, byteCount) override fun read(sink: ByteBuffer): Int { - if (buffer.size == 0L) { - val read = source.read(buffer, Segment.SIZE.toLong()) - if (read == -1L) return -1 - } + if (exhausted()) return -1 return buffer.read(sink) } @@ -148,10 +145,7 @@ internal actual class RealBufferedSource actual constructor( return object : InputStream() { override fun read(): Int { if (closed) throw IOException("closed") - if (buffer.size == 0L) { - val count = source.read(buffer, Segment.SIZE.toLong()) - if (count == -1L) return -1 - } + if (exhausted()) return -1 return buffer.readByte() and 0xff } @@ -159,10 +153,7 @@ internal actual class RealBufferedSource actual constructor( if (closed) throw IOException("closed") checkOffsetAndCount(data.size.toLong(), offset.toLong(), byteCount.toLong()) - if (buffer.size == 0L) { - val count = source.read(buffer, Segment.SIZE.toLong()) - if (count == -1L) return -1 - } + if (exhausted()) return -1 return buffer.read(data, offset, byteCount) }