phone-number-migration/main.py

151 lines
4.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os.path
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
############################################
## Параметры приложения
OLD_NUMBER_PREFIXES = ['+38071', '071']
NEW_NUMBER_PREFIXES = ['+7949']
# Функция для получения нового номера из основы старого
#
# Вернет номер в формате +7 949 123-45-67
def new_phone_number(base: str):
num = '7949' + base
return '+{} {} {}-{}-{}'.format(
num[0],
num[1:4],
num[4:7],
num[7:9],
num[9:11]
)
PHONE_TYPE = 'mobile'
###################################################
# Параметры Google API
CREDENTIALS_FILE = 'credentials.json'
TOKEN_FILE = 'token.json'
REDIRECT_PORT = 42047
SCOPES = ['https://www.googleapis.com/auth/contacts']
########
def main():
creds = None
if os.path.exists(TOKEN_FILE):
creds = Credentials.from_authorized_user_file(TOKEN_FILE, SCOPES)
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
else:
flow = InstalledAppFlow.from_client_secrets_file(
CREDENTIALS_FILE, SCOPES)
creds = flow.run_console()
with open(TOKEN_FILE, 'w') as token:
token.write(creds.to_json())
try:
service = build('people', 'v1', credentials=creds)
print('Получаем список контактов...')
results = service.people().connections().list(resourceName='people/me',
personFields='names,phoneNumbers').execute()
connections = results.get('connections', [])
print('Количество контактов: {}'.format(len(connections)))
batch_request = {
"contacts": {},
"updateMask": "phoneNumbers"
}
for person in connections:
phoneNumbers = person.get('phoneNumbers', [])
has_old = False
old_numbers = set()
new_numbers = set()
# Получаем номера нового и старого формата
for phoneNumber in phoneNumbers:
num = phoneNumber.get('canonicalForm')
if not num:
continue
base = get_number_without_prefix(num, OLD_NUMBER_PREFIXES)
if (base):
has_old = True
old_numbers.add(base)
base = get_number_without_prefix(num, NEW_NUMBER_PREFIXES)
if (base):
new_numbers.add(base)
# Если нет старого - пропускаем
if not has_old:
continue
# Получаем старые номера, у которых нету парного нового
numbers_not_updated = list(
old_numbers.difference(new_numbers)
)
# Если нет старых номеров, которые нужно обновить - пропускаем
if len(numbers_not_updated) == 0:
continue
# Получаем новые номера
new_phone_numbers = [
new_phone_number(x) for x in numbers_not_updated
]
# batchUpdateContacts: https://developers.google.com/people/api/rest/v1/people/batchUpdateContacts
# PhoneNumber: https://developers.google.com/people/api/rest/v1/people#phonenumber
batch_request['contacts'][person['resourceName']] = {
"phoneNumbers": [
*person['phoneNumbers'],
[
{
"value": number,
"type": PHONE_TYPE
} for number in new_phone_numbers
]
],
"etag": person['etag']
}
contacts_len = len(batch_request['contacts'])
if (contacts_len > 0):
print('Количество контактов для обновления: {}'.format(contacts_len))
print('Обновляем контакты...')
service.people().batchUpdateContacts(body = batch_request).execute()
print('Обновление завершено!')
else:
print('Нечего обновлять!')
except HttpError as err:
print(err)
def get_number_without_prefix(number: str, prefixes: list):
for prefix in prefixes:
if number.startswith(prefix):
return number[len(prefix):]
return False
if __name__ == '__main__':
main()