Skip to content

Commit

Permalink
Changing bulk-procs to bulk-serializers. Not a fan of changing flags/…
Browse files Browse the repository at this point in the history
…arguments but I *finally* figured out what the issue was that was causing bottlenecks in the elasticsearch processes. Changed code to isolate the issue. So now there's bulk-serializers and bulk-threads. bulk-serializers should only be changed if you're running a lot of workers. bulk-threads should probably be set to a multiple of how many elasticsearch nodes you pass in via the -u flag (e.g., if you pass in 3 nodes, set bulk-threads to a multiple of 3). The process is considerably faster now.
  • Loading branch information
Mraoul committed Apr 7, 2016
1 parent d709b7a commit e051257
Showing 1 changed file with 88 additions and 46 deletions.
134 changes: 88 additions & 46 deletions pydat/scripts/elasticsearch_populate.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,14 +116,36 @@ def stats_worker(stats_queue):

###### ELASTICSEARCH PROCESS ######

def es_bulk_thread(bulk_request_queue, options):
def es_bulk_shipper_proc(bulk_request_queue, position, options):
os.setpgrp()

global bulkError_event
es = connectElastic(options.es_uri)

es = connectElastic(options.es_uri[position % len(options.es_uri)])
while 1:
try:
bulk_request = bulk_request_queue.get()
#sys.stdout.write("Making bulk request\n")
resp = es.bulk(body=bulk_request)

try:
resp = es.bulk(body=bulk_request)
except Exception as e:
with open('/tmp/pydat-bulk-%s-%s.txt' % (options.identifier, uuid.uuid1()), 'wb') as f:
for request in bulk_request:
f.write('%s\n' % json.dumps(request))

if not bulkError_event.is_set():
bulkError_event.set()
sys.stdout.write("\nErrors making bulk api request!!\nBulk Requests saved to disk (/tmp/pydat-bulk-<identifier>-<random>.txt) and should be submitted manually!!\n")
sys.stdout.write("It is possible you are running too many bulk workers or the bulk size is too big!\n")
sys.stdout.write("ElasticSearch Bulk Syntax (using curl):\n\tcurl -s -XPOST <es_server:port>/_bulk --data-binary @<bulk file name>\n")
sys.stdout.write("Exception: %s\n" % str(e))

continue

# Handle any errors that arise from making the bulk requests
# Some errors are caused by duplicate data, others by ES inconsistency when index refresh is set to a high number
# ignore those errors since they shouldn't be too detrimental.
try:
if 'errors' in resp and resp['errors']:
twoliners = ['create', 'update', 'index']
Expand All @@ -132,6 +154,12 @@ def es_bulk_thread(bulk_request_queue, options):

for item in resp['items']:
key = list(item.keys())[0]
# Only write out those requests that actually failed -- technically requests are idempotent now
# so you could run the entire bulk request again to no ill effect.
# 404's seem to arise from updates or deletes when the record no longer exists usually caused
# by the refresh time or duplicates
# 409's are caused by creates of entries that already exist, either caused by duplicates
# or by refresh issues
if not str(item[key]['status']).startswith('2') and item[key]['status'] not in [404, 409]:
bulkOut.write('%s\n' % json.dumps(bulk_request[original_request_position]))
if key in twoliners:
Expand All @@ -153,7 +181,6 @@ def es_bulk_thread(bulk_request_queue, options):
sys.stdout.write("ElasticSearch Bulk Syntax (using curl):\n\tcurl -s -XPOST <es_server:port>/_bulk --data-binary @<bulk file name>\n")

bulkOut.close()

except Exception as e:
sys.stdout.write("Unhandled Exception attempting to handle error from bulk import: %s %s\n" % (str(e), traceback.format_exc()))
#sys.stdout.write("Bulk request Complete\n")
Expand All @@ -162,7 +189,7 @@ def es_bulk_thread(bulk_request_queue, options):
finally:
bulk_request_queue.task_done()

