import os.path from google.auth.transport.requests import Request from google.oauth2.credentials import Credentials from google_auth_oauthlib.flow import InstalledAppFlow from googleapiclient.discovery import build from googleapiclient.errors import HttpError ############################################ ## Параметры приложения OLD_NUMBER_PREFIXES = ['+38071', '071'] NEW_NUMBER_PREFIXES = ['+7949'] # Функция для получения нового номера из основы старого # # Вернет номер в формате +7 949 123-45-67 def new_phone_number(base: str): num = '7949' + base return '+{} {} {}-{}-{}'.format( num[0], num[1:4], num[4:7], num[7:9], num[9:11] ) PHONE_TYPE = 'mobile' ################################################### # Параметры Google API CREDENTIALS_FILE = 'credentials.json' TOKEN_FILE = 'token.json' REDIRECT_PORT = 42047 SCOPES = ['https://www.googleapis.com/auth/contacts'] ######## def google_creds(): creds = None if os.path.exists(TOKEN_FILE): creds = Credentials.from_authorized_user_file(TOKEN_FILE, SCOPES) if not creds or not creds.valid: if creds and creds.expired and creds.refresh_token: creds.refresh(Request()) else: flow = InstalledAppFlow.from_client_secrets_file( CREDENTIALS_FILE, SCOPES) creds = flow.run_console() with open(TOKEN_FILE, 'w') as token: token.write(creds.to_json()) return creds def main(): try: creds = google_creds() service = build('people', 'v1', credentials=creds) print('Получаем список контактов...') results = service.people().connections().list(resourceName='people/me', personFields='names,phoneNumbers').execute() connections = results.get('connections', []) print('Количество контактов: {}'.format(len(connections))) batch_request = { "contacts": {}, "updateMask": "phoneNumbers" } for person in connections: phoneNumbers = person.get('phoneNumbers', []) has_old = False old_numbers = set() new_numbers = set() # Получаем номера нового и старого формата for phoneNumber in phoneNumbers: num = phoneNumber.get('canonicalForm') if not num: continue base = get_number_without_prefix(num, OLD_NUMBER_PREFIXES) if (base): has_old = True old_numbers.add(base) base = get_number_without_prefix(num, NEW_NUMBER_PREFIXES) if (base): new_numbers.add(base) # Если нет старого - пропускаем if not has_old: continue # Получаем старые номера, у которых нету парного нового numbers_not_updated = list( old_numbers.difference(new_numbers) ) # Если нет старых номеров, которые нужно обновить - пропускаем if len(numbers_not_updated) == 0: continue # Получаем новые номера new_phone_numbers = [ new_phone_number(x) for x in numbers_not_updated ] # batchUpdateContacts: https://developers.google.com/people/api/rest/v1/people/batchUpdateContacts # PhoneNumber: https://developers.google.com/people/api/rest/v1/people#phonenumber batch_request['contacts'][person['resourceName']] = { "phoneNumbers": [ *person['phoneNumbers'], [ { "value": number, "type": PHONE_TYPE } for number in new_phone_numbers ] ], "etag": person['etag'] } contacts_len = len(batch_request['contacts']) if (contacts_len > 0): print('Количество контактов для обновления: {}'.format(contacts_len)) print('Обновляем контакты...') service.people().batchUpdateContacts(body = batch_request).execute() print('Обновление завершено!') else: print('Нечего обновлять!') except HttpError as err: print(err) def get_number_without_prefix(number: str, prefixes: list): for prefix in prefixes: if number.startswith(prefix): return number[len(prefix):] return False if __name__ == '__main__': main()