| |
- add(x)
- aggregate(iterable, function, **kwargs)
- all(iterable, pred)
- Returns True if ALL elements in the given iterable are true for the
given pred function
- any(iterable, pred)
- Returns True if ANY element in the given iterable is True for the
given pred function
- apply(datastream, src_field, dst_field, func, eval_strategy=None)
- Applies a function to the specified field of the stream and stores the result in the specified field. Sample usage:
`[1,2,3] | as_field('f1') | apply('f1','f2',lambda x: x*x) | select_field('f2') | as_list`
If `dst_field` is `None`, function is just executed on the source field(s), and result is not stored.
This is useful when there are side effects.
- apply_batch(datastream, src_field, dst_field, func, batch_size=32)
- Apply function to the field in batches. `batch_size` elements are accumulated into the list, and `func` is called
with this parameter.
- apply_npy(datastream, src_field, dst_field, func, file_ext=None)
- A caching apply that computes some function returning numpy array, and stores the result on disk
:param datastream: datastream
:param src_field: source field to use as argument. Can be one field or list of fields
:param dst_field: destination field name
:param func: function to apply, accepts either one argument or list of arguments
:param file_ext: file extension to use (dst_field+'.npy') by default
:return: processed file stream
- apply_nx(datastream, src_field, dst_field, func, eval_strategy=None, print_exceptions=False)
- Same as `apply`, but ignores exceptions by just skipping elements with errors.
- as_batch(flow, feature_field_name='features', label_field_name='label', batchsize=16)
- Split input datastream into a sequence of batches suitable for keras training.
:param flow: input datastream
:param feature_field_name: feature field name to use. can be string or list of strings (for multiple arguments). Defaults to `features`
:param label_field_name: Label field name. Defaults to `label`
:param batchsize: batch size. Defaults to 16.
:return: sequence of batches that can be passed to `flow_generator` or similar function in keras
- as_dict(iterable)
- as_field(datastream, field_name)
- Convert stream of any objects into proper datastream of `mdict`'s, with one named field
- as_list(iterable)
- as_npy(l)
- Convert the sequence into numpy array. Use as `seq | as_npy`
:param l: input pipe generator (finite)
:return: numpy array created from the generator
- as_set(iterable)
- as_tuple(iterable)
- average(iterable)
- Build the average for the given iterable, starting with 0.0 as seed
Will try a division by 0 if the iterable is empty...
- batch(datastream, k, n)
- Separate only part of the stream for parallel batch processing. If you have `n` nodes, pass number of current node
as `k` (from 0 to n-1), and it will pass only part of the stream to be processed by that node. Namely, for i-th
element of the stream, it is passed through if i%n==k
:param datastream: datastream
:param k: number of current node in cluster
:param n: total number of nodes
:return: resulting datastream which is subset of the original one
- chain(iterable)
chain_with = class chain(builtins.object) |
|
chain(*iterables) --> chain object
Return a chain object whose .__next__() method returns elements from the
first iterable until it is exhausted, then elements from the next
iterable, until all of the iterables are exhausted. |
|
Methods defined here:
- __getattribute__(self, name, /)
- Return getattr(self, name).
- __iter__(self, /)
- Implement iter(self).
- __new__(*args, **kwargs) from builtins.type
- Create and return a new object. See help(type) for accurate signature.
- __next__(self, /)
- Implement next(self).
- __reduce__(...)
- Return state information for pickling.
- __setstate__(...)
- Set state information for unpickling.
- from_iterable(...) from builtins.type
- chain.from_iterable(iterable) --> chain object
Alternate chain() constructor taking a single iterable argument
that evaluates lazily.
| - chunk_slide(datastream, chunk_size)
- collect_video(datastream, filename, video_size=None, codec=1935959654)
- Collect a video file from a sequence of frames of video fragments.
:param datastream: sequence of images or video fragments
:param filename: output file name
:param video_size: size of the video. If `None` (which is the default) - video size is determined from the dimensions of the input `np.array`
:param codec: OpenCV codec to use. Default is `cv2.VideoWriter_fourcc(*"ffds")`
- concat(iterable, separator=', ')
- count(iterable)
- Count the size of the given iterable, walking thrue it.
- count_classes(datastream, class_field_name)
- Count number of elements in difference classes.
:param datastream: input data stream
:param class_field_name: name of the field to be used for counting
:return: mdict with classes and their values
- datasplit(datastream, split_param=None, split_value=0.2)
- Very flexible function for splitting the dataset into train-test or train-test-validation dataset. If datastream
contains field `filename` - all splitting is performed based on the filename (directories are ommited to simplify
moving data and split file onto different path). If not - original objects are used.
:param datastream: datastream to split
:param split_param: either filename of 'split.txt' file, or dictionary of filenames. If the file does not exist - stratified split is performed, and file is created. If `split_param` is None, temporary split is performed.
:param split_value: either one value (default is 0.2), in which case train-test split is performed, or pair of `(validation,test)` values
:return: datastream with additional field `split`
- datasplit_by_pattern(datastream, train_pattern=None, valid_pattern=None, test_pattern=None)
- Attach data split info to the stream according to some pattern in filename.
:param datastream: Datastream, which should contain the field 'filename', or be string stream
:param train_pattern: Train pattern to use. If None, all are considered Train by default
:param valid_pattern: Validation pattern to use. If None, there will be no validation.
:param test_pattern: Test pattern to use. If None, there will be no validation.
:return: Datastream augmented with `split` field
- dedup(iterable)
- Only yield unique items. Use a set to keep track of duplicate data.
- delay(seq, field_name, delayed_field_name)
- Create another field `delayed_field_name` from `field_name` that is one step delayed
:param seq: Sequence
:param field_name: Original existing field name
:param delayed_field_name: New field name to hold the delayed value
:return: New sequence
- delfield(datastream, field_name)
- Delete specified field `field_name` from the stream. This is typically done in order to save memory.
- dict_group_by(datasteam, field_name)
- Group all the records by the given field name. Returns dictionary that for each value of the field contains lists
of corresponding `mdict`-s. **Important**: This operation loads whole dataset into memory, so for big data fields
it is better to use lazy evaluation.
:param datasteam: input datastream
:param field_name: field name to use
:return: dictionary of the form `{ 'value-1' : [ ... ], ...}`
- ensure_field(datastream, field_name)
- Ensure that the field with the given name exists. All records non containing that field are skipped.
:param datastream: input datastream
:param field_name: field name
:return: output datastream
- execute(l)
- Runs all elements of the pipeline, ignoring the result
The same as _ = pipe | as_list
:param l: Pipeline to execute
- fapply(datastream, dst_field, func, eval_strategy=None)
- Applies a function to the whole dictionary and stores the result in the specified field.
This function should rarely be used externaly, choice should be made in favour of `apply`, because it does not involve
operating on internals of `dict`
- fenumerate(l, field_name, start=0)
- Add extra field to datastream which contains number of record
:param l:
:param field_name:
:return:
- filter(datastream, src_field, pred)
- Filters out fields that yield a given criteria.
:param datastream: input datastream
:param src_field: field of list of fields to consider
:param pred: predicate function. If `src_field` is one field, than `pred` is a function of one argument returning boolean.
If `src_field` is a list, `pred` takes tuple/list as an argument.
:return: datastream with fields that yield predicate
- filter_split(datastream, split_type)
- Returns a datastream of the corresponding split type
:param datastream: Input datastream
:return: Tuple of the form `(train_stream,test_stream)`
- first(l)
- Returns first element of a pipe
:param datastream: input pipe generator
:return: first element
- fold(l, field_name, func, init_state)
- Perform fold of the datastream, using given fold function `func` with initial state `init_state`
:param l: datastream
:param field_name: field name (or list of names) to use
:param func: fold function that takes field(s) value and state and returns state. If field_name is None, func
accepts the whole `mdict` as first parameter
:param init_state: initial state
:return: final state of the fold
- groupby(iterable, keyfunc)
- index(iterable, value, start=0, stop=None)
- infshuffle(l)
- Function that turns sequence into infinite shuffled sequence. It loads it into memory for processing.
:param l: input pipe generator
:return: result sequence
- inspect(seq, func=None, message='Inspecting mdict')
- Print out the info about the fields in a given stream
:return: Original sequence
class islice(builtins.object) |
|
islice(iterable, stop) --> islice object
islice(iterable, start, stop[, step]) --> islice object
Return an iterator whose next() method returns selected values from an
iterable. If start is specified, will skip all preceding elements;
otherwise, start defaults to zero. Step defaults to one. If
specified as another value, step determines how many values are
skipped between successive calls. Works like a slice() on a list
but returns an iterator. |
|
Methods defined here:
- __getattribute__(self, name, /)
- Return getattr(self, name).
- __iter__(self, /)
- Implement iter(self).
- __new__(*args, **kwargs) from builtins.type
- Create and return a new object. See help(type) for accurate signature.
- __next__(self, /)
- Implement next(self).
- __reduce__(...)
- Return state information for pickling.
- __setstate__(...)
- Set state information for unpickling.
| - iter(datastream, field_name=None, func=None)
- Execute function `func` on field `field_name` (or list of fields) of every item.
If `field_name` is omitted or `None`, function is applied on the whole dictionary (this usage is not recommended).
- iteri(datastream, field_name=None, func=None)
- Execute function `func` on field `field_name` (or list of fields) of every item.
If `field_name` is omitted or `None`, function is applied on the whole dictionary (this usage is not recommended).
Function receives number of frame as the first argument
izip = class zip(object) |
|
zip(iter1 [,iter2 [...]]) --> zip object
Return a zip object whose .__next__() method returns a tuple where
the i-th element comes from the i-th iterable argument. The .__next__()
method continues until the shortest iterable in the argument sequence
is exhausted and then it raises StopIteration. |
|
Methods defined here:
- __getattribute__(self, name, /)
- Return getattr(self, name).
- __iter__(self, /)
- Implement iter(self).
- __new__(*args, **kwargs) from builtins.type
- Create and return a new object. See help(type) for accurate signature.
- __next__(self, /)
- Implement next(self).
- __reduce__(...)
- Return state information for pickling.
| - lineout(x)
- lstrip(iterable, chars=None)
- lzapply(datastream, src_field, dst_field, func, eval_strategy=None)
- Lazily applies a function to the specified field of the stream and stores the result in the specified field.
You need to make sure that `lzapply` does not create endless recursive loop - you should not use the same
`src_field` and `dst_field`, and avoid situations when x['f1'] lazily depends on x['f2'], while x['f2'] lazily
depends on x['f1'].
- make_train_test_split(datastream)
- Returns a tuple of streams with train and test dataset. It can be used in the following manner:
`train, test = get_datastream(..) | ... | make_train_test_split()`
**Important**: this causes the whole dataset to be loaded into memory. If objects are large, you are encouraged
to use lazy field evaluation, or to handle it using the following way:
```
train = get_datastream('...') | ... | filter_split(SplitType.Train)
test = get_datastream('...') | ... | filter_split(SplitType.Test)
```
:param datastream: Input datastream
:return: Tuple of the form `(train_stream,test_stream)`
- make_train_validation_test_split(datastream)
- Returns a tuple of streams with train, validation and test dataset. See `make_train_test_split` documentation for
limitations and usage suggestions.
:param datastream: Input datastream
:return: Tuple of the form `(train_stream,validation_stream,test_stream)`
- max(iterable, **kwargs)
- min(iterable, **kwargs)
- netcat(to_send, host, port)
- netwrite(to_send, host, port)
- normalize_npy_value(seq, field_name, interval=(0, 1))
- Normalize values of the field specified by `field_name` in the given `interval`
Normalization is applied invividually to each sequence element
:param seq: Input datastream
:param field_name: Field name
:param interval: Interval (default to (0,1))
:return: Datastream with a field normalized
- passed(x)
- pbatch(l, n=10)
- Split input sequence into batches of `n` elements.
:param l: Input sequence
:param n: Length of output batches (lists)
:return: Sequence of lists of `n` elements
- pconcat(l)
- pcycle(l)
- Infinitely cycle the input sequence
:param l: input pipe generator
:return: infinite datastream
- permutations(iterable, r=None)
- pexec(l, func=None, convert_to_list=False)
- Execute function func, passing the pipe sequence as an argument
:param func: Function to execute, must accept 1 iterator as parameter.
:param convert_to_list: Convert pipe to list before passing it to function. If `func` is `None`, iterator is converted to list anyway.
:return: result of func
- pforeach(l, func)
- Execute given function on each element in the pipe
:param l: datastream
:param func: function to be called on each element
- pprint(l)
- Print the values of a finite pipe generator and return a new copy of it. It has to convert generator into in-memory
list, so better not to use it with big data. Use `seq | tee ...` instead.
:param l: input pipe generator
:return: the same generator
- psave(datastream, filename)
- Save whole datastream into a file for later use
:param datastream: Datastream
:param filename: Filename
- pshuffle(l)
- Shuffle a given pipe.
In the current implementation, it has to store the whole datastream into memory as a list, in order to perform shuffle.
Please not, that the idiom [1,2,3] | pshuffle() | pcycle() will return the same order of the shuffled sequence (eg. something
like [2,1,3,2,1,3,...]), if you want proper infinite shuffle use `infshuffle()` instead.
:param l: input pipe generator
:return: list of elements of the datastream in a shuffled order
- puniq(l)
- reverse(iterable)
- rstrip(iterable, chars=None)
- run_with(iterable, func)
- sample_classes(datastream, class_field_name, n=10, classes=None)
- Create a datastream containing at most `n` samples from each of the classes defined by `class_field_name`
**Important** If `classes` is `None`, function determines classes on the fly, in which case it is possible that it will terminate early without
giving elements of all classes.
:param datastream: input stream
:param class_field_name: name of the field in the stream speficying the class
:param n: number of elements of each class to take
:param classes: classes descriptor, either dictionary or list
:return: resulting stream
- sapply(datastream, field, func)
- Self-apply
Applies a function to the specified field of the stream and stores the result in the same field. Sample usage:
`[1,2,3] | as_field('x') | sapply('x',lambda x: x*x) | select_field('x') | as_list`
- scan(l, field_name, new_field_name, func, init_state)
- Perform scan (cumulitive sum) of the datastream, using given function `func` with initial state `init_state`.
Results are places into `new_field_name` field.
:param l: datastream
:param field_name: field name (or list of names) to use
:param new_field_name: field name to use for storing results
:param func: fold function that takes field(s) value and state and returns state. If field_name is None, func
accepts the whole `mdict` as first parameter
:param init_state: initial state
:return: final state of the fold
- select(iterable, selector)
- select_field(datastream, field_name)
- Extract one/several fields from datastream, returning a stream of objects of corresponding type (not `mdict`).
If several fields are given, return a list/tuple.
- select_fields(datastream, field_names)
- Select multiple fields from datastream, returning a *new* stream of *mdicts* with the requested fields copied.
Because field immutability is encouraged, the best way to get rid of some fields and free up memory
is to select out a new data structure with the ones you want copied over.
- silly_progress(seq, n=None, elements=None, symbol='.', width=40)
- Print dots to indicate that something good is happening. A dot is printed every `n` items.
:param seq: original sequence
:param n: number of items to process between printing a dot
:param symbol: symbol to print
:return: original sequence
- skip(iterable, qte)
- Skip qte elements in the given iterable, then yield others.
- skip_while(iterable, predicate)
- sliding_window_npy(seq, field_names, size, cache=10)
- Create a stream of sliding windows from a given stream.
:param seq: Input sequence
:param field_names: Field names to accumulate
:param size: Size of sliding window
:param cache: Size of the caching array, in a number of `size`-chunks.
:return: mPyPl sequence containing numpy arrays for specified fields
- sort(iterable, **kwargs)
- stdout(x)
- stratify_sample(seq, n=None, shuffle=False, field_name='class_id')
- Returns stratified samples of size `n` from each class (given dy `field_name`) in round robin manner.
NB: This operation is cachy (caches all data in memory)
:param l: input pipe generator
:param n: number of samples or `None` (in which case the min number of elements is used)
:param shuffle: perform random shuffling of samples
:param field_name: name of field that specifies classes. `class_no` by default.
:return: result sequence
- stratify_sample_tt(seq, n_samples=None, shuffle=False, class_field_name='class_id', split_field_name='split')
- Returns stratified training, test and validation samples of size `n_sample` from each class
(given dy `class_field_name`) in round robin manner.
`n_samples` is a dict specifying number of samples for each split type (or None).
NB: This operation is cachy (caches all data in memory)
:param l: input pipe generator
:param n_samples: dict specifying number of samples for each split type or `None` (in which case the min number of elements is used)
:param shuffle: perform random shuffling of samples
:param class_field_name: name of field that specifies classes. `class_id` by default.
:param split_field_name: name of field that specifies split. `split` by default.
:return: result sequence
- strip(iterable, chars=None)
- summarize(seq, field_name, func=None, msg=None)
- Compute a summary of a given field (eg. count of different values). Resulting dictionary is either passed to `func`,
or printed on screen (if `func is None`).
:param seq: Datastream
:param field_name: Field name to summarize
:param func: Function to call after summary is obtained (which is after all stream processing). If `None`, summary is printed on screen.
:param msg: Optional message to print before summary
:return: Original stream
- summary(seq, class_field_name='class_name', split_field_name='split')
- Print a summary of a data stream
:param seq: Datastream
:param class_field_name: Field name to differentiate between classes
:param split_field_name: Field name to indicate train/test split
:return: Original stream
- t(iterable, y)
- tail(iterable, qte)
- Yield qte of elements in the given iterable.
- take(iterable, qte)
- Yield qte of elements in the given iterable.
- take_while(iterable, predicate)
- tee(iterable)
- to_type(x, t)
- transpose(iterable)
- traverse(args)
- unfold(l, field_name, func, init_state)
- Add extra field to the datastream, which is obtained by applying state transformation function `func` to
initial state `init_state`
:param l: datastream
:param func: state transformation function
:param init_state: initial state
:return: datastream
- uniq(iterable)
- Deduplicate consecutive duplicate values.
- unroll(datastream, field)
- Field `field` is assumed to be a sequence. This function unrolls the sequence, i.e. replacing the sequence field
with the actual values inside the sequence. All other field values are duplicated.
:param datasteam: Data stream
:param field: Field name or list of field names. If several fields are listed, corresponding sequences should preferably be of the same size.
- where(iterable, predicate)
- write_csv(l, filename)
- write_json(l, filename)
|