Skip to content

Commit

Permalink
Merge pull request #33 from marklogic/feature/with-transaction
Browse files Browse the repository at this point in the history
DEVEXP-561 Now supporting REST API transactions
  • Loading branch information
rjrudin authored Oct 5, 2023
2 parents 216ea24 + e4df27d commit 9698e3c
Show file tree
Hide file tree
Showing 6 changed files with 315 additions and 5 deletions.
1 change: 1 addition & 0 deletions docs/Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@ GEM

PLATFORMS
arm64-darwin-21
arm64-darwin-23

DEPENDENCIES
github-pages (~> 228)
Expand Down
84 changes: 84 additions & 0 deletions docs/transactions.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
---
layout: default
title: Managing transactions
nav_order: 4
---

The [/v1/transactions endpoint](https://docs.marklogic.com/REST/client/transaction-management)
in the MarkLogic REST API supports managing a transaction that can be referenced in
multiple separate calls to other REST API endpoints, with all calls being committed or
rolled back together. The MarkLogic Python client simplifies usage of these endpoints
via a `Transaction` class that is also a
[Python context manager](https://docs.python.org/3/reference/datamodel.html#context-managers),
thereby allowing it to handle committing or rolling back the transaction without any user
involvement.

The following example demonstrates writing documents via multiple calls to MarkLogic,
all within the same REST API transaction; the example depends on first following the
instructions in the [setup guide](example-setup.md):

```
from marklogic import Client
from marklogic.documents import Document
client = Client('http://localhost:8000', digest=('python-user', 'pyth0n'))
default_perms = {"rest-reader": ["read", "update"]}
doc1 = Document("/tx/doc1.json", {"doc": 1}, permissions=default_perms)
doc2 = Document("/tx/doc2.json", {"doc": 2}, permissions=default_perms)
with client.transactions.create() as tx:
client.documents.write(doc1, tx=tx).raise_for_status()
client.documents.write(doc2, tx=tx).raise_for_status()
```

The `client.transactions.create()` function returns a `Transaction` instance that acts
as the context manager. When the `with` block completes, the `Transaction` instance
calls the REST API to commit the transaction.

As of 1.1.0, each of the functions in the `client.documents` object can include a
reference to the transaction to ensure that the `read` or `write` or `search` operation
occurs within the REST API transaction.

## Ensuring a transaction is rolled back

The `requests` function [`raise_for_status()`](https://requests.readthedocs.io/en/latest/user/quickstart/#errors-and-exceptions)
is used in the example above to ensure that if a request fails, an error is thrown,
causing the transaction to be rolled back. The following example demonstrates a rolled
back transaction due to an invalid JSON object that causes a `write` operation to fail:

```
doc1 = Document("/tx/doc1.json", {"doc": 1}, permissions=default_perms)
doc2 = Document("/tx/doc2.json", "invalid json", permissions=default_perms)
with client.transactions.create() as tx:
client.documents.write(doc1, tx=tx).raise_for_status()
client.documents.write(doc2, tx=tx).raise_for_status()
```

The above will cause a `requests` `HTTPError` instance to be thrown, and the first
document will not be written due to the transaction being rolled back.

You are free to check the status code of the response object returned
by each call as well; `raise_for_status()` is simply a commonly used convenience in the
`requests` library.

## Using the transaction request parameter

You can reference the transaction when calling any REST API endpoint that supports the
optional `txid` request parameter. The following example demonstrates this, reusing the
same `client` instance from the first example:

```
with client.transactions.create() as tx:
client.post("/v1/resources/my-resource", params={"txid": tx.id})
client.delete("/v1/resources/other-resource", params={"txid": tx.id})
```

## Getting transaction status

You can get the status of the transaction via the `get_status()` function:

```
with client.transactions.create() as tx:
print(f"Transaction status: {tx.get_status()}")
```
7 changes: 7 additions & 0 deletions marklogic/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from marklogic.cloud_auth import MarkLogicCloudAuth
from marklogic.documents import DocumentManager
from marklogic.rows import RowManager
from marklogic.transactions import TransactionManager
from requests.auth import HTTPDigestAuth
from urllib.parse import urljoin

Expand Down Expand Up @@ -77,3 +78,9 @@ def rows(self):
if not hasattr(self, "_rows"):
self._rows = RowManager(self)
return self._rows

@property
def transactions(self):
if not hasattr(self, "_transactions"):
self._transactions = TransactionManager(self)
return self._transactions
32 changes: 27 additions & 5 deletions marklogic/documents.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@
from collections import OrderedDict
from typing import Union

from marklogic.transactions import Transaction
from requests import Response, Session
from requests_toolbelt.multipart.decoder import MultipartDecoder
from urllib3.fields import RequestField
from urllib3.filepost import encode_multipart_formdata

"""
Defines classes to simplify usage of the documents REST endpoint defined at
https://docs.marklogic.com/REST/client/management.
https://docs.marklogic.com/REST/client/management.
"""


Expand Down Expand Up @@ -147,7 +148,7 @@ def __init__(
@property
def metadata(self):
"""
Returns a dict containing the 5 attributes that comprise the metadata of a
Returns a dict containing the 5 attributes that comprise the metadata of a
document in MarkLogic.
"""
return {
Expand Down Expand Up @@ -344,7 +345,10 @@ def __init__(self, session: Session):
self._session = session

def write(
self, parts: Union[Document, list[Union[DefaultMetadata, Document]]], **kwargs
self,
parts: Union[Document, list[Union[DefaultMetadata, Document]]],
tx: Transaction = None,
**kwargs,
) -> Response:
"""
Write one or many documents at a time via a POST to the endpoint defined at
Expand All @@ -355,6 +359,7 @@ def write(
after it that does not define its own metadata. See
https://docs.marklogic.com/guide/rest-dev/bulk#id_16015 for more information on
how the REST endpoint uses metadata.
:param tx: if set, the request will be associated with the given transaction.
"""
fields = []

Expand All @@ -374,17 +379,27 @@ def write(

data, content_type = encode_multipart_formdata(fields)

params = kwargs.pop("params", {})
if tx:
params["txid"] = tx.id

headers = kwargs.pop("headers", {})
headers["Content-Type"] = "".join(
("multipart/mixed",) + content_type.partition(";")[1:]
)
if not headers.get("Accept"):
headers["Accept"] = "application/json"

return self._session.post("/v1/documents", data=data, headers=headers, **kwargs)
return self._session.post(
"/v1/documents", data=data, headers=headers, params=params, **kwargs
)

def read(
self, uris: Union[str, list[str]], categories: list[str] = None, **kwargs
self,
uris: Union[str, list[str]],
categories: list[str] = None,
tx: Transaction = None,
**kwargs,
) -> Union[list[Document], Response]:
"""
Read one or many documents via a GET to the endpoint defined at
Expand All @@ -395,12 +410,15 @@ def read(
:param categories: optional list of the categories of data to return for each
URI. By default, only content will be returned for each URI. See the endpoint
documentation for further information.
:param tx: if set, the request will be associated with the given transaction.
"""
params = kwargs.pop("params", {})
params["uri"] = uris if isinstance(uris, list) else [uris]
params["format"] = "json" # This refers to the metadata format.
if categories:
params["category"] = categories
if tx:
params["txid"] = tx.id

headers = kwargs.pop("headers", {})
headers["Accept"] = "multipart/mixed"
Expand All @@ -423,6 +441,7 @@ def search(
page_length: int = None,
options: str = None,
collections: list[str] = None,
tx: Transaction = None,
**kwargs,
) -> Union[list[Document], Response]:
"""
Expand All @@ -442,6 +461,7 @@ def search(
:param page_length: maximum number of documents to return.
:param options: name of a query options instance to use.
:param collections: restrict results to documents in these collections.
:param tx: if set, the request will be associated with the given transaction.
"""
params = kwargs.pop("params", {})
params["format"] = "json" # This refers to the metadata format.
Expand All @@ -457,6 +477,8 @@ def search(
params["pageLength"] = page_length
if options:
params["options"] = options
if tx:
params["txid"] = tx.id

headers = kwargs.pop("headers", {})
headers["Accept"] = "multipart/mixed"
Expand Down
109 changes: 109 additions & 0 deletions marklogic/transactions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
import logging
from requests import Response, Session

logger = logging.getLogger(__name__)

"""
Defines classes to simplify usage of the REST endpoints defined at
https://docs.marklogic.com/REST/client/transaction-management for managing transactions.
"""


class Transaction:
"""
Represents a transaction created via
https://docs.marklogic.com/REST/POST/v1/transactions .
An instance of this class can act as a Python context manager and can thus be used
with the Python "with" keyword. This is the intended use case, allowing a user to
perform one to many calls to MarkLogic within the "with" block, each referencing the
ID associated with this transaction. When the "with" block concludes, the
transaction will be automatically committed if no error was thrown, and rolled back
otherwise.
:param id: the ID of the new transaction, which is used for all subsequent
operations involving the transaction.
:param session: a requests Session object that is required for either committing or
rolling back the transaction, as well as for obtaining status of the transaction.
"""

def __init__(self, id: str, session: Session):
self.id = id
self._session = session

def __enter__(self):
return self

def get_status(self) -> dict:
"""
Retrieve transaction status via
https://docs.marklogic.com/REST/GET/v1/transactions/[txid].
"""
return self._session.get(
f"/v1/transactions/{self.id}", headers={"Accept": "application/json"}
).json()

def commit(self) -> Response:
"""
Commits the transaction via
https://docs.marklogic.com/REST/POST/v1/transactions/[txid]. This is expected to be
invoked automatically via a Python context manager.
"""
logger.debug(f"Committing transaction with ID: {self.id}")
return self._session.post(
f"/v1/transactions/{self.id}", params={"result": "commit"}
)

def rollback(self) -> Response:
"""
Rolls back the transaction via
https://docs.marklogic.com/REST/POST/v1/transactions/[txid]. This is expected to be
invoked automatically via a Python context manager.
"""
logger.debug(f"Rolling back transaction with ID: {self.id}")
return self._session.post(
f"/v1/transactions/{self.id}", params={"result": "rollback"}
)

def __exit__(self, *args):
response = (
self.rollback()
if len(args) > 1 and isinstance(args[1], Exception)
else self.commit()
)
assert (
204 == response.status_code
), f"Could not end transaction; cause: {response.text}"


class TransactionManager:
def __init__(self, session: Session):
self._session = session

def create(self, name=None, time_limit=None, database=None) -> Transaction:
"""
Creates a new transaction via https://docs.marklogic.com/REST/POST/v1/transactions.
Contrary to the docs, a Location header is not returned, but the transaction data
is. And the Accept header can be used to control the format of the transaction data.
The returned Transaction is a Python context manager and is intended to be used
via the Python "with" keyword.
:param name: optional name for the transaction.
:param time_limit: optional time limit, in seconds, until the server cancels the
transaction.
:param database: optional database to associate with the transaction.
"""
params = {}
if name:
params["name"] = name
if time_limit:
params["timeLimit"] = time_limit
if database:
params["database"] = database

response = self._session.post(
"/v1/transactions", params=params, headers={"Accept": "application/json"}
)
id = response.json()["transaction-status"]["transaction-id"]
return Transaction(id, self._session)
Loading

0 comments on commit 9698e3c

Please sign in to comment.