Skip to content
Snippets Groups Projects

Progressbar

Merged rgrana requested to merge progressbar into parallel
1 file
+ 126
82
Compare changes
  • Side-by-side
  • Inline
+ 126
82
@@ -22,7 +22,7 @@ BulkRequests cli client
import concurrent.futures
from queue import Queue
import logging
import time
import progressbar
from bulkrequests import parser
from bulkrequests import constants
from bulkrequests import dcquery
@@ -32,125 +32,172 @@ logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
def process_request(paths, params):
def process_get_qos(connections, paths, params):
'''
manage bulkrequests calls according to user requeriments
manage get_qos request. Takes a queue of bulkrequests connections,
a list of paths and the parser params.
'''
result = dict()
result[constants.DISK_QOS] = 0
result[constants.TAPE_QOS] = 0
result[constants.DISKPLUSTAPE_QOS] = 0
future = list() # list to save future of threads
connections = Queue() # thread safe queue for bulkrequests connections
# create a queue with many connections as threads
for thread in range(params['threads']):
connections.put(dcquery.BulkRequests(params))
def get_qos(path):
'''
get_qos function for threading purposes
'''
if params['get_qos']:
connection = connections.get() # get a bulkrequest connection
qos = connection.get_qos(path) # get the the qos of the path
connections.put(connection)
result = dict()
result[constants.DISK_QOS] = 0
result[constants.TAPE_QOS] = 0
result[constants.DISKPLUSTAPE_QOS] = 0
return (path, qos)
def get_qos(path):
'''
get_qos function for threading purposes
'''
with concurrent.futures.ThreadPoolExecutor(max_workers=params['threads']) as executor:
if params['quiet']: # process request showing a progress bar
with progressbar.ProgressBar(max_value=progressbar.UnknownLength) as bar:
for count, path in enumerate(paths):
executor.submit(bar.update, count + 1)
future.append(executor.submit(get_qos, path))
connection = connections.get() # get a bulkrequest connection
qos = connection.get_qos(path) # get the the qos
connections.put(connection)
for qos in concurrent.futures.as_completed(future):
result[qos.result()[1]] += 1
return (path, qos)
print('{0} {1}'.format(constants.DISK_QOS, result[constants.DISK_QOS]))
print('{0} {1}'.format(constants.TAPE_QOS, result[constants.TAPE_QOS]))
print('{0} {1}'.format(constants.DISKPLUSTAPE_QOS, result[constants.DISKPLUSTAPE_QOS]))
with concurrent.futures.ThreadPoolExecutor(max_workers=params['threads']) as executor:
for path in paths:
else: # process request showing the processed paths
for count, path in enumerate(paths):
future.append(executor.submit(get_qos, path))
for qos in concurrent.futures.as_completed(future):
result[qos.result()[1]] += 1
if not params['quiet']:
for qos in concurrent.futures.as_completed(future):
result[qos.result()[1]] += 1
print("{0} {1}".format(qos.result()[0], qos.result()[1]))
if params['quiet']:
print('{0} {1}'.format(constants.DISK_QOS, result[constants.DISK_QOS]))
print('{0} {1}'.format(constants.TAPE_QOS, result[constants.TAPE_QOS]))
print('{0} {1}'.format(constants.DISKPLUSTAPE_QOS, result[constants.DISKPLUSTAPE_QOS]))
elif params['get_locality']:
def process_get_locality(connections, paths, params):
'''
manage get_locality request. Takes a queue of bulkrequests connections,
a list of paths and the parser params.
'''
result = dict()
result[constants.ONLINE_LOC] = 0
result[constants.NEARLINE_LOC] = 0
result[constants.ONLINE_AND_NEARLINE_LOC] = 0
result = dict()
result[constants.ONLINE_LOC] = 0
result[constants.NEARLINE_LOC] = 0
result[constants.ONLINE_AND_NEARLINE_LOC] = 0
def get_locality(path):
'''
get_qos function for threading purposes
'''
future = list() # list to save future of threads
connection = connections.get()
locality = connection.get_locality(path)
connections.put(connection)
def get_locality(path):
'''
get_qos function for threading purposes
'''
return (path, locality)
connection = connections.get()
locality = connection.get_locality(path)
connections.put(connection)
with concurrent.futures.ThreadPoolExecutor(max_workers=params['threads']) as executor:
for path in paths:
future.append(executor.submit(get_locality, path))
return (path, locality)
for locality in concurrent.futures.as_completed(future):
result[locality.result()[1]] += 1
with concurrent.futures.ThreadPoolExecutor(max_workers=params['threads']) as executor:
if params['quiet']: # process request showing a progress bar
with progressbar.ProgressBar(max_value=progressbar.UnknownLength) as bar:
for count, path in enumerate(paths):
executor.submit(bar.update, count + 1)
future.append(executor.submit(get_locality, path))
if not params['quiet']:
print("{0} {1}".format(locality.result()[0], locality.result()[1]))
for locality in concurrent.futures.as_completed(future):
result[locality.result()[1]] += 1
if params['quiet']:
print('{0} {1}'.format(constants.ONLINE_LOC, result[constants.ONLINE_LOC]))
print('{0} {1}'.format(constants.NEARLINE_LOC, result[constants.NEARLINE_LOC]))
print('{0} {1}'.format(constants.ONLINE_AND_NEARLINE_LOC, result[constants.ONLINE_AND_NEARLINE_LOC]))
elif params['set_qos'] == constants.DISKPLUSTAPE_QOS:
else: # process request showing the processed paths
for count, path in enumerate(paths):
future.append(executor.submit(get_locality, path))
for locality in concurrent.futures.as_completed(future):
result[locality.result()[1]] += 1
print("{0} {1}".format(locality.result()[0], locality.result()[1]))
def set_files_online(path):
'''
set_files_online function for threading purposes
'''
def process_set_qos(connections, paths, params):
'''
manage set_qos request. Takes a queue of bulkrequests connections,
a list of paths and the parser params.
'''
connection = connections.get()
connection.set_files_online(path)
connections.put(connection)
future = list() # list to save future of threads
return path
def set_files_online(path):
'''
set_files_online function for threading purposes
'''
with concurrent.futures.ThreadPoolExecutor(max_workers=params['threads']) as executor:
for path in paths:
future.append(executor.submit(set_files_online, path))
connection = connections.get()
connection.set_files_online(path)
connections.put(connection)
if not params['quiet']:
for path in concurrent.futures.as_completed(future):
print("{0} {1}".format(path.result(), constants.DISKPLUSTAPE_QOS))
return path
elif params['set_qos'] == constants.TAPE_QOS:
def set_files_offline(path):
'''
set_files_offline function for threading purposes
'''
connection = connections.get()
connection.set_files_offline(path)
connections.put(connection)
def set_files_offline(path):
'''
set_files_offline function for threading purposes
'''
return path
connection = connections.get()
connection.set_files_offline(path)
connections.put(connection)
if params['set_qos'] == constants.TAPE_QOS:
new_qos = set_files_offline
else:
new_qos = set_files_online
return path
with concurrent.futures.ThreadPoolExecutor(max_workers=params['threads']) as executor:
if params['quiet']: # process request showing a progress bar
with progressbar.ProgressBar(max_value=progressbar.UnknownLength) as bar:
for count, path in enumerate(paths):
executor.submit(bar.update, count + 1)
future.append(executor.submit(new_qos, path))
with concurrent.futures.ThreadPoolExecutor(max_workers=params['threads']) as executor:
else: # process request showing the processed paths
for path in paths:
future.append(executor.submit(set_files_offline, path))
future.append(executor.submit(new_qos, path))
if not params['quiet']:
for path in concurrent.futures.as_completed(future):
print("{0} {1}".format(path.result(), constants.TAPE_QOS))
print("{0} {1}".format(path.result(), params['set_qos']))
def process_request(paths, params):
'''
manage bulkrequests calls according to user requeriments
'''
connections = Queue() # thread safe queue for bulkrequests connections
# create a queue with many connections as threads
for thread in range(params['threads']):
connections.put(dcquery.BulkRequests(params))
if params['get_qos']:
process_get_qos(connections, paths, params)
elif params['get_locality']:
process_get_locality(connections, paths, params)
elif params['set_qos'] == constants.DISKPLUSTAPE_QOS:
process_set_qos(connections, paths, params)
elif params['set_qos'] == constants.TAPE_QOS:
process_set_qos(connections, paths, params)
elif params['set_qos'] == constants.DISK_QOS:
logging.error('This option is not implemented yet')
@@ -164,7 +211,6 @@ def main():
main function
'''
t_inicial = time.time()
try:
bkparser = parser.BulkRequestsParser()
parse_ok = bkparser.get_parameters()
@@ -181,9 +227,7 @@ def main():
paths = bulkrequest.remove_dirs(bkparser.filenames)
process_request(paths, params)
t_final = time.time()
print('main function took: {0}'.format(t_final - t_inicial))
except KeyboardInterrupt:
pass