| |
- add(x)
- aggregate(iterable, function, **kwargs)
- all(iterable, pred)
- Returns True if ALL elements in the given iterable are true for the
given pred function
- any(iterable, pred)
- Returns True if ANY element in the given iterable is True for the
given pred function
- apply(datastream, src_field, dst_field, func, eval_strategy=None)
- Applies a function to the specified field of the stream and stores the result in the specified field. Sample usage:
`[1,2,3] | as_field('f1') | apply('f1','f2',lambda x: x*x) | select_field('f2') | as_list`
If `dst_field` is `None`, function is just executed on the source field(s), and result is not stored.
This is useful when there are side effects.
- apply_batch(datastream, src_field, dst_field, func, batch_size=32)
- Apply function to the field in batches. `batch_size` elements are accumulated into the list, and `func` is called
with this parameter.
- apply_npy(datastream, src_field, dst_field, func, file_ext=None)
- A caching apply that computes some function returning numpy array, and stores the result on disk
:param datastream: datastream
:param src_field: source field to use as argument. Can be one field or list of fields
:param dst_field: destination field name
:param func: function to apply, accepts either one argument or list of arguments
:param file_ext: file extension to use (dst_field+'.npy') by default
:return: processed file stream
- apply_nx(datastream, src_field, dst_field, func, eval_strategy=None, print_exceptions=False)
- Same as `apply`, but ignores exceptions by just skipping elements with errors.
- as_dict(iterable)
- as_field(datastream, field_name)
- Convert stream of any objects into proper datastream of `mdict`'s, with one named field
- as_list(iterable)
- as_npy(l)
- Convert the sequence into numpy array. Use as `seq | as_npy`
:param l: input pipe generator (finite)
:return: numpy array created from the generator
- as_set(iterable)
- as_tuple(iterable)
- average(iterable)
- Build the average for the given iterable, starting with 0.0 as seed
Will try a division by 0 if the iterable is empty...
- batch(datastream, k, n)
- Separate only part of the stream for parallel batch processing. If you have `n` nodes, pass number of current node
as `k` (from 0 to n-1), and it will pass only part of the stream to be processed by that node. Namely, for i-th
element of the stream, it is passed through if i%n==k
:param datastream: datastream
:param k: number of current node in cluster
:param n: total number of nodes
:return: resulting datastream which is subset of the original one
- chain(iterable)
chain_with = class chain(builtins.object) |
|
chain(*iterables) --> chain object
Return a chain object whose .__next__() method returns elements from the
first iterable until it is exhausted, then elements from the next
iterable, until all of the iterables are exhausted. |
|
Methods defined here:
- __getattribute__(self, name, /)
- Return getattr(self, name).
- __iter__(self, /)
- Implement iter(self).
- __new__(*args, **kwargs) from builtins.type
- Create and return a new object. See help(type) for accurate signature.
- __next__(self, /)
- Implement next(self).
- __reduce__(...)
- Return state information for pickling.
- __setstate__(...)
- Set state information for unpickling.
- from_iterable(...) from builtins.type
- chain.from_iterable(iterable) --> chain object
Alternate chain() constructor taking a single iterable argument
that evaluates lazily.
| - concat(iterable, separator=', ')
- count(iterable)
- Count the size of the given iterable, walking thrue it.
- dedup(iterable)
- Only yield unique items. Use a set to keep track of duplicate data.
- delay(seq, field_name, delayed_field_name)
- Create another field `delayed_field_name` from `field_name` that is one step delayed
:param seq: Sequence
:param field_name: Original existing field name
:param delayed_field_name: New field name to hold the delayed value
:return: New sequence
- delfield(datastream, field_name)
- Delete specified field `field_name` from the stream. This is typically done in order to save memory.
- dict_group_by(datasteam, field_name)
- Group all the records by the given field name. Returns dictionary that for each value of the field contains lists
of corresponding `mdict`-s. **Important**: This operation loads whole dataset into memory, so for big data fields
it is better to use lazy evaluation.
:param datasteam: input datastream
:param field_name: field name to use
:return: dictionary of the form `{ 'value-1' : [ ... ], ...}`
- ensure_field(datastream, field_name)
- Ensure that the field with the given name exists. All records non containing that field are skipped.
:param datastream: input datastream
:param field_name: field name
:return: output datastream
- execute(l)
- Runs all elements of the pipeline, ignoring the result
The same as _ = pipe | as_list
:param l: Pipeline to execute
- fapply(datastream, dst_field, func, eval_strategy=None)
- Applies a function to the whole dictionary and stores the result in the specified field.
This function should rarely be used externaly, choice should be made in favour of `apply`, because it does not involve
operating on internals of `dict`
- fenumerate(l, field_name, start=0)
- Add extra field to datastream which contains number of record
:param l:
:param field_name:
:return:
- filter(datastream, src_field, pred)
- Filters out fields that yield a given criteria.
:param datastream: input datastream
:param src_field: field of list of fields to consider
:param pred: predicate function. If `src_field` is one field, than `pred` is a function of one argument returning boolean.
If `src_field` is a list, `pred` takes tuple/list as an argument.
:return: datastream with fields that yield predicate
- first(iterable)
- fold(l, field_name, func, init_state)
- Perform fold of the datastream, using given fold function `func` with initial state `init_state`
:param l: datastream
:param field_name: field name (or list of names) to use
:param func: fold function that takes field(s) value and state and returns state. If field_name is None, func
accepts the whole `mdict` as first parameter
:param init_state: initial state
:return: final state of the fold
- groupby(iterable, keyfunc)
- index(iterable, value, start=0, stop=None)
- infshuffle(l)
- Function that turns sequence into infinite shuffled sequence. It loads it into memory for processing.
:param l: input pipe generator
:return: result sequence
- inspect(seq, func=None, message='Inspecting mdict')
- Print out the info about the fields in a given stream
:return: Original sequence
class islice(builtins.object) |
|
islice(iterable, stop) --> islice object
islice(iterable, start, stop[, step]) --> islice object
Return an iterator whose next() method returns selected values from an
iterable. If start is specified, will skip all preceding elements;
otherwise, start defaults to zero. Step defaults to one. If
specified as another value, step determines how many values are
skipped between successive calls. Works like a slice() on a list
but returns an iterator. |
|
Methods defined here:
- __getattribute__(self, name, /)
- Return getattr(self, name).
- __iter__(self, /)
- Implement iter(self).
- __new__(*args, **kwargs) from builtins.type
- Create and return a new object. See help(type) for accurate signature.
- __next__(self, /)
- Implement next(self).
- __reduce__(...)
- Return state information for pickling.
- __setstate__(...)
- Set state information for unpickling.
| - iter(datastream, field_name=None, func=None)
- Execute function `func` on field `field_name` (or list of fields) of every item.
If `field_name` is omitted or `None`, function is applied on the whole dictionary (this usage is not recommended).
- iteri(datastream, field_name=None, func=None)
- Execute function `func` on field `field_name` (or list of fields) of every item.
If `field_name` is omitted or `None`, function is applied on the whole dictionary (this usage is not recommended).
Function receives number of frame as the first argument
izip = class zip(object) |
|
zip(iter1 [,iter2 [...]]) --> zip object
Return a zip object whose .__next__() method returns a tuple where
the i-th element comes from the i-th iterable argument. The .__next__()
method continues until the shortest iterable in the argument sequence
is exhausted and then it raises StopIteration. |
|
Methods defined here:
- __getattribute__(self, name, /)
- Return getattr(self, name).
- __iter__(self, /)
- Implement iter(self).
- __new__(*args, **kwargs) from builtins.type
- Create and return a new object. See help(type) for accurate signature.
- __next__(self, /)
- Implement next(self).
- __reduce__(...)
- Return state information for pickling.
| - lineout(x)
- lstrip(iterable, chars=None)
- lzapply(datastream, src_field, dst_field, func, eval_strategy=None)
- Lazily applies a function to the specified field of the stream and stores the result in the specified field.
You need to make sure that `lzapply` does not create endless recursive loop - you should not use the same
`src_field` and `dst_field`, and avoid situations when x['f1'] lazily depends on x['f2'], while x['f2'] lazily
depends on x['f1'].
- max(iterable, **kwargs)
- min(iterable, **kwargs)
- netcat(to_send, host, port)
- netwrite(to_send, host, port)
- passed(x)
- pbatch(l, n=10)
- Split input sequence into batches of `n` elements.
:param l: Input sequence
:param n: Length of output batches (lists)
:return: Sequence of lists of `n` elements
- pconcat(l)
- pcycle(l)
- Infinitely cycle the input sequence
:param l: input pipe generator
:return: infinite datastream
- permutations(iterable, r=None)
- pexec(l, func=None, convert_to_list=False)
- Execute function func, passing the pipe sequence as an argument
:param func: Function to execute, must accept 1 iterator as parameter.
:param convert_to_list: Convert pipe to list before passing it to function. If `func` is `None`, iterator is converted to list anyway.
:return: result of func
- pforeach(l, func)
- Execute given function on each element in the pipe
:param l: datastream
:param func: function to be called on each element
- pprint(l)
- Print the values of a finite pipe generator and return a new copy of it. It has to convert generator into in-memory
list, so better not to use it with big data. Use `seq | tee ...` instead.
:param l: input pipe generator
:return: the same generator
- psave(datastream, filename)
- Save whole datastream into a file for later use
:param datastream: Datastream
:param filename: Filename
- pshuffle(l)
- Shuffle a given pipe.
In the current implementation, it has to store the whole datastream into memory as a list, in order to perform shuffle.
Please not, that the idiom [1,2,3] | pshuffle() | pcycle() will return the same order of the shuffled sequence (eg. something
like [2,1,3,2,1,3,...]), if you want proper infinite shuffle use `infshuffle()` instead.
:param l: input pipe generator
:return: list of elements of the datastream in a shuffled order
- puniq(l)
- reverse(iterable)
- rstrip(iterable, chars=None)
- run_with(iterable, func)
- sapply(datastream, field, func)
- Self-apply
Applies a function to the specified field of the stream and stores the result in the same field. Sample usage:
`[1,2,3] | as_field('x') | sapply('x',lambda x: x*x) | select_field('x') | as_list`
- scan(l, field_name, new_field_name, func, init_state)
- Perform scan (cumulitive sum) of the datastream, using given function `func` with initial state `init_state`.
Results are places into `new_field_name` field.
:param l: datastream
:param field_name: field name (or list of names) to use
:param new_field_name: field name to use for storing results
:param func: fold function that takes field(s) value and state and returns state. If field_name is None, func
accepts the whole `mdict` as first parameter
:param init_state: initial state
:return: final state of the fold
- select(iterable, selector)
- select_field(datastream, field_name)
- Extract one/several fields from datastream, returning a stream of objects of corresponding type (not `mdict`).
If several fields are given, return a list/tuple.
- select_fields(datastream, field_names)
- Select multiple fields from datastream, returning a *new* stream of *mdicts* with the requested fields copied.
Because field immutability is encouraged, the best way to get rid of some fields and free up memory
is to select out a new data structure with the ones you want copied over.
- silly_progress(seq, n=None, elements=None, symbol='.', width=40)
- Print dots to indicate that something good is happening. A dot is printed every `n` items.
:param seq: original sequence
:param n: number of items to process between printing a dot
:param symbol: symbol to print
:return: original sequence
- skip(iterable, qte)
- Skip qte elements in the given iterable, then yield others.
- skip_while(iterable, predicate)
- sliding_window_npy(seq, field_names, size, cache=10)
- Create a stream of sliding windows from a given stream.
:param seq: Input sequence
:param field_names: Field names to accumulate
:param size: Size of sliding window
:param cache: Size of the caching array, in a number of `size`-chunks.
:return: mPyPl sequence containing numpy arrays for specified fields
- sort(iterable, **kwargs)
- stdout(x)
- strip(iterable, chars=None)
- summarize(seq, field_name, func=None, msg=None)
- Compute a summary of a given field (eg. count of different values). Resulting dictionary is either passed to `func`,
or printed on screen (if `func is None`).
:param seq: Datastream
:param field_name: Field name to summarize
:param func: Function to call after summary is obtained (which is after all stream processing). If `None`, summary is printed on screen.
:param msg: Optional message to print before summary
:return: Original stream
- t(iterable, y)
- tail(iterable, qte)
- Yield qte of elements in the given iterable.
- take(iterable, qte)
- Yield qte of elements in the given iterable.
- take_while(iterable, predicate)
- tee(iterable)
- to_type(x, t)
- transpose(iterable)
- traverse(args)
- unfold(l, field_name, func, init_state)
- Add extra field to the datastream, which is obtained by applying state transformation function `func` to
initial state `init_state`
:param l: datastream
:param func: state transformation function
:param init_state: initial state
:return: datastream
- uniq(iterable)
- Deduplicate consecutive duplicate values.
- unroll(datastream, field)
- Field `field` is assumed to be a sequence. This function unrolls the sequence, i.e. replacing the sequence field
with the actual values inside the sequence. All other field values are duplicated.
:param datasteam: Data stream
:param field: Field name or list of field names. If several fields are listed, corresponding sequences should preferably be of the same size.
- where(iterable, predicate)
|