mPyPl (version 0.0.3.6)
index
d:\winapp\anaconda3\lib\site-packages\mpypl\__init__.py

# mPyPl - Monadic Pipeline Library for Python
http://github.com/shwars/mPyPl

 
Package Contents
       
core
funcs
jsonstream
keras
mdict
multiclass_datastream
sink
utils (package)
video
xmlstream

 
Functions
       
isdir = _isdir(path, /)
Return true if the pathname refers to an existing directory.

 
Pipe functions
       
add(x)
aggregate(iterable, function, **kwargs)
all(iterable, pred)
Returns True if ALL elements in the given iterable are true for the
given pred function
any(iterable, pred)
Returns True if ANY element in the given iterable is True for the
given pred function
apply(datastream, src_field, dst_field, func, eval_strategy=None)
Applies a function to the specified field of the stream and stores the result in the specified field. Sample usage:
`[1,2,3] | as_field('f1') | apply('f1','f2',lambda x: x*x) | select_field('f2') | as_list`
If `dst_field` is `None`, function is just executed on the source field(s), and result is not stored.
This is useful when there are side effects.
apply_batch(datastream, src_field, dst_field, func, batch_size=32)
Apply function to the field in batches. `batch_size` elements are accumulated into the list, and `func` is called
with this parameter.
apply_npy(datastream, src_field, dst_field, func, file_ext=None)
A caching apply that computes some function returning numpy array, and stores the result on disk
:param datastream: datastream
:param src_field: source field to use as argument. Can be one field or list of fields
:param dst_field: destination field name
:param func: function to apply, accepts either one argument or list of arguments
:param file_ext: file extension to use (dst_field+'.npy') by default
:return: processed file stream
apply_nx(datastream, src_field, dst_field, func, eval_strategy=None, print_exceptions=False)
Same as `apply`, but ignores exceptions by just skipping elements with errors.
as_batch(flow, feature_field_name='features', label_field_name='label', batchsize=16)
Split input datastream into a sequence of batches suitable for keras training.
:param flow: input datastream
:param feature_field_name: feature field name to use. can be string or list of strings (for multiple arguments). Defaults to `features`
:param label_field_name: Label field name. Defaults to `label`
:param batchsize: batch size. Defaults to 16.
:return: sequence of batches that can be passed to `flow_generator` or similar function in keras
as_dict(iterable)
as_field(datastream, field_name)
Convert stream of any objects into proper datastream of `mdict`'s, with one named field
as_list(iterable)
as_npy(l)
Convert the sequence into numpy array. Use as `seq | as_npy`
:param l: input pipe generator (finite)
:return: numpy array created from the generator
as_set(iterable)
as_tuple(iterable)
average(iterable)
Build the average for the given iterable, starting with 0.0 as seed
Will try a division by 0 if the iterable is empty...
batch(datastream, k, n)
Separate only part of the stream for parallel batch processing. If you have `n` nodes, pass number of current node
as `k` (from 0 to n-1), and it will pass only part of the stream to be processed by that node. Namely, for i-th
element of the stream, it is passed through if i%n==k
:param datastream: datastream
:param k: number of current node in cluster
:param n: total number of nodes
:return: resulting datastream which is subset of the original one
chain(iterable)

 
chain_with = class chain(builtins.object)
    chain(*iterables) --> chain object
 
Return a chain object whose .__next__() method returns elements from the
first iterable until it is exhausted, then elements from the next
iterable, until all of the iterables are exhausted.
 
  Methods defined here:
__getattribute__(self, name, /)
Return getattr(self, name).
__iter__(self, /)
Implement iter(self).
__new__(*args, **kwargs) from builtins.type
Create and return a new object.  See help(type) for accurate signature.
__next__(self, /)
Implement next(self).
__reduce__(...)
Return state information for pickling.
__setstate__(...)
Set state information for unpickling.
from_iterable(...) from builtins.type
chain.from_iterable(iterable) --> chain object
 
Alternate chain() constructor taking a single iterable argument
that evaluates lazily.

