Source code for imessagedb.chats
from imessagedb.chat import Chat
[docs]class Chats:
""" All Chats in the database """
def __init__(self, database) -> None:
"""
Parameters
----------
database : imessagedb.DB
An instance of a connected database"""
self._database = database
self._chat_list = {} # Chat list by rowid
self._chat_identifiers = {} # Chat list by identifier
self._chat_names = {} # Chat list by name
self._get_chats_from_database()
return
[docs] def __repr__(self) -> str:
string_array = []
for i in sorted(self.chat_list):
string_array.append(str(self.chat_list[i]))
return '\n'.join(string_array)
[docs] def __len__(self) -> int:
return len(self._chat_list)
[docs] def _get_chats_from_database(self) -> None:
self._database.connection.execute('select rowid, chat_identifier, display_name from chat')
rows = self._database.connection.fetchall()
for row in rows:
rowid = row[0]
chat_identifier = row[1]
display_name = row[2]
new_chat = Chat(self._database, rowid, chat_identifier, display_name)
self.chat_list[new_chat.rowid] = new_chat
# Add the chat to the chat_identifiers
if new_chat.chat_identifier in self.chat_identifiers:
self._chat_identifiers[new_chat.chat_identifier].append(new_chat)
else:
self._chat_identifiers[new_chat.chat_identifier] = [new_chat]
# Add the chat to the chat_names
if new_chat.chat_name != "":
if new_chat.chat_name in self._chat_names:
self._chat_names[new_chat.chat_name].append(new_chat)
else:
self._chat_names[new_chat.chat_name] = [new_chat]
# Add the last chat date to all the chats
for i in self.chat_list.values():
select_string = "select " \
"datetime(max(message_date)/1000000000 + strftime('%s', '2001-01-01'),'unixepoch','localtime') " \
f"from chat_message_join cmj where chat_id = {i.rowid}"
self._database.connection.execute(select_string)
rows = self._database.connection.fetchall()
i.last_message_date = rows[0][0]
# Add the participants for all the chats
self._database.connection.execute('select chat_id, handle_id from chat_handle_join')
rows = self._database.connection.fetchall()
for row in rows:
chat_id = row[0]
handle_id = row[1]
self.chat_list[row[0]].add_participant(row[1])
return
[docs] def get_chats(self) -> str:
""" Return a string with the list of chats in the database"""
return_array = []
for chat_id in sorted(self._chat_list):
chat = self._chat_list[chat_id]
chat_name = ""
if chat.chat_name and chat.chat_name != '':
chat_name = f"{chat.rowid} ({chat.chat_name}):"
else:
chat_name = f"{chat.rowid}:"
chat_string = f"{chat_name} Participants: {chat.participants}, Last Message Sent: {chat.last_message_date}"
return_array.append(chat_string)
return '\n'.join(return_array)
@property
def chat_list(self) -> dict:
""" Return the list of chats by rowid in a dict"""
return self._chat_list
@property
def chat_names(self) -> dict:
""" Return the list of chats by rowid in a dict"""
return self._chat_names
@property
def chat_identifiers(self) -> dict:
""" Return the list of chats by chat identifier in a dict"""
return self._chat_identifiers
[docs] def __getitem__(self, item) -> Chat:
if item in self._chat_names:
return self._chat_names[item]
if item in self._chat_list:
return self._chat_list[item]
if item in self._chat_identifiers:
return self._chat_identifiers[item]
raise KeyError