sqlalchemy - could not map parent and child table with a seperate class - python

Note : I know that this is a lengthy post and I have included too much of code, But I am not aware of what is important and what is not, as I am fairly new to SQLAlchemy. Really sorry for the inconvenience
I am trying to map a many to many relationship between 2 tables, BUT at the same time I also have to add another field that can hold a null value in the beginning.
So far, I have tried following code:
this example from github
this already provided solution
and have ended up with nothing working. Here is my code, please help me find what is my mistake. And if possible add a description to how it works as I am new to Python and SQLAlchemy
Test.py:
from app.Blueprints import db
class Test(db.Model):
__tablename__ = "test"
id = db.Column("id", db.Integer, primary_key=True)
name = db.Column("name", db.Unicode)
def __init__(self, name):
self.name = name
Student.py:
from app.Blueprints import db, connection
class Student(db.Model):
__tablename__ = "student_master"
id = db.Column("id", db.Integer, primary_key = True)
fname = db.Column("first_name", db.Unicode)
lname = db.Column("last_name", db.Unicode)
email = db.Column("email", db.Unicode, unique = True)
passwd = db.Column("passwd", db.Unicode)
def __init__(self, fname, lname, email):
passwd = fname+" "+lname
picture = "default.jpg"
state = 1
reg_date = strftime("%Y-%m-%d")
self.fname = fname
self.lname = lname
self.email = email
self.passwd = passwd
Test_Student.py [The model that maps the test and student, with an extra field of grade]:
from app.models.Test import Test
from app.models.Student import Student
from app.Blueprints import db
class Test_Students(db.Model):
__tablename__ = "test_student"
test = db.Column("test_id", db.Integer, db.ForeignKey('test.id'), primary_key = True)
student = db.Column("student_id", db.Integer, db.ForeignKey("student.id"), primary_key = True)
grade = db.Column("grade", db.Float, nullable = True)
tests = db.relationship(Test, backref="students")
students = db.relationship(Student, backref="students")
And this is how I am creating new test:
if request.method == "POST" :
from app.Blueprints import db
from app.models.Test import Test
from app.models.Test_students import Test_Students
from app.models.Student import Student
#register test
newTest = Test(request.form.get("name"), request.form.get("date"), request.form.get("type"))
db.session.add(newTest)
db.session.commit()
#map test with students
StudentList = request.form.getlist("StudentList[]")
for student in StudentList:
stud = Student.query.get(student)
newTest.students.append(stud)
When I try to execute I get the following error:
sqlalchemy.exc.NoForeignKeysError: Could not determine join condition between parent/child tables on relationship Test_Students.students - there are no foreign keys linking these tables. Ensure that referencing columns are associated with a ForeignKey or ForeignKeyConstraint, or specify a 'primaryjoin' expression.

Related

delete one-to-one relationship in flask

I'm currently developing an application using flask and I'm having a big problem to delete items in an one-to-one relationship. I have the following structure in my models:
class User(db.Model):
__tablename__ = 'user'
user_id = db.Column(db.String(8), primary_key = True)
password = db.Column(db.String(26))
class Student(db.Model):
__tablename__ = 'student'
user_id = Column(db.String(8), ForeignKey('user.user_id'), primary_key = True)
user = db.relationship('User')
class Professor(db.Model):
__tablename__ = 'professor'
user_id = Column(db.String(8), ForeignKey('user.user_id'), primary_key = True)
user = db.relationship('User')
What I want to do is delete the Student or the Professor if I delete the user. I have tried to test it using the code below, however, when I check my database, the student and professor are still there and my tests don't pass. I tried to include the cascade parameter when I set the relationship but it doesn't work. I have found in the internet to use this parameter: single_parent=True, but it also doesn't work.
user1 = User(user_id='user1234',password='alsdjwe1')
user2 = User(user_id='user2345',password='asfr5421')
student1 = Student(user = user1)
professor1 = Professor(user = user2)
db.session.delete(user1)
db.session.delete(user2)
I'd be glad if somebody can help me with this.
Thank you very much,
Thiago.
Use the cascade argument in your relationships.
class Student(db.Model):
__tablename__ = 'student'
user_id = Column(db.String(8), ForeignKey('user.user_id'), primary_key = True)
user = db.relationship('User', cascade='delete')
class Professor(db.Model):
__tablename__ = 'professor'
user_id = Column(db.String(8), ForeignKey('user.user_id'), primary_key = True)
user = db.relationship('User', cascade='delete')
You might want to look into delete-orphan if your use case needs it.

