Skip to content

Commit

Permalink
Fix Transactions in sql-libsql (#3720)
Browse files Browse the repository at this point in the history
Co-authored-by: Chris Wilkinson <[email protected]>
Co-authored-by: Tim <[email protected]>
  • Loading branch information
3 people authored Oct 8, 2024
1 parent 44583d4 commit e0a5dad
Show file tree
Hide file tree
Showing 12 changed files with 549 additions and 458 deletions.
5 changes: 5 additions & 0 deletions .changeset/curvy-cherries-dress.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@effect/sql-libsql": minor
---

Fix transactions
7 changes: 7 additions & 0 deletions .changeset/strange-toys-grow.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
"@effect/sql-libsql": patch
"@effect/sql-mssql": patch
"@effect/sql": patch
---

add SqlClient.makeWithTransaction api
3 changes: 2 additions & 1 deletion packages/sql-libsql/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@
"devDependencies": {
"@effect/platform": "workspace:^",
"@effect/sql": "workspace:^",
"effect": "workspace:^"
"effect": "workspace:^",
"testcontainers": "^10.11.0"
},
"peerDependencies": {
"@effect/platform": "workspace:^",
Expand Down
180 changes: 120 additions & 60 deletions packages/sql-libsql/src/LibsqlClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ import * as Config from "effect/Config"
import type { ConfigError } from "effect/ConfigError"
import * as Context from "effect/Context"
import * as Effect from "effect/Effect"
import { identity } from "effect/Function"
import * as Layer from "effect/Layer"
import * as Option from "effect/Option"
import * as Scope from "effect/Scope"

/**
Expand Down Expand Up @@ -42,6 +42,10 @@ export interface LibsqlClient extends Client.SqlClient {
*/
export const LibsqlClient = Context.GenericTag<LibsqlClient>("@effect/sql-libsql/LibsqlClient")

const LibsqlTransaction = Context.GenericTag<readonly [LibsqlConnection, counter: number]>(
"@effect/sql-libsql/LibsqlClient/LibsqlTransaction"
)

/**
* @category models
* @since 1.0.0
Expand Down Expand Up @@ -91,7 +95,11 @@ export interface LibsqlClientConfig {
readonly transformQueryNames?: ((str: string) => string) | undefined
}

interface LibsqlConnection extends Connection {}
interface LibsqlConnection extends Connection {
readonly beginTransaction: Effect.Effect<LibsqlConnection, SqlError>
readonly commit: Effect.Effect<void, SqlError>
readonly rollback: Effect.Effect<void, SqlError>
}

/**
* @category constructor
Expand All @@ -106,84 +114,136 @@ export const make = (
options.transformResultNames!
).array

const makeConnection = Effect.gen(function*() {
const db = Libsql.createClient(options as Libsql.Config)
yield* Effect.addFinalizer(() => Effect.sync(() => db.close()))
const spanAttributes: Array<[string, unknown]> = [
...(options.spanAttributes ? Object.entries(options.spanAttributes) : []),
[Otel.SEMATTRS_DB_SYSTEM, Otel.DBSYSTEMVALUES_SQLITE]
]

class LibsqlConnectionImpl implements LibsqlConnection {
constructor(readonly sdk: Libsql.Client | Libsql.Transaction) {}

run(
sql: string,
params: ReadonlyArray<Statement.Primitive> = []
) {
return Effect.map(
Effect.tryPromise({
try: () => this.sdk.execute({ sql, args: params as Array<any> }),
catch: (cause) => new SqlError({ cause, message: "Failed to execute statement" })
}),
(results) => results.rows
)
}

const run = (
runRaw(
sql: string,
params: ReadonlyArray<Statement.Primitive> = []
) =>
Effect.tryPromise({
try: () => db.execute({ sql, args: params as Array<any> }).then((results) => results.rows),
) {
return Effect.tryPromise({
try: () => this.sdk.execute({ sql, args: params as Array<any> }),
catch: (cause) => new SqlError({ cause, message: "Failed to execute statement" })
})
}

const runRaw = (
runTransform(
sql: string,
params: ReadonlyArray<Statement.Primitive> = []
) =>
Effect.tryPromise({
try: () => db.execute({ sql, args: params as Array<any> }),
catch: (cause) => new SqlError({ cause, message: "Failed to execute statement" })
) {
return options.transformResultNames
? Effect.map(this.run(sql, params), transformRows)
: this.run(sql, params)
}

execute(sql: string, params: ReadonlyArray<Statement.Primitive>) {
return this.runTransform(sql, params)
}
executeRaw(sql: string, params: ReadonlyArray<Statement.Primitive>) {
return this.runRaw(sql, params)
}
executeValues(sql: string, params: ReadonlyArray<Statement.Primitive>) {
return Effect.map(this.run(sql, params), (rows) => rows.map((row) => Array.from(row) as Array<any>))
}
executeWithoutTransform(sql: string, params: ReadonlyArray<Statement.Primitive>) {
return this.run(sql, params)
}
executeUnprepared(sql: string, params?: ReadonlyArray<Statement.Primitive>) {
return this.run(sql, params)
}
executeStream() {
return Effect.dieMessage("executeStream not implemented")
}
get beginTransaction() {
return Effect.map(
Effect.tryPromise({
try: () => (this.sdk as Libsql.Client).transaction("write"),
catch: (cause) => new SqlError({ cause, message: "Failed to begin transaction" })
}),
(tx) => new LibsqlConnectionImpl(tx)
)
}
get commit() {
return Effect.tryPromise({
try: () => (this.sdk as Libsql.Transaction).commit(),
catch: (cause) => new SqlError({ cause, message: "Failed to commit transaction" })
})
}
get rollback() {
return Effect.tryPromise({
try: () => (this.sdk as Libsql.Transaction).rollback(),
catch: (cause) => new SqlError({ cause, message: "Failed to rollback transaction" })
})
}
}

const runTransform = options.transformResultNames
? (sql: string, params?: ReadonlyArray<Statement.Primitive>) => Effect.map(run(sql, params), transformRows)
: run

return identity<LibsqlConnection>({
execute(sql, params) {
return runTransform(sql, params)
},
executeRaw(sql, params) {
return runRaw(sql, params)
},
executeValues(sql, params) {
return Effect.map(run(sql, params), (rows) => rows.map((row) => Array.from(row) as Array<any>))
},
executeWithoutTransform(sql, params) {
return run(sql, params)
},
executeUnprepared(sql, params) {
return run(sql, params)
},
executeStream(_sql, _params) {
return Effect.dieMessage("executeStream not implemented")
}
})
const connection = yield* Effect.map(
Effect.acquireRelease(
Effect.sync(() => Libsql.createClient(options as Libsql.Config)),
(sdk) => Effect.sync(() => sdk.close())
),
(sdk) => new LibsqlConnectionImpl(sdk)
)
const semaphore = yield* Effect.makeSemaphore(1)

const withTransaction = Client.makeWithTransaction({
transactionTag: LibsqlTransaction,
spanAttributes,
acquireConnection: Effect.uninterruptibleMask((restore) =>
Scope.make().pipe(
Effect.bindTo("scope"),
Effect.bind("conn", ({ scope }) =>
restore(semaphore.take(1)).pipe(
Effect.zipRight(Scope.addFinalizer(scope, semaphore.release(1))),
Effect.zipRight(connection.beginTransaction)
)),
Effect.map(({ conn, scope }) => [scope, conn] as const)
)
),
begin: () => Effect.void, // already begun in acquireConnection
savepoint: (conn, id) => conn.executeRaw(`SAVEPOINT effect_sql_${id};`, []),
commit: (conn) => conn.commit,
rollback: (conn) => conn.rollback,
rollbackSavepoint: (conn, id) => conn.executeRaw(`ROLLBACK TO SAVEPOINT effect_sql_${id};`, [])
})

const semaphore = yield* Effect.makeSemaphore(1)
const connection = yield* makeConnection

const acquirer = semaphore.withPermits(1)(Effect.succeed(connection))
const transactionAcquirer = Effect.uninterruptibleMask((restore) =>
Effect.as(
Effect.zipRight(
restore(semaphore.take(1)),
Effect.tap(
Effect.scope,
(scope) => Scope.addFinalizer(scope, semaphore.release(1))
)
),
connection
)
const acquirer = Effect.flatMap(
Effect.serviceOption(LibsqlTransaction),
Option.match({
onNone: () => semaphore.withPermits(1)(Effect.succeed(connection as LibsqlConnection)),
onSome: ([conn]) => Effect.succeed(conn)
})
)

return Object.assign(
Client.make({
acquirer,
compiler,
transactionAcquirer,
spanAttributes: [
...(options.spanAttributes ? Object.entries(options.spanAttributes) : []),
[Otel.SEMATTRS_DB_SYSTEM, Otel.DBSYSTEMVALUES_SQLITE]
]
}) as LibsqlClient,
spanAttributes
}),
{
[TypeId]: TypeId as TypeId,
config: options
config: options,
withTransaction,
sdk: connection.sdk
}
)
})
Expand Down
Loading

0 comments on commit e0a5dad

Please sign in to comment.