Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 79 additions & 9 deletions pytidb/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -322,17 +322,20 @@ def save(self, data: Union[T, dict]) -> T:
db_session.refresh(merged_data)
return merged_data

def bulk_insert(self, data: List[Union[T, dict]]) -> List[T]:
if not isinstance(data, list):
def bulk_insert(self, data: Union[List[Union[T, dict]], "pd.DataFrame"]) -> List[T]:
# Handle pandas DataFrame input
if hasattr(data, 'to_dict') and hasattr(data, 'iterrows'):
data = self._dataframe_to_models(data)
elif not isinstance(data, list):
raise ValueError(
f"Invalid data type: {type(data)}, expected list[dict], list[{self._table_model}]"
f"Invalid data type: {type(data)}, expected list[dict], list[{self._table_model}], or pandas DataFrame"
)

# Convert dict items to table model instances.
data = [
self._table_model(**item) if isinstance(item, dict) else item
for item in data
]
else:
# Convert dict items to table model instances.
data = [
self._table_model(**item) if isinstance(item, dict) else item
for item in data
]

# Auto embedding.
for field_name, config in self._auto_embedding_configs.items():
Expand Down Expand Up @@ -382,6 +385,73 @@ def bulk_insert(self, data: List[Union[T, dict]]) -> List[T]:
db_session.refresh(item)
return data

def _dataframe_to_models(self, df: "pd.DataFrame") -> List[T]:
"""
Convert pandas DataFrame to list of TableModel instances.

Args:
df: pandas DataFrame to convert

Returns:
List of TableModel instances

Raises:
ImportError: If pandas is not installed
ValueError: If DataFrame columns don't match table schema or conversion fails
"""
try:
import pandas as pd
except ImportError:
raise ImportError(
"pandas is required for DataFrame support. Install it with: pip install pandas"
)

if not isinstance(df, pd.DataFrame):
raise ValueError("Input must be a pandas DataFrame")

# Get table model fields
table_fields = set()
if hasattr(self._table_model, '__pydantic_fields__'):
table_fields = set(self._table_model.__pydantic_fields__.keys())
elif hasattr(self._table_model, '__table__'):
table_fields = set(col.name for col in self._table_model.__table__.columns)

# Validate DataFrame columns against table schema
df_columns = set(df.columns)

# Check for required fields (non-nullable, non-auto fields)
required_fields = set()
if hasattr(self._table_model, '__table__'):
for col in self._table_model.__table__.columns:
if not col.nullable and not col.autoincrement and col.default is None and col.server_default is None:
required_fields.add(col.name)

missing_required = required_fields - df_columns
if missing_required:
raise ValueError(f"Missing required columns: {missing_required}")

# Convert DataFrame rows to TableModel instances
models = []
for row_idx, row in df.iterrows():
# Create dict with only valid fields
model_data = {}
for col_name, value in row.items():
if col_name in table_fields:
# Handle NaN values
if pd.isna(value):
model_data[col_name] = None
else:
model_data[col_name] = value

# Create model instance
try:
model_instance = self._table_model(**model_data)
models.append(model_instance)
except Exception as e:
raise ValueError(f"Failed to create model instance from row {row_idx}: {e}")

return models

def update(self, values: dict, filters: Optional[Filters] = None) -> object:
# Auto embedding.
for field_name, config in self._auto_embedding_configs.items():
Expand Down
Loading