diff --git a/docs/Gemfile.lock b/docs/Gemfile.lock index eb6aaa8..c34fcc1 100644 --- a/docs/Gemfile.lock +++ b/docs/Gemfile.lock @@ -251,6 +251,7 @@ GEM PLATFORMS arm64-darwin-21 + arm64-darwin-23 DEPENDENCIES github-pages (~> 228) diff --git a/docs/transactions.md b/docs/transactions.md new file mode 100644 index 0000000..df1c715 --- /dev/null +++ b/docs/transactions.md @@ -0,0 +1,84 @@ +--- +layout: default +title: Managing transactions +nav_order: 4 +--- + +The [/v1/transactions endpoint](https://docs.marklogic.com/REST/client/transaction-management) +in the MarkLogic REST API supports managing a transaction that can be referenced in +multiple separate calls to other REST API endpoints, with all calls being committed or +rolled back together. The MarkLogic Python client simplifies usage of these endpoints +via a `Transaction` class that is also a +[Python context manager](https://docs.python.org/3/reference/datamodel.html#context-managers), +thereby allowing it to handle committing or rolling back the transaction without any user +involvement. + +The following example demonstrates writing documents via multiple calls to MarkLogic, +all within the same REST API transaction; the example depends on first following the +instructions in the [setup guide](example-setup.md): + +``` +from marklogic import Client +from marklogic.documents import Document +client = Client('http://localhost:8000', digest=('python-user', 'pyth0n')) + +default_perms = {"rest-reader": ["read", "update"]} +doc1 = Document("/tx/doc1.json", {"doc": 1}, permissions=default_perms) +doc2 = Document("/tx/doc2.json", {"doc": 2}, permissions=default_perms) + +with client.transactions.create() as tx: + client.documents.write(doc1, tx=tx).raise_for_status() + client.documents.write(doc2, tx=tx).raise_for_status() +``` + +The `client.transactions.create()` function returns a `Transaction` instance that acts +as the context manager. When the `with` block completes, the `Transaction` instance +calls the REST API to commit the transaction. + +As of 1.1.0, each of the functions in the `client.documents` object can include a +reference to the transaction to ensure that the `read` or `write` or `search` operation +occurs within the REST API transaction. + +## Ensuring a transaction is rolled back + +The `requests` function [`raise_for_status()`](https://requests.readthedocs.io/en/latest/user/quickstart/#errors-and-exceptions) +is used in the example above to ensure that if a request fails, an error is thrown, +causing the transaction to be rolled back. The following example demonstrates a rolled +back transaction due to an invalid JSON object that causes a `write` operation to fail: + +``` +doc1 = Document("/tx/doc1.json", {"doc": 1}, permissions=default_perms) +doc2 = Document("/tx/doc2.json", "invalid json", permissions=default_perms) + +with client.transactions.create() as tx: + client.documents.write(doc1, tx=tx).raise_for_status() + client.documents.write(doc2, tx=tx).raise_for_status() +``` + +The above will cause a `requests` `HTTPError` instance to be thrown, and the first +document will not be written due to the transaction being rolled back. + +You are free to check the status code of the response object returned +by each call as well; `raise_for_status()` is simply a commonly used convenience in the +`requests` library. + +## Using the transaction request parameter + +You can reference the transaction when calling any REST API endpoint that supports the +optional `txid` request parameter. The following example demonstrates this, reusing the +same `client` instance from the first example: + +``` +with client.transactions.create() as tx: + client.post("/v1/resources/my-resource", params={"txid": tx.id}) + client.delete("/v1/resources/other-resource", params={"txid": tx.id}) +``` + +## Getting transaction status + +You can get the status of the transaction via the `get_status()` function: + +``` +with client.transactions.create() as tx: + print(f"Transaction status: {tx.get_status()}") +``` \ No newline at end of file diff --git a/marklogic/client.py b/marklogic/client.py index 23a87e1..6c6f912 100644 --- a/marklogic/client.py +++ b/marklogic/client.py @@ -2,6 +2,7 @@ from marklogic.cloud_auth import MarkLogicCloudAuth from marklogic.documents import DocumentManager from marklogic.rows import RowManager +from marklogic.transactions import TransactionManager from requests.auth import HTTPDigestAuth from urllib.parse import urljoin @@ -77,3 +78,9 @@ def rows(self): if not hasattr(self, "_rows"): self._rows = RowManager(self) return self._rows + + @property + def transactions(self): + if not hasattr(self, "_transactions"): + self._transactions = TransactionManager(self) + return self._transactions diff --git a/marklogic/documents.py b/marklogic/documents.py index d3da95f..877f5af 100644 --- a/marklogic/documents.py +++ b/marklogic/documents.py @@ -2,6 +2,7 @@ from collections import OrderedDict from typing import Union +from marklogic.transactions import Transaction from requests import Response, Session from requests_toolbelt.multipart.decoder import MultipartDecoder from urllib3.fields import RequestField @@ -9,7 +10,7 @@ """ Defines classes to simplify usage of the documents REST endpoint defined at -https://docs.marklogic.com/REST/client/management. +https://docs.marklogic.com/REST/client/management. """ @@ -147,7 +148,7 @@ def __init__( @property def metadata(self): """ - Returns a dict containing the 5 attributes that comprise the metadata of a + Returns a dict containing the 5 attributes that comprise the metadata of a document in MarkLogic. """ return { @@ -344,7 +345,10 @@ def __init__(self, session: Session): self._session = session def write( - self, parts: Union[Document, list[Union[DefaultMetadata, Document]]], **kwargs + self, + parts: Union[Document, list[Union[DefaultMetadata, Document]]], + tx: Transaction = None, + **kwargs, ) -> Response: """ Write one or many documents at a time via a POST to the endpoint defined at @@ -355,6 +359,7 @@ def write( after it that does not define its own metadata. See https://docs.marklogic.com/guide/rest-dev/bulk#id_16015 for more information on how the REST endpoint uses metadata. + :param tx: if set, the request will be associated with the given transaction. """ fields = [] @@ -374,6 +379,10 @@ def write( data, content_type = encode_multipart_formdata(fields) + params = kwargs.pop("params", {}) + if tx: + params["txid"] = tx.id + headers = kwargs.pop("headers", {}) headers["Content-Type"] = "".join( ("multipart/mixed",) + content_type.partition(";")[1:] @@ -381,10 +390,16 @@ def write( if not headers.get("Accept"): headers["Accept"] = "application/json" - return self._session.post("/v1/documents", data=data, headers=headers, **kwargs) + return self._session.post( + "/v1/documents", data=data, headers=headers, params=params, **kwargs + ) def read( - self, uris: Union[str, list[str]], categories: list[str] = None, **kwargs + self, + uris: Union[str, list[str]], + categories: list[str] = None, + tx: Transaction = None, + **kwargs, ) -> Union[list[Document], Response]: """ Read one or many documents via a GET to the endpoint defined at @@ -395,12 +410,15 @@ def read( :param categories: optional list of the categories of data to return for each URI. By default, only content will be returned for each URI. See the endpoint documentation for further information. + :param tx: if set, the request will be associated with the given transaction. """ params = kwargs.pop("params", {}) params["uri"] = uris if isinstance(uris, list) else [uris] params["format"] = "json" # This refers to the metadata format. if categories: params["category"] = categories + if tx: + params["txid"] = tx.id headers = kwargs.pop("headers", {}) headers["Accept"] = "multipart/mixed" @@ -423,6 +441,7 @@ def search( page_length: int = None, options: str = None, collections: list[str] = None, + tx: Transaction = None, **kwargs, ) -> Union[list[Document], Response]: """ @@ -442,6 +461,7 @@ def search( :param page_length: maximum number of documents to return. :param options: name of a query options instance to use. :param collections: restrict results to documents in these collections. + :param tx: if set, the request will be associated with the given transaction. """ params = kwargs.pop("params", {}) params["format"] = "json" # This refers to the metadata format. @@ -457,6 +477,8 @@ def search( params["pageLength"] = page_length if options: params["options"] = options + if tx: + params["txid"] = tx.id headers = kwargs.pop("headers", {}) headers["Accept"] = "multipart/mixed" diff --git a/marklogic/transactions.py b/marklogic/transactions.py new file mode 100644 index 0000000..02ed1a8 --- /dev/null +++ b/marklogic/transactions.py @@ -0,0 +1,109 @@ +import logging +from requests import Response, Session + +logger = logging.getLogger(__name__) + +""" +Defines classes to simplify usage of the REST endpoints defined at +https://docs.marklogic.com/REST/client/transaction-management for managing transactions. +""" + + +class Transaction: + """ + Represents a transaction created via + https://docs.marklogic.com/REST/POST/v1/transactions . + + An instance of this class can act as a Python context manager and can thus be used + with the Python "with" keyword. This is the intended use case, allowing a user to + perform one to many calls to MarkLogic within the "with" block, each referencing the + ID associated with this transaction. When the "with" block concludes, the + transaction will be automatically committed if no error was thrown, and rolled back + otherwise. + + :param id: the ID of the new transaction, which is used for all subsequent + operations involving the transaction. + :param session: a requests Session object that is required for either committing or + rolling back the transaction, as well as for obtaining status of the transaction. + """ + + def __init__(self, id: str, session: Session): + self.id = id + self._session = session + + def __enter__(self): + return self + + def get_status(self) -> dict: + """ + Retrieve transaction status via + https://docs.marklogic.com/REST/GET/v1/transactions/[txid]. + """ + return self._session.get( + f"/v1/transactions/{self.id}", headers={"Accept": "application/json"} + ).json() + + def commit(self) -> Response: + """ + Commits the transaction via + https://docs.marklogic.com/REST/POST/v1/transactions/[txid]. This is expected to be + invoked automatically via a Python context manager. + """ + logger.debug(f"Committing transaction with ID: {self.id}") + return self._session.post( + f"/v1/transactions/{self.id}", params={"result": "commit"} + ) + + def rollback(self) -> Response: + """ + Rolls back the transaction via + https://docs.marklogic.com/REST/POST/v1/transactions/[txid]. This is expected to be + invoked automatically via a Python context manager. + """ + logger.debug(f"Rolling back transaction with ID: {self.id}") + return self._session.post( + f"/v1/transactions/{self.id}", params={"result": "rollback"} + ) + + def __exit__(self, *args): + response = ( + self.rollback() + if len(args) > 1 and isinstance(args[1], Exception) + else self.commit() + ) + assert ( + 204 == response.status_code + ), f"Could not end transaction; cause: {response.text}" + + +class TransactionManager: + def __init__(self, session: Session): + self._session = session + + def create(self, name=None, time_limit=None, database=None) -> Transaction: + """ + Creates a new transaction via https://docs.marklogic.com/REST/POST/v1/transactions. + Contrary to the docs, a Location header is not returned, but the transaction data + is. And the Accept header can be used to control the format of the transaction data. + + The returned Transaction is a Python context manager and is intended to be used + via the Python "with" keyword. + + :param name: optional name for the transaction. + :param time_limit: optional time limit, in seconds, until the server cancels the + transaction. + :param database: optional database to associate with the transaction. + """ + params = {} + if name: + params["name"] = name + if time_limit: + params["timeLimit"] = time_limit + if database: + params["database"] = database + + response = self._session.post( + "/v1/transactions", params=params, headers={"Accept": "application/json"} + ) + id = response.json()["transaction-status"]["transaction-id"] + return Transaction(id, self._session) diff --git a/tests/test_transactions.py b/tests/test_transactions.py new file mode 100644 index 0000000..a2ca3e7 --- /dev/null +++ b/tests/test_transactions.py @@ -0,0 +1,87 @@ +import requests +import time +from marklogic import Client +from marklogic.documents import Document + +PERMS = {"python-tester": ["read", "update"]} + + +def test_write_two_docs(client: Client): + with client.transactions.create() as tx: + collections = ["tx-test"] + doc1 = Document("/t1.json", {}, permissions=PERMS, collections=collections) + doc2 = Document("/t2.json", {}, permissions=PERMS, collections=collections) + client.documents.write(doc1, tx=tx) + client.documents.write(doc2, tx=tx) + + msg = "Neither doc should be returned since the read is outside the transaction" + docs = client.documents.read(["/t1.json", "/t2.json"]) + assert len(docs) == 0, msg + + msg = "Both documents should be returned as the call is within the transaction" + docs = client.documents.read(["/t1.json", "/t2.json"], tx=tx) + assert len(docs) == 2, msg + docs = client.documents.search(collections=collections, tx=tx) + assert len(docs) == 2, msg + response = client.get( + "/v1/search", + params={"collection": collections, "txid": tx.id, "format": "json"}, + ) + assert response.json()["total"] == 2, msg + + tx_status = tx.get_status() + assert "update" == tx_status["transaction-status"]["transaction-mode"] + + docs = client.documents.read(["/t1.json", "/t2.json"]) + assert len(docs) == 2 + + +def test_write_invalid_doc(client: Client): + try: + with client.transactions.create() as tx: + doc1 = Document("/t1.json", {}, permissions=PERMS) + doc2 = Document("/t2.json", "invalid JSON, should fail", permissions=PERMS) + client.documents.write(doc1, tx=tx) + client.documents.write(doc2, tx=tx).raise_for_status() + + except requests.exceptions.HTTPError as error: + msg = "Second write should have failed due to invalid JSON" + assert 400 == error.response.status_code, msg + + msg = """Because the two writes occurred in a transaction, the second should + have caused the transaction to be rolled back.""" + docs = client.documents.read(["/t1.json", "/t2.json"]) + assert len(docs) == 0, msg + + +def test_time_limit(client: Client): + """ + Verifies that when a time limit is set and the transaction is committed after that + time limit has elapsed, the transaction fails to be committed. + """ + try: + with client.transactions.create(time_limit=1) as tx: + time.sleep(1.1) + doc1 = Document("/t1.json", {}, permissions=PERMS) + client.documents.write(doc1, tx=tx) + + except AssertionError as error: + assert error.args[0].startswith("Could not end transaction") + assert "No transaction with identifier" in error.args[0] + assert "XDMP-NOTXN" in error.args[0] + + +def test_database_and_name_args(client: Client): + """ + Verifies that setting the optional 'name' and 'database' args doesn't cause any + problems. + """ + tx_name = "python-tx" + db_name = "python-client-test-content" + with client.transactions.create(name=tx_name, database=db_name) as tx: + doc1 = Document("/t1.json", {}, permissions=PERMS) + client.documents.write(doc1, tx=tx) + status = tx.get_status() + assert tx_name == status["transaction-status"]["transaction-name"] + + assert 1 == len(client.documents.read("/t1.json"))