SQLAlchemy create temp clone of table inheritance schema for dataset comparison

I'm creating an application to assist with auditing of user accounts in various directories (AD, OpenLDAP, etc.) used by our business applications. The goal of the audits are to verify account ownership and their respective rights.
With my initial prototype I created an abstract class (BaseUser) defining all the data fields, and 2 child classes inheriting this (User, UserTemp). I used the Versioned extension from examples/versioning in the SQLAlchemy source with the User model. After populating the User and UserTemp tables with data from different days I queried for all adds/changes/deletes with a SELECT/UNION/EXCEPT/INTERSECT between the two. I added/updated/deleted User records based on the query results and a history of all changes were stored to the version table.
I've now created a set of table inheritance models for the actual data. A subset of the models:
DirectoryEntry ---- DirectoryUser ---- LdapUser ---- ActiveDirectoryUser
| |
| |-- OpenLdapUser
|
|
---- DirectoryGroup ---- LdapGroup ---- ActiveDirectoryGroup
|
|-- OpenLdapGroup
Code (I tried to remove anything not needed for an example):
import sqlalchemy as sa
import sqlalchemy_utils as utils
import sqlalchemy.orm as orm
from sqlalchemy.ext.associationproxy import association_proxy
from sqlalchemy.orm.collections import attribute_mapped_collection
from uuid import UUID, uuid4
from datetime import datetime
from sqlalchemy.dialects import postgresql as pgsql
from sqlalchemy.testing.entities import ComparableEntity
from audit_tools.behaviors import Versioned, PolymorphicVerticalProperty, ProxiedDictMixin
from audit_tools.models import Base
class Directory(Base, ComparableEntity, Versioned):
name = sa.Column(sa.String, primary_key=True)
url = sa.Column(utils.URLType)
base_dn = sa.Column(sa.String)
entries = orm.relationship('DirectoryEntry', backref='directory')
_type = sa.Column(sa.String, nullable=False)
__tablename__ = 'directory'
__mapper_args__ = {
'polymorphic_identity': 'directory',
'polymorphic_on': _type,
}
class DirectoryEntry(Base, ComparableEntity, Versioned, ProxiedDictMixin):
guid: UUID = sa.Column(utils.UUIDType, primary_key=True)
_directory: str = sa.Column('directory', sa.String, sa.ForeignKey('directory.name'), nullable=False)
_type: str = sa.Column(sa.String, nullable=False)
entry_class = sa.Column(pgsql.ARRAY(sa.String))
distinguished_name = sa.Column(sa.String)
common_name = sa.Column(sa.String)
properties = orm.relationship('DirectoryEntryProperty', collection_class=attribute_mapped_collection('key'))
_proxied = association_proxy('properties', 'value', creator=lambda key, value: DirectoryEntryProperty(key=key, value=value))
def __init__(self, **kwargs):
for attribute in kwargs:
if hasattr(self, attribute):
setattr(self, attribute, kwargs[attribute])
else:
self[attribute] = kwargs[attribute]
#classmethod
def with_property(cls, key, value):
return cls.properties.any(key=key, value=value)
__tablename__ = 'directory_entry'
__mapper_args__ = {
'polymorphic_on': _type,
}
class DirectoryEntryProperty(Base, PolymorphicVerticalProperty, Versioned, ComparableEntity):
guid = sa.Column(utils.UUIDType, sa.ForeignKey('directory_entry.guid'), primary_key=True)
key = sa.Column(sa.String(64), primary_key=True)
_type = sa.Column(sa.String(16))
int_value = sa.Column(sa.Integer, info={'type': (int, 'integer')})
char_value = sa.Column(sa.String, info={'type': (str, 'string')})
boolean_value = sa.Column(sa.Boolean, info={'type': (bool, 'boolean')})
list_value = sa.Column(pgsql.ARRAY(sa.String), info={'type': (list, 'list')})
datetime_value = sa.Column(sa.DateTime, info={'type': (datetime, 'datetime')})
bytes_value = sa.Column(sa.LargeBinary, info={'type': (bytes, 'bytes')})
__tablename__ = 'directory_entry_property'
class DirectoryGroup(DirectoryEntry):
guid = sa.Column(utils.UUIDType, sa.ForeignKey('directory_entry.guid'), primary_key=True)
type = sa.Column(sa.String)
__tablename__ = 'directory_group'
class DirectoryUser(DirectoryEntry):
guid = sa.Column(utils.UUIDType, sa.ForeignKey('directory_entry.guid'), primary_key=True)
given_name = sa.Column(sa.String)
surname = sa.Column(sa.String)
email = sa.Column(sa.String)
username = sa.Column(sa.String)
employee_id = sa.Column(sa.String)
title = sa.Column(sa.String)
__tablename__ = 'directory_user'
class DirectoryGroupMember(Base, Versioned):
guid = sa.Column(utils.UUIDType, primary_key=True, default=uuid4)
group_guid = sa.Column(utils.UUIDType, sa.ForeignKey('directory_group.guid'))
distinguished_name = sa.Column(sa.String) # OpenLDAP allows orphaned users in groups, can't define foreign key :(
user_guid = sa.Column(utils.UUIDType) # ServiceNow allows duplicate users in groups, in addition to non-existent users :(
__tablename__ = 'directory_group_member'
__table_args__ = (
sa.UniqueConstraint('group_guid', 'distinguished_name'),
)
class LdapEntry(DirectoryEntry):
objectClass = orm.synonym('entry_class')
cn = orm.synonym('common_name')
class LdapGroupMember(DirectoryGroupMember):
pass
class LdapUser(DirectoryUser, LdapEntry):
givenName = orm.synonym('given_name')
sn = orm.synonym('surname')
_groups = orm.relationship("LdapGroupMember", primaryjoin="and_(foreign(LdapGroupMember.distinguished_name)==DirectoryUser.distinguished_name)", backref='member')
class LdapGroup(DirectoryGroup, LdapEntry):
_members = orm.relationship('LdapGroupMember', backref='group')
class ActiveDirectoryEntry(LdapEntry):
displayName = orm.synonym('display_name')
distinguishedName = orm.synonym('distinguished_name')
objectGUID = orm.synonym('guid')
class ActiveDirectoryUser(LdapUser, ActiveDirectoryEntry):
mail = orm.synonym('email')
employeeID = orm.synonym('employee_id')
sAMAccountName = orm.synonym('username')
__mapper_args__ = {
'polymorphic_identity': 'active_directory_user'
}
class ActiveDirectoryGroup(LdapGroup, ActiveDirectoryEntry):
__mapper_args__ = {
'polymorphic_identity': 'active_directory_group'
}
When I put this into production the current state of all the directories will be loaded. To update the database afterwards I'd like to create an empty temporary copy of the tables and populate them with a new directory snapshot. I'd then query for changes between the 2 sets of tables and update the primary tables as I did with the prototype.
Everything is working as expected getting the inital data into the database. I've been spinning my wheels though trying to create temporary mapped classes and tables to hold the directory snapshots for comparison. I'm hoping I can do this dynamically, and not restort to coding duplicate models for everything. I've tried creating a function based on the code in the example versioning extension to do this, but I honestly haven't had the time to dig deep into the bowels of the SQLAlchemy internals to determine where I'm going wrong.
I'm hoping someone might have a pointer or two to get me moving again. Or maybe there's a better way to accomplish this? I've done quite a bit of googling but it's only turned up information related to copying schema and data between two different databases.
Thanks!

