Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyndl/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
__author__ = ('Konstantin Sering, Marc Weitz, '
'David-Elias Künstle, Lennard Schneider')
__author_email__ = '[email protected]'
__version__ = '0.7.2'
__version__ = '0.8.0'
__license__ = 'MIT'
__description__ = ('Naive discriminative learning implements learning and '
'classification models based on the Rescorla-Wagner '
Expand Down
14 changes: 8 additions & 6 deletions pyndl/error_codes.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ cdef enum ErrorCode:
MAGIC_NUMBER_DOES_NOT_MATCH = 1
VERSION_NUMBER_DOES_NOT_MATCH = 2
INITIAL_ERROR_CODE = 3
START_LARGER_END = 4


ERROR_CODES = """
NO_ERROR = 0
MAGIC_NUMBER_DOES_NOT_MATCH = 1
VERSION_NUMBER_DOES_NOT_MATCH = 2
INITIAL_ERROR_CODE = 3
"""
#ERROR_CODES = """
# NO_ERROR = 0
# MAGIC_NUMBER_DOES_NOT_MATCH = 1
# VERSION_NUMBER_DOES_NOT_MATCH = 2
# INITIAL_ERROR_CODE = 3
# START_LARGER_END = 4
# """
123 changes: 107 additions & 16 deletions pyndl/ndl.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,43 @@ def events_from_file(event_path):
return io.events_from_file(event_path)


def _create_cue_outcome_map(cues, outcomes, cues_to_mask, old_cues=frozenset(), old_outcomes=frozenset()):
"""This function returns a cue_map and an outcome_map with the cues_to_mask
placed in the first mask_up_to_excluding index."""

cues = set(cues) | set(old_cues)
outcomes = set(outcomes) | set(old_outcomes)

# cues to mask have to come in the beginning of the cue_map and we need the
# same amount of indices in the outcome_map
if cues_to_mask == 'all':
cues_to_mask = set(cues)
elif cues_to_mask is None:
cues_to_mask = set()
mask_up_to_excluding = len(cues_to_mask) # the highest index that should be masked (excluding)
cues_not_to_mask = set(cues) - cues_to_mask
outcomes_not_to_mask = set(outcomes) - cues_to_mask

# fix order of sets
cues_to_mask = list(cues_to_mask)
cues_not_to_mask = list(cues_not_to_mask)
outcomes_not_to_mask = list(outcomes_not_to_mask)

# reassamble cues and outcomes with specific ordering now
cues = cues_to_mask + cues_not_to_mask
outcomes = cues_to_mask + outcomes_not_to_mask

cue_map = OrderedDict(((cue, ii) for ii, cue in enumerate(cues)))
outcome_map = OrderedDict(((outcome, ii) for ii, outcome in enumerate(outcomes)))

return mask_up_to_excluding, cue_map, outcome_map


def ndl(events, alpha, betas, lambda_=1.0, *,
method='openmp', weights=None,
number_of_threads=8, len_sublists=10, remove_duplicates=None,
verbose=False, temporary_directory=None,
events_per_temporary_file=10000000):
events_per_temporary_file=10000000, cues_to_mask=None):
"""
Calculate the weights for all_outcomes over all events in event_file
given by the files path.
Expand Down Expand Up @@ -85,7 +117,11 @@ def ndl(events, alpha, betas, lambda_=1.0, *,
if none is provided, the operating system's default will
be used (/tmp on unix)
events_per_temporary_file: int
Number of events in each temporary binary file. Has to be larger than 1
number of events in each temporary binary file. Has to be larger than 1
cues_to_mask: set of cues or None or 'all'
if None no masking is applied, otherwise all cues are masked from
themselfes if they appear as outcomes as well in the learning events,
'all' indicates that all cues should be masked

Returns
-------
Expand All @@ -111,30 +147,33 @@ def ndl(events, alpha, betas, lambda_=1.0, *,
verbose=verbose)
cues = list(cues.keys())
outcomes = list(outcomes.keys())
cue_map = OrderedDict(((cue, ii) for ii, cue in enumerate(cues)))
outcome_map = OrderedDict(((outcome, ii) for ii, outcome in enumerate(outcomes)))

all_outcome_indices = [outcome_map[outcome] for outcome in outcomes]

shape = (len(outcome_map), len(cue_map))

# initialize weights
if weights is None:
mask_up_to_excluding, cue_map, outcome_map = _create_cue_outcome_map(cues, outcomes, cues_to_mask)
shape = (len(outcome_map), len(cue_map))
weights = np.ascontiguousarray(np.zeros(shape, dtype=np.float64, order='C'))
elif isinstance(weights, xr.DataArray):
old_cues = weights.coords["cues"].values.tolist()
new_cues = list(set(cues) - set(old_cues))
old_outcomes = weights.coords["outcomes"].values.tolist()
new_outcomes = list(set(outcomes) - set(old_outcomes))

if cues_to_mask is None:
mask_up_to_excluding = 0
else:
mask_up_to_excluding, cue_map, outcome_map = _create_cue_outcome_map(cues, outcomes,
cues_to_mask, old_cues, old_outcomes)
# TODO: allocate weights and copy them cell wise from the old
# weights to the new weights
raise NotImplementedError('continue learning is not implemented for masking right now')

new_cues = list(set(cues) - set(old_cues))
new_outcomes = list(set(outcomes) - set(old_outcomes))
cues = old_cues + new_cues
outcomes = old_outcomes + new_outcomes

cue_map = OrderedDict(((cue, ii) for ii, cue in enumerate(cues)))
outcome_map = OrderedDict(((outcome, ii) for ii, outcome in enumerate(outcomes)))

all_outcome_indices = [outcome_map[outcome] for outcome in outcomes]

weights_tmp = np.concatenate((weights.values,
np.zeros((len(new_outcomes), len(old_cues)),
dtype=np.float64, order='C')),
Expand Down Expand Up @@ -168,16 +207,25 @@ def ndl(events, alpha, betas, lambda_=1.0, *,
if verbose:
print('start learning...')
# learning
all_outcome_indices_masked = list(range(mask_up_to_excluding))
all_outcome_indices_normal = list(range(mask_up_to_excluding, len(outcome_map)))
if method == 'openmp':
if sys.platform.startswith('darwin'):
raise NotImplementedError("OpenMP does not work under MacOs yet."
"Use method='threading' instead.")
# 1. learn masked indices
ndl_openmp.learn_inplace_masked(binary_files, weights, alpha,
beta1, beta2, lambda_,
np.array(all_outcome_indices_masked, dtype=np.uint32),
len_sublists, number_of_threads)
# 2. learn normal
ndl_openmp.learn_inplace(binary_files, weights, alpha,
beta1, beta2, lambda_,
np.array(all_outcome_indices, dtype=np.uint32),
np.array(all_outcome_indices_normal, dtype=np.uint32),
len_sublists, number_of_threads)
elif method == 'threading':
part_lists = slice_list(all_outcome_indices, len_sublists)
# 1. learn all masked indices
part_lists = slice_list(all_outcome_indices_masked, len_sublists)

working_queue = Queue(len(part_lists))
threads = []
Expand All @@ -189,7 +237,7 @@ def worker():
if working_queue.empty():
break
data = working_queue.get()
ndl_parallel.learn_inplace(binary_files, weights, alpha,
ndl_parallel.learn_inplace_masked(binary_files, weights, alpha,
beta1, beta2, lambda_, data)

with queue_lock:
Expand All @@ -203,6 +251,36 @@ def worker():

for thread in threads:
thread.join()

# 2. learn all normal
part_lists = slice_list(all_outcome_indices_normal, len_sublists)

working_queue = Queue(len(part_lists))
threads = []
queue_lock = threading.Lock()

def worker():
while True:
with queue_lock:
if working_queue.empty():
break
data = working_queue.get()
ndl_parallel.learn_inplace(binary_files, weights,
alpha, beta1, beta2,
lambda_, data)

with queue_lock:
for partlist in part_lists:
working_queue.put(np.array(partlist, dtype=np.uint32))

for _ in range(number_of_threads):
thread = threading.Thread(target=worker)
thread.start()
threads.append(thread)

for thread in threads:
thread.join()

else:
raise ValueError('method needs to be either "threading" or "openmp"')

Expand All @@ -220,6 +298,9 @@ def worker():
__name__ + "." + ndl.__name__, method=method, attrs=attrs_to_be_updated)

# post-processing
# we have to extract the right ordering
cues = list(cue_map.keys())
outcomes = list(outcome_map.keys())
weights = xr.DataArray(weights, [('outcomes', outcomes), ('cues', cues)],
attrs=attrs)
return weights
Expand Down Expand Up @@ -312,7 +393,7 @@ def attrs(self, attrs):

def dict_ndl(events, alphas, betas, lambda_=1.0, *,
weights=None, inplace=False, remove_duplicates=None,
make_data_array=False, verbose=False):
make_data_array=False, verbose=False, cues_to_mask=None):
"""
Calculate the weights for all_outcomes over all events in event_file.

