Audio Pipelines
minispeaker.processor.pipes
stream_async_as_generator
def stream_async_as_generator(
iterable: AsyncIterable[T]) -> AsyncGenerator[T, None]
Convenience function that returns a wrapped async generator from an iterable
.
Arguments:
iterable
AsyncIterable[T] - Any asynchronous iterator audio stream.
Returns:
AsyncGenerator[T, None]: An identical iterable
audio stream as a asynchronous generator.
stream_as_generator
def stream_as_generator(iterable: Iterable[T]) -> Generator[T, None, None]
Convenience function that returns a wrapped generator from an iterable
.
Arguments:
iterable
Iterable[T] - Any iterator audio stream.
Returns:
Generator[T, None, None]: An identical iterable
audio stream as a generator.
Yields:
T
- Any value fromiterable
memory_stream
def memory_stream(arr: ndarray) -> PlaybackCallbackGeneratorType
Converts a numpy array into a stream.
Arguments:
arr
ndarray - An numpy array of shape(-1, channels).
Returns:
PlaybackCallbackGeneratorType
- Generator that supports miniaudio's playback callback
Yields:
Iterator[ndarray]
- A audio chunk represented as a numpy subarray.
stream_numpy_pcm_memory
def stream_numpy_pcm_memory(
filename: str,
output_format: SampleFormat = SampleFormat.SIGNED16,
nchannels: int = 2,
sample_rate: int = 44100,
dither: DitherMode = DitherMode.NONE) -> PlaybackCallbackGeneratorType
Convenience function that returns a generator to decode and stream any source of encoded audio data.
Stream result is chunks of raw PCM samples as a numpy array. If you send() a number into the generator rather than just using next() on it, you'll get that given number of frames, instead of the default configured amount. This is particularly useful to plug this stream into an audio device callback that wants a variable number of frames per call.
Arguments:
filename
str - descriptionoutput_format
SampleFormat, optional - description. Defaults to SampleFormat.SIGNED16.nchannels
int, optional - description. Defaults to 2.sample_rate
int, optional - description. Defaults to 44100.dither
DitherMode, optional - description. Defaults to DitherMode.NONE.
Returns:
PlaybackCallbackGeneratorType
- description
stream_async_with_callbacks
def stream_async_with_callbacks(
sample_stream: AsyncGenerator[bytes | ArrayLike, int],
progress_callback: Union[Callable[[int], None], None] = None,
frame_process_method: Union[Callable[[FramesType], FramesType],
None] = None,
end_callback: Union[Callable,
None] = None) -> PlaybackCallbackGeneratorType
Convenience synchronous generator function to add callback and processing functionality, allowing synchronous playback from a asynchronous stream.
You can specify: A callback function that gets called during play and takes an int for the number of frames played. A function that can be used to process raw data frames before they are yielded back (takes an array.array or bytes, returns an array.array or bytes) *Note: if the processing method is slow it will result in audio glitchiness.
Arguments:
sample_stream
AsyncGenerator[bytes | ArrayLike, int] - descriptionprogress_callback
Union[Callable[[int], None], None], optional - description. Defaults to None.frame_process_method
Union[Callable[[FramesType], FramesType], None], optional - description. Defaults to None.end_callback
Union[Callable, None], optional - description. Defaults to None.
Returns:
PlaybackCallbackGeneratorType
- description
stream_num_frames
def stream_num_frames(
sample_stream: Generator[ArrayLike, Any, None]
) -> PlaybackCallbackGeneratorType
Convenience generator function with dynamic audio buffer management to guarantee a certain audio chunk size per iteration.
If you send() a number into the generator rather than just using next() on it, you'll get that given number of frames, instead of the default configured amount. This is particularly useful to plug this stream into an audio device callback that wants a variable number of frames per call.
Arguments:
sample_stream
Generator[ArrayLike, Any, None] - Any ArrayLike generator.
Returns:
PlaybackCallbackGeneratorType:
Yields:
ArrayLike
- An audio chunk
stream_match_audio_channels
def stream_match_audio_channels(
sample_stream: Generator[ArrayLike, int, None],
channels: int) -> Generator[ndarray, int, None]
Convenience generator function to automatically reformat any sample_stream
data as a numpy channeled array.
Arguments:
sample_stream
Generator[ArrayLike, int, None] - Any ArrayLike generator.channels
int - description
Returns:
Generator[ndarray, int, None]: Audio data formatted with the correct channels
.
stream_async_buffer
async def stream_async_buffer(
sample_stream: AsyncGenerator[ArrayLike, None],
max_buffer_chunks: int) -> AsyncGenerator[ArrayLike, None]
Asynchronous convenience generator function to prefetch audio for continuous playback.
Arguments:
sample_stream
AsyncGenerator[ArrayLike, None] - Any asynchronous audio generator.max_buffer_chunks
int - The prefetched audio size will not exceed by this amount.
Returns:
AsyncGenerator[ArrayLike, None]: Identical audio stream with buffer cache.
Yields:
ArrayLike
- Identical audio data.
stream_bytes_to_array
def stream_bytes_to_array(byte_stream: Generator[Buffer, int, None],
dtype: DTypeLike) -> Generator[ndarray, int, None]
Convenience generator function to automatically convert any byte_stream
into a numpy compatible sample stream.
Arguments:
byte_stream
Generator[Buffer, int, None] - Any Buffer generator.dtype
DTypeLike - The underlying audio sample format as a numpy dtype.
Returns:
Generator[ndarray, int, None]: Audio data formatted as a numpy array.
stream_sentinel
def stream_sentinel() -> Generator[ArrayLike, int, None]
Convenience generator function to simply yield nothing. Typically used against race conditions when track audio data is requested before the complete audio stream generator is initialized.
Returns:
Generator[ArrayLike, int, None]: Empty audio data
stream_pad
def stream_pad(ndarray_stream: Generator[ndarray, int, None],
channels: int) -> Generator[ndarray, int, None]
When calculating np.average to mix multiple audio streams, the function assumes all audio streams are identical in shape.
This is the case until an audio stream is nearing its end, whereby it returns an trimmed audio stream. pad() ensures that the trimmed audio stream is padded.
Arguments:
ndarray_stream
Generator[ndarray, int, None] - Any synchronous audio generator whose data is formatted as a numpy array.channels
int - Number of audio channels.
Returns:
Generator[ndarray, int, None]:formatted as a numpy array.
stream_handle_mute
def stream_handle_mute(sample_stream: Generator[ArrayLike, int, None],
track: Track) -> Generator[ArrayLike, int, None]
Convenience generator function to purposely throw out audio data if track
is muted, creating the effect of played but unheard audio.
Arguments:
sample_stream
Generator[ArrayLike, int, None] - Any synchronous audio generatortrack
Track - AnyTrack
class.
Returns:
Generator[ArrayLike, int, None]: Audio data
AudioPipeline
class AudioPipeline()
Immutable pipeline that preserves order and chains audio processing transformations.
Creates a composable pipeline of generator transformations that can be applied to any source. Each transformation in the pipeline receives the output of the previous transformation, creating a lazy evaluation chain that processes audio data efficiently.
Each generator must contain an initialization yield as each of them will be started in the pipeline.
Arguments:
*transforms
- Variable number of (function, args, kwargs) tuples representing transformations to apply. Each function should return an AudioGenerator with an initialization yield. Typically not called directly - use >> operator instead.
Attributes:
transforms
tuple - Immutable tuple of (function, args, kwargs) transformations that define the processing pipeline order.
Examples:
```python
Create an audio generator with initialization and send capability:
>>> def amplify(source, factor=2):
... # Initialization
... current_factor = factor
... yield # Initialization yield
...
... # Main processing loop
... for sample in source:
... # Can receive new amplification factor via send()
... new_factor = yield sample * current_factor
... if new_factor is not None:
... current_factor = new_factor
Create a pipeline using the >> operator:
>>> pipeline = (AudioPipeline()
... >> (amplify, 2)
... >> bandpass_filter
... >> compressor)
Apply to audio stream:
>>> audio_gen = pipeline(audio_samples)
Compile for reuse:
>>> process = pipeline.compile()
>>> output = list(process(another_stream))
```
AudioPipeline.__rshift__
def __rshift__(
transform: Union[GeneratorFactory, Transform]) -> "AudioPipeline"
Add a transformation to the pipeline using the >> operator.
Creates a new Pipeline instance with the additional transformation appended, preserving immutability. The transformation should be a generator function that yields once for initialization and can accept integer values via send().
Arguments:
transform
Union[GeneratorFactory,Transform] - Either a generator function or a tuple of (generator_function, *args, **kwargs).If callable: Applied with no additional arguments If tuple: First element is the generator function, remaining elements are arguments If last tuple element is a dict, it's treated as kwargs
Generator functions in the pipeline first positional argument must be either:
- The initial
source
passed to the pipeline (for the first transform) - The output from the previous transform in the chain (for subsequent transforms)
- The initial
Returns:
AudioPipeline
- New Pipeline instance with the transformation added.
Raises:
TypeError
- If transform is neither callable nor tuple.
AudioPipeline.__call__
def __call__(source: In) -> AudioGenerator
Apply the pipeline to a source.
Executes all transformations in order, passing the output of each transformation as input to the next. Each generator will be GEN_STARTED
by having their first yield consumed before processing begins.
Arguments:
source
In - The input to process through the pipeline. Can be any type that the first generator in the chain accepts (iterable, generator, etc).
Returns:
AudioGenerator
- CurriedAudioGenerator
oftransforms
AudioPipeline.compile
def compile() -> GeneratorFactory
Compile the pipeline into a reusable generator factory.
Returns a function that can be called multiple times with different sources. Each call will create a new chain of initialized generators that process the input and support send() communication.
Returns:
GeneratorFactory
- Function that returns a curriedAudioGenerator
oftransforms
AudioPipeline.__repr__
def __repr__() -> str
Return a string representation of the pipeline for debugging.
Shows the sequence of transformations in a readable format using >> notation. Functions with arguments are displayed with ellipsis to indicate parameters.
Returns:
str
- Human-readable representation of the pipeline structure.