Flask Many to Many relationship sqlalchemy.exc.InvalidRequestError: failed to locate a name

When trying to get a many to many relationship working I keep getting the following error:
sqlalchemy.exc.InvalidRequestError: One or more mappers failed to initialize - can't proceed with initialization of other mappers. Triggering mapper: 'Mapper|User|Users'. Original exception was: When initializing mapper Mapper|User|Users, expression 'Device' failed to locate a name ("name 'Device' is not defined"). If this is a class name, consider adding this relationship() to the class after both dependent classes have been defined.
I have looked over all the sqlalchemy documents and reviewed multiple links on many to many but no luck. I am sure its a naming or importing issue, but have not found a solution yet
I removed some of the code that I don't feel is related
Users.py
from random import SystemRandom
from backports.pbkdf2 import pbkdf2_hmac, compare_digest
from flask_login import UserMixin
from sqlalchemy.ext.hybrid import hybrid_property
from sqlalchemy.orm import relationship
from devices.models import Device
user_device = db.Table('UserDevice', db.Model.metadata,
db.Column('userID', db.Integer, db.ForeignKey('Users.userID')),
db.Column('deviceID', db.Integer, db.ForeignKey('Device.deviceID')))
class User(UserMixin, db.Model):
__tablename__ = 'Users'
__table_args__ = {'mysql_engine': 'InnoDB',
'extend_existing': True}
id = db.Column('userID', db.Integer, primary_key=True)
# Relationship to UserDevice association table
user_device = relationship('Device',
secondary=user_device,
backref=db.backref('users', lazy='dynamic'))
Device.py
class Device(db.Model):
__tablename__ = 'Device'
__table_args__ = {'mysql_engine': 'InnoDB',
'extend_existing': True}
id = db.Column('deviceID', db.Integer, primary_key=True)
date_created = db.Column('deviceDateCreated', db.DateTime, default=db.func.current_timestamp())
date_modified = db.Column('deviceDateModified', db.DateTime, default=db.func.current_timestamp(), onupdate=db.func.current_timestamp())
device_created_user = db.Column('deviceCreatedUser', db.String, default='App Server')
device_last_updated_user = db.Column('deviceLastUpdatedUser', db.String, default='App Server', onupdate=current_user)
#Serial Number
serial_number = db.Column('deviceSerialNumber', db.Integer, nullable=False, unique=True)
#Sampling Interval
sampling_interval = db.Column('deviceSamplingInterval', db.Integer, default=60, nullable=False)
# Relationship to Device Status Table
device_status_id = db.Column('deviceStatusID', db.Integer, db.ForeignKey('DeviceStatus.deviceStatusID'))
# New instance instantiation procedure
def __init__(self, serial_number):
self.serial_number = serial_number
self.device_status_id = 1
def __repr__(self):
return '<Device %r>' % self.serial_number
Image of Database Model:
Turns out I didn't provide enough information to solve this problem. The problem turned out to be using the db variable created by calling SQLAlchemy. I created a python file just for the database called database.py. The mistake I made was in User\models.py I called the following import from database import db and in Device\models.py I called from app import db. This caused the db.Model to not function properly and also wouldn't create the user tables when calling create_all(). Hope this helps someone in the future.
Database.py
from flask_influxdb import InfluxDB
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy()
influx_db = InfluxDB()
influx_db_client = None
def init_db():
# import all modules here that might define models so that
# they will be registered properly on the metadata. Otherwise
# you will have to import them first before calling init_db()
from users.models import User, UserStatus, UserDevice
from devices.models import Device, DeviceStatus
db.Model.metadata.drop_all(bind=db.engine)
db.Model.metadata.create_all(bind=db.engine)
Devices\models.py
from app import db
from flask_login import current_user
from sqlalchemy.orm import relationship
import enum
class DeviceStatusType(enum.Enum):
INACTIVE = "Inactive"
ACTIVE = "Active"
# Define a Device model
class Device(db.Model):
__tablename__ = 'Device'
__table_args__ = {'extend_existing': True}
id = db.Column('deviceID', db.Integer, primary_key=True)
date_created = db.Column('deviceDateCreated', db.DateTime, default=db.func.current_timestamp())
date_modified = db.Column('deviceDateModified', db.DateTime, default=db.func.current_timestamp(), onupdate=db.func.current_timestamp())
device_created_user = db.Column('deviceCreatedUser', db.String(128), default='App Server')
device_last_updated_user = db.Column('deviceLastUpdatedUser', db.String(128), default='App Server', onupdate=current_user)
#Serial Number
serial_number = db.Column('deviceSerialNumber', db.Integer, nullable=False, unique=True)
#Sampling Interval
sampling_interval = db.Column('deviceSamplingInterval', db.Integer, default=60, nullable=False)
# Relationship to Device Status Table
device_status_id = db.Column('deviceStatusID', db.Integer, db.ForeignKey('DeviceStatus.deviceStatusID'))
users = relationship("User", secondary="userDevice")
# New instance instantiation procedure
def __init__(self, serial_number):
self.serial_number = serial_number
self.device_status_id = 1
def __repr__(self):
return '<Device %r>' % self.serial_number
users\models.py
from random import SystemRandom
from backports.pbkdf2 import pbkdf2_hmac, compare_digest
from flask_login import UserMixin, current_user
from sqlalchemy.ext.hybrid import hybrid_property
from sqlalchemy.orm import relationship, backref
from devices.models import Device
import enum
# Import the database object (db) from the main application module
# We will define this inside /app/__init__.py in the next sections.
from app import db
class UserStatusType(enum.Enum):
INACTIVE = "Inactive"
ACTIVE = "Active"
# Define a User model
class User(UserMixin, db.Model):
__tablename__ = 'User'
__table_args__ = {'extend_existing': True}
id = db.Column('userID', db.Integer, primary_key=True)
date_created = db.Column('userDateCreated', db.DateTime, default=db.func.current_timestamp())
date_modified = db.Column('userDateModified', db.DateTime, default=db.func.current_timestamp(), onupdate=db.func.current_timestamp())
user_created_user = db.Column('userCreatedUser', db.String(128), default=current_user)
user_last_updated_user = db.Column('userLastUpdatedUser', db.String(128), default=current_user, onupdate=current_user)
# First Name
first_name = db.Column('userFirstName', db.String(128), nullable=False)
# Last Name
last_name = db.Column('userLastName', db.String(128), nullable=False)
# User Name
user_name = db.Column('userUserName', db.String(128), nullable=False, unique=True)
# Email
email = db.Column('userEmailAddress', db.String(128), nullable=False, unique=True)
# Password
_password = db.Column('userPassword', db.LargeBinary(128))
_salt = db.Column('userSalt', db.LargeBinary(128))
# Relationship to User Status table
user_status_id = db.Column('userStatusID', db.Integer, db.ForeignKey('UserStatus.userStatusID'))
# Relationship to UserDevice association table
devices = relationship("Device", secondary="userDevice")
#hybrid_property
def password(self):
return self._password
# In order to ensure that passwords are always stored
# hashed and salted in our database we use a descriptor
# here which will automatically hash our password
# when we provide it (i. e. user.password = "12345")
#password.setter
def password(self, value):
# When a user is first created, give them a salt
if self._salt is None:
self._salt = bytes(SystemRandom().getrandbits(8))
self._password = self._hash_password(value)
def is_valid_password(self, password):
"""Ensure that the provided password is valid.
We are using this instead of a ``sqlalchemy.types.TypeDecorator``
(which would let us write ``User.password == password`` and have the incoming
``password`` be automatically hashed in a SQLAlchemy query)
because ``compare_digest`` properly compares **all***
the characters of the hash even when they do not match in order to
avoid timing oracle side-channel attacks."""
new_hash = self._hash_password(password)
return compare_digest(new_hash, self._password)
def _hash_password(self, password):
pwd = password.encode("utf-8")
salt = bytes(self._salt)
buff = pbkdf2_hmac("sha512", pwd, salt, iterations=100000)
return bytes(buff)
# New instance instantiation procedure
def __init__(self, first_name, last_name, user_name, email, password):
self.first_name = first_name
self.last_name = last_name
self.user_name = user_name
self.email = email
self.password = password
self.user_status_id = 2
def __repr__(self):
return "<User #{:d}>".format(self.id)

