mPyPl.core
index
d:\winapp\anaconda3\lib\site-packages\mpypl\core.py

# mPyPl - Monadic Pipeline Library for Python
http://github.com/shwars/mPyPl

 
Modules
       
builtins
cv2.cv2
enum
functools
itertools
json
math
numpy
os
pickle
random
sys
types

 
Functions
       
infinite()
Produce an infinite sequence of empty `mdict`s
isdir = _isdir(path, /)
Return true if the pathname refers to an existing directory.

 
Pipe functions
       
add(x)
aggregate(iterable, function, **kwargs)
all(iterable, pred)
Returns True if ALL elements in the given iterable are true for the
given pred function
any(iterable, pred)
Returns True if ANY element in the given iterable is True for the
given pred function
apply(datastream, src_field, dst_field, func, eval_strategy=None)
Applies a function to the specified field of the stream and stores the result in the specified field. Sample usage:
`[1,2,3] | as_field('f1') | apply('f1','f2',lambda x: x*x) | select_field('f2') | as_list`
If `dst_field` is `None`, function is just executed on the source field(s), and result is not stored.
This is useful when there are side effects.
apply_batch(datastream, src_field, dst_field, func, batch_size=32)
Apply function to the field in batches. `batch_size` elements are accumulated into the list, and `func` is called
with this parameter.
apply_npy(datastream, src_field, dst_field, func, file_ext=None)
A caching apply that computes some function returning numpy array, and stores the result on disk
:param datastream: datastream
:param src_field: source field to use as argument. Can be one field or list of fields
:param dst_field: destination field name
:param func: function to apply, accepts either one argument or list of arguments
:param file_ext: file extension to use (dst_field+'.npy') by default
:return: processed file stream
apply_nx(datastream, src_field, dst_field, func, eval_strategy=None, print_exceptions=False)
Same as `apply`, but ignores exceptions by just skipping elements with errors.
as_dict(iterable)
as_field(datastream, field_name)
Convert stream of any objects into proper datastream of `mdict`'s, with one named field
as_list(iterable)
as_npy(l)
Convert the sequence into numpy array. Use as `seq | as_npy`
:param l: input pipe generator (finite)
:return: numpy array created from the generator
as_set(iterable)
as_tuple(iterable)
average(iterable)
Build the average for the given iterable, starting with 0.0 as seed
Will try a division by 0 if the iterable is empty...
batch(datastream, k, n)
Separate only part of the stream for parallel batch processing. If you have `n` nodes, pass number of current node
as `k` (from 0 to n-1), and it will pass only part of the stream to be processed by that node. Namely, for i-th
element of the stream, it is passed through if i%n==k
:param datastream: datastream
:param k: number of current node in cluster
:param n: total number of nodes
:return: resulting datastream which is subset of the original one
chain(iterable)

 
chain_with = class chain(builtins.object)
    chain(*iterables) --> chain object
 
Return a chain object whose .__next__() method returns elements from the
first iterable until it is exhausted, then elements from the next
iterable, until all of the iterables are exhausted.
 
  Methods defined here:
__getattribute__(self, name, /)
Return getattr(self, name).
__iter__(self, /)
Implement iter(self).
__new__(*args, **kwargs) from builtins.type
Create and return a new object.  See help(type) for accurate signature.
__next__(self, /)
Implement next(self).
__reduce__(...)
Return state information for pickling.
__setstate__(...)
Set state information for unpickling.
from_iterable(...) from builtins.type
chain.from_iterable(iterable) --> chain object
 
Alternate chain() constructor taking a single iterable argument
that evaluates lazily.

