Add mass copy functionality.
This commit is contained in:
@@ -73,7 +73,7 @@ def _get_default(default_object: object, type_overload: Dict[Optional[type], str
|
||||
return ""
|
||||
|
||||
|
||||
def _create_table(class_: type, cursor: sql.Cursor, type_overload: Dict[Optional[type], str]) -> None:
|
||||
def _create_table(class_: type, cursor: sql.Cursor, type_overload: Dict[Optional[type], str] = type_table) -> None:
|
||||
"""
|
||||
Create the table for a specific dataclass given
|
||||
:param class_: A dataclass.
|
||||
@@ -89,6 +89,3 @@ def _create_table(class_: type, cursor: sql.Cursor, type_overload: Dict[Optional
|
||||
f"{_get_default(field.default, type_overload)}" for field in fields)
|
||||
sql_fields = "obj_id INTEGER PRIMARY KEY AUTOINCREMENT, " + sql_fields
|
||||
cursor.execute(f"CREATE TABLE IF NOT EXISTS {class_.__name__.lower()} ({sql_fields});")
|
||||
|
||||
|
||||
|
||||
|
||||
@@ -5,18 +5,24 @@ This module includes functions to insert multiple records
|
||||
"""
|
||||
from typing import TypeVar, Union, List, Tuple
|
||||
from dataclasses import asdict
|
||||
from warnings import warn
|
||||
from .constraints import ConstraintFailedError
|
||||
from .commons import _convert_sql_format
|
||||
from .commons import _convert_sql_format, _create_table
|
||||
import sqlite3 as sql
|
||||
|
||||
T = TypeVar('T')
|
||||
|
||||
|
||||
class MisformedCollectionError(Exception):
|
||||
class HeterogeneousCollectionError(Exception):
|
||||
"""
|
||||
:raises if the passed collection is not homogeneous.
|
||||
ie: If a List or Tuple has elements of multiple
|
||||
types.
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
def is_homogeneous(objects: Union[List[T], Tuple[T]]) -> bool:
|
||||
def _check_homogeneity(objects: Union[List[T], Tuple[T]]) -> None:
|
||||
"""
|
||||
Check if all of the members a Tuple or a List
|
||||
is of the same type.
|
||||
@@ -25,10 +31,62 @@ def is_homogeneous(objects: Union[List[T], Tuple[T]]) -> bool:
|
||||
:return: If all of the members of the same type.
|
||||
"""
|
||||
class_ = objects[0].__class__
|
||||
return all([isinstance(obj, class_) for obj in objects])
|
||||
if not all([isinstance(obj, class_) or isinstance(objects[0], obj.__class__) for obj in objects]):
|
||||
raise HeterogeneousCollectionError("Tuple or List is not homogeneous.")
|
||||
|
||||
|
||||
def create_many_entries(objects: Union[List[T], Tuple[T]], protect_memory: bool = True) -> None:
|
||||
def _toggle_memory_protection(cur: sql.Cursor, protect_memory: bool) -> None:
|
||||
"""
|
||||
Given a cursor to an sqlite3 connection, if memory protection is false,
|
||||
toggle memory protections off.
|
||||
|
||||
:param cur: Cursor to an open SQLite3 connection.
|
||||
:param protect_memory: Whether or not should memory be protected.
|
||||
:return: Memory protections off.
|
||||
"""
|
||||
if not protect_memory:
|
||||
warn("Memory protections are turned off, "
|
||||
"if operations are interrupted, file may get corrupt.", RuntimeWarning)
|
||||
cur.execute("PRAGMA synchronous = OFF")
|
||||
cur.execute("PRAGMA journal_mode = MEMORY")
|
||||
|
||||
|
||||
def _mass_insert(objects: Union[List[T], Tuple[T]], db_name: str, protect_memory: bool = True) -> None:
|
||||
"""
|
||||
Insert multiple records into an SQLite3 database.
|
||||
|
||||
:param objects: Objects to insert.
|
||||
:param db_name: Name of the database to insert.
|
||||
:param protect_memory: Whether or not memory
|
||||
protections are on or off.
|
||||
:return: None
|
||||
"""
|
||||
_check_homogeneity(objects)
|
||||
sql_queries = []
|
||||
first_index: int = 0
|
||||
table_name = objects[0].__class__.__name__.lower()
|
||||
for obj in objects:
|
||||
kv_pairs = asdict(obj).items()
|
||||
print(kv_pairs)
|
||||
sql_queries.append(f"INSERT INTO {table_name}(" +
|
||||
f"{', '.join(item[0] for item in kv_pairs)})" +
|
||||
f" VALUES ({', '.join(_convert_sql_format(item[1]) for item in kv_pairs)});")
|
||||
with sql.connect(db_name) as con:
|
||||
cur: sql.Cursor = con.cursor()
|
||||
try:
|
||||
_toggle_memory_protection(cur, protect_memory)
|
||||
cur.execute(f"SELECT obj_id FROM {table_name} ORDER BY obj_id DESC LIMIT 1")
|
||||
index_tuple = cur.fetchone()
|
||||
if index_tuple:
|
||||
first_index = index_tuple[0]
|
||||
cur.executescript("BEGIN TRANSACTION;\n" + '\n'.join(sql_queries) + '\nEND TRANSACTION;')
|
||||
except sql.IntegrityError:
|
||||
raise ConstraintFailedError
|
||||
for i, obj in enumerate(objects):
|
||||
setattr(obj, "obj_id", first_index + i + 1)
|
||||
|
||||
|
||||
def create_many(objects: Union[List[T], Tuple[T]], protect_memory: bool = True) -> None:
|
||||
"""
|
||||
Insert many records corresponding to objects
|
||||
in a tuple or a list.
|
||||
@@ -39,29 +97,30 @@ def create_many_entries(objects: Union[List[T], Tuple[T]], protect_memory: bool
|
||||
with datalite.
|
||||
:return: None.
|
||||
"""
|
||||
if not objects or not is_homogeneous(objects):
|
||||
raise MisformedCollectionError("Tuple or List is empty or homogeneous.")
|
||||
sql_queries = []
|
||||
first_index: int = 0
|
||||
table_name = objects[0].__class__.__name__.lower()
|
||||
for obj in objects:
|
||||
kv_pairs = asdict(obj).items()
|
||||
sql_queries.append(f"INSERT INTO {table_name}(" +
|
||||
f"{', '.join(item[0] for item in kv_pairs)})" +
|
||||
f" VALUES ({', '.join(_convert_sql_format(item[1]) for item in kv_pairs)});")
|
||||
with sql.connect(getattr(objects[0], "db_path")) as con:
|
||||
cur: sql.Cursor = con.cursor()
|
||||
try:
|
||||
if not protect_memory:
|
||||
cur.execute("PRAGMA synchronous = OFF")
|
||||
cur.execute("PRAGMA journal_mode = MEMORY")
|
||||
cur.execute(f"SELECT obj_id FROM {table_name} ORDER BY obj_id DESC LIMIT 1")
|
||||
index_tuple = cur.fetchone()
|
||||
if index_tuple:
|
||||
first_index = index_tuple[0]
|
||||
cur.executescript("BEGIN TRANSACTION;\n" + '\n'.join(sql_queries) + '\nEND TRANSACTION;')
|
||||
except sql.IntegrityError:
|
||||
raise ConstraintFailedError
|
||||
con.commit()
|
||||
for i, obj in enumerate(objects):
|
||||
setattr(obj, "obj_id", first_index + i)
|
||||
if objects:
|
||||
_mass_insert(objects, getattr(objects[0], "db_path"), protect_memory)
|
||||
else:
|
||||
raise ValueError("Collection is empty.")
|
||||
|
||||
|
||||
def copy_many(objects: Union[List[T], Tuple[T]], db_name: str, protect_memory: bool = True) -> None:
|
||||
"""
|
||||
Copy many records to another database, from
|
||||
their original database to new database,
|
||||
do not delete old records.
|
||||
|
||||
:param objects: Objects to copy.
|
||||
:param db_name: Name of the new database.
|
||||
:param protect_memory: Wheter to protect memory during operation,
|
||||
Setting this to False will quicken the operation, but if the
|
||||
operation is cut short, database file will corrupt.
|
||||
:return: None
|
||||
"""
|
||||
if objects:
|
||||
with sql.connect(db_name) as con:
|
||||
cur = con.cursor()
|
||||
_create_table(objects[0].__class__, cur)
|
||||
con.commit()
|
||||
_mass_insert(objects, db_name, protect_memory)
|
||||
else:
|
||||
raise ValueError("Collection is empty.")
|
||||
|
||||
@@ -23,7 +23,7 @@ copyright = '2020, Ege Ozkan'
|
||||
author = 'Ege Ozkan'
|
||||
|
||||
# The full version, including alpha/beta/rc tags
|
||||
release = 'v0.6.0'
|
||||
release = 'v0.7.0'
|
||||
|
||||
|
||||
# -- General configuration ---------------------------------------------------
|
||||
|
||||
2
setup.py
2
setup.py
@@ -5,7 +5,7 @@ with open("README.md", "r") as fh:
|
||||
|
||||
setuptools.setup(
|
||||
name="datalite", # Replace with your own username
|
||||
version="0.6.0",
|
||||
version="0.7.0",
|
||||
author="Ege Ozkan",
|
||||
author_email="egeemirozkan24@gmail.com",
|
||||
description="A small package that binds dataclasses to an sqlite database",
|
||||
|
||||
@@ -2,7 +2,7 @@ import unittest
|
||||
from datalite import datalite
|
||||
from datalite.constraints import Unique, ConstraintFailedError
|
||||
from datalite.fetch import fetch_if, fetch_all, fetch_range, fetch_from, fetch_equals, fetch_where
|
||||
from datalite.mass_actions import create_many_entries
|
||||
from datalite.mass_actions import create_many, copy_many
|
||||
from sqlite3 import connect
|
||||
from dataclasses import dataclass, asdict
|
||||
from math import floor
|
||||
@@ -197,10 +197,18 @@ class DatabaseConstraints(unittest.TestCase):
|
||||
|
||||
class DatabaseMassInsert(unittest.TestCase):
|
||||
def setUp(self) -> None:
|
||||
self.objs = [MassCommit('cat') for _ in range(30)]
|
||||
self.objs = [MassCommit(f'cat + {i}') for i in range(30)]
|
||||
|
||||
def testMassCreate(self):
|
||||
create_many_entries(self.objs, protect_memory=False)
|
||||
create_many(self.objs, protect_memory=False)
|
||||
_objs = fetch_all(MassCommit)
|
||||
self.assertEqual(_objs, tuple(self.objs))
|
||||
|
||||
def testMassCopy(self):
|
||||
copy_many(self.objs, 'other.db', False)
|
||||
setattr(MassCommit, 'db_path', 'other.db')
|
||||
tup = fetch_all(MassCommit)
|
||||
self.assertEqual(tup, tuple(self.objs))
|
||||
|
||||
def tearDown(self) -> None:
|
||||
[obj.remove_entry() for obj in self.objs]
|
||||
|
||||
Reference in New Issue
Block a user