phone-number-migration/main.py

156 lines
4.9 KiB
Python
Raw Permalink Normal View History

2022-06-05 14:18:40 +03:00
import os.path
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
############################################
## Параметры приложения
OLD_NUMBER_PREFIXES = ['+38071', '071']
NEW_NUMBER_PREFIXES = ['+7949']
# Функция для получения нового номера из основы старого
#
# Вернет номер в формате +7 949 123-45-67
def new_phone_number(base: str):
num = '7949' + base
return '+{} {} {}-{}-{}'.format(
num[0],
num[1:4],
num[4:7],
num[7:9],
num[9:11]
)
PHONE_TYPE = 'mobile'
###################################################
# Параметры Google API
CREDENTIALS_FILE = 'credentials.json'
TOKEN_FILE = 'token.json'
REDIRECT_PORT = 42047
SCOPES = ['https://www.googleapis.com/auth/contacts']
########
def google_creds():
2022-06-05 14:18:40 +03:00
creds = None
if os.path.exists(TOKEN_FILE):
creds = Credentials.from_authorized_user_file(TOKEN_FILE, SCOPES)
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
else:
flow = InstalledAppFlow.from_client_secrets_file(
CREDENTIALS_FILE, SCOPES)
2022-06-10 21:45:24 +03:00
creds = flow.run_console()
2022-06-05 14:18:40 +03:00
with open(TOKEN_FILE, 'w') as token:
token.write(creds.to_json())
return creds
def main():
2022-06-05 14:18:40 +03:00
try:
creds = google_creds()
2022-06-05 14:18:40 +03:00
service = build('people', 'v1', credentials=creds)
print('Получаем список контактов...')
results = service.people().connections().list(resourceName='people/me',
personFields='names,phoneNumbers').execute()
connections = results.get('connections', [])
print('Количество контактов: {}'.format(len(connections)))
batch_request = {
"contacts": {},
"updateMask": "phoneNumbers"
}
for person in connections:
phoneNumbers = person.get('phoneNumbers', [])
has_old = False
old_numbers = set()
new_numbers = set()
# Получаем номера нового и старого формата
for phoneNumber in phoneNumbers:
num = phoneNumber.get('canonicalForm')
if not num:
continue
base = get_number_without_prefix(num, OLD_NUMBER_PREFIXES)
if (base):
has_old = True
old_numbers.add(base)
base = get_number_without_prefix(num, NEW_NUMBER_PREFIXES)
if (base):
new_numbers.add(base)
# Если нет старого - пропускаем
if not has_old:
continue
# Получаем старые номера, у которых нету парного нового
numbers_not_updated = list(
old_numbers.difference(new_numbers)
)
# Если нет старых номеров, которые нужно обновить - пропускаем
if len(numbers_not_updated) == 0:
continue
# Получаем новые номера
new_phone_numbers = [
new_phone_number(x) for x in numbers_not_updated
]
# batchUpdateContacts: https://developers.google.com/people/api/rest/v1/people/batchUpdateContacts
# PhoneNumber: https://developers.google.com/people/api/rest/v1/people#phonenumber
batch_request['contacts'][person['resourceName']] = {
"phoneNumbers": [
*person['phoneNumbers'],
[
{
"value": number,
"type": PHONE_TYPE
} for number in new_phone_numbers
]
],
"etag": person['etag']
}
contacts_len = len(batch_request['contacts'])
if (contacts_len > 0):
print('Количество контактов для обновления: {}'.format(contacts_len))
2022-06-05 14:18:40 +03:00
print('Обновляем контакты...')
service.people().batchUpdateContacts(body = batch_request).execute()
print('Обновление завершено!')
else:
print('Нечего обновлять!')
except HttpError as err:
print(err)
def get_number_without_prefix(number: str, prefixes: list):
for prefix in prefixes:
if number.startswith(prefix):
return number[len(prefix):]
return False
if __name__ == '__main__':
main()