concat(iterable, separator=', ')
count(iterable)
Count the size of the given iterable, walking thrue it.
dedup(iterable)
Only yield unique items. Use a set to keep track of duplicate data.
delay(seq, field_name, delayed_field_name)
Create another field `delayed_field_name` from `field_name` that is one step delayed
:param seq: Sequence
:param field_name: Original existing field name
:param delayed_field_name: New field name to hold the delayed value
:return: New sequence
delfield(datastream, field_name)
Delete specified field `field_name` from the stream. This is typically done in order to save memory.
dict_group_by(datasteam, field_name)
Group all the records by the given field name. Returns dictionary that for each value of the field contains lists
of corresponding `mdict`-s. **Important**: This operation loads whole dataset into memory, so for big data fields
it is better to use lazy evaluation.
:param datasteam: input datastream
:param field_name: field name to use
:return: dictionary of the form `{ 'value-1' : [ ... ], ...}`
ensure_field(datastream, field_name)
Ensure that the field with the given name exists. All records non containing that field are skipped.
:param datastream: input datastream
:param field_name: field name
:return: output datastream
execute(l)
Runs all elements of the pipeline, ignoring the result
The same as _ = pipe | as_list
:param l: Pipeline to execute
fapply(datastream, dst_field, func, eval_strategy=None)
Applies a function to the whole dictionary and stores the result in the specified field.
This function should rarely be used externaly, choice should be made in favour of `apply`, because it does not involve
operating on internals of `dict`
fenumerate(l, field_name, start=0)
Add extra field to datastream which contains number of record
:param l:
:param field_name:
:return:
filter(datastream, src_field, pred)
Filters out fields that yield a given criteria.
:param datastream: input datastream
:param src_field: field of list of fields to consider
:param pred: predicate function. If `src_field` is one field, than `pred` is a function of one argument returning boolean.
If `src_field` is a list, `pred` takes tuple/list as an argument.
:return: datastream with fields that yield predicate
first(iterable)
fold(l, field_name, func, init_state)
Perform fold of the datastream, using given fold function `func` with initial state `init_state`
:param l: datastream
:param field_name: field name (or list of names) to use
:param func: fold function that takes field(s) value and state and returns state. If field_name is None, func
accepts the whole `mdict` as first parameter
:param init_state: initial state
:return: final state of the fold
groupby(iterable, keyfunc)
index(iterable, value, start=0, stop=None)
infshuffle(l)
Function that turns sequence into infinite shuffled sequence. It loads it into memory for processing.
:param l: input pipe generator
:return: result sequence
inspect(seq, func=None, message='Inspecting mdict')
Print out the info about the fields in a given stream
:return: Original sequence

 
class islice(builtins.object)
    islice(iterable, stop) --> islice object
islice(iterable, start, stop[, step]) --> islice object
 
Return an iterator whose next() method returns selected values from an
iterable.  If start is specified, will skip all preceding elements;
otherwise, start defaults to zero.  Step defaults to one.  If
specified as another value, step determines how many values are 
skipped between successive calls.  Works like a slice() on a list
but returns an iterator.
 
  Methods defined here:
__getattribute__(self, name, /)
Return getattr(self, name).
__iter__(self, /)
Implement iter(self).
__new__(*args, **kwargs) from builtins.type
Create and return a new object.  See help(type) for accurate signature.
__next__(self, /)
Implement next(self).
__reduce__(...)
Return state information for pickling.
__setstate__(...)
Set state information for unpickling.

iter(datastream, field_name=None, func=None)
Execute function `func` on field `field_name` (or list of fields) of every item.
If `field_name` is omitted or `None`, function is applied on the whole dictionary (this usage is not recommended).
iteri(datastream, field_name=None, func=None)
Execute function `func` on field `field_name` (or list of fields) of every item.
If `field_name` is omitted or `None`, function is applied on the whole dictionary (this usage is not recommended).
Function receives number of frame as the first argument

 
izip = class zip(object)
    zip(iter1 [,iter2 [...]]) --> zip object
 
Return a zip object whose .__next__() method returns a tuple where
the i-th element comes from the i-th iterable argument.  The .__next__()
method continues until the shortest iterable in the argument sequence
is exhausted and then it raises StopIteration.
 
  Methods defined here:
__getattribute__(self, name, /)
Return getattr(self, name).
__iter__(self, /)
Implement iter(self).
__new__(*args, **kwargs) from builtins.type
Create and return a new object.  See help(type) for accurate signature.
__next__(self, /)
Implement next(self).
__reduce__(...)
Return state information for pickling.

