From 398e278999924d5f1d30b79d29210fe863ed352b Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Wed, 13 Sep 2023 16:36:09 +0100 Subject: [PATCH] refactor!: Cleanup CosmosClientFactory --- CHANGELOG.md | 1 + DOCUMENTATION.md | 11 +- README.md | 20 +-- docker-compose.yml | 2 +- src/Equinox.CosmosStore/CosmosStore.fs | 122 ++++++++---------- .../CosmosFixtures.fs | 4 +- tools/Equinox.Tool/Program.fs | 6 +- 7 files changed, 74 insertions(+), 92 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 94a9c13e2..fa6a66044 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -55,6 +55,7 @@ The `Unreleased` section name is replaced by the expected version of next releas - `CosmosStore`: Only log `bytes` when log level is `Debug` [#305](https://github.com/jet/equinox/pull/305) - `CosmosStore.AccessStrategy.MultiSnapshot`,`Custom`: Change `list` and `seq` types to `array` [#338](https://github.com/jet/equinox/pull/338) - `CosmosStore.Core.Initialization.initAux`: Replace hard-coded manual 400 RU with `mode` parameter [#328](https://github.com/jet/equinox/pull/328) :pray: [@brihadish](https://github.com/brihadish) +- `CosmosStore.CosmosClientFactory`: Moved to Core [#430](https://github.com/jet/equinox/pull/430) - `EventStore`: Target `EventStore.Client` v `22.0.0-preview`; rename `Connector` -> `EventStoreConnector` [#317](https://github.com/jet/equinox/pull/317) - `Tool`/`samples/`: switched to use `Equinox.EventStoreDb` [#196](https://github.com/jet/equinox/pull/196) diff --git a/DOCUMENTATION.md b/DOCUMENTATION.md index 4b4149397..6e516ac67 100755 --- a/DOCUMENTATION.md +++ b/DOCUMENTATION.md @@ -1840,7 +1840,6 @@ following key benefits: ### Example Code ```fsharp - open Equinox.CosmosStore.Core // open MyCodecs.Json // example of using specific codec which can yield UTF-8 // byte arrays from a type using `Json.toBytes` via Fleece @@ -1852,22 +1851,20 @@ type EventData with // Load connection string from your Key Vault (example here is the CosmosDB // simulator's well known key) -// see https://github.com/jet/equinox-provisioning-cosmosdb +// see https://github.com/jet/equinox#provisioning-cosmosdb let connectionString: string = "AccountEndpoint=https://localhost:8081;AccountKey=C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==;" -// Forward to Log (you can use `Log.Logger` and/or `Log.ForContext` if your app -// uses Serilog already) -let outputLog = LoggerConfiguration().WriteTo.NLog().CreateLogger() +// Forward to Log (use `Log.Logger` if your app already uses Serilog) +let outputLog = LoggerConfiguration().WriteTo.Console().CreateLogger() // Serilog has a `ForContext()`, but if you are using a `module` for the // wiring, you might create a tagged logger like this: let gatewayLog = outputLog.ForContext(Serilog.Core.Constants.SourceContextPropertyName, "Equinox") -let discovery = Discovery.ConnectionString (read "EQUINOX_COSMOS_CONNECTION") let connector: Equinox.CosmosStore.CosmosStoreConnector = CosmosStoreConnector( - discovery, + Equinox.CosmosStore.Discovery.ConnectionString connectionString, requestTimeout = TimeSpan.FromSeconds 5., maxRetryAttemptsOnRateLimitedRequests = 1, maxRetryWaitTimeOnRateLimitedRequests = TimeSpan.FromSeconds 3.) diff --git a/README.md b/README.md index bbb18d71d..2498594b8 100644 --- a/README.md +++ b/README.md @@ -665,12 +665,14 @@ For more complete instructions, follow https://developers.eventstore.com/server/ #### Using Azure Cosmos DB Service - dotnet run --project tools/Equinox.Tool -- init -ru 400 ` - cosmos -s $env:EQUINOX_COSMOS_CONNECTION -d $env:EQUINOX_COSMOS_DATABASE -c $env:EQUINOX_COSMOS_CONTAINER - # Same for a Archive Container for integration testing of the archive store fallback mechanism - $env:EQUINOX_COSMOS_CONTAINER_ARCHIVE="equinox-test-archive" - dotnet run --project tools/Equinox.Tool -- init -ru 400 ` - cosmos -s $env:EQUINOX_COSMOS_CONNECTION -d $env:EQUINOX_COSMOS_DATABASE -c $env:EQUINOX_COSMOS_CONTAINER_ARCHIVE +```bash +dotnet run --project tools/Equinox.Tool -- init -ru 400 ` + cosmos -s $env:EQUINOX_COSMOS_CONNECTION -d $env:EQUINOX_COSMOS_DATABASE -c $env:EQUINOX_COSMOS_CONTAINER +# Same for a Archive Container for integration testing of the archive store fallback mechanism +$env:EQUINOX_COSMOS_CONTAINER_ARCHIVE="equinox-test-archive" +dotnet run --project tools/Equinox.Tool -- init -ru 400 ` + cosmos -s $env:EQUINOX_COSMOS_CONNECTION -d $env:EQUINOX_COSMOS_DATABASE -c $env:EQUINOX_COSMOS_CONTAINER_ARCHIVE +``` #### Using Cosmos Emulator on an Intel Mac @@ -678,8 +680,10 @@ NOTE There's [no Apple Silicon emulator available as yet](https://github.com/Azu NOTE Have not tested with the Windows Emulator, but it should work with analogous steps. - docker compose up equinox-cosmos -d - bash docker-compose-cosmos.sh +```bash +docker compose up equinox-cosmos -d +bash docker-compose-cosmos.sh +``` ### Provisioning SqlStreamStore diff --git a/docker-compose.yml b/docker-compose.yml index 89c07cad6..90274af04 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -13,7 +13,7 @@ services: - AZURE_COSMOS_EMULATOR_IP_ADDRESS_OVERRIDE=127.0.0.1 ports: - "8081:8081" # so docker-cosmos-init.sh can get the cert and/or humans can use https://localhost:8081/_explorer/index.html - - "10250-10256:10250-10256" # tests connect using Direct mode + - "10250-10255:10250-10255" # tests connect using Direct mode equinox-mssql: container_name: equinox-mssql diff --git a/src/Equinox.CosmosStore/CosmosStore.fs b/src/Equinox.CosmosStore/CosmosStore.fs index ae809a2df..86164f321 100644 --- a/src/Equinox.CosmosStore/CosmosStore.fs +++ b/src/Equinox.CosmosStore/CosmosStore.fs @@ -1107,6 +1107,35 @@ module ConnectionString = | true, (:? string as s) when not (String.IsNullOrEmpty s) -> s | _ -> invalidOp "Connection string does not contain an \"AccountEndpoint\"" +[] +type DiscoveryMode = + | AccountUriAndKey of accountUri: string * key: string + | ConnectionString of connectionString: string + member x.Endpoint = x |> function + | DiscoveryMode.AccountUriAndKey (u, _k) -> u + | DiscoveryMode.ConnectionString (ConnectionString.AccountEndpoint e) -> e + +/// Manages establishing a CosmosClient, which is used by CosmosStoreClient to read from the underlying Cosmos DB Container. +type CosmosClientFactory(options) = + static member CreateDefaultOptions(requestTimeout: TimeSpan, maxRetryAttemptsOnRateLimitedRequests: int, maxRetryWaitTimeOnRateLimitedRequests: TimeSpan) = + CosmosClientOptions( + MaxRetryAttemptsOnRateLimitedRequests = maxRetryAttemptsOnRateLimitedRequests, + MaxRetryWaitTimeOnRateLimitedRequests = maxRetryWaitTimeOnRateLimitedRequests, + RequestTimeout = requestTimeout, + Serializer = CosmosJsonSerializer(JsonSerializerOptions())) + /// CosmosClientOptions for this CosmosClientFactory as configured (NOTE while the Options object is not immutable, it should not have setters called on it) + member val Options = options + /// Creates an instance of CosmosClient without actually validating or establishing the connection + /// It's recommended to use CreateAndInitializeAsync in preference to this API + /// in order to avoid latency spikes, and/or deferring discovery of connectivity or permission issues. + member x.CreateUninitialized(discovery: DiscoveryMode) = discovery |> function + | DiscoveryMode.AccountUriAndKey (accountUri = uri; key = key) -> new CosmosClient(uri, key, x.Options) + | DiscoveryMode.ConnectionString cs -> new CosmosClient(cs, x.Options) + /// Creates and validates a CosmosClient [including loading metadata](https://devblogs.microsoft.com/cosmosdb/improve-net-sdk-initialization) for the specified containers + member x.CreateAndInitializeAsync(discovery: DiscoveryMode, containers, ct) = discovery |> function + | DiscoveryMode.AccountUriAndKey (accountUri = uri; key = key) -> CosmosClient.CreateAndInitializeAsync(uri, key, containers, x.Options, ct) + | DiscoveryMode.ConnectionString cs -> CosmosClient.CreateAndInitializeAsync(cs, containers, x.Options, ct) + namespace Equinox.CosmosStore open Equinox.Core @@ -1120,63 +1149,9 @@ type Discovery = | AccountUriAndKey of accountUri: Uri * key: string /// Cosmos SDK Connection String | ConnectionString of connectionString: string - member x.Endpoint: Uri = x |> function - | Discovery.AccountUriAndKey (u, _k) -> u - | Discovery.ConnectionString (ConnectionString.AccountEndpoint e) -> Uri e - -/// Manages establishing a CosmosClient, which is used by CosmosStoreClient to read from the underlying Cosmos DB Container. -[] -type CosmosClientFactory - ( // Timeout to apply to individual reads/write round-trips going to CosmosDB. CosmosDB Default: 1m. - requestTimeout: TimeSpan, - // Maximum number of times to attempt when failure reason is a 429 from CosmosDB, signifying RU limits have been breached. CosmosDB default: 9 - maxRetryAttemptsOnRateLimitedRequests: int, - // Maximum number of seconds to wait (especially if a higher wait delay is suggested by CosmosDB in the 429 response). CosmosDB default: 30s - maxRetryWaitTimeOnRateLimitedRequests: TimeSpan, - // Connection mode (default: ConnectionMode.Direct (best performance, same as Microsoft.Azure.Cosmos SDK default) - // NOTE: default for Equinox.Cosmos.Connector (i.e. V2) was Gateway (worst performance, least trouble, Microsoft.Azure.DocumentDb SDK default) - [] ?mode: ConnectionMode, - // Connection limit for Gateway Mode. CosmosDB default: 50 - [] ?gatewayModeMaxConnectionLimit, - // consistency mode (default: ConsistencyLevel.Session) - [] ?defaultConsistencyLevel: ConsistencyLevel, - // Inhibits certificate verification when set to true, i.e. for working with the CosmosDB Emulator (default false) - [] ?bypassCertificateValidation: bool) = - - /// CosmosClientOptions for this CosmosClientFactory as configured - member val Options = - let co = CosmosClientOptions( - MaxRetryAttemptsOnRateLimitedRequests = maxRetryAttemptsOnRateLimitedRequests, - MaxRetryWaitTimeOnRateLimitedRequests = maxRetryWaitTimeOnRateLimitedRequests, - RequestTimeout = requestTimeout, - Serializer = CosmosJsonSerializer(System.Text.Json.JsonSerializerOptions())) - match mode with - | None | Some ConnectionMode.Direct -> co.ConnectionMode <- ConnectionMode.Direct - | Some ConnectionMode.Gateway | Some _ (* enum total match :( *) -> co.ConnectionMode <- ConnectionMode.Gateway // only supports Https - match gatewayModeMaxConnectionLimit with - | Some _ when co.ConnectionMode = ConnectionMode.Direct -> invalidArg "gatewayModeMaxConnectionLimit" "Not admissible in Direct mode" - | x -> if co.ConnectionMode = ConnectionMode.Gateway then co.GatewayModeMaxConnectionLimit <- defaultArg x 50 - match defaultConsistencyLevel with - | Some x -> co.ConsistencyLevel <- x - | None -> () - // https://github.com/Azure/azure-cosmos-dotnet-v3/blob/1ef6e399f114a0fd580272d4cdca86b9f8732cf3/Microsoft.Azure.Cosmos.Samples/Usage/HttpClientFactory/Program.cs#L96 - if bypassCertificateValidation = Some true && co.ConnectionMode = ConnectionMode.Gateway then - let cb = System.Net.Http.HttpClientHandler.DangerousAcceptAnyServerCertificateValidator - let ch = new System.Net.Http.HttpClientHandler(ServerCertificateCustomValidationCallback = cb) - co.HttpClientFactory <- fun () -> new System.Net.Http.HttpClient(ch) - co - - /// Creates an instance of CosmosClient without actually validating or establishing the connection - /// It's recommended to use CreateAndInitializeAsync in preference to this API - /// in order to avoid latency spikes, and/or deferring discovery of connectivity or permission issues. - member x.CreateUninitialized(discovery: Discovery) = discovery |> function - | Discovery.AccountUriAndKey (accountUri = uri; key = key) -> new CosmosClient(string uri, key, x.Options) - | Discovery.ConnectionString cs -> new CosmosClient(cs, x.Options) - - /// Creates and validates a CosmosClient [including loading metadata](https://devblogs.microsoft.com/cosmosdb/improve-net-sdk-initialization) for the specified containers - member x.CreateAndInitializeAsync(discovery: Discovery, containers, ct) = discovery |> function - | Discovery.AccountUriAndKey (accountUri = uri; key = key) -> CosmosClient.CreateAndInitializeAsync(string uri, key, containers, x.Options, ct) - | Discovery.ConnectionString cs -> CosmosClient.CreateAndInitializeAsync(cs, containers, x.Options, ct) + member x.ToDiscoveryMode() = x |> function + | Discovery.AccountUriAndKey (u, k) -> DiscoveryMode.AccountUriAndKey (string u, k) + | Discovery.ConnectionString c -> DiscoveryMode.ConnectionString c /// Manages establishing a CosmosClient, which is used by CosmosStoreClient to read from the underlying Cosmos DB Container. type CosmosStoreConnector @@ -1191,32 +1166,37 @@ type CosmosStoreConnector // Connection mode (default: ConnectionMode.Direct (best performance, same as Microsoft.Azure.Cosmos SDK default) // NOTE: default for Equinox.Cosmos.Connector (i.e. V2) was Gateway (worst performance, least trouble, Microsoft.Azure.DocumentDb SDK default) [] ?mode: ConnectionMode, - // Connection limit for Gateway Mode. CosmosDB default: 50 - [] ?gatewayModeMaxConnectionLimit, - // consistency mode (default: ConsistencyLevel.Session) + // consistency mode (default: use configuration specified for Database) [] ?defaultConsistencyLevel: ConsistencyLevel, - // Inhibits certificate verification when set to true, i.e. for working with the CosmosDB Emulator (default false) - [] ?bypassCertificateValidation: bool) = - + // Inhibits certificate verification when set to `true`. Default: false. + [] ?bypassCertificateValidation: bool, + [] ?customize: Action) = + let discoveryMode = discovery.ToDiscoveryMode() let factory = - CosmosClientFactory - ( requestTimeout, maxRetryAttemptsOnRateLimitedRequests, maxRetryWaitTimeOnRateLimitedRequests, ?mode = mode, - ?gatewayModeMaxConnectionLimit = gatewayModeMaxConnectionLimit, ?defaultConsistencyLevel = defaultConsistencyLevel, - ?bypassCertificateValidation = bypassCertificateValidation) + let o = CosmosClientFactory.CreateDefaultOptions(requestTimeout, maxRetryAttemptsOnRateLimitedRequests, maxRetryWaitTimeOnRateLimitedRequests) + mode |> Option.iter (fun x -> o.ConnectionMode <- x) + defaultConsistencyLevel |> Option.iter (fun x -> o.ConsistencyLevel <- x) + // https://github.com/Azure/azure-cosmos-dotnet-v3/blob/1ef6e399f114a0fd580272d4cdca86b9f8732cf3/Microsoft.Azure.Cosmos.Samples/Usage/HttpClientFactory/Program.cs#L96 + if defaultArg bypassCertificateValidation false then + let cb = System.Net.Http.HttpClientHandler.DangerousAcceptAnyServerCertificateValidator + let ch = new System.Net.Http.HttpClientHandler(ServerCertificateCustomValidationCallback = cb) + o.HttpClientFactory <- fun () -> new System.Net.Http.HttpClient(ch) + customize |> Option.iter (fun c -> c.Invoke o) + CosmosClientFactory o /// The CosmosClientOptions used when connecting to CosmosDB member _.Options = factory.Options /// The Endpoint Uri for the target CosmosDB - member _.Endpoint = discovery.Endpoint + member val Endpoint = discoveryMode.Endpoint |> Uri /// Creates an instance of CosmosClient without actually validating or establishing the connection /// It's recommended to use Connect and/or CreateAndInitialize in preference to this API /// in order to avoid latency spikes, and/or deferring discovery of connectivity or permission issues. - member _.CreateUninitialized() = factory.CreateUninitialized(discovery) + member _.CreateUninitialized() = factory.CreateUninitialized(discoveryMode) /// Creates and validates a CosmosClient [including loading metadata](https://devblogs.microsoft.com/cosmosdb/improve-net-sdk-initialization) for the specified containers - member _.CreateAndInitializeAsync(containers, ct): Task = factory.CreateAndInitializeAsync(discovery, containers, ct) + member _.CreateAndInitializeAsync(containers, ct): Task = factory.CreateAndInitializeAsync(discoveryMode, containers, ct) /// Creates and validates a CosmosClient [including loading metadata](https://devblogs.microsoft.com/cosmosdb/improve-net-sdk-initialization) for the specified containers member x.CreateAndInitialize(databaseAndContainerIds: struct (string * string)[]) = Async.call (fun ct -> x.CreateAndInitializeAsync(databaseAndContainerIds, ct)) @@ -1226,7 +1206,7 @@ type CosmosStoreConnector /// Creates and validates a CosmosStoreClient [including loading metadata](https://devblogs.microsoft.com/cosmosdb/improve-net-sdk-initialization) for the specified containers member _.ConnectAsync(containers, ct): Task = task { - let! cosmosClient = factory.CreateAndInitializeAsync(discovery, containers, ct) + let! cosmosClient = factory.CreateAndInitializeAsync(discoveryMode, containers, ct) return CosmosStoreClient(cosmosClient) } /// Creates and validates a CosmosStoreClient [including loading metadata](https://devblogs.microsoft.com/cosmosdb/improve-net-sdk-initialization) for the specified containers member x.Connect(databaseAndContainerIds: struct (string * string)[]) = diff --git a/tests/Equinox.CosmosStore.Integration/CosmosFixtures.fs b/tests/Equinox.CosmosStore.Integration/CosmosFixtures.fs index ff0df8ed8..322da61f6 100644 --- a/tests/Equinox.CosmosStore.Integration/CosmosFixtures.fs +++ b/tests/Equinox.CosmosStore.Integration/CosmosFixtures.fs @@ -80,7 +80,7 @@ let private databaseId = tryRead "EQUINOX_COSMOS_DATABASE" |> Option.defaultValu let private containerId = tryRead "EQUINOX_COSMOS_CONTAINER" |> Option.defaultValue "equinox-test" let private archiveContainerId = tryRead "EQUINOX_COSMOS_CONTAINER_ARCHIVE" |> Option.defaultValue "equinox-test-archive" -// see https://github.com/jet/equinox-provisioning-cosmosdb for details of what's expected in terms of provisioned containers etc +// see https://github.com/jet/equinox#provisioning-cosmosdb for details of what's expected in terms of provisioned containers etc let discoverConnection () = match tryRead "EQUINOX_COSMOS_CONNECTION" with | None -> "localDocDbSim", Discovery.AccountUriAndKey(Uri "https://localhost:8081", "C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==") @@ -90,7 +90,7 @@ let createConnector (log: Serilog.ILogger) = let name, discovery = discoverConnection () let connector = CosmosStoreConnector(discovery, requestTimeout = TimeSpan.FromSeconds 3., maxRetryAttemptsOnRateLimitedRequests = 2, maxRetryWaitTimeOnRateLimitedRequests = TimeSpan.FromMinutes 1.) - log.Information("CosmosStore {name} {endpoint}", name, discovery.Endpoint) + log.Information("CosmosStore {name} {endpoint}", name, connector.Endpoint) connector [] diff --git a/tools/Equinox.Tool/Program.fs b/tools/Equinox.Tool/Program.fs index 41a7ed80d..c11d6abcf 100644 --- a/tools/Equinox.Tool/Program.fs +++ b/tools/Equinox.Tool/Program.fs @@ -384,13 +384,13 @@ module CosmosInit = match a.ProvisioningMode with | CosmosInit.Provisioning.Container throughput -> let modeStr = "Container" - log.Information("Provisioning `Equinox.CosmosStore` Store at {mode:l} level for {rus:n0} RU/s", modeStr, throughput) + log.Information("CosmosStore provisioning at {mode:l} level for {rus:n0} RU/s", modeStr, throughput) | CosmosInit.Provisioning.Database throughput -> let modeStr = "Database" - log.Information("Provisioning `Equinox.CosmosStore` Store at {mode:l} level for {rus:n0} RU/s", modeStr, throughput) + log.Information("CosmosStore provisioning at {mode:l} level for {rus:n0} RU/s", modeStr, throughput) | CosmosInit.Provisioning.Serverless -> let modeStr = "Serverless" - log.Information("Provisioning `Equinox.CosmosStore` Store in {mode:l} mode with automatic RU/s as configured in account", modeStr) + log.Information("CosmosStore provisioning in {mode:l} mode with automatic RU/s as configured in account", modeStr) CosmosInit.init log (connector.CreateUninitialized()) (dName, cName) a.ProvisioningMode a.SkipStoredProc | x -> Store.missingArg $"unexpected subcommand %A{x}"