Skip to content

Commit

Permalink
Implementation of ROLIE feeds (#108)
Browse files Browse the repository at this point in the history
* Implementation of ROLIE feeds

* Fetching from ROLIE feed

* Added more documentation

* Simplify document streaming logic and update tests

---------

Co-authored-by: milux <[email protected]>
  • Loading branch information
oxisto and milux authored Nov 13, 2024
1 parent 11dba0c commit 14a0e40
Show file tree
Hide file tree
Showing 7 changed files with 536 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package io.github.csaf.sbom.retrieval
import io.github.csaf.sbom.schema.generated.Aggregator
import io.github.csaf.sbom.schema.generated.Csaf
import io.github.csaf.sbom.schema.generated.Provider
import io.github.csaf.sbom.schema.generated.ROLIEFeed
import io.ktor.client.*
import io.ktor.client.call.*
import io.ktor.client.engine.*
Expand Down Expand Up @@ -96,6 +97,19 @@ class CsafLoader(engine: HttpClientEngine = Java.create()) {
suspend fun fetchDocument(url: String, ctx: RetrievalContext): Result<Csaf> =
Result.of { get<Csaf>(url, ctx.responseCallback()).also(ctx.jsonCallback()) }

/**
* Fetch and parse a ROLE feed from a given URL.
*
* @param url the URL where the ROLIE feed is found
* @param responseCallback An optional callback to further evaluate the [HttpResponse].
* @return The resulting [ROLIEFeed], wrapped in a [Result] monad, if successful. A failed
* [Result] wrapping the thrown [Throwable] in case of an error.
*/
suspend fun fetchROLIEFeed(
url: String,
responseCallback: ((HttpResponse) -> Unit)? = null
): Result<ROLIEFeed> = Result.of { get(url, responseCallback ?: {}) }

/**
* Fetch an arbitrary URL's content as plain text [String], falling back to UTF-8 if no charset
* is provided.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ fun main(args: Array<String>) {
RetrievedProvider.from(args[0])
.onSuccess { provider ->
println("Discovered provider-metadata.json @ ${provider.json.canonical_url}")
println("Expected documents: ${provider.countExpectedDocuments()}")
// Retrieve all documents from all feeds. Note: we currently only support index.txt
for (result in provider.fetchDocuments()) {
result.onSuccess { doc ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,22 @@ import io.github.csaf.sbom.retrieval.roles.CSAFProviderRole
import io.github.csaf.sbom.retrieval.roles.CSAFPublisherRole
import io.github.csaf.sbom.retrieval.roles.CSAFTrustedProviderRole
import io.github.csaf.sbom.schema.generated.Provider
import io.github.csaf.sbom.schema.generated.Provider.Feed
import io.github.csaf.sbom.schema.generated.ROLIEFeed
import java.util.*
import java.util.concurrent.CompletableFuture
import java.util.stream.Stream
import java.util.stream.StreamSupport
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.produce
import kotlinx.coroutines.channels.toList
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.future.future

/**
* This class represents a "retrieved" provider (i.e., the roles "publisher", "provider" and
* "trusted provider"), including its metadata (in the form of [Provider]) as well as functionality
* to retrieve its documents.
*/
@OptIn(ExperimentalCoroutinesApi::class)
class RetrievedProvider(val json: Provider) : Validatable {

/**
Expand All @@ -51,21 +52,22 @@ class RetrievedProvider(val json: Provider) : Validatable {
}

/**
* This function fetches all directory indices referenced by this provider.
* This function fetches all directory indices referenced by this provider and sends them to a
* [ReceiveChannel].
*
* @param loader The instance of [CsafLoader] used for fetching of online resources.
* @param channelCapacity The capacity of the channels used to buffer parallel fetches. Defaults
* to [DEFAULT_CHANNEL_CAPACITY].
* @return The fetched [Result]s, representing index contents or fetch errors.
* @return A [ReceiveChannel] containing the fetched [Result]s, representing index contents or
* fetch errors.
*/
@OptIn(ExperimentalCoroutinesApi::class)
fun fetchDocumentIndices(
loader: CsafLoader = lazyLoader,
channelCapacity: Int = DEFAULT_CHANNEL_CAPACITY
): ReceiveChannel<Pair<String, Result<String>>> {
@Suppress("SimpleRedundantLet")
val directoryUrls =
(json.distributions ?: emptySet()).mapNotNull { distribution ->
@Suppress("SimpleRedundantLet")
distribution.directory_url?.let { it.toString().trimEnd('/') }
}
// This channel collects up to `channelCapacity` directory indices concurrently.
Expand All @@ -83,6 +85,37 @@ class RetrievedProvider(val json: Provider) : Validatable {
}
}

/**
* This function fetches all ROLIE feeds referenced by this provider and sends them to a
* [ReceiveChannel].
*
* @param loader The instance of [CsafLoader] used for fetching of online resources.
* @param channelCapacity The capacity of the channels used to buffer parallel fetches. Defaults
* to [DEFAULT_CHANNEL_CAPACITY].
* @return A [ReceiveChannel] containing the fetched [Result]s, representing ROLIE feeds'
* contents (as [ROLIEFeed]) or fetch errors.
*/
fun fetchRolieFeeds(
loader: CsafLoader = lazyLoader,
channelCapacity: Int = DEFAULT_CHANNEL_CAPACITY
): ReceiveChannel<Pair<Feed, Result<ROLIEFeed>>> {
val feeds = json.distributions?.mapNotNull { it.rolie }?.flatMap { it.feeds } ?: listOf()

// This channel collects up to `channelCapacity` feeds concurrently.
val rolieChannel =
ioScope.produce(capacity = channelCapacity) {
for (feed in feeds) {
send(feed to async { loader.fetchROLIEFeed(feed.url.toString()) })
}
}
// This terminal channel is a simple "rendezvous channel" for awaiting the Results.
return ioScope.produce {
for ((feed, feedDeferred) in rolieChannel) {
send(feed to feedDeferred.await())
}
}
}

/**
* This function sums up the expected number of [RetrievedDocument]s that will be fetched from
* this Provider.
Expand All @@ -92,22 +125,11 @@ class RetrievedProvider(val json: Provider) : Validatable {
* to [DEFAULT_CHANNEL_CAPACITY].
* @return The expected number of [RetrievedDocument]s provided.
*/
@OptIn(ExperimentalCoroutinesApi::class)
suspend fun countExpectedDocuments(
loader: CsafLoader = lazyLoader,
channelCapacity: Int = DEFAULT_CHANNEL_CAPACITY
): Int {
val indexChannel = fetchDocumentIndices(loader, channelCapacity)
// This second channel collects up to `channelCapacity` Results concurrently, which
// represent CSAF Documents or errors from fetching or validation.
val documentCountChannel =
ioScope.produce(capacity = channelCapacity) {
for ((_, indexResult) in indexChannel) {
indexResult.onSuccess { send(it.lines().size) }
}
}
// This terminal channel is a simple "rendezvous channel" for awaiting the Results.
return documentCountChannel.toList().sum()
return fetchAllDocumentUrls(loader, channelCapacity).toList().filter { it.isSuccess }.size
}

/**
Expand All @@ -123,35 +145,15 @@ class RetrievedProvider(val json: Provider) : Validatable {
loader: CsafLoader = lazyLoader,
channelCapacity: Int = DEFAULT_CHANNEL_CAPACITY
): ReceiveChannel<Result<RetrievedDocument>> {
val indexChannel = fetchDocumentIndices(loader, channelCapacity)
val documentUrlChannel = fetchAllDocumentUrls(loader, channelCapacity)
// This second channel collects up to `channelCapacity` Results concurrently, which
// represent CSAF Documents or errors from fetching or validation.
val documentJobChannel =
ioScope.produce<Deferred<Result<RetrievedDocument>>>(capacity = channelCapacity) {
for ((directoryUrl, indexResult) in indexChannel) {
indexResult.fold(
{ index ->
index.lines().map { line ->
send(
async {
val csafUrl = "$directoryUrl/$line"
RetrievedDocument.from(csafUrl, loader, role)
}
)
}
},
{ e ->
send(
async {
Result.failure(
Exception(
"Failed to fetch index.txt from directory at $directoryUrl",
e
)
)
}
)
}
for (result in documentUrlChannel) {
result.fold(
{ send(async { RetrievedDocument.from(it, loader, role) }) },
{ send(async { Result.failure(it) }) }
)
}
}
Expand All @@ -163,6 +165,93 @@ class RetrievedProvider(val json: Provider) : Validatable {
}
}

/**
* Returns a channel that produces all URLs from ROLIE feeds and directory indices without
* duplicates.
*
* @param loader The instance of [CsafLoader] used for fetching of online resources.
* @param channelCapacity The capacity of the channels used to buffer parallel fetches. Defaults
* to [DEFAULT_CHANNEL_CAPACITY].
* @return
*/
@OptIn(ExperimentalCoroutinesApi::class)
fun fetchAllDocumentUrls(
loader: CsafLoader = lazyLoader,
channelCapacity: Int = DEFAULT_CHANNEL_CAPACITY
): ReceiveChannel<Result<String>> {
val urlResultChannel =
ioScope.produce(capacity = channelCapacity) {
fetchDocumentUrlsFromIndices(fetchDocumentIndices(loader, channelCapacity))
fetchDocumentUrlsFromRolieFeeds(fetchRolieFeeds(loader, channelCapacity))
}
return ioScope.produce {
val seenUrls = mutableSetOf<String>()
for (urlResult in urlResultChannel) {
urlResult.fold(
{
if (seenUrls.add(it)) {
send(Result.success(it))
}
},
{ send(Result.failure(it)) }
)
}
}
}

/**
* Sends the URLs obtained from the [indexChannel] to the given [ProducerScope].
*
* @param indexChannel The source channel providing directory index data.
* @receiver urlChannel The target channel where URLs are sent to.
*/
private suspend fun SendChannel<Result<String>>.fetchDocumentUrlsFromIndices(
indexChannel: ReceiveChannel<Pair<String, Result<String>>>
) {
for ((directoryUrl, indexResult) in indexChannel) {
indexResult.fold(
{ index ->
index.lines().map { line -> send(Result.success("$directoryUrl/$line")) }
},
{ e ->
send(
Result.failure(
Exception(
"Failed to fetch index.txt from directory at $directoryUrl",
e
)
)
)
}
)
}
}

/**
* Sends the URLs obtained from the [rolieChannel] feeds to the given [ProducerScope].
*
* @param rolieChannel The source channel providing ROLIE feed data.
* @receiver urlChannel The target channel where URLs are sent to.
*/
private suspend fun SendChannel<Result<String>>.fetchDocumentUrlsFromRolieFeeds(
rolieChannel: ReceiveChannel<Pair<Feed, Result<ROLIEFeed>>>
) {
for ((feed, rolieResult) in rolieChannel) {
rolieResult.fold(
{ rolie ->
rolie.feed.entry.map { entry ->
send(Result.success(entry.content.src.toString()))
}
},
{ e ->
send(
Result.failure(Exception("Failed to fetch ROLIE feed from ${feed.url}", e))
)
}
)
}
}

/**
* This method provides the [Result]s of [fetchDocuments] as a Java [Stream] for usage in
* non-Kotlin environments.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,4 +102,13 @@ class CsafLoaderTest {
}
assertFalse { result.isSuccess }
}

@Test
fun testFetchROLIEFeed() = runTest {
val result =
loader.fetchROLIEFeed("does-not-really-exist.json") {
assertSame(HttpStatusCode.NotFound, it.status)
}
assertFalse { result.isSuccess }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@ import io.github.csaf.sbom.validation.ValidationException
import kotlin.test.*
import kotlinx.coroutines.channels.toList
import kotlinx.coroutines.test.runTest
import org.junit.jupiter.api.assertThrows

class RetrievedProviderTest {
init {
CsafLoader.defaultLoaderFactory = { CsafLoader(mockEngine()) }
}

@Test fun testRetrievedProviderFrom() = runTest { providerTest("example.com") }
@Test fun testRetrievedProviderFrom() = runTest { providerTest("example.com", 4) }

@Test
fun testRetrievedProviderFromSecurityTxt() = runTest {
Expand Down Expand Up @@ -70,16 +71,20 @@ class RetrievedProviderTest {
assertFalse(documentIndexResults[1].second.isSuccess)
}

private suspend fun providerTest(domain: String) {
@Test
fun testFetchRolieFeeds() = runTest {
val provider = RetrievedProvider.from("example.com").getOrThrow()
val rolieFeedsResults = provider.fetchRolieFeeds().toList()
assertEquals(1, rolieFeedsResults.size, "Expected exactly 1 result: One parsed ROLIE feed")
assertTrue(rolieFeedsResults[0].second.isSuccess)
}

private suspend fun providerTest(domain: String, numResults: Int = 5) {
val provider = RetrievedProvider.from(domain).getOrThrow()
val expectedDocumentCount = provider.countExpectedDocuments()
assertEquals(3, expectedDocumentCount, "Expected 3 documents")
val documentResults = provider.fetchDocuments().toList()
assertEquals(
4,
documentResults.size,
"Expected exactly 4 results: One document, two document errors, one index error"
)
assertEquals(numResults, documentResults.size, "Expected exactly $numResults results")
// Check some random property on successful document
assertEquals(
"Bundesamt für Sicherheit in der Informationstechnik",
Expand All @@ -106,4 +111,28 @@ class RetrievedProviderTest {
documentResults[3].exceptionOrNull()?.message
)
}

@Test
fun testFetchAllDocumentUrls() = runTest {
val provider = RetrievedProvider.from("example.com").getOrThrow()
val urlResults = provider.fetchAllDocumentUrls().toList()

assertEquals(4, urlResults.size, "Expected exactly 4 results")
assertEquals(
"https://example.com/directory/2022/bsi-2022-0001.json",
urlResults[0].getOrThrow()
)
assertEquals(
"https://example.com/directory/2022/bsi-2022_2-01.json",
urlResults[1].getOrThrow()
)
assertEquals(
"https://example.com/directory/2024/does-not-exist.json",
urlResults[2].getOrThrow()
)
assertEquals(
"Failed to fetch index.txt from directory at https://example.com/invalid-directory",
(assertThrows<Exception> { urlResults[3].getOrThrow() }).message
)
}
}
Loading

0 comments on commit 14a0e40

Please sign in to comment.