lineout(x)
lstrip(iterable, chars=None)
lzapply(datastream, src_field, dst_field, func, eval_strategy=None)
Lazily applies a function to the specified field of the stream and stores the result in the specified field.
You need to make sure that `lzapply` does not create endless recursive loop - you should not use the same
`src_field` and `dst_field`, and avoid situations when x['f1'] lazily depends on x['f2'], while x['f2'] lazily
depends on x['f1'].
max(iterable, **kwargs)
min(iterable, **kwargs)
netcat(to_send, host, port)
netwrite(to_send, host, port)
passed(x)
pbatch(l, n=10)
Split input sequence into batches of `n` elements.
:param l: Input sequence
:param n: Length of output batches (lists)
:return: Sequence of lists of `n` elements
pconcat(l)
pcycle(l)
Infinitely cycle the input sequence
:param l: input pipe generator
:return: infinite datastream
permutations(iterable, r=None)
pexec(l, func=None, convert_to_list=False)
Execute function func, passing the pipe sequence as an argument
:param func: Function to execute, must accept 1 iterator as parameter.
:param convert_to_list: Convert pipe to list before passing it to function. If `func` is `None`, iterator is converted to list anyway.
:return: result of func
pforeach(l, func)
Execute given function on each element in the pipe
:param l: datastream
:param func: function to be called on each element
pprint(l)
Print the values of a finite pipe generator and return a new copy of it. It has to convert generator into in-memory
list, so better not to use it with big data. Use `seq | tee ...` instead.
:param l: input pipe generator
:return: the same generator
psave(datastream, filename)
Save whole datastream into a file for later use
:param datastream: Datastream
:param filename: Filename
pshuffle(l)
Shuffle a given pipe.
In the current implementation, it has to store the whole datastream into memory as a list, in order to perform shuffle.
Please not, that the idiom [1,2,3] | pshuffle() | pcycle() will return the same order of the shuffled sequence (eg. something
like [2,1,3,2,1,3,...]), if you want proper infinite shuffle use `infshuffle()` instead.
:param l: input pipe generator
:return: list of elements of the datastream in a shuffled order
puniq(l)
reverse(iterable)
rstrip(iterable, chars=None)
run_with(iterable, func)
sapply(datastream, field, func)
Self-apply
Applies a function to the specified field of the stream and stores the result in the same field. Sample usage:
`[1,2,3] | as_field('x') | sapply('x',lambda x: x*x) | select_field('x') | as_list`
scan(l, field_name, new_field_name, func, init_state)
Perform scan (cumulitive sum) of the datastream, using given function `func` with initial state `init_state`.
Results are places into `new_field_name` field.
:param l: datastream
:param field_name: field name (or list of names) to use
:param new_field_name: field name to use for storing results
:param func: fold function that takes field(s) value and state and returns state. If field_name is None, func
accepts the whole `mdict` as first parameter
:param init_state: initial state
:return: final state of the fold
select(iterable, selector)
select_field(datastream, field_name)
Extract one/several fields from datastream, returning a stream of objects of corresponding type (not `mdict`).
If several fields are given, return a list/tuple.
select_fields(datastream, field_names)
Select multiple fields from datastream, returning a *new* stream of *mdicts* with the requested fields copied.
Because field immutability is encouraged, the best way to get rid of some fields and free up memory
is to select out a new data structure with the ones you want copied over.
silly_progress(seq, n=None, elements=None, symbol='.', width=40)
Print dots to indicate that something good is happening. A dot is printed every `n` items.
:param seq: original sequence
:param n: number of items to process between printing a dot
:param symbol: symbol to print
:return: original sequence
skip(iterable, qte)
Skip qte elements in the given iterable, then yield others.
skip_while(iterable, predicate)
sliding_window_npy(seq, field_names, size, cache=10)
Create a stream of sliding windows from a given stream.
:param seq: Input sequence
:param field_names: Field names to accumulate
:param size: Size of sliding window
:param cache: Size of the caching array, in a number of `size`-chunks.
:return: mPyPl sequence containing numpy arrays for specified fields
sort(iterable, **kwargs)
stdout(x)
strip(iterable, chars=None)
summarize(seq, field_name, func=None, msg=None)
Compute a summary of a given field (eg. count of different values). Resulting dictionary is either passed to `func`,
or printed on screen (if `func is None`).
:param seq: Datastream
:param field_name: Field name to summarize
:param func: Function to call after summary is obtained (which is after all stream processing). If `None`, summary is printed on screen.
:param msg: Optional message to print before summary
:return: Original stream
t(iterable, y)
tail(iterable, qte)
Yield qte of elements in the given iterable.
take(iterable, qte)
Yield qte of elements in the given iterable.
take_while(iterable, predicate)
tee(iterable)
to_type(x, t)
transpose(iterable)
traverse(args)
unfold(l, field_name, func, init_state)
Add extra field to the datastream, which is obtained by applying state transformation function `func` to
initial state `init_state`
:param l: datastream
:param func: state transformation function
:param init_state: initial state
:return: datastream
uniq(iterable)
Deduplicate consecutive duplicate values.
unroll(datastream, field)
Field `field` is assumed to be a sequence. This function unrolls the sequence, i.e. replacing the sequence field
with the actual values inside the sequence. All other field values are duplicated.
:param datasteam: Data stream
:param field: Field name or list of field names. If several fields are listed, corresponding sequences should preferably be of the same size.
where(iterable, predicate)

 
Data
        altsep = '/'
curdir = '.'
defpath = r'.;C:\bin'
devnull = 'nul'
extsep = '.'
pardir = '..'
pathsep = ';'
sep = r'\'
supports_unicode_filenames = True