def es_worker(insert_queue, options):
def es_serializer_proc(insert_queue, bulk_request_queue, options):
#Ignore signals that are sent to parent process
#The parent should properly shut this down
os.setpgrp()
Expand All @@ -173,14 +200,6 @@ def es_worker(insert_queue, options):
finishup = False
bulk_request = []

# Allow the queue to back up a bit
bulk_request_queue = queue.Queue(maxsize = 2 * options.bulk_threads)

for i in range(options.bulk_threads):
bulkThread = Thread(target = es_bulk_thread, args = (bulk_request_queue, options))
bulkThread.daemon = True
bulkThread.start()

while 1:
try:
request = insert_queue.get_nowait()
Expand All @@ -199,7 +218,7 @@ def es_worker(insert_queue, options):
except queue.Empty as e:
if finished_event.is_set():
break
time.sleep(.01)
time.sleep(.001)

# Send whatever is left
if bulk_counter > 0:
Expand Down Expand Up @@ -617,18 +636,18 @@ def main():
parser.add_argument("-i", "--identifier", action="store", dest="identifier", type=int,
default=None, help="Numerical identifier to use in update to signify version (e.g., '8' or '20140120')")
parser.add_argument("-B", "--bulk-size", action="store", dest="bulk_size", type=int,
default=5000, help="Size of Bulk Insert Requests")
default=5000, help="Size of Bulk Elasticsearch Requests")
parser.add_argument("--optimize-import", action="store_true", dest="optimize_import",
default=False, help="If enabled, will change ES index settings to speed up bulk imports, but if the cluster has a failure, data might be lost permanently!")

parser.add_argument("-t", "--threads", action="store", dest="threads", type=int,
default=2, help="Number of workers, defaults to 2. Note that each worker will increase the load on your ES cluster")
parser.add_argument("--bulk-procs", action="store", dest="bulk_procs", type=int,
default=1, help="How many processes to spawn to handle ES messages. Increase this if your messaging rate is maxing out your per-core cpu utilization")
default=2, help="Number of workers, defaults to 2. Note that each worker will increase the load on your ES cluster since it will try to lookup whatever record it is working on in ES")
parser.add_argument("--bulk-serializers", action="store", dest="bulk_serializers", type=int,
default=1, help="How many threads to spawn to combine messages from workers. Only increase this if you're are running a lot of workers and one cpu is unable to keep up with the load")
parser.add_argument("--bulk-threads", action="store", dest="bulk_threads", type=int,
default=1, help="How many threads per bulk process to use for making bulk requests to ES. Increase this if your cluster can handle more simultaneous requests")
default=1, help="How many threads to spawn to send bulk ES messages. The larger your cluster, the more you can increase this")
parser.add_argument("--enable-delta-indexes", action="store_true", dest="enable_delta_indexes",
default=False, help="If enabled, will put changed entries in a separate index. These indexes can be safely deleted if space is an issue")
default=False, help="If enabled, will put changed entries in a separate index. These indexes can be safely deleted if space is an issue, also provides some other improvements")

options = parser.parse_args()

Expand All @@ -639,7 +658,8 @@ def main():

threads = []
work_queue = jmpQueue(maxsize=options.bulk_size * options.threads)
insert_queue = jmpQueue(maxsize=options.bulk_size * options.bulk_procs * options.bulk_threads)
insert_queue = jmpQueue(maxsize=options.bulk_size * options.bulk_threads)
bulk_request_queue = jmpQueue(maxsize = 2 * options.bulk_threads)
stats_queue = mpQueue()

meta_index_name = '@' + options.index_prefix + "_meta"
Expand Down Expand Up @@ -874,13 +894,24 @@ def main():
threads.append(t)
#No need to update lastVersion or create metadata entry

