# -*- coding: utf-8 -*-
import io
import os
import pickle
from copy import deepcopy as copy
from datetime import date, timedelta
from os import path
import maya
from apiclient import http as http_, discovery
from progressbar import ProgressBar, UnknownLength
from apis.google import GoogleAPI
from const import DOWNLOAD_DIRECTORY, MIME, PAGE_SIZE
from util import print_json, convert_mime_type_and_extension, CalDict, DateRange
#: Location of pickled data when cached.
DRIVE_BACKUP_FILE = path.join(path.abspath(path.dirname(__file__)), '..', 'drive_data_backup.pkl')
#: Number of hours in a segment. Must be equally divisible by 24 to avoid issues.
SEGMENT_SIZE = 4
[docs]class DriveAPI(GoogleAPI):
"""Class to interact with Google Drive APIs.
Documentation for the Python API:
- https://developers.google.com/resources/api-libraries/documentation/drive/v3/python/latest/index.html
Quick start guide:
- https://developers.google.com/drive/v3/web/quickstart/python
"""
_service_name = 'drive'
_version = 'v3'
[docs] def activity(self, level, what=('files', 'revisions'), use_cached=False, **kwargs):
"""Compile the user's activity.
Note about revision history: One of the metadata fields for file
revisions is called "keepForever". This indicates whether to keep the
revision forever, even if it is no longer the head revision. If not
set, the revision will be automatically purged 30 days after newer
content is uploaded. This can be set on a maximum of 200 revisions for
a file.
:param str level: Level of detail on the activity. Accepted values:
- ``'dy'``: Activity is summarized by day
- ``'hr'``: Activity is summarized by hour, X:00:00 to X:59:59
- ``'sg'``: Activity throughout the day is divided into a number
of segments (defined to be :data:`SEGMENT_SIZE` divided by
``24``).
:param what: Indicates what kind of content to scan for activity.
Accepted values:
- ``'created'``
- ``'revisions'``
- ``'comments'``
:type what: tuple or list
:param bool use_cached: Whether or not to use cached data. When set,
this avoids downloading all the file metadata from Google if a
cached version of the data is available on disk.
:return: A dictionary containing three keys: ``x``, ``y``, and ``z``.
Each of these stores a list suitable for passing as the data set
for a plot.
:rtype: dict(str, list)
:raises ValueError: When the ``level`` or ``what`` parameters have an
unsupported format or value.
"""
cr, rev, com = self.activity_data(level, what, use_cached)
return self.activity_plot(created_data=cr, revision_data=rev, comment_data=com, level=level, what=what)
def activity_data(self, level, what=('files', 'revisions'), use_cached=False):
# Validate parameter values
if level not in ('dy', 'sg', 'hr'):
raise ValueError('Unsupported activity level: {}'.format(level))
if not isinstance(what, (tuple, list)):
raise ValueError('Unsupported format of activity content type.')
for w in what:
if w not in ('created', 'revisions', 'comments'):
raise ValueError('Unsupported activity content type: {}'.format(w))
cache_ok = True
if use_cached:
# Unpickle the cached data
try:
with open(DRIVE_BACKUP_FILE, 'rb') as f:
created_data, modified_data, revision_data, comment_data = pickle.load(f)
except (pickle.UnpicklingError, FileNotFoundError, EOFError):
use_cached = False
cache_ok = False
print('No valid cache found. Downloading fresh data.')
else:
print('Successfully loaded cached data.')
if not use_cached: # Don't use elif so we can change the value of use_cached if the cache is bad
# Prompt before overwriting cache file (unless we already tried using it)
if cache_ok and path.exists(DRIVE_BACKUP_FILE):
res = input('Cache file exists. Okay to overwrite? [y/N] ')
if not len(res) or res[0].lower() != 'y':
print('Exiting')
return
# One or more of the following will be crunched, stored to data, and become the z-axis in the figure
created_data = CalDict()
modified_data = CalDict() # TODO: Probably will remove this
revision_data = CalDict()
comment_data = CalDict()
# bar = Counter(format='Downloaded metadata for %(value)d files')
bar = ProgressBar(max_value=UnknownLength)
fields = 'id, ownedByMe, createdTime, modifiedByMe, modifiedByMeTime, ' \
'viewedByMe, viewedByMeTime, trashed, trashedTime'
cnt = 0
for f in self.gen_file_data(fields):
# Putting the progress bar here shows it to the user much sooner, indicates the program isn't hanging
if not cnt % 10:
bar.update(cnt)
# If the user created the file (i.e. is the owner), get the creation time
if f.get('ownedByMe', False):
t = maya.parse(f['createdTime']).datetime(to_timezone=self.tz)
# import pdb; pdb.set_trace()
created_data[t.year][t.month][t.day][t.hour] += 1
# If the user has modified the file, also get the modification time
if f.get('modifiedByMe', False):
t = maya.parse(f['modifiedByMeTime']).datetime(to_timezone=self.tz)
modified_data[t.year][t.month][t.day][t.hour] += 1
# Get file revisions, if requested
if 'revisions' in what:
self._file_revisions(f['id'], revision_data)
# Get file comments, if requested
if 'comments' in what:
self._file_comments(f['id'], comment_data)
cnt += 1
bar.finish()
print('Done downloading file metadata')
# Cache the downloaded data
with open(DRIVE_BACKUP_FILE, 'wb') as f:
pickle.dump((created_data, modified_data, revision_data, comment_data), f, protocol=-1)
# END OF FRESH DOWNLOAD CODE
return created_data, revision_data, comment_data
def activity_plot(self, created_data, revision_data, comment_data, level, what):
data = [] # Will become the z-axis in the figure. list(list(int))
data_labels = [] # Will become the labels of the y-axis in the figure
date_range = DateRange(None, None) # Will become the labels of the x-axis in the figure
# Prep the labels
segment_labels = ['{:02}00 to {:02}59'.format(x, x+SEGMENT_SIZE-1) for x in range(24) if not x % SEGMENT_SIZE]
hour_labels = ['{0:02}00 to {0:02}59'.format(x) for x in range(24)]
# Reverse the label times
segment_labels.reverse()
hour_labels.reverse()
for method, lvl, data_set, label in ( # These are in the reverse order they'll appear on the y-axis
('comments', 'hr', comment_data, 'Drive Files - Comments '),
('comments', 'sg', comment_data, 'Drive Files - Comments from '),
('comments', 'dy', comment_data, 'Drive Files - Comments Daily'),
('revisions', 'hr', revision_data, 'Drive Files - Revisions '),
('revisions', 'sg', revision_data, 'Drive Files - Revisions from '),
('revisions', 'dy', revision_data, 'Drive Files - Revisions Daily'),
('created', 'hr', created_data, 'Drive Files - Created '),
('created', 'sg', created_data, 'Drive Files - Created from '),
('created', 'dy', created_data, 'Drive Files - Created Daily'),
):
# Only use the specified types of data
if method not in what:
continue
# Only use those methods for the specified level
if lvl != level:
continue
# Add the labels to the label set according to what level we're using
if lvl == 'dy':
data_labels.append(label)
elif lvl == 'sg':
for l in segment_labels:
data_labels.append(label + l)
elif lvl == 'hr':
for l in hour_labels:
data_labels.append(label + l)
# Crunch the data
dates, data_set = crunch(data=data_set, level=level, start=self.start, end=self.end)
# Align the date ranges
if None in date_range:
# No date range has been recorded yet. There are two possible reasons for this. First, no data sets
# have been collected, so this will be the first possible date range. Second, other collected data sets
# didn't have any data, in which case, if the data set we're currently processing *does* have data, we
# need to make two adjustments. First, copy the dates from the current data set (we can actually do
# this even if the dates are None without it having an effect). Second, we need to add zeros to the
# data set now that we now how many of them we should add.
date_range = copy(dates)
if None in dates:
data.append([])
else:
n = (dates.end - dates.start).days + 1
for i, d in enumerate(data):
if len(d):
raise RuntimeError('Non-empty data set with no date range detected.')
data[i] = [0] * n
elif None in dates:
if len(data_set[0]):
raise RuntimeError('Non-empty data set returned with no date range.')
n = (date_range.end - date_range.start).days + 1
data_set = [[0] * n]
dates = DateRange(date_range.start, date_range.start)
# If the date range is still None, that means there was no data
if None in date_range:
# TODO: Verify this is the correct thing to do here
continue
# We want to do the following two checks independently (i.e. without using elif statements) because it
# may be that one range is not a subset of the other. In other words, range A may start before range B
# and at the same time range B may end after range A. Keeping the condition checks separate handles both
# adjustments. Also, this isn't a problem because of how Python handles multiplying lists by a negative
# value, as seen in the following example:
#
# >>> [0] * -3
# []
renew_range = False
if dates.start < date_range.start or date_range.end < dates.end:
# Append/prepend values to the *other* lists of data
renew_range = True
pre = (date_range.start - dates.start).days
post = (dates.end - date_range.end).days
for i, d in enumerate(data):
data[i] = [0] * pre + data[i] + [0] * post
if date_range.start < dates.start or dates.end < date_range.end:
# Append/prepend values to the *current* list(s) of data
renew_range = True
pre = (dates.start - date_range.start).days
post = (date_range.end - dates.end).days
for i, d in enumerate(data_set):
data_set[i] = [0] * pre + data_set[i] + [0] * post
if renew_range:
date_range.start = date_range.start if date_range.start < dates.start else dates.start
date_range.end = date_range.end if date_range.end > dates.end else dates.end
# Add the new data to the data set
data += data_set
return {'x': [date_range.end - timedelta(days=x)
for x in range((date_range.end - date_range.start).days, -1, -1)], # Reversed so dates ascend
'y': data_labels,
'z': data}
def _file_revisions(self, file_id, data):
"""Retrieve revisions of a Google Document.
This includes Google Docs, Google Sheets, etc.
https://developers.google.com/resources/api-libraries/documentation/drive/v3/python/latest/drive_v3.revisions.html
:param str file_id: The ID of the file.
:param CalDict data: The data object to which the revision history will
be added. This is modified directly, making it unnecessary to
return the data.
:rtype: None
"""
args = {'fileId': file_id,
'pageSize': PAGE_SIZE,
'fields': 'nextPageToken, revisions(modifiedTime, lastModifyingUser)'}
page_token = None
while True:
try:
rev_set = self.service.revisions().list(pageToken=page_token, **args).execute()
except discovery.HttpError:
# The file does not support revisions.
return
page_token = rev_set.get('nextPageToken')
for r in rev_set['revisions']:
# TODO: Filter revisions that don't correspond to the target user
# if r['lastModifyingUser']['emailAddress'] != self.target_email:
# continue
# For now, we'll just filter by if the user is "me"
try:
if not r['lastModifyingUser']['me']:
continue
except KeyError:
# Sometimes there's a revision entry that doesn't include the last modifying user for whatever
# reason. Just skip that revision and get the rest.
continue
t = maya.parse(r['modifiedTime']).datetime(to_timezone=self.tz)
data[t.year][t.month][t.day][t.hour] += 1
# page_token will be None when there are no more pages of results
if page_token is None:
break
def _file_comments(self, file_id, data):
"""Retrieve comments from a Google Document.
This includes Google Docs, Google Sheets, etc.
API:
https://developers.google.com/resources/api-libraries/documentation/drive/v3/python/latest/drive_v3.comments.html
Reference:
https://developers.google.com/drive/v3/reference/comments
:param str file_id: The ID of the file.
:param CalDict data: The data object to which the comment history will
be added. This is modified directly, making it unnecessary to
return the data.
:rtype: None
"""
args = {
'fileId': file_id,
'includeDeleted': True,
'pageSize': PAGE_SIZE,
'fields': 'nextPageToken, comments(createdTime, author, replies)',
}
page_token = None
while True:
comment_set = self.service.comments().list(pageToken=page_token, **args).execute()
page_token = comment_set.get('nextPageToken')
for c in comment_set['comments']:
# TODO: Filter comments that don't correspond to the target user
# if c['author']['emailAddress'] != self.target_email:
# continue
# For now, we'll just filter by if the user is "me"
if not c['author']['me']:
continue
# Unlike revision history, the modified time of comments is the last time the comment or any of its
# replies was modified. Since this is too broad, we just look at the time the comment was created.
t = maya.parse(c['createdTime']).datetime(to_timezone=self.tz)
data[t.year][t.month][t.day][t.hour] += 1
# Log replies as well
for repl in c['replies']:
# TODO: Filter replies that don't correspond to the target user
# if repl['author']['emailAddress'] != self.target_email:
# continue
# For now, we'll just filter by if the user is "me"
if not repl['author']['me']:
continue
# Log the creation time
t = maya.parse(repl['createdTime']).datetime(to_timezone=self.tz)
data[t.year][t.month][t.day][t.hour] += 1
# If the modification time is different from the creation time, log it too
if repl['createdTime'] != repl['modifiedTime']:
t = maya.parse(repl['modifiedTime']).datetime(to_timezone=self.tz)
data[t.year][t.month][t.day][t.hour] += 1
# page_token will be None when there are no more pages of results
if page_token is None:
break
[docs] def get_about(self, fields='*'):
"""
Retrieves information about the user's Drive. and system capabilities.
https://developers.google.com/drive/v3/reference/about
:param fields: fields to be returned
:type fields: string
:return: JSON
"""
# So I can do this.. perfect
about = self.service.about()
results = about.get(fields=fields).execute()
# why do I have this?
if not results:
return None
else:
return results
# Needs to loop over all changes using pageToken from start_token and nextPageToken token
@property
def team_drives(self):
"""A list of team drives associated with the user.
:rtype: list(str)
"""
if isinstance(self._team_drives, list):
return self._team_drives
# Populate list of team drives
self._team_drives = []
page_token = None
team_page_size = 100 # Range must be [1, 100]
while True:
t = self.service.teamdrives().list(pageToken=page_token, pageSize=team_page_size).execute()
page_token = t.get('nextPageToken')
self._team_drives += [x['id'] for x in t['teamDrives']]
# page_token will be None when there are no more pages of results
if page_token is None:
break
return self._team_drives
[docs] def get_changes(self, spaces='drive', include_team_drives=True, restrict_to_my_drive=False,
include_corpus_removals=None, include_removed=None):
"""Return the changes for a Google Drive account.
The set of changes as returned by this method are more suited for a
file syncing application.
In the returned :class:`dict`, the key for changes in the user's
regular Drive is an empty string (``''``). The data for each Team Drive
(assuming ``include_team_drives`` is `True`) is stored using a key in
the format ``'team_drive_X'``, where ``X`` is the ID of the Team Drive.
For the form of the JSON data, go to
https://developers.google.com/resources/api-libraries/documentation/drive/v3/python/latest/drive_v3.teamdrives.html#list
https://developers.google.com/drive/v3/reference/changes
:param str spaces: A comma-separated list of spaces to query within the
user corpus. Supported values are 'drive', 'appDataFolder' and
'photos'.
:param bool include_team_drives: Whether or not to include data from
Team Drives as well as the user's Drive.
:param bool restrict_to_my_drive: Whether to restrict the results to
changes inside the My Drive hierarchy. This omits changes to files
such as those in the Application Data folder or shared files which
have not been added to My Drive.
:param bool include_corpus_removals: Whether changes should include the
file resource if the file is still accessible by the user at the
time of the request, even when a file was removed from the list of
changes and there will be no further change entries for this file.
:param bool include_removed: Whether to include changes indicating that
items have been removed from the list of changes, for example by
deletion or loss of access.
:return: All data on changes by the user in JSON format and stored in
a :class:`dict`.
:rtype: dict(str, dict)
"""
args = {
'spaces': spaces,
'restrict_to_my_drive': restrict_to_my_drive,
'include_corpus_removals': include_corpus_removals,
'include_removed': include_removed,
}
# Get changes for regular Drive stuff
changes = {'': self._get_changes(**args)}
# Cycle through the Team Drives and get those too
if include_team_drives:
for t in self.team_drives():
changes['team_drive_{}'.format(t)] = self._get_changes(team_drive_id=t, **args)
return changes
def _get_changes(self, spaces, team_drive_id=None, restrict_to_my_drive=False, include_corpus_removals=None,
include_removed=None):
"""
:param str spaces: A comma-separated list of spaces to query within the
user corpus. Supported values are 'drive', 'appDataFolder' and
'photos'.
:param str team_drive_id:
:param bool restrict_to_my_drive: Whether to restrict the results to
changes inside the My Drive hierarchy. This omits changes to files
such as those in the Application Data folder or shared files which
have not been added to My Drive.
:param bool include_corpus_removals: Whether changes should include the
file resource if the file is still accessible by the user at the
time of the request, even when a file was removed from the list of
changes and there will be no further change entries for this file.
:param bool include_removed: Whether to include changes indicating that
items have been removed from the list of changes, for example by
deletion or loss of access.
:return: The list of changes combined from all pages.
:rtype: dict
"""
chg = self.service.changes()
args = {'supportsTeamDrives': True, # "Whether the requesting application supports Team Drives."
'teamDriveId': team_drive_id}
# Get the first page token
start = chg.getStartPageToken(**args).execute()['startPageToken']
args.update({
'pageToken': start,
'pageSize': PAGE_SIZE,
'includeTeamDriveItems': True if team_drive_id is not None else False,
# supportsTeamDrives already defined above
'restrictToMyDrive': restrict_to_my_drive,
'spaces': spaces,
# teamDriveId already defined above
'includeCorpusRemovals': include_corpus_removals,
'includeRemoved': include_removed,
})
# Send the first request (first page)
req = chg.list(**args)
resp = req.execute()
# Process the response
if True:
raise NotImplementedError
while True:
req = chg.list_next(previous_request=req, previous_response=resp)
resp = req.execute() # Returns None when there are no more items in the collection
if resp is None:
break
# Process the response
pass
# Return all the responses
pass
[docs] def gen_file_data(self, fields='*', spaces='drive', include_team_drives=True, corpora=None):
"""Generate the metadata for the user's Drive files.
This function is a generator, so it yields the metadata for one file at
a time. For the format of the :class:`dict` generated, see
https://developers.google.com/resources/api-libraries/documentation/drive/v3/python/latest/drive_v3.files.html#list
:param str fields: The metadata fields to retrieve.
:param str spaces: A comma-separated list of spaces to query within the
user corpus. Supported values are 'drive', 'appDataFolder' and
'photos'.
:param bool include_team_drives: Whether or not to include data from
Team Drives as well as the user's Drive.
:param str corpora: Comma-separated list of bodies of items
(files/documents) to which the query applies. Supported bodies are
'user', 'domain', 'teamDrive' and 'allTeamDrives'. 'allTeamDrives'
must be combined with 'user'; all other values must be used in
isolation. Prefer 'user' or 'teamDrive' to 'allTeamDrives' for
efficiency.
:return: The file metadata.
:rtype: dict
"""
args = {'spaces': spaces,
'corpora': corpora,
'fields': fields}
# Get files from regular Drive
yield from self._gen_file_data(**args)
# Cycle through the Team Drives and get those too
if include_team_drives:
for t in self.team_drives:
yield from self._gen_file_data(team_drive_id=t, **args)
def _gen_file_data(self, fields, spaces, team_drive_id=None, corpora=None):
"""Helper method for :meth:`gen_file_data`.
For descriptions of the parameters, see the signature for
:meth:`gen_file_data`.
"""
args = {
'includeTeamDriveItems': True if team_drive_id is not None else False,
'pageSize': PAGE_SIZE,
'corpora': corpora, # Not sure how this affects the results...
'supportsTeamDrives': True,
'spaces': spaces,
'teamDriveId': team_drive_id,
'fields': 'nextPageToken, files({})'.format(fields),
}
page_token = None
while True:
file_set = self.service.files().list(pageToken=page_token, **args).execute()
page_token = file_set.get('nextPageToken')
for f in file_set['files']:
yield f
# page_token will be None when there are no more pages of results
if page_token is None:
break
[docs] def export_drive_file(self, file_data, download_path):
"""
Exports and converts .g* files to real files and then downloads them
https://developers.google.com/drive/v3/reference/files/export
:param file_data: List of file(s) to be downloaded
:type file_data: JSON
:param download_path: Path where the file will be downloaded
:return: boolean True if downloads succeeded, False if Downloads failed.
"""
mime_type, extension = convert_mime_type_and_extension(file_data['mimeType'])
if not mime_type or not extension:
print('mime type is not found, dumping file\'s information')
print_json(file_data)
return False
os.chdir(download_path)
request = self.service.files().export(fileId=file_data['id'], mimeType=mime_type)
fh = io.FileIO(file_data['name'] + extension, 'wb')
downloader = http_.MediaIoBaseDownload(fh, request)
done = False
while done is False:
status, done = downloader.next_chunk()
print(file_data['name'])
print('Download %d%%.' % int(status.progress() * 100))
return True
[docs] def export_real_file(self, file_data, download_path):
"""
Downloads real files. AKA not .g*
https://developers.google.com/drive/v3/reference/files/export
:param file_data: List of file(s) to be downloaded
:type file_data: JSON
:param download_path: Path where the file will be downloaded
:return: Nothing
"""
os.chdir(download_path)
request = self.service.files().get_media(fileId=file_data['id'])
fh = io.FileIO(file_data['name'], 'wb')
downloader = http_.MediaIoBaseDownload(fh, request)
done = False
while done is False:
status, done = downloader.next_chunk()
print(file_data['name'])
print('Download %d%%.' % int(status.progress() * 100))
# I am sorry for the recursion Mike...
# TODO Sort the file list so that there is no need to check from the beginning every time
# For some reason the orderby in the get_file_data() call works and puts folders first but
# at some point it can put a file before the folders in the list which required this block of
# code to be written horribly.
# TODO fix this bug
def handle_folder_helper(self, file_data_list, download_path, curr_folder_id):
# Loop over all file data
for file_data in file_data_list:
# If file belong in current folder
if str(file_data['parents'][0]) == str(curr_folder_id):
# make new folder if it is a folder
if str(file_data['mimeType']) == str(MIME['g_folder']):
os.mkdir(download_path + '/' + file_data['name'])
# every time we make a new folder we recursively call
self.handle_folder_helper(file_data_list, download_path + '/' + file_data['name'], file_data['id'])
# it must be a file so download
else:
if str(file_data['parents'][0]) == curr_folder_id:
if 'google-apps' not in str(file_data['mimeType']):
self.export_real_file(file_data, download_path)
elif 'folder' not in str(file_data['mimeType']):
download_succeeded = self.export_drive_file(file_data, download_path)
# In the event of a Mime Type conversion error the download process will stop
if download_succeeded is False:
print('Error has occurred, process was aborted.')
# Always make sure you orderBy folder
def handle_folders(self, file_list_array, download_path):
# Get root id
root_id = self.get_root_file_id()
# recursively download everything in the drive
self.handle_folder_helper(file_list_array, download_path, root_id)
def get_root_file_id(self):
root_id = self.service.files().get(fileId='root').execute()
print_json(root_id)
return root_id['id']
# TODO HANDLE TRASH....
[docs] def download_files(self, file_list_array=False):
"""
Downloads files from the user's drive
https://developers.google.com/drive/v3/web/manage-downloads
:param file_list_array: list of file(s) to be downloaded
:type file_list_array: array
:return: Nothing
"""
if not file_list_array:
file_list_array = self.list_file_data()
# If download directory is set us it for the download folder.
# Otherwise use the directory of this project
if DOWNLOAD_DIRECTORY is None:
download_path = os.getcwd()
else:
download_path = path.expanduser(DOWNLOAD_DIRECTORY)
if not path.exists(download_path + '/trash'):
os.mkdir(download_path + '/trash')
# Download trashed files first
for file_data in file_list_array:
if file_data['trashed']:
if 'google-apps' not in str(file_data['mimeType']):
self.export_real_file(file_data, download_path + '/trash')
elif 'folder' not in str(file_data['mimeType']):
download_succeeded = self.export_drive_file(file_data, download_path + '/trash')
# In the event of a Mime Type conversion error the download process will stop
if download_succeeded is False:
print('Error has occurred, process was aborted.')
# Now download the rest of them
self.handle_folders(file_list_array, download_path)
[docs] def get_app_folder(self, fields='nextPageToken, files(id, name)'):
"""
Returns the data in the users app data folder
https://developers.google.com/drive/v3/reference/files/list
:param fields: fields to be returned
:type fields: string
:return: JSON
"""
response = self.service.files().list(spaces='appDataFolder', fields=fields, pageSize=10).execute()
items = response.get('files', [])
while 'nextPageToken' in response:
response = self.service.files().list(spaces='appDataFolder', pageSize=1000,
pageToken=str(response['nextPageToken'])).execute()
items.append(response.get('files', []))
if not items:
return None
else:
return items
[docs] def get_photo_data(self, fields='nextPageToken, files(id,name)'):
"""
Returns the data about the user's photos
https://developers.google.com/drive/v3/reference/files/list
:param fields: fields to be returned
:type fields: string
:return: JSON
"""
response = self.service.files().list(spaces='photos', fields=fields, pageSize=10).execute()
items = response.get('files', [])
while 'nextPageToken' in response:
response = self.service.files().list(spaces='photos', pageSize=1000,
pageToken=str(response['nextPageToken'])).execute()
items.append(response.get('files', []))
if not items:
return None
else:
return items
[docs]def crunch(level, **kwargs):
"""Consolidate the data to the specified level.
:param CalDict data: The data from parsing the Drive metadata.
:param str level: Must be one of ``dy``, ``sg``, or ``hr``. For an
explanation of these options, see the docstring for
:meth:`DriveAPI.activity`.
:param datetime.date start: The earliest data to collect.
:param datetime.date end: The latest data to collect.
:return: Tuple with two elements. The first is a :class:`DateRange` object
which stores the first and last days with activity (the range of dates
that the data corresponds to) in its :attr:`~DateRange.start` and
:attr:`~DateRange.end` attributes, respectively. Both of these
attributes are :class:`~datetime.date` objects.
The second element in the returned tuple is a :class:`list` containing
the data for each day. The contents of this list vary based on the
value of ``level``:
- ``dy``: A single :class:`list` of :class:`int` s, one for each day.
- ``sg``: :class:`list` s of :class:`int` s. Each `list` corresponds to
a segment, each `int` corresponds to a day. These lists are in
reverse order, meaning the first `list` represents the last segment
of a day.
- ``hr``: :class:`list` s of :class:`int` s. Each `list` corresponds to
an hour, each `int` corresponds to a day. These lists are in reverse
order, meaning the first `list` represents the last hour of a day.
:rtype: tuple(DateRange, list(list(int)))
"""
if level == 'dy':
return _do_crunch(num_hours=24, **kwargs)
elif level == 'sg':
return _do_crunch(num_hours=SEGMENT_SIZE, **kwargs)
elif level == 'hr':
return _do_crunch(num_hours=1, **kwargs)
raise ValueError('Unsupported data crunching level: {}'.format(level))
def _do_crunch(data, num_hours, start=None, end=None):
"""Do the actual data crunching as requested of :func:`crunch`.
:param CalDict data: The data to crunch.
:param int num_hours: The number of hours that should be crunched together.
For example, to consolidate the activity for an entire day,
``num_hours`` should be 24. To show each hour's activity separately,
``num_hours`` should be 1.
:param datetime.date start: The earliest data to collect.
:param datetime.date end: The latest data to collect.
:return: See the docs for :func:`crunch`.
"""
dates = DateRange(None, None)
years = list(data.keys())
years.sort()
try:
years = list(range(years[0], years[-1] + 1)) # Ensures we don't skip any years
except IndexError:
# Means there are no years
pass
months = [x for x in range(1, 13)]
days = [x for x in range(1, 32)]
new_data = []
for x in range(24 // num_hours):
new_data.append([])
zeros = 0
for y in years:
if (start is not None and y < start.year) or \
(end is not None and y > end.year):
continue
for m in months:
for d in days:
try:
day = date(y, m, d)
except ValueError:
# Means we tried to make an invalid date, like February 30th. Just go to the next day.
continue
# Stay within the specified range of dates
if (start is not None and day < start) or \
(end is not None and day > end):
continue
# Add up the hourly activity for the day
i = sum(data[y][m][d])
# Note: It's possible that by accessing the data
if not dates.start: # Have we stored the first day with data yet?
if not i: # If this day doesn't have any data, go to the next day
continue
dates.start = day
if i:
for s in range(len(new_data)):
new_data[-1 - s] += [None] * zeros
val = sum(data[y][m][d][s:s+num_hours])
new_data[-1 - s].append(val if val > 0 else None)
dates.end = day
zeros = 0
else:
# Count zeros separately from non-zeros. Allows the end date to be the last day with data.
zeros += 1
return dates, new_data