Expand Down Expand Up @@ -347,6 +428,10 @@ def dict_ndl(events, alphas, betas, lambda_=1.0, *,
if True makes a xarray.DataArray out of the dict of dicts.
verbose : bool
print some output if True.
cues_to_mask: set of cues or None or 'all'
if None no masking is applied, otherwise all cues are masked from
themselfes if they appear as outcomes as well in the learning events,
'all' indicates that all cues should be masked

Returns
-------
Expand All @@ -370,6 +455,9 @@ def dict_ndl(events, alphas, betas, lambda_=1.0, *,
if not (remove_duplicates is None or isinstance(remove_duplicates, bool)):
raise ValueError("remove_duplicates must be None, True or False")

if cues_to_mask is None:
cues_to_mask = set()

wall_time_start = time.perf_counter()
cpu_time_start = time.process_time()
if isinstance(events, str):
Expand Down Expand Up @@ -434,6 +522,9 @@ def dict_ndl(events, alphas, betas, lambda_=1.0, *,
else:
update = beta2 * (0 - association_strength)
for cue in cues:
if cues_to_mask == 'all' or cue in cues_to_mask:
if cue == outcome:
continue
weights[outcome][cue] += alphas[cue] * update

cpu_time_stop = time.process_time()
Expand Down
56 changes: 53 additions & 3 deletions pyndl/ndl_openmp.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ ctypedef np.float64_t dtype_t
cimport cython
from cython.parallel cimport parallel, prange

from ndl_parallel cimport learn_inplace_ptr
from error_codes cimport ErrorCode, NO_ERROR, INITIAL_ERROR_CODE, ERROR_CODES
from ndl_parallel cimport learn_inplace_ptr, learn_inplace_masked_ptr
from error_codes cimport ErrorCode, NO_ERROR, INITIAL_ERROR_CODE


def learn_inplace(binary_file_paths, np.ndarray[dtype_t, ndim=2] weights,
Expand All @@ -23,6 +23,8 @@ def learn_inplace(binary_file_paths, np.ndarray[dtype_t, ndim=2] weights,
cdef unsigned int start_val, end_val, ii, number_parts
cdef ErrorCode error = INITIAL_ERROR_CODE

if length_all_outcomes == 0:
return

# cdef String
# weights muss contigousarray sein und mode=c, siehe:
Expand All @@ -48,4 +50,52 @@ def learn_inplace(binary_file_paths, np.ndarray[dtype_t, ndim=2] weights,
break

if (error != NO_ERROR):
raise IOError(f'binary files does not have proper format, error code {error}\n{ERROR_CODES}')
raise IOError(f'binary files does not have proper format, error code {error}')


# The masked versions where learning is ignored when cue and outcome have the
# same index. The code is copied to not take the penalty for the if statement
# in the innerst loop in the case where no masking is applied.

def learn_inplace_masked(binary_file_paths, np.ndarray[dtype_t, ndim=2] weights,
dtype_t alpha, dtype_t beta1,
dtype_t beta2, dtype_t lambda_,
np.ndarray[unsigned int, ndim=1] all_outcomes,
unsigned int chunksize,
unsigned int number_of_threads):

cdef unsigned int mm = weights.shape[1] # number of cues == columns
cdef unsigned int* all_outcomes_ptr = <unsigned int *> all_outcomes.data
cdef unsigned int length_all_outcomes = all_outcomes.shape[0]
cdef char* fname
cdef unsigned int start_val, end_val, ii, number_parts
cdef ErrorCode error = INITIAL_ERROR_CODE

if length_all_outcomes == 0:
return

# cdef String
# weights muss contigousarray sein und mode=c, siehe:
#cdef np.ndarray[np.uint32_t, ndim=3, mode = 'c'] np_buff = np.ascontiguousarray(im, dtype = np.uint32)
cdef dtype_t* weights_ptr = <dtype_t *> weights.data # ueberlegen ob [][] oder ** oder [] oder *

for binary_file_path in binary_file_paths: #
filename_byte_string = binary_file_path.encode("UTF-8")
fname = filename_byte_string

number_parts = math.ceil(<double> length_all_outcomes / chunksize)

with nogil, parallel(num_threads=number_of_threads):
for ii in prange(number_parts, schedule="dynamic", chunksize=1):
start_val = ii * chunksize
end_val = min(start_val + chunksize, length_all_outcomes)
if start_val == length_all_outcomes:
break
error = learn_inplace_masked_ptr(fname, weights_ptr, mm, alpha, beta1,
beta2, lambda_, all_outcomes_ptr, start_val,
end_val)
if error != NO_ERROR:
break

if (error != NO_ERROR):
raise IOError(f'binary files does not have proper format, error code {error}')
8 changes: 6 additions & 2 deletions pyndl/ndl_parallel.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ ctypedef np.float64_t dtype_t
from error_codes cimport ErrorCode


cdef ErrorCode learn_inplace_ptr(char*, dtype_t*, unsigned int, dtype_t, dtype_t,
dtype_t, dtype_t, unsigned int*, unsigned int,
cdef ErrorCode learn_inplace_ptr(char*, dtype_t*, unsigned int, dtype_t,
dtype_t, dtype_t, dtype_t, unsigned int*, unsigned int,
unsigned int) nogil

cdef ErrorCode learn_inplace_masked_ptr(char*, dtype_t*, unsigned int, dtype_t,
dtype_t, dtype_t, dtype_t, unsigned int*, unsigned int,
unsigned int) nogil
Loading