Checking db-models on uniqueness. INSERT OR IGNORE by means of Flask-SQLAlchemy

I have a question on how to add new but unique items to the database.
app = Flask(__name__, template_folder='templates')
app.config.from_object('config')
db = SQLAlchemy(app)
Classes look like this many-to-many implementation:
connections = db.Table('connections', db.metadata,
db.Column('book_id', db.Integer, db.ForeignKey('books.id')),
db.Column('author_id', db.Integer, db.ForeignKey('authors.id'))
)
class Author(db.Model):
__tablename__ = 'authors'
__searchable__ = ['a_name']
__table_args__ = {'sqlite_autoincrement': True,}
id = db.Column(db.Integer, primary_key=True)
a_name = db.Column(db.String(80), unique=True)
def __repr__(self):
return unicode(self.a_name)
class Book(db.Model):
__tablename__ = 'books'
__searchable__ = ['b_name']
__table_args__ = {'sqlite_autoincrement': True,}
id = db.Column(db.Integer, primary_key=True)
b_name = db.Column(db.String(80), unique=True)
authors = db.relationship('Author', secondary=lambda: connections,
backref=db.backref('books'))
def __repr__(self):
return unicode(self.b_name)
I want to add only unique db items. If I will write this code:
author = Author(a_name = "Author1")
book = Book(b_name = "Book1")
author.books.append(book)
db.session.add(author)
db.session.add(book)
db.session.commit()
And we already had author with a_name "Author1" in our database there will be the error exeption.
IntegrityError: (IntegrityError) column a_name is not unique u'INSERT INTO authors (a_name) VALUES (?)' (u'\u0410\u0432\u0442\u043e\u04402',)
Do I need to check the uniqueness of this insershions and how? Or there is another best solution?
the ORM doesn't have support for the "MERGE" approach (e.g. "INSERT OR REPLACE" and all that) and there's a lot of complications with those as well. I use the unique object recipe or some variant thereof. When I'm doing a large data merge, I'll frequently load up the entire set of objects to be dealt with for some particular chunk into a dictionary ahead of time, so there's just one SELECT statement to load them, then just check into that dictionary as I proceed.
Actually I found not easy going solution and ugly one :), but it works.
Of course, when I will use a big database I would use this unique object recipe which #zzzeek introduced.
new_author = Author(a_name = request.form['author'])
new_book = Book(b_name = request.form['book'])
author_in_db = Author.query.filter_by(a_name=unicode(new_author)).first()
book_in_db = Book.query.filter_by(b_name=unicode(new_book)).first()
# both author and book are new and unique
if unicode(new_author) != unicode(author_in_db) and \
unicode(new_book) != unicode(book_in_db):
new_author.books.append(new_book)
db.session.add(new_author)
db.session.add(new_book)
db.session.commit()
# just book is not unique
elif unicode(new_author) != unicode(author_in_db):
new_author.books.append(book_in_db)
db.session.add(new_author)
db.session.commit()
# just author is not unique
elif unicode(new_book) != unicode(book_in_db):
author_in_db.books.append(new_book)
db.session.add(new_book)
db.session.commit()
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
name = Column(String, unique=True)
fullname = Column(String)
password = Column(String)
insert_command = User.__table__.insert(
prefixes=['OR IGNORE'],
values=dict(name='MyName', fullname='Slim Shady', password='******')
)
session.execute(insert_command)
session.commit()
Replace works as well
prefixes=['OR REPLACE']

