# flake8: noqa """ Writing module to load stems into numpy tensors. """ from stempeg.write import FilesWriter import numpy as np import warnings import ffmpeg import pprint from multiprocessing import Pool import atexit from functools import partial import datetime as dt class Reader(object): """Base class for reader Holds reader options """ def __init__(self): pass class StreamsReader(Reader): """Holding configuration for streams This is the default reader. Nothing to be hold """ def __init__(self): pass class ChannelsReader(Reader): """Using multichannels to multiplex to stems stems will be extracted from multichannel-pairs e.g. 8 channels will be converted to 4 stereo pairs Args: from_channels: int number of channels, defaults to `2`. """ def __init__(self, nb_channels=2): self.nb_channels = nb_channels def _read_ffmpeg( filename, sample_rate, channels, start, duration, dtype, ffmpeg_format, stem_idx ): """Loading data using ffmpeg and numpy Args: filename (str): filename path sample_rate (int): sample rate channels (int): metadata info object needed to know the channel configuration in advance start (float): start position in seconds duration (float): duration in seconds dtype (numpy.dtype): Type of audio array to be casted into stem_idx (int): stream id ffmpeg_format (str): ffmpeg intermediate format encoding. Choose "f32le" for best compatibility Returns: (array_like): numpy audio array """ output_kwargs = {'format': ffmpeg_format, 'ar': sample_rate} if duration is not None: output_kwargs['t'] = str(dt.timedelta(seconds=duration)) if start is not None: output_kwargs['ss'] = str(dt.timedelta(seconds=start)) output_kwargs['map'] = '0:' + str(stem_idx) process = ( ffmpeg .input(filename) .output('pipe:', **output_kwargs) .run_async(pipe_stdout=True, pipe_stderr=True)) buffer, _ = process.communicate() # decode to raw pcm format if ffmpeg_format == "f64le": # PCM 64 bit float numpy_dtype = '<f8' elif ffmpeg_format == "f32le": # PCM 32 bit float numpy_dtype = '<f4' elif ffmpeg_format == "s16le": # PCM 16 bit signed int numpy_dtype = '<i2' else: raise NotImplementedError("ffmpeg format is not supported") waveform = np.frombuffer(buffer, dtype=numpy_dtype).reshape(-1, channels) if not waveform.dtype == np.dtype(dtype): # cast to target/output dtype waveform = waveform.astype(dtype, order='C') # when coming from integer, apply normalization t0 [-1.0, 1.0] if np.issubdtype(numpy_dtype, np.integer): waveform = waveform / (np.iinfo(numpy_dtype).max + 1.0) return waveform def read_stems( filename, start=None, duration=None, stem_id=None, always_3d=False, dtype=np.float64, ffmpeg_format="f32le", info=None, sample_rate=None, reader=StreamsReader(), multiprocess=False ): """Read stems into numpy tensor This function can read both, multi-stream and single stream audio files. If used for reading normal audio, the output is a 1d or 2d (mono/stereo) array. When multiple streams are read, the output is a 3d array. An option stems_from_multichannel was added to load stems that are aggregated into multichannel audio (concatenation of pairs of stereo channels), see more info on audio `stempeg.write.write_stems`. By default `read_stems` assumes that multiple substreams were used to save the stem file (`reader=stempeg.StreamsReader()`). To support multistream files on audio formats that do not support multiple streams (e.g. WAV), streams can be mapped to multiple pairs of channels. In that case, `stempeg.ChannelsReader()`, can be passed. Also see: `stempeg.write.ChannelsWriter`. Args: filename (str): filename of the audio file to load data from. start (float): Start offset to load from in seconds. duration (float): Duration to load in seconds. stem_id (int, optional): substream id, defauls to `None` (all substreams are loaded). always_3d (bool, optional): By default, reading a single-stream audio file will return a two-dimensional array. With ``always_3d=True``, audio data is always returned as a three-dimensional array, even if the audio file has only one stream. dtype (np.dtype, optional): Numpy data type to use, default to `np.float32`. info (Info, Optional): Pass ffmpeg `Info` object to reduce number of os calls on file. This can be used e.g. the sample rate and length of a track is already known in advance. Useful for ML training where the info objects can be pre-processed, thus audio loading can be speed up. sample_rate (float, optional): Sample rate of returned audio. Defaults to `None` which results in the sample rate returned from the mixture. reader (Reader): Holds parameters for the reading method. One of the following: `StreamsReader(...)` Read from a single multistream audio (default). `ChannelsReader(...)` Read/demultiplexed from multiple channels. multiprocess (bool): Applys multi-processing for reading substreams in parallel to speed up reading. Defaults to `True` Returns: stems (array_like): stems tensor of `shape=(stem x samples x channels)` rate (float): sample rate Shape: - Output: `[S, T, C']`, with `S`, if the file has multiple streams and, `C` is the audio has multiple channels. >>> audio, sample_rate = stempeg.read_stems("test.stem.mp4") >>> audio.shape [5, 220500, 2] >>> sample_rate 44100 """ if multiprocess: _pool = Pool() atexit.register(_pool.close) else: _pool = None if not isinstance(filename, str): filename = filename.decode() # use ffprobe to get info object (samplerate, lengths) try: if info is None: metadata = Info(filename) else: metadata = info ffmpeg.probe(filename) except ffmpeg._run.Error as e: raise Warning( 'An error occurs with ffprobe (see ffprobe output below)\n\n{}' .format(e.stderr.decode())) # check number of audio streams in file if 'streams' not in metadata.info or metadata.nb_audio_streams == 0: raise Warning('No audio stream found.') # using ChannelReader would ignore substreams if isinstance(reader, ChannelsReader): if metadata.nb_audio_streams != 1: raise Warning( 'stempeg.ChannelsReader() only processes the first substream.' ) else: if metadata.audio_streams[0][ 'channels' ] % reader.nb_channels != 0: raise Warning('Stems should be encoded as multi-channel.') else: substreams = 0 else: if stem_id is not None: substreams = stem_id else: substreams = metadata.audio_stream_idx() if not isinstance(substreams, list): substreams = [substreams] # if not, get sample rate from mixture if sample_rate is None: sample_rate = metadata.sample_rate(0) _chans = metadata.channels_streams # check if all substreams have the same number of channels if len(set(_chans)) == 1: channels = min(_chans) else: raise RuntimeError("Stems do not have the same number of channels per substream") # set channels to minimum channel per stream stems = [] if _pool: results = _pool.map_async( partial( _read_ffmpeg, filename, sample_rate, channels, start, duration, dtype, ffmpeg_format ), substreams, callback=stems.extend ) results.wait() _pool.terminate() else: stems = [ _read_ffmpeg( filename, sample_rate, channels, start, duration, dtype, ffmpeg_format, stem_idx ) for stem_idx in substreams ] stem_durations = np.array([t.shape[0] for t in stems]) if not (stem_durations == stem_durations[0]).all(): warnings.warning("Stems differ in length and were shortend") min_length = np.min(stem_durations) stems = [t[:min_length, :] for t in stems] # aggregate list of stems to numpy tensor stems = np.array(stems) # If ChannelsReader is used, demultiplex from channels if isinstance(reader, (ChannelsReader)) and stems.shape[-1] > 1: stems = stems.transpose(1, 0, 2) stems = stems.reshape( stems.shape[0], stems.shape[1], -1, reader.nb_channels ) stems = stems.transpose(2, 0, 3, 1)[..., 0] if not always_3d: stems = np.squeeze(stems) return stems, sample_rate class Info(object): """Audio properties that hold a number of metadata. The object is created when can be used when `read_stems` is called. This is can be passed, to `read_stems` to reduce loading time. """ def __init__(self, filename): super(Info, self).__init__() self.info = ffmpeg.probe(filename) self.audio_streams = [ stream for stream in self.info['streams'] if stream['codec_type'] == 'audio' ] @property def nb_audio_streams(self): """Returns the number of audio substreams""" return len(self.audio_streams) @property def nb_samples_streams(self): """Returns a list of number of samples for each substream""" return [self.samples(k) for k, stream in enumerate(self.audio_streams)] @property def channels_streams(self): """Returns the number of channels per substream""" return [ self.channels(k) for k, stream in enumerate(self.audio_streams) ] @property def duration_streams(self): """Returns a list of durations (in s) for all substreams""" return [ self.duration(k) for k, stream in enumerate(self.audio_streams) ] @property def title_streams(self): """Returns stream titles for all substreams""" return [ stream['tags'].get('handler_name') for stream in self.audio_streams ] def audio_stream_idx(self): """Returns audio substream indices""" return [s['index'] for s in self.audio_streams] def samples(self, idx): """Returns the number of samples for a stream index""" return int(self.audio_streams[idx]['duration_ts']) def duration(self, idx): """Returns the duration (in seconds) for a stream index""" return float(self.audio_streams[idx]['duration']) def title(self, idx): """Return the `handler_name` metadata for a given stream index""" return self.audio_streams[idx]['tags']['handler_name'] def rate(self, idx): # deprecated from older stempeg version return self.sample_rate(idx) def sample_rate(self, idx): """Return sample rate for a given substream""" return int(self.audio_streams[idx]['sample_rate']) def channels(self, idx): """Returns the number of channels for a gvien substream""" return int(self.audio_streams[idx]['channels']) def __repr__(self): """Print stream information""" return pprint.pformat(self.audio_streams)
Memory