#Start up the Elasticsearch Bulk Processors
es_workers = []
for i in range(options.bulk_procs):
es_worker_thread = Process(target=es_worker, args=(insert_queue, options))
es_worker_thread.daemon = True
es_worker_thread.start()
es_workers.append(es_worker_thread)
# Start up the Elasticsearch Bulk Serializers
# Its job is just to combine work into bulk-sized chunks to be sent to the bulk API
# One serializer should be enough for a lot of workers, but anyone with a super large cluster might
# be able to run a lot of workers which can subsequently overwhelm a single serializer
es_serializers = []
for i in range(options.bulk_serializers):
es_serializer = Process(target=es_serializer_proc, args=(insert_queue, bulk_request_queue, options))
es_serializer.start()
es_serializers.append(es_serializer)

# Start up ES Bulk Shippers, each in their own process
# As far as I can tell there's an issue (bug? feature?) that causes every request made to ES to hinder the entire process even if it's in a separate python thread
# not sure if this is GIL related or not, but instead of debugging how the elasticsearch library or urllib does things
# its easier to just spawn a separate process for every connection being made to ES
for i in range(options.bulk_threads):
es_bulk_shipper = Process(target=es_bulk_shipper_proc, args=(bulk_request_queue, i, options))
es_bulk_shipper.daemon = True
es_bulk_shipper.start()

stats_worker_thread = Thread(target=stats_worker, args=(stats_queue,), name = 'Stats')
stats_worker_thread.daemon = True
Expand Down Expand Up @@ -922,8 +953,12 @@ def main():
for t in threads:
t.join()

for t in es_workers:
t.join()
# Wait for the es serializer(s) to package up all of the bulk requests
for es_serializer in es_serializers:
es_serializer.join()

# Wait for shippers to send all bulk requests
bulk_request_queue.join()

# Change settings back
unOptimizeIndexes(es, data_template, options)
Expand All @@ -932,17 +967,20 @@ def main():
stats_worker_thread.join()

#Update the stats
es.update(index=meta_index_name, id=options.identifier,
doc_type='meta',
body = { 'doc': {
'total' : STATS['total'],
'new' : STATS['new'],
'updated' : STATS['updated'],
'unchanged' : STATS['unchanged'],
'duplicates': STATS['duplicates'],
'changed_stats': CHANGEDCT
}}
);
try:
es.update(index=meta_index_name, id=options.identifier,
doc_type='meta',
body = { 'doc': {
'total' : STATS['total'],
'new' : STATS['new'],
'updated' : STATS['updated'],
'unchanged' : STATS['unchanged'],
'duplicates': STATS['duplicates'],
'changed_stats': CHANGEDCT
}}
);
except Exception as e:
sys.stdout.write("Error attempting to update stats: %s\n" % str(e))
except KeyboardInterrupt:
pass

Expand Down Expand Up @@ -979,7 +1017,7 @@ def main():
# The worker threads will exit on their own after getting the shutdown_event

# Joining on the insert queue is important to ensure ES isn't left in an inconsistent state if delta indexes are being used
# since it 'move' documents from one index to another which involves an insert and a delete
# since it 'moves' documents from one index to another which involves an insert and a delete
insert_queue.join()

# All of the workers should have seen the shutdown event and exited after finishing whatever they were last working on
Expand All @@ -992,12 +1030,16 @@ def main():
stats_worker_thread.join()

sys.stdout.write("\tWaiting for ElasticSearch bulk uploads to finish ... \n")
# The ES workers do not recognize the shutdown event only the graceful finished_event
# The ES serializer does not recognize the shutdown event only the graceful finished_event
# so set the event so it can gracefully shutdown
finished_event.set()
# Wait for bulk uploads to finish, otherwise ES can be left in an inconsistent state
for es_worker_thread in es_workers:
es_worker_thread.join()

# Wait for es serializer(s) to package up all bulk requests
for es_serializer in es_serializers:
es_serializer.join()

# Wait for shippers to send all bulk requests, otherwise ES might be left in an inconsistent state
bulk_request_queue.join()

#Attempt to update the stats
#XXX
Expand Down

0 comments on commit e051257

Please sign in to comment.