diff --git a/pydat/scripts/elasticsearch_populate.py b/pydat/scripts/elasticsearch_populate.py index 52066ca..1f323b0 100755 --- a/pydat/scripts/elasticsearch_populate.py +++ b/pydat/scripts/elasticsearch_populate.py @@ -116,14 +116,36 @@ def stats_worker(stats_queue): ###### ELASTICSEARCH PROCESS ###### -def es_bulk_thread(bulk_request_queue, options): +def es_bulk_shipper_proc(bulk_request_queue, position, options): + os.setpgrp() + global bulkError_event - es = connectElastic(options.es_uri) + + es = connectElastic(options.es_uri[position % len(options.es_uri)]) while 1: try: bulk_request = bulk_request_queue.get() #sys.stdout.write("Making bulk request\n") - resp = es.bulk(body=bulk_request) + + try: + resp = es.bulk(body=bulk_request) + except Exception as e: + with open('/tmp/pydat-bulk-%s-%s.txt' % (options.identifier, uuid.uuid1()), 'wb') as f: + for request in bulk_request: + f.write('%s\n' % json.dumps(request)) + + if not bulkError_event.is_set(): + bulkError_event.set() + sys.stdout.write("\nErrors making bulk api request!!\nBulk Requests saved to disk (/tmp/pydat-bulk--.txt) and should be submitted manually!!\n") + sys.stdout.write("It is possible you are running too many bulk workers or the bulk size is too big!\n") + sys.stdout.write("ElasticSearch Bulk Syntax (using curl):\n\tcurl -s -XPOST /_bulk --data-binary @\n") + sys.stdout.write("Exception: %s\n" % str(e)) + + continue + + # Handle any errors that arise from making the bulk requests + # Some errors are caused by duplicate data, others by ES inconsistency when index refresh is set to a high number + # ignore those errors since they shouldn't be too detrimental. try: if 'errors' in resp and resp['errors']: twoliners = ['create', 'update', 'index'] @@ -132,6 +154,12 @@ def es_bulk_thread(bulk_request_queue, options): for item in resp['items']: key = list(item.keys())[0] + # Only write out those requests that actually failed -- technically requests are idempotent now + # so you could run the entire bulk request again to no ill effect. + # 404's seem to arise from updates or deletes when the record no longer exists usually caused + # by the refresh time or duplicates + # 409's are caused by creates of entries that already exist, either caused by duplicates + # or by refresh issues if not str(item[key]['status']).startswith('2') and item[key]['status'] not in [404, 409]: bulkOut.write('%s\n' % json.dumps(bulk_request[original_request_position])) if key in twoliners: @@ -153,7 +181,6 @@ def es_bulk_thread(bulk_request_queue, options): sys.stdout.write("ElasticSearch Bulk Syntax (using curl):\n\tcurl -s -XPOST /_bulk --data-binary @\n") bulkOut.close() - except Exception as e: sys.stdout.write("Unhandled Exception attempting to handle error from bulk import: %s %s\n" % (str(e), traceback.format_exc())) #sys.stdout.write("Bulk request Complete\n") @@ -162,7 +189,7 @@ def es_bulk_thread(bulk_request_queue, options): finally: bulk_request_queue.task_done() -def es_worker(insert_queue, options): +def es_serializer_proc(insert_queue, bulk_request_queue, options): #Ignore signals that are sent to parent process #The parent should properly shut this down os.setpgrp() @@ -173,14 +200,6 @@ def es_worker(insert_queue, options): finishup = False bulk_request = [] - # Allow the queue to back up a bit - bulk_request_queue = queue.Queue(maxsize = 2 * options.bulk_threads) - - for i in range(options.bulk_threads): - bulkThread = Thread(target = es_bulk_thread, args = (bulk_request_queue, options)) - bulkThread.daemon = True - bulkThread.start() - while 1: try: request = insert_queue.get_nowait() @@ -199,7 +218,7 @@ def es_worker(insert_queue, options): except queue.Empty as e: if finished_event.is_set(): break - time.sleep(.01) + time.sleep(.001) # Send whatever is left if bulk_counter > 0: @@ -617,18 +636,18 @@ def main(): parser.add_argument("-i", "--identifier", action="store", dest="identifier", type=int, default=None, help="Numerical identifier to use in update to signify version (e.g., '8' or '20140120')") parser.add_argument("-B", "--bulk-size", action="store", dest="bulk_size", type=int, - default=5000, help="Size of Bulk Insert Requests") + default=5000, help="Size of Bulk Elasticsearch Requests") parser.add_argument("--optimize-import", action="store_true", dest="optimize_import", default=False, help="If enabled, will change ES index settings to speed up bulk imports, but if the cluster has a failure, data might be lost permanently!") parser.add_argument("-t", "--threads", action="store", dest="threads", type=int, - default=2, help="Number of workers, defaults to 2. Note that each worker will increase the load on your ES cluster") - parser.add_argument("--bulk-procs", action="store", dest="bulk_procs", type=int, - default=1, help="How many processes to spawn to handle ES messages. Increase this if your messaging rate is maxing out your per-core cpu utilization") + default=2, help="Number of workers, defaults to 2. Note that each worker will increase the load on your ES cluster since it will try to lookup whatever record it is working on in ES") + parser.add_argument("--bulk-serializers", action="store", dest="bulk_serializers", type=int, + default=1, help="How many threads to spawn to combine messages from workers. Only increase this if you're are running a lot of workers and one cpu is unable to keep up with the load") parser.add_argument("--bulk-threads", action="store", dest="bulk_threads", type=int, - default=1, help="How many threads per bulk process to use for making bulk requests to ES. Increase this if your cluster can handle more simultaneous requests") + default=1, help="How many threads to spawn to send bulk ES messages. The larger your cluster, the more you can increase this") parser.add_argument("--enable-delta-indexes", action="store_true", dest="enable_delta_indexes", - default=False, help="If enabled, will put changed entries in a separate index. These indexes can be safely deleted if space is an issue") + default=False, help="If enabled, will put changed entries in a separate index. These indexes can be safely deleted if space is an issue, also provides some other improvements") options = parser.parse_args() @@ -639,7 +658,8 @@ def main(): threads = [] work_queue = jmpQueue(maxsize=options.bulk_size * options.threads) - insert_queue = jmpQueue(maxsize=options.bulk_size * options.bulk_procs * options.bulk_threads) + insert_queue = jmpQueue(maxsize=options.bulk_size * options.bulk_threads) + bulk_request_queue = jmpQueue(maxsize = 2 * options.bulk_threads) stats_queue = mpQueue() meta_index_name = '@' + options.index_prefix + "_meta" @@ -874,13 +894,24 @@ def main(): threads.append(t) #No need to update lastVersion or create metadata entry - #Start up the Elasticsearch Bulk Processors - es_workers = [] - for i in range(options.bulk_procs): - es_worker_thread = Process(target=es_worker, args=(insert_queue, options)) - es_worker_thread.daemon = True - es_worker_thread.start() - es_workers.append(es_worker_thread) + # Start up the Elasticsearch Bulk Serializers + # Its job is just to combine work into bulk-sized chunks to be sent to the bulk API + # One serializer should be enough for a lot of workers, but anyone with a super large cluster might + # be able to run a lot of workers which can subsequently overwhelm a single serializer + es_serializers = [] + for i in range(options.bulk_serializers): + es_serializer = Process(target=es_serializer_proc, args=(insert_queue, bulk_request_queue, options)) + es_serializer.start() + es_serializers.append(es_serializer) + + # Start up ES Bulk Shippers, each in their own process + # As far as I can tell there's an issue (bug? feature?) that causes every request made to ES to hinder the entire process even if it's in a separate python thread + # not sure if this is GIL related or not, but instead of debugging how the elasticsearch library or urllib does things + # its easier to just spawn a separate process for every connection being made to ES + for i in range(options.bulk_threads): + es_bulk_shipper = Process(target=es_bulk_shipper_proc, args=(bulk_request_queue, i, options)) + es_bulk_shipper.daemon = True + es_bulk_shipper.start() stats_worker_thread = Thread(target=stats_worker, args=(stats_queue,), name = 'Stats') stats_worker_thread.daemon = True @@ -922,8 +953,12 @@ def main(): for t in threads: t.join() - for t in es_workers: - t.join() + # Wait for the es serializer(s) to package up all of the bulk requests + for es_serializer in es_serializers: + es_serializer.join() + + # Wait for shippers to send all bulk requests + bulk_request_queue.join() # Change settings back unOptimizeIndexes(es, data_template, options) @@ -932,17 +967,20 @@ def main(): stats_worker_thread.join() #Update the stats - es.update(index=meta_index_name, id=options.identifier, - doc_type='meta', - body = { 'doc': { - 'total' : STATS['total'], - 'new' : STATS['new'], - 'updated' : STATS['updated'], - 'unchanged' : STATS['unchanged'], - 'duplicates': STATS['duplicates'], - 'changed_stats': CHANGEDCT - }} - ); + try: + es.update(index=meta_index_name, id=options.identifier, + doc_type='meta', + body = { 'doc': { + 'total' : STATS['total'], + 'new' : STATS['new'], + 'updated' : STATS['updated'], + 'unchanged' : STATS['unchanged'], + 'duplicates': STATS['duplicates'], + 'changed_stats': CHANGEDCT + }} + ); + except Exception as e: + sys.stdout.write("Error attempting to update stats: %s\n" % str(e)) except KeyboardInterrupt: pass @@ -979,7 +1017,7 @@ def main(): # The worker threads will exit on their own after getting the shutdown_event # Joining on the insert queue is important to ensure ES isn't left in an inconsistent state if delta indexes are being used - # since it 'move' documents from one index to another which involves an insert and a delete + # since it 'moves' documents from one index to another which involves an insert and a delete insert_queue.join() # All of the workers should have seen the shutdown event and exited after finishing whatever they were last working on @@ -992,12 +1030,16 @@ def main(): stats_worker_thread.join() sys.stdout.write("\tWaiting for ElasticSearch bulk uploads to finish ... \n") - # The ES workers do not recognize the shutdown event only the graceful finished_event + # The ES serializer does not recognize the shutdown event only the graceful finished_event # so set the event so it can gracefully shutdown finished_event.set() - # Wait for bulk uploads to finish, otherwise ES can be left in an inconsistent state - for es_worker_thread in es_workers: - es_worker_thread.join() + + # Wait for es serializer(s) to package up all bulk requests + for es_serializer in es_serializers: + es_serializer.join() + + # Wait for shippers to send all bulk requests, otherwise ES might be left in an inconsistent state + bulk_request_queue.join() #Attempt to update the stats #XXX