Data Producer, User, Owner in mloda¶
Roles in mloda¶
In this notebook, we will describe three roles which exists in the mloda framework.
Data Producers: Provide access to raw data, business-layer data, or aggregated data.
- A data scientist or analyst might create simpler datasets or analytical outputs
- A data engineer might design access to complex data infrastructures, such as data lakes or warehouses
- Shares plug-ins and with it access to data
Data User: Interacts with mloda by applying plug-ins while making requests via the mlodaAPI.
- A data scientist or analyst who needs data and data transformations (features)
- Consumes features from other parts of the organizations
Data Owner: Ensures lifetime value, availability and governance of data and features
Data Producer¶
Let us look closer into the role of a data producer.
What could a data producer create?
# We reuse the data from the first example and just rerun it for the sake of the example, but just in one cell.
import os
from typing import List
from mloda_core.abstract_plugins.components.feature import Feature
from mloda_core.abstract_plugins.components.data_access_collection import DataAccessCollection
from mloda_plugins.feature_group.input_data.read_dbs.sqlite import SQLITEReader
from mloda_core.abstract_plugins.plugin_loader.plugin_loader import PluginLoader
from mloda_core.api.request import mlodaAPI
from mloda_plugins.compute_framework.base_implementations.pyarrow.table import PyarrowTable
plugin_loader = PluginLoader.all()
# Initialize a DataAccessCollection object
data_access_collection = DataAccessCollection()
# Define the folders containing the data
# Note: We use two paths to accommodate different possible root locations as it depends where the code is executed.
base_data_path = os.path.join(os.getcwd(), "docs", "docs", "examples", "mloda_basics", "base_data")
if not os.path.exists(base_data_path):
base_data_path = os.path.join(os.getcwd(), "base_data")
# Add the folder to the DataAccessCollection
data_access_collection.add_folder(base_data_path)
# As a db cannot work with a folder, we need to add a connection for the db.
data_access_collection.add_credential_dict(
credential_dict={SQLITEReader.db_path(): os.path.join(base_data_path, "example.sqlite")}
)
order_features: List[str | Feature] = ["order_id", "product_id", "quantity", "item_price"]
payment_features: List[str | Feature] = ["payment_id", "payment_type", "payment_status", "valid_datetime"]
location_features: List[str | Feature] = ["user_location", "merchant_location", "update_date"]
categorical_features: List[str | Feature] = ["user_age_group", "product_category", "transaction_type"]
all_features = order_features + payment_features + location_features + categorical_features
mlodaAPI.run_all(all_features, data_access_collection=data_access_collection, compute_frameworks={PyarrowTable})
[pyarrow.Table user_location: string update_date: int64 merchant_location: string ---- user_location: [["East","West","North","West","West",...,"West","North","North","North","East"]] update_date: [[1640995200000,1641632290909,1642269381818,1642906472727,1643543563636,...,1701518836363,1702155927272,1702793018181,1703430109090,1704067200000]] merchant_location: [["North","East","East","East","North",...,"North","South","East","South","North"]], pyarrow.Table product_id: int64 item_price: double quantity: int64 order_id: int64 ---- product_id: [[282,355,395,319,275,...,170,328,361,192,271]] item_price: [[74.86,154.56,67.7,186.21,118.69,...,80.51,98.91,162.98,194.98,107.73]] quantity: [[6,2,4,9,5,...,4,3,5,5,6]] order_id: [[1,2,3,4,5,...,96,97,98,99,100]], pyarrow.Table payment_type: string valid_datetime: timestamp[ns, tz=UTC] payment_status: string payment_id: int64 ---- payment_type: [["debit card","debit card","store credit","credit card","store credit",...,"paypal","debit card","credit card","store credit","paypal"]] valid_datetime: [[2024-01-11 23:01:49.090909090Z,2024-01-15 09:41:49.090909090Z,2024-01-29 19:23:38.181818181Z,2024-01-04 18:10:54.545454545Z,2024-01-06 15:16:21.818181818Z,...,2024-01-13 12:36:21.818181818Z,2024-01-10 01:56:21.818181818Z,2024-01-15 02:10:54.545454545Z,2024-01-28 20:50:54.545454545Z,2024-01-06 00:14:32.727272727Z]] payment_status: [["failed","pending","completed","pending","failed",...,"completed","pending","completed","failed","completed"]] payment_id: [[1,2,3,4,5,...,96,97,98,99,100]], pyarrow.Table product_category: string transaction_type: string user_age_group: string ---- product_category: [["clothing","home","clothing","clothing","clothing",...,"clothing","home","books","beauty","clothing"]] transaction_type: [["online","online","telephone","telephone","mail-order",...,"mail-order","in-store","telephone","mail-order","mail-order"]] user_age_group: [["26-35","26-35","36-45","55+","26-35",...,"26-35","46-55","36-45","26-35","36-45"]]]
FeatureGroup API (plug-in) in short¶
In mloda, a data producer defines access by creating feature groups. Here's an example implementation:
class FeatureGroupClass(AbstractFeatureGroup):
# Root feature definition
@classmethod
def input_data(...)
...
# Features derived from other features
def input_features(...)
...
@classmethod
def calculate_feature(...)
...
Simple example implementation of the FeatureGroup API¶
In the background, mloda loads the plug-ins, which were created before, like this one.
class ReadFileFeature(AbstractFeatureGroup):
@classmethod
def input_data(cls) -> Optional[BaseInputData]:
return ReadFile()
@classmethod
def calculate_feature(cls, data: Any, features: FeatureSet) -> Any:
reader = cls.input_data()
if reader is not None:
data = reader.load(features)
return data
raise ValueError(f"Reading file failed for feature {features.get_name_of_one_feature()}.")
We use composition to read different data sources. A ReadFile object looks like this:
class CsvReader(ReadFile):
@classmethod
def suffix(cls) -> Tuple[str, ...]:
return (
".csv",
".CSV",
)
@classmethod
def load_data(cls, data_access: Any, features: FeatureSet) -> Any:
result = pyarrow_csv.read_csv(data_access)
return result.select(list(features.get_all_names()))
@classmethod
def get_column_names(cls, file_name: str) -> Any:
read_options = pyarrow_csv.ReadOptions(skip_rows_after_names=1)
table = pyarrow_csv.read_csv(file_name, read_options=read_options)
return table.schema.names
As you can see, the implementation is flexible in the sense that if you need something, you can adjust it quite easily. The other files like .json, .parquet and the sqlite access are implemented in a similar fashion.
# In the following, we will just adjust a bit the CsvReader to handle a different delimiter.
from typing import Any, Optional
from pyarrow import csv as pyarrow_csv
from mloda_core.abstract_plugins.components.feature_set import FeatureSet
from mloda_core.abstract_plugins.components.input_data.base_input_data import BaseInputData
from mloda_plugins.feature_group.input_data.read_file_feature import ReadFileFeature
from mloda_plugins.feature_group.input_data.read_files.csv import CsvReader
class CsvReader2(CsvReader):
# Adjusted CsvReader2 to handle the new delimiter
_parse_options = pyarrow_csv.ParseOptions(
delimiter=",", # Default delimiter
quote_char='"', # Handles quoted strings
ignore_empty_lines=True, # Skips empty lines
)
@classmethod
def load_data(cls, data_access: Any, features: FeatureSet) -> Any:
result = pyarrow_csv.read_csv(data_access, parse_options=cls._parse_options)
print("We used CsvReader2 to load the data.")
return result.select(list(features.get_all_names()))
class ReadFileFeature2(ReadFileFeature):
@classmethod
def input_data(cls) -> Optional[BaseInputData]:
return CsvReader2()
@classmethod
def validate_output_features(cls, data: Any, features: FeatureSet) -> Optional[bool]:
for column_name in features.get_all_names():
if column_name in data.column_names:
column = data[column_name]
if column.null_count == column.length:
return False
return True
return True
from mloda_core.abstract_plugins.components.plugin_option.plugin_collector import PlugInCollector
result = mlodaAPI.run_all(
order_features,
data_access_collection=data_access_collection,
compute_frameworks={PyarrowTable},
plugin_collector=PlugInCollector.enabled_feature_groups({ReadFileFeature2}),
)
# We can see that the data was loaded using the new CsvReader2.
# However, this is a rather simple use case. In a real-world scenario, we would have more complex data and more complex operations.
We used CsvReader2 to load the data.
Complex plug-ins¶
These can be quite varied:
- aggregate features
- entity features
- historical features
Additionally, one can also write feature groups for:
- using feature stores
- using orchestrator steps
- lazy evaluated functions
Quality¶
The producer must optimize quality, which includes:
- defining input and output validators
- manage the storage and retrieval of artifacts
- implementing software testing
An integration test could be done by using mlodaAPI.run_all and custom data.
An example of a unit test could look like:
def test_csv_reader_2(self) -> None:
def test_parse_options_are_customized(self, mock_read_csv):
# Ensure the parse options are as expected
expected_parse_options = pyarrow_csv.ParseOptions(
delimiter=",",
quote_char='"',
ignore_empty_lines=True
)
# Call the method to trigger parse options usage
CsvReader2.load_data(Mock(), Mock(spec=FeatureSet))
# Verify that the _parse_options in CsvReader2 are customized
self.assertEqual(CsvReader2._parse_options, expected_parse_options)
This allows us to apply software engineering practices consistently throughout the entire data workflow.
Consequences¶
Within mloda, the data producer is empowered as the primary driver, owing to the extensive and customizable range of available plugins.
Unlike traditional data toolchains, mloda provides data producers with the flexibility to define their specific start and end points. This enables the versatile application of mloda across different parts of machine learning lifecycle, such as prototyping, training data preparation, or real-time result monitoring.
This includes the autonomy to define the boundaries of the data producer's domain and to govern the outflow of data. Both aspects are managed through feature groups, which remain under the direct control of the data producer.
Data User Role¶
The Data User plays a pivotal role in configuring and utilizing the mloda API for machine learning and data workflows. The mloda API offers flexible configurations to cater to diverse use cases across the ML lifecycle. Below is an outline of the configurations and features that define the Data User's role:
class mlodaAPI:
def __init__(
self,
requested_features: Union[Features, list[Union[Feature, str]]],
compute_frameworks: Union[Set[Type[ComputeFrameWork]], Optional[list[str]]] = None,
links: Optional[Set[Link]] = None,
data_access_collection: Optional[DataAccessCollection] = None,
global_filter: Optional[GlobalFilter] = None,
api_input_data_collection: Optional[ApiInputDataCollection] = None,
plugin_collector: Optional[PlugInCollector] = None,
) -> None:
data = mlodaAPI.run_all(requested_feature,...)
Let’s use the API to further explain the Data User role. As shown above, there are several configurations to consider. The key ones are:
- Which features to request and if the compute_frameworks should be limited?
- How data is linked?
- What specific access rights and permissions does the user have?
- How data is refined to meet the requirements of the use case?
With all the given configurations, the mloda core is designed, whenever feasible, to follow the process:
- First, formulate an optimized execution plan
- Second, to execute the plan accordingly
What the user mostly gains is that the process is repeatable and can be run in most environments, as long as the plug-ins are available and the accesses exist (firewalls, credentials).
The data user could run mloda API in following scenarios:
- POC notebooks
- Production code scenarios (model training or realtime prediction)
- Micro service endpont
- KPI or QA test data ingestion
With this, the whole ml lifecycle is represented and plug-ins can be reused in a testable and repeatable way along this cycle.
Data Owner Role¶
Data owners typically operate at various levels within an organization.
It can be the one who produces the data, the business stakeholder responsible for the service, or, in some cases, may not be explicitly defined.
In mloda, the data owner is the one in control of the governance. However, as to date, this system is not included in this open-source offering, as this platform is reserved for development until the plugin ecosystem has a higher degree of maturity.
We have the plug-in functionalities to integrate governance and operations systems in place. Two simple examples can be:
Using organization wide logging¶
class OtelExtender(WrapperFunctionExtender):
def __init__(self) -> None:
if trace is None:
return
# Function to be wrapped by the Extender
self.wrapped = {WrapperFunctionEnum.FEATURE_GROUP_CALCULATE_FEATURE}
def wraps(self) -> Set[WrapperFunctionEnum]:
return self.wrapped
def __call__(self, func: Any, *args: Any, **kwargs: Any) -> Any:
logger.warning("OtelExtender")
result = func(*args, **kwargs)
return result
Logging data size¶
class LogSizeOfData(WrapperFunctionExtender):
def wraps(self) -> Set[WrapperFunctionEnum]:
# Function to be wrapped by the Extender
return {WrapperFunctionEnum.VALIDATE_INPUT_FEATURE}
def __call__(self, func: Any, *args: Any, **kwargs: Any) -> Any:
result = func(*args, **kwargs)
size = sys.getsizeof(result)
print(f"Size: {size}")
return result
Conclusion¶
In this notebook, we explored the roles of Data Producers, Data Users, and Data Owners within mloda. We delved into the responsibilities and functionalities associated with each role, highlighting how they contribute to the overall data lifecycle.
Data Producers are responsible for implementing the plugins, ensuring the accuracy of data access processes, and defining relevant configuration options
Data Users create usage configuration and apply the mlodaAPI to receive data
Data Owners, while not fully covered in this open-source offering, they are critical for governance and ensuring the ongoing availability and maintenance of essential plugins
Understanding these roles and their interactions, shows how mloda's modular and extensible design is vital in bringing the change to efficient data management practices and processing throughout the machine learning lifecycle
Next steps¶
While mloda holds a great potential to become a unified portal, we face a key challenge: its plugin coverage is not comprehensive enough to integrate seamlessly across all available tools and technologies. Therefore, active community contributions are absolutely essential to accelerate both its adoption and its ability to transform data management practices