Setup: Create Sample Data¶
In [1]:
Copied!
import pandas as pd
import numpy as np
# Create realistic sample data with missing values
np.random.seed(42)
n_samples = 1000
data_dict = {
"age": np.random.randint(18, 80, n_samples),
"weight": np.random.normal(70, 15, n_samples),
"state": np.random.choice(["CA", "NY", "TX", "FL"], n_samples),
"gender": np.random.choice(["M", "F"], n_samples),
}
data = pd.DataFrame(data_dict)
# Introduce missing values
missing_indices = np.random.choice(n_samples, size=int(0.1 * n_samples), replace=False)
data.loc[missing_indices[:50], "age"] = np.nan
data.loc[missing_indices[50:], "weight"] = np.nan
print("Sample data with missing values:")
print(data.head())
print(f"\nData shape: {data.shape}")
print(f"Missing values: age={data['age'].isna().sum()}, weight={data['weight'].isna().sum()}")
import pandas as pd
import numpy as np
# Create realistic sample data with missing values
np.random.seed(42)
n_samples = 1000
data_dict = {
"age": np.random.randint(18, 80, n_samples),
"weight": np.random.normal(70, 15, n_samples),
"state": np.random.choice(["CA", "NY", "TX", "FL"], n_samples),
"gender": np.random.choice(["M", "F"], n_samples),
}
data = pd.DataFrame(data_dict)
# Introduce missing values
missing_indices = np.random.choice(n_samples, size=int(0.1 * n_samples), replace=False)
data.loc[missing_indices[:50], "age"] = np.nan
data.loc[missing_indices[50:], "weight"] = np.nan
print("Sample data with missing values:")
print(data.head())
print(f"\nData shape: {data.shape}")
print(f"Missing values: age={data['age'].isna().sum()}, weight={data['weight'].isna().sum()}")
Sample data with missing values: age weight state gender 0 56.0 90.585667 NY F 1 69.0 59.833209 NY M 2 46.0 87.302978 FL F 3 32.0 64.374841 FL M 4 60.0 59.587811 FL M Data shape: (1000, 4) Missing values: age=50, weight=50
Traditional scikit-learn Pipeline¶
In [2]:
Copied!
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer
from sklearn.compose import ColumnTransformer
# numeric_preprocessor = Pipeline(
numeric_preprocessor = Pipeline(
steps=[
("imputation_mean", SimpleImputer(missing_values=np.nan, strategy="mean")),
("scaler", StandardScaler()),
]
)
categorical_preprocessor = Pipeline(
steps=[
(
"imputation_constant",
SimpleImputer(fill_value="missing", strategy="constant"),
),
("onehot", OneHotEncoder(handle_unknown="ignore")),
]
)
preprocessor = ColumnTransformer(
[
("categorical", categorical_preprocessor, ["state", "gender"]),
("numerical", numeric_preprocessor, ["age", "weight"]),
]
)
# Fit and transform
preprocessor.fit(data) # Learn imputations, encoder categories, and scaler parameters
X_transformed = preprocessor.transform(data) # Apply the transformations
onehot_feature_names = (
preprocessor.named_transformers_["categorical"].named_steps["onehot"].get_feature_names_out(["state", "gender"])
)
numeric_feature_names = ["age", "weight"]
all_feature_names = np.concatenate([onehot_feature_names, numeric_feature_names])
df_transformed = pd.DataFrame(
X_transformed.toarray() if hasattr(X_transformed, "toarray") else X_transformed, # type: ignore
columns=all_feature_names,
)
print("✅ Transformed dataset with split fit/transform:")
print(df_transformed.head(2))
print(f"Traditional pipeline result shape: {X_transformed.shape}")
print(f"Result: {len(df_transformed.columns)} columns total")
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer
from sklearn.compose import ColumnTransformer
# numeric_preprocessor = Pipeline(
numeric_preprocessor = Pipeline(
steps=[
("imputation_mean", SimpleImputer(missing_values=np.nan, strategy="mean")),
("scaler", StandardScaler()),
]
)
categorical_preprocessor = Pipeline(
steps=[
(
"imputation_constant",
SimpleImputer(fill_value="missing", strategy="constant"),
),
("onehot", OneHotEncoder(handle_unknown="ignore")),
]
)
preprocessor = ColumnTransformer(
[
("categorical", categorical_preprocessor, ["state", "gender"]),
("numerical", numeric_preprocessor, ["age", "weight"]),
]
)
# Fit and transform
preprocessor.fit(data) # Learn imputations, encoder categories, and scaler parameters
X_transformed = preprocessor.transform(data) # Apply the transformations
onehot_feature_names = (
preprocessor.named_transformers_["categorical"].named_steps["onehot"].get_feature_names_out(["state", "gender"])
)
numeric_feature_names = ["age", "weight"]
all_feature_names = np.concatenate([onehot_feature_names, numeric_feature_names])
df_transformed = pd.DataFrame(
X_transformed.toarray() if hasattr(X_transformed, "toarray") else X_transformed, # type: ignore
columns=all_feature_names,
)
print("✅ Transformed dataset with split fit/transform:")
print(df_transformed.head(2))
print(f"Traditional pipeline result shape: {X_transformed.shape}")
print(f"Result: {len(df_transformed.columns)} columns total")
✅ Transformed dataset with split fit/transform: state_CA state_FL state_NY state_TX gender_F gender_M age \ 0 0.0 0.0 1.0 0.0 1.0 0.0 0.351358 1 0.0 0.0 1.0 0.0 0.0 1.0 1.086080 weight 0 1.287800 1 -0.705904 Traditional pipeline result shape: (1000, 8) Result: 8 columns total
mloda Approach¶
In [3]:
Copied!
from typing import Optional, Any
from mloda_core.abstract_plugins.abstract_feature_group import AbstractFeatureGroup
from mloda_core.abstract_plugins.components.input_data.base_input_data import BaseInputData
from mloda_core.abstract_plugins.components.input_data.creator.data_creator import DataCreator
from mloda_core.abstract_plugins.components.feature_set import FeatureSet
from mloda_core.abstract_plugins.plugin_loader.plugin_loader import PluginLoader
from mloda_core.api.request import mlodaAPI
from mloda_plugins.compute_framework.base_implementations.pandas.dataframe import PandasDataframe
# In mloda, we have the concept of feature groups.
# A feature group is an abstraction between a data framework and processes of a data transformation.
# In this example, the data framework is clearly pandas.
# The processes are typically meta information like names, but lifecyle definition, or dependencies or relations to other data.
# On the basis on the given data_dict earlier defined and its names, we use a DataCreator to inject the data_dict into the feature group abstraction.
# Very simply spoken: we load the data.
class SklearnDataCreator(AbstractFeatureGroup):
# This function is core to mloda. In this spot, the data framework with the actual data representation meets the defined and resolved processes.
# With this, we have access to the before and after state of a feature.
@classmethod
def calculate_feature(cls, data: Any, features: FeatureSet) -> Any:
# If this feature would not load data, we would use the data given from the parameter "data".
data = pd.DataFrame(data_dict)
return data
# One way to get this data is via defined input_data. But there are many more and we should not go to deep into this topic for now.
@classmethod
def input_data(cls) -> Optional[BaseInputData]:
return DataCreator({"age", "weight", "state", "gender"})
# As next, we will use one method of defining what features we want as result from the mloda framework.
features = [
"standard_scaled__mean_imputed__age", # Scale imputed age
"standard_scaled__mean_imputed__weight", # Scale imputed weight
"onehot_encoded__state", # One-hot encode state
"onehot_encoded__gender", # One-hot encode gender
]
# We now use a trick to register all known feature groups. mloda will only use those which are loaded into the namespace.
PluginLoader().all()
# And then we execute mloda, which will resolve its dependencies of its feature groups and data frame technologies automatically.
result = mlodaAPI.run_all(features, compute_frameworks={PandasDataframe})
_result, _result2 = result[0], result[1]
print("✅ Transformed dataset with split fit/transform:")
print(_result.head(2))
print(_result2.head(2))
print(f"Result: {list(_result.columns)} \n {list(_result2.columns)} columns total")
# Remark 1: We have not yet added the functionality to map the value to a column string back. It is planned. https://github.com/TomKaltofen/mloda/issues/46
# Remark 2: If you see the error "ValueError: Multiple feature groups", please restart the notebook. This happens if we load the class SklearnDataCreator twice into the notebook memory.
# I have yet to find a solution for this.
from typing import Optional, Any
from mloda_core.abstract_plugins.abstract_feature_group import AbstractFeatureGroup
from mloda_core.abstract_plugins.components.input_data.base_input_data import BaseInputData
from mloda_core.abstract_plugins.components.input_data.creator.data_creator import DataCreator
from mloda_core.abstract_plugins.components.feature_set import FeatureSet
from mloda_core.abstract_plugins.plugin_loader.plugin_loader import PluginLoader
from mloda_core.api.request import mlodaAPI
from mloda_plugins.compute_framework.base_implementations.pandas.dataframe import PandasDataframe
# In mloda, we have the concept of feature groups.
# A feature group is an abstraction between a data framework and processes of a data transformation.
# In this example, the data framework is clearly pandas.
# The processes are typically meta information like names, but lifecyle definition, or dependencies or relations to other data.
# On the basis on the given data_dict earlier defined and its names, we use a DataCreator to inject the data_dict into the feature group abstraction.
# Very simply spoken: we load the data.
class SklearnDataCreator(AbstractFeatureGroup):
# This function is core to mloda. In this spot, the data framework with the actual data representation meets the defined and resolved processes.
# With this, we have access to the before and after state of a feature.
@classmethod
def calculate_feature(cls, data: Any, features: FeatureSet) -> Any:
# If this feature would not load data, we would use the data given from the parameter "data".
data = pd.DataFrame(data_dict)
return data
# One way to get this data is via defined input_data. But there are many more and we should not go to deep into this topic for now.
@classmethod
def input_data(cls) -> Optional[BaseInputData]:
return DataCreator({"age", "weight", "state", "gender"})
# As next, we will use one method of defining what features we want as result from the mloda framework.
features = [
"standard_scaled__mean_imputed__age", # Scale imputed age
"standard_scaled__mean_imputed__weight", # Scale imputed weight
"onehot_encoded__state", # One-hot encode state
"onehot_encoded__gender", # One-hot encode gender
]
# We now use a trick to register all known feature groups. mloda will only use those which are loaded into the namespace.
PluginLoader().all()
# And then we execute mloda, which will resolve its dependencies of its feature groups and data frame technologies automatically.
result = mlodaAPI.run_all(features, compute_frameworks={PandasDataframe})
_result, _result2 = result[0], result[1]
print("✅ Transformed dataset with split fit/transform:")
print(_result.head(2))
print(_result2.head(2))
print(f"Result: {list(_result.columns)} \n {list(_result2.columns)} columns total")
# Remark 1: We have not yet added the functionality to map the value to a column string back. It is planned. https://github.com/TomKaltofen/mloda/issues/46
# Remark 2: If you see the error "ValueError: Multiple feature groups", please restart the notebook. This happens if we load the class SklearnDataCreator twice into the notebook memory.
# I have yet to find a solution for this.
/home/tom/envs/python310/lib/python3.10/site-packages/tqdm/auto.py:21: TqdmWarning: IProgress not found. Please update jupyter and ipywidgets. See https://ipywidgets.readthedocs.io/en/stable/user_install.html from .autonotebook import tqdm as notebook_tqdm
✅ Transformed dataset with split fit/transform: onehot_encoded__state~1 onehot_encoded__gender~0 onehot_encoded__state~0 \ 0 0.0 1.0 0.0 1 0.0 0.0 0.0 onehot_encoded__gender~1 onehot_encoded__state~2 onehot_encoded__state~3 0 0.0 1.0 0.0 1 1.0 1.0 0.0 standard_scaled__mean_imputed__age standard_scaled__mean_imputed__weight 0 0.339295 1.243621 1 1.057320 -0.677472 Result: ['onehot_encoded__state~1', 'onehot_encoded__gender~0', 'onehot_encoded__state~0', 'onehot_encoded__gender~1', 'onehot_encoded__state~2', 'onehot_encoded__state~3'] ['standard_scaled__mean_imputed__age', 'standard_scaled__mean_imputed__weight'] columns total
In [4]:
Copied!
# The beauty and strength of mloda is that we can combine feature groups in a very creative way.
chained_features = [
"max_aggr__standard_scaled__mean_imputed__age", # Do feature pipelines
"robust_scaled__mean_imputed__weight", # Different scaler for weight
"onehot_encoded__state~0", # Access specific one-hot column
]
result = mlodaAPI.run_all(chained_features, compute_frameworks={PandasDataframe})
print(
result[0].head(2),
result[1].head(2),
result[2].head(2),
)
# The beauty and strength of mloda is that we can combine feature groups in a very creative way.
chained_features = [
"max_aggr__standard_scaled__mean_imputed__age", # Do feature pipelines
"robust_scaled__mean_imputed__weight", # Different scaler for weight
"onehot_encoded__state~0", # Access specific one-hot column
]
result = mlodaAPI.run_all(chained_features, compute_frameworks={PandasDataframe})
print(
result[0].head(2),
result[1].head(2),
result[2].head(2),
)
onehot_encoded__state~0 0 0.0 1 0.0 robust_scaled__mean_imputed__weight 0 0.938858 1 -0.535025 max_aggr__standard_scaled__mean_imputed__age 0 1.609647 1 1.609647
In [ ]:
Copied!
# We can replace feature groups and dataframe plugins in an easy fashion.
from mloda_core.abstract_plugins.components.plugin_option.plugin_collector import PlugInCollector
class SecondSklearnDataCreator(AbstractFeatureGroup):
@classmethod
def input_data(cls) -> Optional[BaseInputData]:
return DataCreator({"age", "weight", "state", "gender"})
@classmethod
def calculate_feature(cls, data: Any, features: FeatureSet) -> Any:
print(f"I, {cls.get_class_name()} AM NOW USED.")
return pd.DataFrame(
{
"age": np.random.randint(25, 65, 500),
"weight": np.random.normal(80, 20, 500), # Different distribution
"state": np.random.choice(["WA", "OR"], 500), # Different states!
"gender": np.random.choice(["M", "F", "Other"], 500), # New category!
}
)
chained_features = [
"max_aggr__standard_scaled__mean_imputed__age", # Step 2: Scale imputed
"robust_scaled__mean_imputed__weight", # Different scaler for weight
"onehot_encoded__state~0", # Access specific one-hot column
]
# We deactivated now the other feature group, so that we use SecondSklearnDataCreator.
result = mlodaAPI.run_all(
chained_features,
compute_frameworks={PandasDataframe},
plugin_collector=PlugInCollector.disabled_feature_groups(SklearnDataCreator),
)
print(
result[0].head(2),
result[1].head(2),
result[2].head(2),
)
# We can replace feature groups and dataframe plugins in an easy fashion.
from mloda_core.abstract_plugins.components.plugin_option.plugin_collector import PlugInCollector
class SecondSklearnDataCreator(AbstractFeatureGroup):
@classmethod
def input_data(cls) -> Optional[BaseInputData]:
return DataCreator({"age", "weight", "state", "gender"})
@classmethod
def calculate_feature(cls, data: Any, features: FeatureSet) -> Any:
print(f"I, {cls.get_class_name()} AM NOW USED.")
return pd.DataFrame(
{
"age": np.random.randint(25, 65, 500),
"weight": np.random.normal(80, 20, 500), # Different distribution
"state": np.random.choice(["WA", "OR"], 500), # Different states!
"gender": np.random.choice(["M", "F", "Other"], 500), # New category!
}
)
chained_features = [
"max_aggr__standard_scaled__mean_imputed__age", # Step 2: Scale imputed
"robust_scaled__mean_imputed__weight", # Different scaler for weight
"onehot_encoded__state~0", # Access specific one-hot column
]
# We deactivated now the other feature group, so that we use SecondSklearnDataCreator.
result = mlodaAPI.run_all(
chained_features,
compute_frameworks={PandasDataframe},
plugin_collector=PlugInCollector.disabled_feature_groups(SklearnDataCreator),
)
print(
result[0].head(2),
result[1].head(2),
result[2].head(2),
)
I, SecondSklearnDataCreator AM NOW USED. onehot_encoded__state~0 0 1.0 1 1.0 robust_scaled__mean_imputed__weight 0 0.677809 1 -0.293511 max_aggr__standard_scaled__mean_imputed__age 0 1.631997 1 1.631997