The versatility of APIs is truly astounding, as they empower developers to interconnect systems, share data, and automate processes in unique and groundbreaking ways. In this blog post, we’ll explore how developers can create a tool that transforms audio files sent to an email address into transcriptions with ease.
Imagine a scenario where an agent wants to transcribe an exceptional customer service conversation. Rather than requiring agents to log into ElevateAI, upload audio files, and download transcriptions, developers can construct an internal service to streamline the process by ingesting audio files, transcribing them, and delivering the transcripts directly.
You can download sample code with an implementation from its GitHub repository. If you want to send ElevateAI files in bulk, consider importing multiple audio files using the command line.
The GitHub repository references a submodule, the ElevateAI Python SDK. We’ll use the ElevateAI.py in the SDK to interface with the ElevateAI API.
At a high level:
For the transcription part of the code, the steps are:
The functions in ElevateAI.py, DeclareAudioInteraction, UploadInteraction, GetPunctuatedTranscript (or GetWordByWordTranscription), and GetAIResults will do the heavy lifting.
Let’s dive in!
Essentially, we want to pull out the IMAP and SMTP hostnames, usernames, and passwords.
def read_config(filename): """ Read and parse the configuration file. """ try: with open(filename, 'r') as f: config = json.load(f) required_fields = ['imap_server', 'imap_username', 'imap_password', 'smtp_server', 'smtp_username', 'smtp_password', 'api_token'] for field in required_fields: if field not in config: raise ValueError(f"Config file is missing required field: {field}") return config except FileNotFoundError: print(f'Error: Config file "{filename}" not found.') sys.exit(1) except json.JSONDecodeError: print(f'Error: Config file "{filename}" is not valid JSON.') sys.exit(1) except ValueError as e: print(f'Error: {e}') sys.exit(1)
For the sake of this exercise, we will only retrieve a specific email, but a POC will require a more robust implementation.
# Search for the newest email message with an attachment search_criteria = 'DATE' result, data = imap.sort(search_criteria, 'UTF-8', 'SUBJECT "Transcribe"') latest_email_id = data[0].split()[-1] # Fetch the email message and extract the attachment result, data = imap.fetch(latest_email_id, "(RFC822)") raw_email = data[0][1] email_message = email.message_from_bytes(raw_email) attachment_path = None sender_address = None for part in email_message.walk(): if part.get_content_maintype() == 'multipart': continue if part.get('Content-Disposition') is None: continue filename = part.get_filename() if not filename: continue # Save the attachment to a temporary file file_name = filename attachment_path = os.path.join(tmp_folder, filename) with open(attachment_path, 'wb') as f: f.write(part.get_payload(decode=True))
search_criteria = 'DATE' result, data = imap.sort(search_criteria, 'UTF-8', 'SUBJECT "Transcribe"') latest_email_id = data[0].split()[-1]
Use Python’s built in functionality email handling functionality to download the email attachment and store it.
for part in email_message.walk(): if part.get_content_maintype() == 'multipart': continue if part.get('Content-Disposition') is None: continue filename = part.get_filename() if not filename: continue # Save the attachment to a temporary file file_name = filename attachment_path = os.path.join(tmp_folder, filename) with open(attachment_path, 'wb') as f: f.write(part.get_payload(decode=True))
Send the audio to ElevateAI for transcription. Block and wait till the file is processed.
declareResp = ElevateAI.DeclareAudioInteraction(langaugeTag, vert, None, token, transcriptionMode, True) declareJson = declareResp.json() interactionId = declareJson["interactionIdentifier"] if (localFilePath is None): raise Exception('Something wrong with attachment') uploadInteractionResponse = ElevateAI.UploadInteraction(interactionId, token, localFilePath, fileName) #Loop over status until processed while True: getInteractionStatusResponse = ElevateAI.GetInteractionStatus(interactionId,token) getInteractionStatusResponseJson = getInteractionStatusResponse.json() if getInteractionStatusResponseJson["status"] == "processed" or getInteractionStatusResponseJson["status"] == "fileUploadFailed" or getInteractionStatusResponseJson["status"] == "fileDownloadFailed" or getInteractionStatusResponseJson["status"] == "processingFailed" : break time.sleep(15)
Once, we have the JSON, parse it so it reads like a conversation and store it.
def print_conversation(json_str): data = json.loads(json_str) filename = 'transcript.txt' # Initialize variables to store the accumulated phrases for each participant participantOne_phrases = "" participantTwo_phrases = "" tmp_folder = tempfile.mkdtemp() attachment_path = os.path.join(tmp_folder, filename) print("=== Begin Transcription Output ===\n\n") with open(attachment_path, 'w') as f: # Loop through the sentenceSegments list and accumulate phrases for each participant for segment in data['sentenceSegments']: if segment['participant'] == 'participantOne': participantOne_phrases += segment['phrase'] + " " elif segment['participant'] == 'participantTwo': participantTwo_phrases += segment['phrase'] + " " # If the next segment has a different participant, print the accumulated phrases and reset the variables if (data['sentenceSegments'].index(segment) != len(data['sentenceSegments'])-1) and (segment['participant'] != data['sentenceSegments'][data['sentenceSegments'].index(segment)+1]['participant']): p1 = participantOne_phrases.strip() p2 = participantTwo_phrases.strip() if p1: print("participantOne:\n" + p1 + "\n") f.write("participantOne:\n" + p1 + "\n\n") if p2: print("participantTwo:\n" + p2 + "\n") f.write("participantTwo:\n" + p2 + "\n\n") participantOne_phrases = "" participantTwo_phrases = "" # Print the accumulated phrases for the last participant p1 = participantOne_phrases.strip() p2 = participantTwo_phrases.strip() if p1: print("participantOne:\n" + p1 + "\n") f.write("participantOne:\n" + p1 + "\n\n") if p2: print("participantTwo:\n" + p2 + "\n") f.write("participantTwo:\n" + p2 + "\n\n") print("=== End Transcription Output ===\n\n") f.close() return attachment_path
Create a new email, attach the transcription, and send it back to the original sender.
def send_email_with_attachment(attachment_path, recipient_address, config): smtp_server = config["smtp_server"] smtp_username = config["smtp_username"] smtp_password = config["smtp_password"] # Log in to the SMTP server smtp = smtplib.SMTP_SSL(smtp_server) smtp.ehlo() smtp.login(smtp_username, smtp_password) print("SMTP logged in.") # Create a message object message = MIMEMultipart() message['From'] = smtp_username message['To'] = recipient_address message['Subject'] = "Completed Transcription" # Add the attachment to the message with open(attachment_path, 'r') as f: attachment = MIMEApplication(f.read(), _subtype='txt') attachment.add_header('Content-Disposition', 'attachment', filename=os.path.basename(attachment_path)) message.attach(attachment) # Send the message smtp.send_message(message) # Log out of the SMTP server smtp.quit()
Sample code can be found in GitHub.
Want more? Visit our Documentation Hub >> ElevateAI Documentation
Ready to Get Started? >> elevateai.com/getstarted