SQLAlchemy: retrieve all episodes from favorite_series of specific user

I have user who can have his favorite series and there are episodes which have series as foreign key and I am trying to retrieve all episodes from favorite series of user.
I am using Flask-SQLAlchemy.
Database:
db = SQLAlchemy(app)
# cross table for user-series
favorite_series = db.Table('favorite_series',
db.Column('user_id', db.Integer, db.ForeignKey('user.id')),
db.Column('series_id', db.Integer, db.ForeignKey('series.id'))
)
# user
class User(db.Model):
__tablename__ = 'user'
id = db.Column(db.Integer, primary_key=True)
favorite_series = db.relationship('Series', secondary=favorite_series,
backref=db.backref('users', lazy='dynamic'))
# series
class Series(db.Model):
__tablename__ = 'series'
id = db.Column(db.Integer, primary_key=True)
# episode
class Episode(db.Model):
__tablename__ = 'episode'
id = db.Column(db.Integer, primary_key=True)
series_id = db.Column(db.Integer, db.ForeignKey('series.id'))
series = db.relationship('Series',
backref=db.backref('episodes', lazy='dynamic'))
Friend helped me with SQL
select user_id,series.name,episode.name from (favorite_series left join series on favorite_series.series_id = series.id) left join episode on episode.series_id = series.id where user_id=1;
Altough, I want it in SQLAlchemy API, but can't manage to get it working.
EDIT:
My final working result:
episodes = Episode.query.filter(Episode.series_id.in_(x.id for x in g.user.favorite_series)).filter(Episode.air_time!=None).order_by(Episode.air_time)
First of all you don't seem to be declaring your table names?
Also, the whole point of bothering with orm is so you never have to write sql queries:
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import orm
import sqlalchemy as db
Base = declarative_base()
favorite_series = db.Table('favorite_series', Base.metadata,
db.Column('user_id', db.Integer, db.ForeignKey('User.id')),
db.Column('series_id', db.Integer, db.ForeignKey('Series.id'))
)
class Episode(Base):
__tablename__ = 'Episode'
id = db.Column(db.Integer, primary_key=True)
season = db.Column(db.Integer)
episode_num = db.Column(db.Integer)
series_id = db.Column(db.Integer, db.ForeignKey('Series.id'))
def __init__(self, season, episode_num, series_id):
self.season = season
self.episode_num = episode_num
self.series_id = series_id
def __repr__(self):
return self.series.title + \
' S' + str(self.season) + \
'E' + str(self.episode_num)
class Series(Base):
__tablename__ = 'Series'
id = db.Column(db.Integer, primary_key=True)
title = db.Column(db.String)
episodes = orm.relationship('Episode', backref='series')
def __init__(self, title):
self.title = title
def __repr__(self):
return self.title
class User(Base):
__tablename__ = 'User'
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String)
favorite_series = orm.relationship('Series',
secondary=favorite_series, backref='users')
def __init__(self, name):
self.name = name
def __repr__(self):
return self.name
Now you can just access the attributes of your objects and let sql alchemy deal with keeping you DB in sync and issuing queries.
engine = db.create_engine('sqlite:///:memory:')
session = orm.sessionmaker(bind=engine)()
Base.metadata.create_all(engine)
lt = User('Ludovic Tiako')
the_wire = Series('The Wire')
friends = Series('Friends')
session.add_all([lt, the_wire, friends])
session.commit() # need to commit here to generate the id fields
tw_s01e01 = Episode(1,1,the_wire.id)
tw_s01e02 = Episode(1,2,the_wire.id)
f_s01e01 = Episode(1,1,friends.id)
f_s01e02 = Episode(1,2,friends.id)
f_s01e03 = Episode(1,3,friends.id)
session.add_all([tw_s01e01, tw_s01e02,
f_s01e01, f_s01e02, f_s01e03])
session.commit()
the_wire.episodes # > [The Wire S1E1, The Wire S1E2]
friends.episodes # > [Friends S1E1, Friends S1E2, Friends S1E3]
Finally, to answer your question:
lt.favorite_series.append(the_wire)
session.commit()
lt.favorite_series # > [The Wire]
[s.episodes for s in lt.favorite_series] # >> [[The Wire S1E1, The Wire S1E2]]
I don't know about Flask, but from the docs of Flask-SQLAlchemy, it seems it uses declarative, so the ORM. And so, you should have a session. I think it is accessible to you from db.session.
Anyway, if those assumptions are true, this is how you should do it:
query = db.session.query(User.id, Series.name, Episode.name).filter((Episode.series_id == Series.id) & \
(User.id == favorite_series.c.user_id) & (Series.id == favorite_series.c.id) & \
(User.id == 1))
results = query.all();
It might not be the exact query you provided, but should do the same.
UPDATE: I just checked Flask-SQLALchemy code on github, it seems that db is an instance of SQLAlchemy, which has a session attribute, created by self.session = self.create_scoped_session(session_options) which returns a session object. So this should work.
Also, not that by doing that, you won't be using their BaseQuery, although I don't know what that would mean...
Check the documentation to know what to do exactly.

Resources