Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial work on transactions for chunkstore #776

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 59 additions & 2 deletions arctic/chunkstore/chunkstore.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import logging
from collections import defaultdict
from itertools import groupby
import time

import pymongo
from bson.binary import Binary
Expand All @@ -13,7 +14,7 @@
from .passthrough_chunker import PassthroughChunker
from .._util import indent, mongo_count, enable_sharding
from ..decorators import mongo_retry
from ..exceptions import NoDataFoundException
from ..exceptions import NoDataFoundException, ChunkStoreOutstandingTransaction
from ..serialization.numpy_arrays import FrametoArraySerializer, DATA, METADATA, COLUMNS

logger = logging.getLogger(__name__)
Expand All @@ -29,6 +30,7 @@
SERIALIZER = 'se'
CHUNKER = 'ch'
USERMETA = 'u'
TRANSACTION_START = 'ts'

MAX_CHUNK_SIZE = 15 * 1024 * 1024

Expand Down Expand Up @@ -76,6 +78,8 @@ def _ensure_index(self):
(START, pymongo.ASCENDING),
(END, pymongo.ASCENDING)],
unique=True, background=True)
self._trans.create_index([(SYMBOL, pymongo.ASCENDING)],
unique=True, background=True)

def __init__(self, arctic_lib):
self._arctic_lib = arctic_lib
Expand All @@ -92,6 +96,7 @@ def _reset(self):
self._symbols = self._collection.symbols
self._mdata = self._collection.metadata
self._audit = self._collection.audit
self._trans = self._collection.transactions

def __getstate__(self):
return {'arctic_lib': self._arctic_lib}
Expand Down Expand Up @@ -156,6 +161,7 @@ def delete(self, symbol, chunk_range=None, audit=None):
self._collection.delete_many(query)
self._symbols.delete_many(query)
self._mdata.delete_many(query)
self._trans.delete_many(query)

if audit is not None:
audit['symbol'] = symbol
Expand Down Expand Up @@ -255,6 +261,11 @@ def read(self, symbol, chunk_range=None, filter_data=True, **kwargs):
if not sym:
raise NoDataFoundException('No data found for %s' % (symbol))

for s in symbol:
transaction = self._trans.find_one({SYMBOL: s})
if transaction:
raise ChunkStoreOutstandingTransaction("Pending transaction on {}".format(s))

spec = {SYMBOL: {'$in': symbol}}
chunker = CHUNKER_MAP[sym[0][CHUNKER]]
deser = SER_MAP[sym[0][SERIALIZER]].deserialize
Expand Down Expand Up @@ -331,6 +342,10 @@ def write(self, symbol, item, metadata=None, chunker=DateChunker(), audit=None,
if not isinstance(item, (DataFrame, Series)):
raise Exception("Can only chunk DataFrames and Series")

transaction = self._trans.find_one({SYMBOL: symbol})
if transaction:
raise ChunkStoreOutstandingTransaction("Pending transaction on {}".format(symbol))

self._arctic_lib.check_quota()

previous_shas = []
Expand All @@ -351,8 +366,12 @@ def write(self, symbol, item, metadata=None, chunker=DateChunker(), audit=None,
ops = []
meta_ops = []
chunk_count = 0

trans_start = None
trans_end = None
for start, end, chunk_size, record in chunker.to_chunks(item, **kwargs):
if trans_start is None:
trans_start = start
trans_end = end
chunk_count += 1
data = self.serializer.serialize(record)
doc[CHUNK_SIZE] = chunk_size
Expand Down Expand Up @@ -383,6 +402,8 @@ def write(self, symbol, item, metadata=None, chunker=DateChunker(), audit=None,
# already exists, dont need to update in mongo
previous_shas.remove(chunk[SHA])

# Start transaction
mongo_retry(self._trans.insert_one)({SYMBOL: symbol, START: trans_start, END: trans_end, TRANSACTION_START: time.time()})
if ops:
self._collection.bulk_write(ops, ordered=False)
if meta_ops:
Expand All @@ -402,6 +423,9 @@ def write(self, symbol, item, metadata=None, chunker=DateChunker(), audit=None,
audit['action'] = 'write'
audit['chunks'] = chunk_count
self._audit.insert_one(audit)
# end transaction
mongo_retry(self._trans.delete_many)({SYMBOL: symbol})


def __update(self, sym, item, metadata=None, combine_method=None, chunk_range=None, audit=None):
'''
Expand All @@ -428,7 +452,12 @@ def __update(self, sym, item, metadata=None, combine_method=None, chunk_range=No

appended = 0
new_chunks = 0
trans_start = None
trans_end = None
for start, end, _, record in chunker.to_chunks(item, chunk_size=sym[CHUNK_SIZE]):
trans_end = end
if trans_start is None:
trans_start = start
# read out matching chunks
df = self.read(symbol, chunk_range=chunker.to_range(start, end), filter_data=False)
# assuming they exist, update them and store the original chunk
Expand Down Expand Up @@ -477,6 +506,9 @@ def __update(self, sym, item, metadata=None, combine_method=None, chunk_range=No
START: start,
END: end},
{'$set': meta}, upsert=True))

# Start transaction
mongo_retry(self._trans.insert_one)({SYMBOL: symbol, START: trans_start, END: trans_end, TRANSACTION_START: time.time()})
if ops:
self._collection.bulk_write(ops, ordered=False)
self._mdata.bulk_write(meta_ops, ordered=False)
Expand All @@ -489,6 +521,8 @@ def __update(self, sym, item, metadata=None, combine_method=None, chunk_range=No
if appended > 0:
audit['appended_rows'] = appended
self._audit.insert_one(audit)
# end transaction
mongo_retry(self._trans.delete_many)({SYMBOL: symbol})

def append(self, symbol, item, upsert=False, metadata=None, audit=None, **kwargs):
"""
Expand Down Expand Up @@ -754,3 +788,26 @@ def has_symbol(self, symbol):
bool
"""
return self._get_symbol_info(symbol) is not None

def transaction_recover(self, symbol):
"""
Remove partially written data in a failed transaction.

Will do nothing if transaction or symbol does not exist.

Parameters
----------
symbol: str
"""
sym = self._get_symbol_info(symbol)
trans = self._trans.find_one({SYMBOL: symbol})
if trans:
start = trans[START]
end = trans[END]

query = {SYMBOL: symbol}
date_range = CHUNKER_MAP[sym[CHUNKER]].to_range(start, end)
query.update(CHUNKER_MAP[sym[CHUNKER]].to_mongo(date_range))
self._collection.delete_many(query)
self._mdata.delete_many(query)
self._trans.delete_many({SYMBOL: symbol})
4 changes: 4 additions & 0 deletions arctic/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,7 @@ class AsyncArcticException(ArcticException):

class RequestDurationException(AsyncArcticException):
pass


class ChunkStoreOutstandingTransaction(ArcticException):
pass