chunk_slide(datastream, chunk_size)
collect_video(datastream, filename, video_size=None, codec=1935959654)
Collect a video file from a sequence of frames of video fragments.
:param datastream: sequence of images or video fragments
:param filename: output file name
:param video_size: size of the video. If `None` (which is the default) - video size is determined from the dimensions of the input `np.array`
:param codec: OpenCV codec to use. Default is `cv2.VideoWriter_fourcc(*"ffds")`
concat(iterable, separator=', ')
count(iterable)
Count the size of the given iterable, walking thrue it.
count_classes(datastream, class_field_name)
Count number of elements in difference classes.
:param datastream: input data stream
:param class_field_name: name of the field to be used for counting
:return: mdict with classes and their values
datasplit(datastream, split_param=None, split_value=0.2)
Very flexible function for splitting the dataset into train-test or train-test-validation dataset. If datastream
contains field `filename` - all splitting is performed based on the filename (directories are ommited to simplify
moving data and split file onto different path). If not - original objects are used.
:param datastream: datastream to split
:param split_param: either filename of 'split.txt' file, or dictionary of filenames. If the file does not exist - stratified split is performed, and file is created. If `split_param` is None, temporary split is performed.
:param split_value: either one value (default is 0.2), in which case train-test split is performed, or pair of `(validation,test)` values
:return: datastream with additional field `split`
datasplit_by_pattern(datastream, train_pattern=None, valid_pattern=None, test_pattern=None)
Attach data split info to the stream according to some pattern in filename.
:param datastream: Datastream, which should contain the field 'filename', or be string stream
:param train_pattern: Train pattern to use. If None, all are considered Train by default
:param valid_pattern: Validation pattern to use. If None, there will be no validation.
:param test_pattern: Test pattern to use. If None, there will be no validation.
:return: Datastream augmented with `split` field
dedup(iterable)
Only yield unique items. Use a set to keep track of duplicate data.
delay(seq, field_name, delayed_field_name)
Create another field `delayed_field_name` from `field_name` that is one step delayed
:param seq: Sequence
:param field_name: Original existing field name
:param delayed_field_name: New field name to hold the delayed value
:return: New sequence
delfield(datastream, field_name)
Delete specified field `field_name` from the stream. This is typically done in order to save memory.
dict_group_by(datasteam, field_name)
Group all the records by the given field name. Returns dictionary that for each value of the field contains lists
of corresponding `mdict`-s. **Important**: This operation loads whole dataset into memory, so for big data fields
it is better to use lazy evaluation.
:param datasteam: input datastream
:param field_name: field name to use
:return: dictionary of the form `{ 'value-1' : [ ... ], ...}`
ensure_field(datastream, field_name)
Ensure that the field with the given name exists. All records non containing that field are skipped.
:param datastream: input datastream
:param field_name: field name
:return: output datastream
execute(l)
Runs all elements of the pipeline, ignoring the result
The same as _ = pipe | as_list
:param l: Pipeline to execute
fapply(datastream, dst_field, func, eval_strategy=None)
Applies a function to the whole dictionary and stores the result in the specified field.
This function should rarely be used externaly, choice should be made in favour of `apply`, because it does not involve
operating on internals of `dict`
fenumerate(l, field_name, start=0)
Add extra field to datastream which contains number of record
:param l:
:param field_name:
:return:
filter(datastream, src_field, pred)
Filters out fields that yield a given criteria.
:param datastream: input datastream
:param src_field: field of list of fields to consider
:param pred: predicate function. If `src_field` is one field, than `pred` is a function of one argument returning boolean.
If `src_field` is a list, `pred` takes tuple/list as an argument.
:return: datastream with fields that yield predicate
filter_split(datastream, split_type)
Returns a datastream of the corresponding split type
:param datastream: Input datastream
:return: Tuple of the form `(train_stream,test_stream)`
first(l)
Returns first element of a pipe
:param datastream: input pipe generator
:return: first element
fold(l, field_name, func, init_state)
Perform fold of the datastream, using given fold function `func` with initial state `init_state`
:param l: datastream
:param field_name: field name (or list of names) to use
:param func: fold function that takes field(s) value and state and returns state. If field_name is None, func
accepts the whole `mdict` as first parameter
:param init_state: initial state
:return: final state of the fold
groupby(iterable, keyfunc)
index(iterable, value, start=0, stop=None)
infshuffle(l)
Function that turns sequence into infinite shuffled sequence. It loads it into memory for processing.
:param l: input pipe generator
:return: result sequence
inspect(seq, func=None, message='Inspecting mdict')
Print out the info about the fields in a given stream
:return: Original sequence

 
class islice(builtins.object)
    islice(iterable, stop) --> islice object
islice(iterable, start, stop[, step]) --> islice object
 
Return an iterator whose next() method returns selected values from an
iterable.  If start is specified, will skip all preceding elements;
otherwise, start defaults to zero.  Step defaults to one.  If
specified as another value, step determines how many values are 
skipped between successive calls.  Works like a slice() on a list
but returns an iterator.
 
  Methods defined here:
__getattribute__(self, name, /)
Return getattr(self, name).
__iter__(self, /)
Implement iter(self).
__new__(*args, **kwargs) from builtins.type
Create and return a new object.  See help(type) for accurate signature.
__next__(self, /)
Implement next(self).
__reduce__(...)
Return state information for pickling.
__setstate__(...)
Set state information for unpickling.

iter(datastream, field_name=None, func=None)
Execute function `func` on field `field_name` (or list of fields) of every item.
If `field_name` is omitted or `None`, function is applied on the whole dictionary (this usage is not recommended).
iteri(datastream, field_name=None, func=None)
Execute function `func` on field `field_name` (or list of fields) of every item.
If `field_name` is omitted or `None`, function is applied on the whole dictionary (this usage is not recommended).
Function receives number of frame as the first argument

 
izip = class zip(object)
    zip(iter1 [,iter2 [...]]) --> zip object
 
Return a zip object whose .__next__() method returns a tuple where
the i-th element comes from the i-th iterable argument.  The .__next__()
method continues until the shortest iterable in the argument sequence
is exhausted and then it raises StopIteration.
 
  Methods defined here:
__getattribute__(self, name, /)
Return getattr(self, name).
__iter__(self, /)
Implement iter(self).
__new__(*args, **kwargs) from builtins.type
Create and return a new object.  See help(type) for accurate signature.
__next__(self, /)
Implement next(self).
__reduce__(...)
Return state information for pickling.

lineout(x)
lstrip(iterable, chars=None)
lzapply(datastream, src_field, dst_field, func, eval_strategy=None)
Lazily applies a function to the specified field of the stream and stores the result in the specified field.
You need to make sure that `lzapply` does not create endless recursive loop - you should not use the same
`src_field` and `dst_field`, and avoid situations when x['f1'] lazily depends on x['f2'], while x['f2'] lazily
depends on x['f1'].
make_train_test_split(datastream)
Returns a tuple of streams with train and test dataset. It can be used in the following manner:
`train, test = get_datastream(..) | ... | make_train_test_split()`
**Important**: this causes the whole dataset to be loaded into memory. If objects are large, you are encouraged
to use lazy field evaluation, or to handle it using the following way:
```
train = get_datastream('...') | ... | filter_split(SplitType.Train)
test = get_datastream('...') | ... | filter_split(SplitType.Test)
```
:param datastream: Input datastream
:return: Tuple of the form `(train_stream,test_stream)`
make_train_validation_test_split(datastream)
Returns a tuple of streams with train, validation and test dataset. See `make_train_test_split` documentation for
limitations and usage suggestions.
:param datastream: Input datastream
:return: Tuple of the form `(train_stream,validation_stream,test_stream)`
max(iterable, **kwargs)
min(iterable, **kwargs)
netcat(to_send, host, port)
netwrite(to_send, host, port)
normalize_npy_value(seq, field_name, interval=(0, 1))
Normalize values of the field specified by `field_name` in the given `interval`
Normalization is applied invividually to each sequence element
:param seq: Input datastream
:param field_name: Field name
:param interval: Interval (default to (0,1))
:return: Datastream with a field normalized
passed(x)
pbatch(l, n=10)
Split input sequence into batches of `n` elements.
:param l: Input sequence
:param n: Length of output batches (lists)
:return: Sequence of lists of `n` elements
pconcat(l)
pcycle(l)
Infinitely cycle the input sequence
:param l: input pipe generator
:return: infinite datastream
permutations(iterable, r=None)
pexec(l, func=None, convert_to_list=False)
Execute function func, passing the pipe sequence as an argument
:param func: Function to execute, must accept 1 iterator as parameter.
:param convert_to_list: Convert pipe to list before passing it to function. If `func` is `None`, iterator is converted to list anyway.
:return: result of func
pforeach(l, func)
Execute given function on each element in the pipe
:param l: datastream
:param func: function to be called on each element
pprint(l)
Print the values of a finite pipe generator and return a new copy of it. It has to convert generator into in-memory
list, so better not to use it with big data. Use `seq | tee ...` instead.
:param l: input pipe generator
:return: the same generator
psave(datastream, filename)
Save whole datastream into a file for later use
:param datastream: Datastream
:param filename: Filename
pshuffle(l)
Shuffle a given pipe.
In the current implementation, it has to store the whole datastream into memory as a list, in order to perform shuffle.
Please not, that the idiom [1,2,3] | pshuffle() | pcycle() will return the same order of the shuffled sequence (eg. something
like [2,1,3,2,1,3,...]), if you want proper infinite shuffle use `infshuffle()` instead.
:param l: input pipe generator
:return: list of elements of the datastream in a shuffled order
puniq(l)
reverse(iterable)
rstrip(iterable, chars=None)
run_with(iterable, func)
sample_classes(datastream, class_field_name, n=10, classes=None)
Create a datastream containing at most `n` samples from each of the classes defined by `class_field_name`
**Important** If `classes` is `None`, function determines classes on the fly, in which case it is possible that it will terminate early without
giving elements of all classes.
:param datastream: input stream
:param class_field_name: name of the field in the stream speficying the class
:param n: number of elements of each class to take
:param classes: classes descriptor, either dictionary or list
:return: resulting stream
sapply(datastream, field, func)
Self-apply
Applies a function to the specified field of the stream and stores the result in the same field. Sample usage:
`[1,2,3] | as_field('x') | sapply('x',lambda x: x*x) | select_field('x') | as_list`
scan(l, field_name, new_field_name, func, init_state)
Perform scan (cumulitive sum) of the datastream, using given function `func` with initial state `init_state`.
Results are places into `new_field_name` field.
:param l: datastream
:param field_name: field name (or list of names) to use
:param new_field_name: field name to use for storing results
:param func: fold function that takes field(s) value and state and returns state. If field_name is None, func
accepts the whole `mdict` as first parameter
:param init_state: initial state
:return: final state of the fold
select(iterable, selector)
select_field(datastream, field_name)
Extract one/several fields from datastream, returning a stream of objects of corresponding type (not `mdict`).
If several fields are given, return a list/tuple.
select_fields(datastream, field_names)
Select multiple fields from datastream, returning a *new* stream of *mdicts* with the requested fields copied.
Because field immutability is encouraged, the best way to get rid of some fields and free up memory
is to select out a new data structure with the ones you want copied over.
silly_progress(seq, n=None, elements=None, symbol='.', width=40)
Print dots to indicate that something good is happening. A dot is printed every `n` items.
:param seq: original sequence
:param n: number of items to process between printing a dot
:param symbol: symbol to print
:return: original sequence
skip(iterable, qte)
Skip qte elements in the given iterable, then yield others.
skip_while(iterable, predicate)
sliding_window_npy(seq, field_names, size, cache=10)
Create a stream of sliding windows from a given stream.
:param seq: Input sequence
:param field_names: Field names to accumulate
:param size: Size of sliding window
:param cache: Size of the caching array, in a number of `size`-chunks.
:return: mPyPl sequence containing numpy arrays for specified fields
sort(iterable, **kwargs)
stdout(x)
stratify_sample(seq, n=None, shuffle=False, field_name='class_id')
Returns stratified samples of size `n` from each class (given dy `field_name`) in round robin manner.
NB: This operation is cachy (caches all data in memory)
:param l: input pipe generator
:param n: number of samples or `None` (in which case the min number of elements is used)
:param shuffle: perform random shuffling of samples
:param field_name: name of field that specifies classes. `class_no` by default.
:return: result sequence
stratify_sample_tt(seq, n_samples=None, shuffle=False, class_field_name='class_id', split_field_name='split')
Returns stratified training, test and validation samples of size `n_sample` from each class
(given dy `class_field_name`) in round robin manner.
`n_samples` is a dict specifying number of samples for each split type (or None).
NB: This operation is cachy (caches all data in memory)
:param l: input pipe generator
:param n_samples: dict specifying number of samples for each split type or `None` (in which case the min number of elements is used)
:param shuffle: perform random shuffling of samples
:param class_field_name: name of field that specifies classes. `class_id` by default.
:param split_field_name: name of field that specifies split. `split` by default.
:return: result sequence
strip(iterable, chars=None)
summarize(seq, field_name, func=None, msg=None)
Compute a summary of a given field (eg. count of different values). Resulting dictionary is either passed to `func`,
or printed on screen (if `func is None`).
:param seq: Datastream
:param field_name: Field name to summarize
:param func: Function to call after summary is obtained (which is after all stream processing). If `None`, summary is printed on screen.
:param msg: Optional message to print before summary
:return: Original stream
summary(seq, class_field_name='class_name', split_field_name='split')
Print a summary of a data stream
:param seq: Datastream
:param class_field_name: Field name to differentiate between classes
:param split_field_name: Field name to indicate train/test split
:return: Original stream
t(iterable, y)
tail(iterable, qte)
Yield qte of elements in the given iterable.
take(iterable, qte)
Yield qte of elements in the given iterable.
take_while(iterable, predicate)
tee(iterable)
to_type(x, t)
transpose(iterable)
traverse(args)
unfold(l, field_name, func, init_state)
Add extra field to the datastream, which is obtained by applying state transformation function `func` to
initial state `init_state`
:param l: datastream
:param func: state transformation function
:param init_state: initial state
:return: datastream
uniq(iterable)
Deduplicate consecutive duplicate values.
unroll(datastream, field)
Field `field` is assumed to be a sequence. This function unrolls the sequence, i.e. replacing the sequence field
with the actual values inside the sequence. All other field values are duplicated.
:param datasteam: Data stream
:param field: Field name or list of field names. If several fields are listed, corresponding sequences should preferably be of the same size.
where(iterable, predicate)
write_csv(l, filename)
write_json(l, filename)

 
Data
        altsep = '/'
curdir = '.'
defpath = r'.;C:\bin'
devnull = 'nul'
extsep = '.'
pardir = '..'
pathsep = ';'
sep = r'\'
supports_unicode_filenames = True