Introduction
Generative AI has rapidly evolved from an experimental technology to a transformative force across industries. As an experienced IT professional working with cloud platforms, understanding where this technology is heading is crucial for strategic planning and implementation. In this blog, we’ll explore the most significant trends shaping generative AI’s future, with practical implementation examples across AWS, GCP, Azure, and cloud-independent approaches.

Top Trends in Generative AI
1. Multimodal Models Becoming the Standard
Multimodal models that can process and generate across text, images, audio, and video are rapidly becoming the industry standard. These models can understand context across different types of data, creating more sophisticated and nuanced applications.
Multimodal Model Implementation Examples
# AWS Implementation
import boto3
import json
import base64
def aws_multimodal_processing(image_path, prompt):
# Initialize Bedrock client
bedrock_runtime = boto3.client(
service_name='bedrock-runtime',
region_name='us-east-1'
)
# Read and encode image
with open(image_path, "rb") as image_file:
image_bytes = image_file.read()
base64_image = base64.b64encode(image_bytes).decode('utf-8')
# Create request payload for Claude 3 Sonnet (multimodal model)
request_body = {
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": 1000,
"messages": [
{
"role": "user",
"content": [
{
"type": "image",
"source": {
"type": "base64",
"media_type": "image/jpeg",
"data": base64_image
}
},
{
"type": "text",
"text": prompt
}
]
}
]
}
# Invoke the model
response = bedrock_runtime.invoke_model(
modelId='anthropic.claude-3-sonnet-20240229-v1:0',
body=json.dumps(request_body)
)
# Parse and return the response
response_body = json.loads(response['body'].read())
return response_body['content'][0]['text']
# GCP Implementation
from google.cloud import aiplatform
from vertexai.preview.multimodal_model import MultimodalModel
def gcp_multimodal_processing(image_path, prompt):
# Initialize Vertex AI
aiplatform.init(project='your-gcp-project-id', location='us-central1')
# Load Gemini multimodal model
multimodal_model = MultimodalModel.from_pretrained("gemini-pro-vision")
# Process image and text
image = aiplatform.Image.load_from_file(image_path)
response = multimodal_model.generate_content(
[image, prompt],
generation_config={
"max_output_tokens": 1024,
"temperature": 0.4,
"top_p": 0.8,
"top_k": 40
}
)
return response.text
# Azure Implementation
import os
from openai import AzureOpenAI
def azure_multimodal_processing(image_path, prompt):
# Initialize Azure OpenAI client
client = AzureOpenAI(
api_key=os.getenv("AZURE_OPENAI_API_KEY"),
api_version="2023-12-01-preview",
azure_endpoint=os.getenv("AZURE_OPENAI_ENDPOINT")
)
# Read image file
with open(image_path, "rb") as image_file:
image_data = image_file.read()
# Create message with both image and text
messages = [
{
"role": "user",
"content": [
{
"type": "text",
"text": prompt
},
{
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{base64.b64encode(image_data).decode('utf-8')}"
}
}
]
}
]
# Get completion from the model
response = client.chat.completions.create(
model="gpt-4-vision-preview",
messages=messages,
max_tokens=1000
)
return response.choices[0].message.content
# Cloud-Independent Implementation (using Ollama)
import requests
import base64
def ollama_multimodal_processing(image_path, prompt):
# Read and encode image
with open(image_path, "rb") as image_file:
image_data = base64.b64encode(image_file.read()).decode('utf-8')
# Prepare the request
url = "http://localhost:11434/api/generate"
payload = {
"model": "llava",
"prompt": prompt,
"images": [image_data]
}
# Make the request
response = requests.post(url, json=payload)
result = response.json()
return result["response"]
Cost Comparison for Multimodal Processing:
Cloud Provider | Service | Cost Structure | Estimated Monthly Cost (10K queries) |
---|---|---|---|
AWS | Bedrock (Claude 3 Sonnet) | $15/1M input tokens, $60/1M output tokens | $750-1,500 |
GCP | Vertex AI (Gemini Pro Vision) | $5/1K image queries, $0.0025/text output token | $500-800 |
Azure | Azure OpenAI (GPT-4V) | $10/1K images, $0.03/1K output tokens | $700-1,200 |
Self-hosted | Ollama (LLaVA) | Hardware + electricity costs | $200-500 initial + $50-100/month |
2. Fine-Tuning and Customization at Scale
Organizations are moving beyond generic models to custom-tuned AI tailored to specific domains and use cases. Fine-tuning allows businesses to optimize models for their unique data and requirements.


Fine-Tuning Implementation Examples
# AWS SageMaker Fine-Tuning Example
import boto3
import sagemaker
from sagemaker.huggingface import HuggingFace
def aws_fine_tuning():
# Initialize SageMaker session
session = sagemaker.Session()
role = sagemaker.get_execution_role()
# Define hyperparameters
hyperparameters = {
'model_id': 'meta-llama/Llama-2-7b',
'epochs': 3,
'per_device_train_batch_size': 4,
'per_device_eval_batch_size': 4,
'gradient_accumulation_steps': 8,
'learning_rate': 2e-5,
'warmup_steps': 100
}
# Define training job configuration
huggingface_estimator = HuggingFace(
entry_point='train.py',
source_dir='./scripts',
instance_type='ml.g5.2xlarge',
instance_count=1,
role=role,
transformers_version='4.28.1',
pytorch_version='2.0.0',
py_version='py310',
hyperparameters=hyperparameters
)
# Start training
huggingface_estimator.fit({
'train': 's3://your-bucket/training-data',
'validation': 's3://your-bucket/validation-data'
})
# Deploy model
predictor = huggingface_estimator.deploy(
initial_instance_count=1,
instance_type='ml.g5.xlarge'
)
return predictor.endpoint_name
# GCP Vertex AI Fine-Tuning Example
from google.cloud import aiplatform
from google.cloud.aiplatform import vertex_ai_tuning
def gcp_fine_tuning():
# Initialize Vertex AI
aiplatform.init(project='your-project-id', location='us-central1')
# Create a custom training job
job = aiplatform.CustomTrainingJob(
display_name="llama-tuning-job",
script_path="train.py",
container_uri="us-docker.pkg.dev/vertex-ai/training/pytorch-gpu.1-13:latest",
requirements=["transformers==4.28.1", "datasets==2.12.0", "accelerate==0.19.0"],
model_serving_container_image_uri="us-docker.pkg.dev/vertex-ai/prediction/pytorch-gpu.1-13:latest"
)
# Start the training job
model = job.run(
machine_type="n1-standard-8",
accelerator_type="NVIDIA_TESLA_V100",
accelerator_count=1,
replica_count=1,
dataset_uri="gs://your-bucket/training-data",
args=[
"--model_name_or_path=meta-llama/Llama-2-7b",
"--output_dir=./model",
"--num_train_epochs=3",
"--per_device_train_batch_size=4"
]
)
# Deploy the model
endpoint = model.deploy(
machine_type="n1-standard-4",
accelerator_type="NVIDIA_TESLA_T4",
accelerator_count=1
)
return endpoint.resource_name
# Azure OpenAI Fine-Tuning Example
import os
from openai import AzureOpenAI
def azure_fine_tuning():
# Initialize Azure OpenAI client
client = AzureOpenAI(
api_key=os.getenv("AZURE_OPENAI_API_KEY"),
api_version="2023-12-01-preview",
azure_endpoint=os.getenv("AZURE_OPENAI_ENDPOINT")
)
# Create fine-tuning job
response = client.fine_tuning.jobs.create(
model="gpt-35-turbo",
training_file="file-abc123", # File ID of uploaded training data
hyperparameters={
"n_epochs": 3,
"batch_size": 4,
"learning_rate_multiplier": 0.1
},
suffix="customer-support-specialized"
)
job_id = response.id
# Monitor job progress
job_status = client.fine_tuning.jobs.retrieve(job_id)
# When complete, the fine-tuned model can be accessed using the model name
fine_tuned_model = f"{job_status.fine_tuned_model}"
return fine_tuned_model
# Cloud-Independent Implementation (using HuggingFace Transformers)
from transformers import (
AutoModelForCausalLM,
AutoTokenizer,
TrainingArguments,
Trainer,
DataCollatorForLanguageModeling
)
from datasets import load_dataset
def huggingface_fine_tuning():
# Load base model and tokenizer
model_name = "meta-llama/Llama-2-7b"
model = AutoModelForCausalLM.from_pretrained(model_name)
tokenizer = AutoTokenizer.from_pretrained(model_name)
# Prepare training data
dataset = load_dataset("your-dataset")
def tokenize_function(examples):
return tokenizer(examples["text"], truncation=True, padding="max_length", max_length=512)
tokenized_dataset = dataset.map(tokenize_function, batched=True)
# Define training arguments
training_args = TrainingArguments(
output_dir="./results",
per_device_train_batch_size=4,
gradient_accumulation_steps=8,
learning_rate=2e-5,
num_train_epochs=3,
weight_decay=0.01,
save_strategy="epoch",
evaluation_strategy="epoch",
fp16=True, # Mixed precision training
)
# Initialize trainer
data_collator = DataCollatorForLanguageModeling(
tokenizer=tokenizer,
mlm=False
)
trainer = Trainer(
model=model,
args=training_args,
train_dataset=tokenized_dataset["train"],
eval_dataset=tokenized_dataset["validation"],
data_collator=data_collator,
)
# Start training
trainer.train()
# Save the fine-tuned model
model.save_pretrained("./fine-tuned-model")
tokenizer.save_pretrained("./fine-tuned-model")
return "./fine-tuned-model"
Cost Comparison for Fine-Tuning:
Cloud Provider | Service | Hardware | Cost Structure | Est. Cost (1 week, single model) |
---|---|---|---|---|
AWS | SageMaker | ml.g5.2xlarge | $1.24/hour | $208-$250 |
GCP | Vertex AI | NVIDIA V100 | $2.48/hour | $416-$500 |
Azure | Azure OpenAI | N/A (Managed) | $80/training hr, $0.008/1K tokens | $300-$400 |
Self-hosted | Local/Collab | A100 (cloud VM) | ~$3.00/hour | $504-$600 |
3. Retrieval-Augmented Generation (RAG) Becoming Standard
RAG is transforming how organizations leverage their private data with generative AI, combining the power of LLMs with specialized knowledge retrieval systems.


RAG Implementation Examples
# AWS RAG Implementation
import boto3
import json
from opensearchpy import OpenSearch, RequestsHttpConnection
from requests_aws4auth import AWS4Auth
def aws_rag_implementation(query, index_name="documents"):
# Initialize clients
region = "us-east-1"
bedrock_runtime = boto3.client(service_name="bedrock-runtime", region_name=region)
# Initialize OpenSearch client
host = "your-opensearch-endpoint.us-east-1.es.amazonaws.com"
service = "es"
credentials = boto3.Session().get_credentials()
awsauth = AWS4Auth(credentials.access_key, credentials.secret_key,
region, service, session_token=credentials.token)
opensearch_client = OpenSearch(
hosts=[{"host": host, "port": 443}],
http_auth=awsauth,
use_ssl=True,
verify_certs=True,
connection_class=RequestsHttpConnection
)
# Step 1: Convert user query to embedding using the embeddings model
embedding_response = bedrock_runtime.invoke_model(
modelId="amazon.titan-embed-text-v1",
contentType="application/json",
accept="application/json",
body=json.dumps({"inputText": query})
)
embedding_response_body = json.loads(embedding_response.get("body").read())
embedding = embedding_response_body.get("embedding")
# Step 2: Search OpenSearch index with the embedding
search_query = {
"size": 5,
"query": {
"knn": {
"embedding_vector": {
"vector": embedding,
"k": 5
}
}
}
}
search_response = opensearch_client.search(
body=search_query,
index=index_name
)
# Step 3: Extract relevant context from search results
contexts = []
for hit in search_response["hits"]["hits"]:
contexts.append(hit["_source"]["text"])
context_text = "\n\n".join(contexts)
# Step 4: Generate response with context using Claude model
prompt = f"""You are an AI assistant helping with information retrieval.
Use the following context to answer the user's question. If the answer is not
in the context, say that you don't know based on available information.
Context:
{context_text}
User Question: {query}
"""
response = bedrock_runtime.invoke_model(
modelId="anthropic.claude-3-sonnet-20240229-v1:0",
contentType="application/json",
accept="application/json",
body=json.dumps({
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": 1000,
"messages": [
{
"role": "user",
"content": prompt
}
]
})
)
response_body = json.loads(response.get("body").read())
answer = response_body["content"][0]["text"]
return {
"query": query,
"context": context_text,
"answer": answer
}
# GCP RAG Implementation
from google.cloud import aiplatform
from vertexai.language_models import TextEmbeddingModel, TextGenerationModel
import vertexai
import numpy as np
def gcp_rag_implementation(query, project_id="your-project-id", location="us-central1"):
# Initialize Vertex AI
vertexai.init(project=project_id, location=location)
# Step 1: Generate embeddings for the query
embedding_model = TextEmbeddingModel.from_pretrained("textembedding-gecko@latest")
query_embeddings = embedding_model.get_embeddings([query])[0].values
# Step 2: Search Vector Store (using Vertex AI Vector Search)
# Assuming you have already created an index and populated it
index_endpoint = aiplatform.MatchingEngineIndexEndpoint(
index_endpoint_name="your-index-endpoint"
)
matched_neighbors = index_endpoint.find_neighbors(
deployed_index_id="your-deployed-index-id",
queries=[query_embeddings],
num_neighbors=5
)
# Step 3: Retrieve content from the matched documents
# This assumes you've stored document IDs in the index and can retrieve content
contexts = []
for neighbor in matched_neighbors[0]:
# Retrieve document content based on neighbor.id
# This is placeholder code - in practice, you'd lookup the content
doc_content = retrieve_document_by_id(neighbor.id)
contexts.append(doc_content)
context_text = "\n\n".join(contexts)
# Step 4: Generate response with LLM
generation_model = TextGenerationModel.from_pretrained("gemini-pro")
prompt = f"""You are an AI assistant helping with information retrieval.
Use the following context to answer the user's question. If the answer is not
in the context, say that you don't know based on available information.
Context:
{context_text}
User Question: {query}
"""
response = generation_model.predict(
prompt=prompt,
temperature=0.2,
max_output_tokens=1024,
)
return {
"query": query,
"context": context_text,
"answer": response.text
}
def retrieve_document_by_id(doc_id):
# This is a placeholder function
# In a real application, you would retrieve the document from a database
return f"Content for document {doc_id}"
# Azure RAG Implementation
import os
from openai import AzureOpenAI
from azure.search.documents import SearchClient
from azure.core.credentials import AzureKeyCredential
def azure_rag_implementation(query):
# Initialize clients
search_service_endpoint = os.getenv("AZURE_SEARCH_SERVICE_ENDPOINT")
search_api_key = os.getenv("AZURE_SEARCH_API_KEY")
index_name = "documents"
openai_client = AzureOpenAI(
api_key=os.getenv("AZURE_OPENAI_API_KEY"),
api_version="2023-12-01-preview",
azure_endpoint=os.getenv("AZURE_OPENAI_ENDPOINT")
)
search_client = SearchClient(
endpoint=search_service_endpoint,
index_name=index_name,
credential=AzureKeyCredential(search_api_key)
)
# Step 1: Generate embeddings for the query
embedding_response = openai_client.embeddings.create(
input=query,
model="text-embedding-ada-002"
)
query_embedding = embedding_response.data[0].embedding
# Step 2: Search Azure AI Search using vector search
search_results = search_client.search(
search_text=None, # Not using keyword search
vector=query_embedding,
vector_fields=["embedding"],
top=5
)
# Step 3: Extract context from search results
contexts = []
for result in search_results:
contexts.append(result["content"])
context_text = "\n\n".join(contexts)
# Step 4: Generate response with Azure OpenAI
prompt = f"""You are an AI assistant helping with information retrieval.
Use the following context to answer the user's question. If the answer is not
in the context, say that you don't know based on available information.
Context:
{context_text}
User Question: {query}
"""
response = openai_client.chat.completions.create(
model="gpt-4",
messages=[
{"role": "system", "content": "You are an AI assistant helping with information retrieval."},
{"role": "user", "content": prompt}
],
temperature=0.3,
max_tokens=800
)
return {
"query": query,
"context": context_text,
"answer": response.choices[0].message.content
}
# Cloud-Independent RAG Implementation (using open-source tools)
from langchain.vectorstores import Chroma
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.llms import Ollama
from langchain.prompts import PromptTemplate
from langchain.chains import RetrievalQA
def open_source_rag_implementation(query, db_directory="./chroma_db"):
# Step 1: Initialize embedding model
embedding_model = HuggingFaceEmbeddings(
model_name="sentence-transformers/all-MiniLM-L6-v2"
)
# Step 2: Load vector database
# Note: Assumes you've already populated the database
vector_db = Chroma(
persist_directory=db_directory,
embedding_function=embedding_model
)
# Step 3: Initialize retriever
retriever = vector_db.as_retriever(search_kwargs={"k": 5})
# Step 4: Initialize LLM
llm = Ollama(model="llama2")
# Step 5: Create prompt template
prompt_template = """You are an AI assistant helping with information retrieval.
Use the following context to answer the user's question. If the answer is not
in the context, say that you don't know based on available information.
Context:
{context}
User Question: {question}
"""
prompt = PromptTemplate(
template=prompt_template,
input_variables=["context", "question"]
)
# Step 6: Create RAG chain
qa_chain = RetrievalQA.from_chain_type(
llm=llm,
chain_type="stuff",
retriever=retriever,
chain_type_kwargs={"prompt": prompt}
)
# Step 7: Run query
result = qa_chain.invoke({"query": query})
return {
"query": query,
"answer": result["result"]
}
Cost Comparison for RAG Solutions:
Cloud Provider | Component | Service | Cost Structure | Est. Monthly Cost (100K queries) |
---|---|---|---|---|
AWS | Vector DB | OpenSearch | $0.138/hour (r6g.large.search) | $100-200 |
LLM | Bedrock (Claude 3) | $15/1M input tokens, $60/1M output tokens | $1,500-3,000 | |
Storage | S3 | $0.023/GB | $5-50 | |
GCP | Vector DB | Vertex AI Vector Search | $0.45/1K queries | $45-90 |
LLM | Vertex AI (Gemini Pro) | $0.0025/1K input tokens, $0.00375/1K output tokens | $1,000-2,000 | |
Storage | Cloud Storage | $0.020/GB | $5-50 | |
Azure | Vector DB | Azure AI Search | $100/search unit | $100-200 |
LLM | Azure OpenAI (GPT-4) | $0.03/1K input tokens, $0.06/1K output tokens | $1,800-3,600 | |
Storage | Blob Storage | $0.0184/GB | $5-50 | |
Self-hosted | Vector DB | Chroma/Qdrant | Hardware costs | $50-150 |
LLM | Ollama (Llama) | Hardware costs | $100-500 | |
Storage | Local Storage | Hardware costs | $5-25 |
4. Responsible AI and Governance Frameworks
As generative AI adoption increases, implementing robust governance frameworks becomes crucial for mitigating risks like hallucinations, bias, and security vulnerabilities.


AI Governance Implementation Examples
# AWS - Implementing AI Governance with SageMaker Model Monitor and Clarify
import boto3
import json
import numpy as np
import pandas as pd
from sagemaker.clarify import DataConfig, BiasConfig, ModelConfig, SHAPConfig
from sagemaker.model_monitor import ModelMonitor, DataCaptureConfig
def aws_ai_governance():
# Initialize SageMaker client
sagemaker_client = boto3.client('sagemaker')
# Step 1: Configure bias detection with SageMaker Clarify
bias_analysis_config = {
"bias_config": {
"label": "target",
"facet": [{"name_or_index": "sensitive_attribute"}],
"label_values_or_threshold": [1],
"group_variable": "sensitive_attribute"
},
"methods": {
"pre_training_bias": {
"methods": ["DPPL", "DI", "DCR"]
},
"post_training_bias": {
"methods": ["DPPL", "DI", "DCR", "DCA", "DCR"]
}
},
"report": {
"name": "bias_report",
"title": "Bias Report"
}
}
# Step 2: Set up model monitoring
model_monitor = ModelMonitor(
role="arn:aws:iam::123456789012:role/SageMakerMonitoringRole",
instance_count=1,
instance_type="ml.m5.xlarge",
volume_size_in_gb=20,
max_runtime_in_seconds=1800
)
# Configure data capture for the endpoint
data_capture_config = DataCaptureConfig(
enable_capture=True,
sampling_percentage=100,
destination_s3_uri="s3://your-bucket/data-capture"
)
# Step 3: Create model explainability configuration with SHAP
explainability_config = {
"shap_config": {
"baseline": "s3://your-bucket/baseline.csv",
"num_samples": 100,
"agg_method": "mean_abs",
"use_logit": False
},
"report": {
"name": "explainability_report",
"title": "Model Explainability Report"
}
}
# Step 4: Set up CloudWatch alerting
cloudwatch_client = boto3.client('cloudwatch')
# Create alarm for model drift
response = cloudwatch_client.put_metric_alarm(
AlarmName="ModelDriftAlarm",
ComparisonOperator="GreaterThanThreshold",
EvaluationPeriods=1,
MetricName="feature_drift_score",
Namespace="AWS/SageMaker",
Period=3600,
Statistic="Maximum",
Threshold=0.7,
ActionsEnabled=True,
AlarmActions=["arn:aws:sns:us-east-1:123456789012:ModelMonitoringTopic"],
AlarmDescription="Alarm when model feature drift exceeds threshold",
Dimensions=[
{
"Name": "EndpointName",
"Value": "your-endpoint-name"
}
]
)
# Step 5: Implement model governance logging
def log_model_governance_event(event_type, event_details):
logs_client = boto3.client('logs')
logs_client.put_log_events(
logGroupName="/aws/sagemaker/model-governance",
logStreamName="model-events",
logEvents=[
{
'timestamp': int(time.time() * 1000),
'message': json.dumps({
'event_type': event_type,
'details': event_details
})
}
]
)
return {
"bias_config": bias_analysis_config,
"monitoring_config": data_capture_config,
"explainability_config": explainability_config
}
# GCP - Implementing AI Governance with Vertex AI
from google.cloud import aiplatform
from google.cloud.aiplatform.metadata import metadata_store
from google.cloud.aiplatform_v1 import ModelMonitoringServiceClient
from google.cloud.aiplatform_v1.types import (
ModelMonitoringAlertConfig,
ModelMonitoringObjectiveConfig,
ThresholdConfig
)
import datetime
def gcp_ai_governance(project_id="your-project-id", location="us-central1"):
# Initialize Vertex AI
aiplatform.init(project=project_id, location=location)
# Step 1: Set up Vertex ML Metadata tracking
metadata_store_client = metadata_store.MetadataStore()
# Create a metadata schema for model governance
schema_title = "model_governance_schema"
schema_version = "0.0.1"
schema_metadata = {
"description": "Schema for tracking model governance metrics",
"owner": "AI Governance Team",
}
schema_property_specs = {
"model_name": {"string_type": {}},
"model_version": {"string_type": {}},
"training_dataset": {"string_type": {}},
"evaluation_metrics": {"struct_type": {}},
"bias_metrics": {"struct_type": {}},
"approval_status": {"string_type": {}},
"approver": {"string_type": {}},
"approval_date": {"string_type": {}},
}
# Create the schema
schema = metadata_store_client.create_metadata_schema(
schema_title=schema_title,
schema_version=schema_version,
schema_metadata=schema_metadata,
property_specs=schema_property_specs,
)
# Step 2: Configure Model Monitoring
monitoring_client = ModelMonitoringServiceClient()
# Configure monitoring objective (for feature drift)
objective_config = ModelMonitoringObjectiveConfig(
training_dataset={"gcs_source": {"uris": ["gs://your-bucket/training-data.csv"]}},
training_prediction_skew_detection_config={
"skew_thresholds": {
"feature1": ThresholdConfig(value=0.3),
"feature2": ThresholdConfig(value=0.2)
}
}
)
# Configure alert policy
alert_config = ModelMonitoringAlertConfig(
email_alert_config={
"user_emails": ["admin@example.com", "data-scientist@example.com"]
},
notification_channels=["projects/your-project-id/notificationChannels/123456789"]
)
# Step 3: Set up explainability for the model
def create_explainable_model(model_id, instance_type="n1-standard-4"):
model = aiplatform.Model(model_id)
explanation_metadata = {
"inputs": {
"feature1": {"input_tensor_name": "feature1", "modality": "numeric"},
"feature2": {"input_tensor_name": "feature2", "modality": "numeric"}
},
"outputs": {
"prediction": {"output_tensor_name": "prediction"}
}
}
explanation_parameters = {
"sampled_shapley_attribution": {
"path_count": 10
}
}
endpoint = model.deploy(
machine_type=instance_type,
explanation_metadata=explanation_metadata,
explanation_parameters=explanation_parameters
)
return endpoint
# Step 4: Create a logging function for governance events
def log_governance_event(event_type, event_details):
# Create a metadata entry for this governance event
metadata_entry = metadata_store_client.create_metadata(
metadata_schema=schema.name,
metadata={
"model_name": event_details.get("model_name", ""),
"model_version": event_details.get("model_version", ""),
"training_dataset": event_details.get("training_dataset", ""),
"evaluation_metrics": event_details.get("evaluation_metrics", {}),
"bias_metrics": event_details.get("bias_metrics", {}),
"approval_status": event_details.get("approval_status", "PENDING"),
"approver": event_details.get("approver", ""),
"approval_date": datetime.datetime.now().isoformat()
}
)
return metadata_entry
return {
"metadata_schema": schema.name,
"objective_config": objective_config,
"alert_config": alert_config
}
# Azure - Implementing AI Governance
from azure.ai.ml import MLClient
from azure.ai.ml.entities import Model, ModelConfiguration, ModelMonitor
from azure.identity import DefaultAzureCredential
from azure.ai.ml.entities._assets.linked_service import LinkedService
from azure.ai.ml.entities._assets.data_asset import DataAsset
from azure.ai.ml.entities._job.monitor import MonitorData, MonitorTarget, MonitorType
def azure_ai_governance():
# Initialize Azure ML client
credential = DefaultAzureCredential()
ml_client = MLClient(
credential=credential,
subscription_id="your-subscription-id",
resource_group_name="your-resource-group",
workspace_name="your-workspace"
)
# Step 1: Register model with responsible AI information
model = Model(
path="azureml://jobs/model-training-job/outputs/model",
name="responsible-ai-model",
description="Model with responsible AI documentation and monitoring",
type="custom_model",
)
registered_model = ml_client.models.create_or_update(model)
# Step 2: Create data drift monitor
# First, register baseline data
baseline_data = DataAsset(
name="model-baseline-data",
version="1",
description="Baseline data for drift detection",
path="azureml://datastores/your-datastore/paths/baseline"
)
registered_baseline = ml_client.data.create_or_update(baseline_data)
# Create model monitor for data drift
drift_monitor = ModelMonitor(
name="data-drift-monitor",
description="Monitor data drift for our model",
target=MonitorTarget(
endpoint_name="your-endpoint-name",
model_name=registered_model.name,
model_version=registered_model.version
),
dataset_monitor=MonitorData(
baseline_data=registered_baseline.id,
target_data="azureml://datastores/your-datastore/paths/target",
metrics=["jensen_shannon_distance", "wasserstein_distance"],
alert_enabled=True,
alert_threshold=0.7
),
schedule="0 0 * * *", # Daily at midnight
log_analytics={
"workspace_id": "your-log-analytics-workspace-id",
"primary_key": "your-log-analytics-primary-key"
}
)
drift_monitor_job = ml_client.model_monitors.begin_create_or_update(drift_monitor)
# Step 3: Implement explainability for the model
# Explainability is set during deployment
def deploy_model_with_explanations():
# Register the environment for the model
env = Environment(
name="interpret-env",
description="Environment with explainer packages",
conda_file="path/to/conda.yml" # Include interpret-ml package here
)
registered_env = ml_client.environments.create_or_update(env)
# Create inference config with explainer
inference_config = InferenceConfig(
entry_script="score.py",
environment=registered_env
)
# Include explainer settings in the deployment config
deployment_config = AksWebservice.deploy_configuration(
cpu_cores=1,
memory_gb=1,
enable_app_insights=True,
collect_model_data=True
)
# Deploy the model with explainability
service = Model.deploy(
ml_client,
service_name="explainable-model-service",
models=[registered_model],
inference_config=inference_config,
deployment_config=deployment_config,
overwrite=True
)
return service
# Step 4: Implement model governance auditing
def log_governance_action(action_type, details):
from applicationinsights import TelemetryClient
# Initialize Application Insights for logging
telemetry_client = TelemetryClient("your-app-insights-key")
# Log the governance action
telemetry_client.track_event(
action_type,
properties={
"model_id": registered_model.id,
"user": details.get("user", "unknown"),
"timestamp": datetime.datetime.now().isoformat(),
"action_details": json.dumps(details)
}
)
telemetry_client.flush()
return {
"registered_model": registered_model.id,
"drift_monitor": drift_monitor_job.name
}
# Cloud-Independent Implementation with MLflow and Evidently
import mlflow
import pandas as pd
from evidently.model_profile import Profile
from evidently.model_profile.sections import DataDriftProfileSection
from evidently.pipeline.column_mapping import ColumnMapping
import json
import logging
from datetime import datetime
def open_source_ai_governance():
# Step 1: Set up MLflow tracking
mlflow.set_tracking_uri("http://localhost:5000")
experiment_name = "responsible-ai-governance"
mlflow.set_experiment(experiment_name)
# Step 2: Register the model with governance metadata
with mlflow.start_run(run_name="model-governance-setup") as run:
# Log governance metadata
mlflow.log_params({
"governance_framework": "NIST AI Risk Management Framework",
"intended_use": "Customer churn prediction",
"model_owner": "Data Science Team",
"approval_status": "PENDING_REVIEW",
"risk_level": "MEDIUM",
"data_sensitivity": "INTERNAL_ONLY"
})
# Log model performance metrics
mlflow.log_metrics({
"accuracy": 0.85,
"precision": 0.83,
"recall": 0.79,
"f1_score": 0.81,
"auc": 0.88
})
# Log model with additional governance artifacts
# This assumes you've already trained the model
# model_path = "path/to/your/model"
# mlflow.sklearn.log_model(model, "model")
# Log model card as JSON
model_card = {
"model_details": {
"name": "Customer Churn Predictor",
"version": "1.0.0",
"description": "Predicts likelihood of customer churn based on usage patterns and demographics"
},
"intended_use": {
"primary_uses": ["Identify at-risk customers for retention campaigns"],
"out_of_scope_uses": ["Automated decision-making without human review", "Credit decisions"]
},
"factors": {
"relevant_factors": ["Demographics", "Usage patterns", "Customer history"],
"evaluation_factors": ["Performance varies by customer tenure", "Sensitive to seasonal patterns"]
},
"metrics": {
"performance_measures": ["Accuracy", "Precision", "Recall", "F1", "AUC"],
"decision_thresholds": "Default threshold is 0.5, can be adjusted based on business needs"
},
"evaluation_data": {
"datasets": ["Training: Jan-June 2023", "Testing: July-Sept 2023"],
"motivation": "Temporal split to evaluate model stability over time"
},
"training_data": {
"datasets": ["Customer data from Jan 2021 to June 2023"],
"preprocessing": ["Missing value imputation", "Feature normalization"]
},
"quantitative_analyses": {
"bias_evaluation": "Evaluated fairness across age groups and geographic regions",
"performance_results": "See metrics logged with this model version"
},
"ethical_considerations": {
"ethical_risks": ["Potential reinforcement of historical biases", "Privacy concerns"],
"mitigations": ["Regular fairness audits", "Anonymized feature engineering"]
}
}
with open("model_card.json", "w") as f:
json.dump(model_card, f, indent=2)
mlflow.log_artifact("model_card.json", "governance")
# Step 3: Set up data drift monitoring with Evidently
def monitor_data_drift(reference_data, current_data, column_mapping=None):
# Initialize data drift profile
data_drift_profile = Profile(sections=[DataDriftProfileSection()])
# Calculate drift
data_drift_profile.calculate(reference_data, current_data, column_mapping=column_mapping)
# Get drift report as JSON
drift_json = data_drift_profile.json()
# Log drift report to MLflow
with mlflow.start_run(run_name="data-drift-monitoring") as run:
# Convert drift report to dictionary
drift_report = json.loads(drift_json)
# Extract and log key metrics
drift_share = drift_report["data_drift"]["data"]["metrics"]["share_of_drifted_columns"]
mlflow.log_metric("drift_share", drift_share)
# Log full report as artifact
with open("drift_report.json", "w") as f:
json.dump(drift_report, f)
mlflow.log_artifact("drift_report.json", "monitoring/drift")
# Log alert if drift exceeds threshold
if drift_share > 0.3:
logging.warning(f"Data drift detected: {drift_share:.2f} of features have drifted")
# Record governance event
governance_event = {
"event_type": "DATA_DRIFT_ALERT",
"timestamp": datetime.now().isoformat(),
"details": {
"drift_share": drift_share,
"drifted_columns": [col for col, drift in
drift_report["data_drift"]["data"]["metrics"]["column_drift"].items()
if drift["drift_detected"]],
"severity": "HIGH" if drift_share > 0.5 else "MEDIUM"
}
}
with open("drift_alert.json", "w") as f:
json.dump(governance_event, f)
mlflow.log_artifact("drift_alert.json", "governance/alerts")
return drift_report
# Step 4: Create governance approval workflow
def record_governance_approval(model_version, approver, decision, comments):
with mlflow.start_run(run_name="governance-approval") as run:
approval_event = {
"model_version": model_version,
"approver": approver,
"decision": decision, # "APPROVED", "REJECTED", "NEEDS_CHANGES"
"timestamp": datetime.now().isoformat(),
"comments": comments,
"requirements_met": decision == "APPROVED"
}
# Log governance event
with open("approval_record.json", "w") as f:
json.dump(approval_event, f)
mlflow.log_artifact("approval_record.json", "governance/approvals")
# Update model tags with approval status
client = mlflow.tracking.MlflowClient()
client.set_model_version_tag(
name="customer-churn-model",
version=model_version,
key="approval_status",
value=decision
)
# Log approval as a metric for tracking
mlflow.log_param("approval_decision", decision)
mlflow.log_param("approver", approver)
return approval_event
return {
"experiment_name": experiment_name,
"governance_functions": {
"monitor_data_drift": monitor_data_drift,
"record_governance_approval": record_governance_approval
}
}
Cost Comparison for AI Governance Solutions:
Cloud Provider | Component | Service | Cost Structure | Est. Monthly Cost |
---|---|---|---|---|
AWS | Monitoring | SageMaker Model Monitor | $0.10/hour per schedule | $72-144 |
Bias Detection | SageMaker Clarify | $0.138/hour instance cost | $50-100 | |
Logging | CloudWatch | $0.30/GB ingested, $0.03/million metrics | $30-100 | |
GCP | Monitoring | Vertex AI Model Monitoring | $0.10/1K predictions monitored | $50-150 |
Explainability | Vertex Explainable AI | $0.25/1K explanations | $25-75 | |
Logging | Cloud Logging | $0.50/GB ingested beyond free tier | $25-100 | |
Azure | Monitoring | Azure AI Monitoring | Part of deployment costs + Log Analytics | $50-150 |
Responsible AI | Responsible AI dashboard | Included with workspace | $0 | |
Logging | Azure Monitor | $2.30/GB ingested | $25-100 | |
Self-hosted | Complete Solution | MLflow + Evidently | Server costs + maintenance | $100-300 |
5. Edge AI and On-Device Generation
As models become more efficient, on-device inference is becoming increasingly viable, enabling privacy-preserving, low-latency applications even in environments with limited connectivity.
Edge AI Architecture Comparison






Edge AI Implementation Examples
AWS IoT Greengrass Implementation:
- Complete implementation of
EdgeGenerativeAI
class with text generation and MQTT publishing - Comprehensive Greengrass component recipe for deployment
- Example usage function to demonstrate the implementation
# AWS IoT Greengrass Implementation for Edge AI
import json
import awsiot.greengrasscoreipc
import awsiot.greengrasscoreipc.client as client
from awsiot.greengrasscoreipc.model import (
QOS,
PublishToIoTCoreRequest
)
import onnxruntime as ort
import numpy as np
from transformers import AutoTokenizer, AutoConfig
class EdgeGenerativeAI:
def __init__(self, model_path="/greengrass/v2/packages/artifacts/EdgeAI/models/"):
# Initialize tokenizer
self.tokenizer = AutoTokenizer.from_pretrained(f"{model_path}/tokenizer")
# Load ONNX optimized model
self.onnx_session = ort.InferenceSession(
f"{model_path}/model_quantized.onnx",
providers=["CPUExecutionProvider"] # Or use "CUDAExecutionProvider" if GPU available
)
# Get model config
self.config = AutoConfig.from_pretrained(f"{model_path}/config")
# Initialize IPC client for AWS IoT Greengrass
self.ipc_client = awsiot.greengrasscoreipc.connect()
def generate_text(self, prompt, max_length=100):
# Tokenize input
inputs = self.tokenizer(prompt, return_tensors="np")
# Get input names for the ONNX model
input_names = [input.name for input in self.onnx_session.get_inputs()]
# Prepare inputs for the model
onnx_inputs = {name: inputs[name.split('.')[-1]] for name in input_names if name.split('.')[-1] in inputs}
# Run inference
outputs = self.onnx_session.run(None, onnx_inputs)
# Process outputs to generate text
generated_ids = outputs[0]
generated_text = self.tokenizer.decode(generated_ids[0], skip_special_tokens=True)
return generated_text
def publish_result(self, result, topic="edge/ai/results"):
# Create request to publish to IoT Core
request = PublishToIoTCoreRequest()
request.topic_name = topic
request.payload = json.dumps({"result": result}).encode()
request.qos = QOS.AT_LEAST_ONCE
# Publish the message
operation = self.ipc_client.new_publish_to_iot_core()
operation.activate(request)
future = operation.get_response()
# Wait for the response
future.result(timeout=10)
return True
# Example usage
def run_edge_inference():
# Initialize the generative AI component
edge_ai = EdgeGenerativeAI()
# Generate text from a prompt
prompt = "Summarize the benefits of edge AI in three points:"
generated_text = edge_ai.generate_text(prompt)
# Publish the result to AWS IoT Core
edge_ai.publish_result(generated_text)
return generated_text
# Main entry point
if __name__ == "__main__":
result = run_edge_inference()
print(f"Generated text: {result}")
# Example component recipe for AWS IoT Greengrass deployment
"""
{
"RecipeFormatVersion": "2020-01-25",
"ComponentName": "com.example.EdgeGenerativeAI",
"ComponentVersion": "1.0.0",
"ComponentDescription": "Edge AI component for local generative AI inference",
"ComponentPublisher": "Example Corp",
"ComponentDependencies": {
"aws.greengrass.TokenExchangeService": {
"VersionRequirement": ">=2.0.0",
"DependencyType": "HARD"
}
},
"Manifests": [
{
"Platform": {
"os": "linux"
},
"Lifecycle": {
"Install": "pip3 install onnxruntime transformers numpy",
"Run": "python3 {artifacts:decompressedPath}/edge_ai.py"
},
"Artifacts": [
{
"URI": "s3://your-bucket/edge_ai.py",
"Unarchive": "NONE"
},
{
"URI": "s3://your-bucket/models/model_quantized.onnx",
"Unarchive": "NONE"
},
{
"URI": "s3://your-bucket/models/tokenizer/",
"Unarchive": "ZIP"
},
{
"URI": "s3://your-bucket/models/config/",
"Unarchive": "ZIP"
}
]
}
]
}
"""
GCP Edge TPU Implementation:
- Optimized implementation for Google’s Edge TPU hardware
- Custom tokenization and text generation for edge deployment
- Integration with Google Cloud Pub/Sub for result publishing
# GCP Edge TPU Implementation
from pycoral.utils import edgetpu
from pycoral.adapters import common
from pycoral.adapters import classify
import tflite_runtime.interpreter as tflite
import numpy as np
import json
class GCPEdgeAI:
def __init__(self, model_path="models/edge_model_quantized_edgetpu.tflite", vocab_path="models/vocab.txt"):
# Initialize TensorFlow Lite interpreter with Edge TPU
self.interpreter = edgetpu.make_interpreter(model_path)
self.interpreter.allocate_tensors()
# Get input and output details
self.input_details = self.interpreter.get_input_details()
self.output_details = self.interpreter.get_output_details()
# Load vocabulary for tokenization/detokenization
self.vocab = self._load_vocab(vocab_path)
self.id_to_token = {idx: token for idx, token in enumerate(self.vocab)}
def _load_vocab(self, vocab_path):
with open(vocab_path, 'r') as f:
return [line.strip() for line in f]
def generate_text(self, prompt, max_length=50):
# Basic tokenization (simplified for example)
tokens = prompt.lower().split()
input_ids = [self.vocab.index(token) if token in self.vocab else 0 for token in tokens]
# Pad or truncate to expected input size
expected_shape = self.input_details[0]['shape'][1]
if len(input_ids) < expected_shape:
input_ids = input_ids + [0] * (expected_shape - len(input_ids))
else:
input_ids = input_ids[:expected_shape]
# Set input tensor
input_tensor = np.array([input_ids], dtype=np.int32)
self.interpreter.set_tensor(self.input_details[0]['index'], input_tensor)
# Run inference
self.interpreter.invoke()
# Get output
output = self.interpreter.get_tensor(self.output_details[0]['index'])
# Convert output to text (simplified)
generated_ids = np.argmax(output, axis=-1)[0]
generated_text = " ".join([self.id_to_token.get(idx, "") for idx in generated_ids])
return generated_text
def publish_result(self, result, project_id="your-project-id", topic="edge-ai-results"):
from google.cloud import pubsub_v1
# Initialize publisher client
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic)
# Publish message
data = json.dumps({"result": result}).encode("utf-8")
future = publisher.publish(topic_path, data)
return future.result() # Returns the message ID
# Example usage
def run_gcp_edge_inference():
# Initialize Edge AI
edge_ai = GCPEdgeAI()
# Generate text
prompt = "Summarize the benefits of edge AI in three points:"
result = edge_ai.generate_text(prompt)
# Publish results to Pub/Sub
message_id = edge_ai.publish_result(result)
return {
"result": result,
"message_id": message_id
}
# Main entry point
if __name__ == "__main__":
result = run_gcp_edge_inference()
print(f"Generated text: {result['result']}")
print(f"Message ID: {result['message_id']}")
# Example conversion script to prepare model for Edge TPU
"""
# convert_to_edge_tpu.py
import tensorflow as tf
def convert_model_for_edge_tpu(saved_model_dir, output_tflite_file):
# Load the SavedModel
model = tf.saved_model.load(saved_model_dir)
# Convert to TensorFlow Lite model
converter = tf.lite.TFLiteConverter.from_saved_model(saved_model_dir)
# Set optimization flags
converter.optimizations = [tf.lite.Optimize.DEFAULT]
converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8]
converter.inference_input_type = tf.int8
converter.inference_output_type = tf.int8
# Representative dataset for quantization
def representative_dataset_gen():
# Generate representative data for quantization
# This would typically be a small sample of your input data
for _ in range(100):
yield [np.random.randint(0, 1000, size=(1, 128), dtype=np.int32)]
converter.representative_dataset = representative_dataset_gen
# Convert model
tflite_model = converter.convert()
# Save model
with open(output_tflite_file, 'wb') as f:
f.write(tflite_model)
print(f"Model saved to {output_tflite_file}")
# Compile for Edge TPU
# Note: This requires the Edge TPU compiler to be installed
# https://coral.ai/docs/edgetpu/compiler/
import subprocess
result = subprocess.run(['edgetpu_compiler', output_tflite_file])
if result.returncode == 0:
print("Model successfully compiled for Edge TPU")
else:
print("Error compiling model for Edge TPU")
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description='Convert model for Edge TPU')
parser.add_argument('--model_dir', required=True, help='Path to SavedModel directory')
parser.add_argument('--output_file', required=True, help='Path to output TFLite file')
args = parser.parse_args()
convert_model_for_edge_tpu(args.model_dir, args.output_file)
"""
Azure IoT Edge Implementation:
- Module client for Azure IoT Edge runtime
- Direct method handler for remote invocation
- Telemetry output for monitoring and logging
- Includes a complete deployment manifest
# Azure IoT Edge Implementation
import json
import time
import onnxruntime as ort
import numpy as np
from azure.iot.device import IoTHubModuleClient, Message, MethodResponse
from transformers import AutoTokenizer
class AzureEdgeAI:
def __init__(self, model_path="/app/models/"):
# Initialize IoT Hub Module Client
self.client = IoTHubModuleClient.create_from_edge_environment()
# Load ONNX model optimized for edge
self.onnx_session = ort.InferenceSession(
f"{model_path}/model_optimized.onnx",
providers=["CPUExecutionProvider"]
)
# Initialize tokenizer
self.tokenizer = AutoTokenizer.from_pretrained(f"{model_path}/tokenizer")
# Register callback for direct method calls
self.client.on_method_request_received = self.method_request_handler
def method_request_handler(self, method_request):
# Handle direct method calls
if method_request.name == "GenerateText":
try:
# Parse payload
payload = json.loads(method_request.payload)
prompt = payload.get("prompt", "")
max_length = payload.get("max_length", 100)
# Generate text
result = self.generate_text(prompt, max_length)
# Send response
response_payload = {"result": result}
response = MethodResponse(method_request.request_id, 200, json.dumps(response_payload))
self.client.send_method_response(response)
# Also send telemetry
self.send_telemetry({"prompt": prompt, "result": result})
except Exception as e:
# Handle errors
error_payload = {"error": str(e)}
error_response = MethodResponse(method_request.request_id, 400, json.dumps(error_payload))
self.client.send_method_response(error_response)
else:
# Method not implemented
not_impl_response = MethodResponse(method_request.request_id, 404, "")
self.client.send_method_response(not_impl_response)
def generate_text(self, prompt, max_length=100):
# Tokenize input
inputs = self.tokenizer(prompt, return_tensors="np")
# Get input names
input_names = [input.name for input in self.onnx_session.get_inputs()]
# Prepare inputs
onnx_inputs = {name: inputs[name.split('.')[-1]] for name in input_names if name.split('.')[-1] in inputs}
# Run inference
outputs = self.onnx_session.run(None, onnx_inputs)
# Process output
generated_ids = outputs[0]
generated_text = self.tokenizer.decode(generated_ids[0], skip_special_tokens=True)
return generated_text
def send_telemetry(self, data):
# Create IoT Hub message
message = Message(json.dumps(data))
message.content_type = "application/json"
message.content_encoding = "utf-8"
# Send telemetry
self.client.send_message_to_output(message, "telemetryOutput")
# Main entry point for module
def main():
# Initialize the edge AI module
edge_ai = AzureEdgeAI()
# Keep the module running to handle method calls
try:
print("Edge AI module running. Press Ctrl-C to exit")
while True:
time.sleep(1000)
except KeyboardInterrupt:
print("Edge AI module stopped")
if __name__ == "__main__":
main()
# Example client code to invoke the module
"""
# client.py
import asyncio
from azure.iot.device import IoTHubDeviceClient
from azure.iot.device import MethodRequest
async def invoke_edge_module():
# Create device client
device_client = IoTHubDeviceClient.create_from_connection_string(
"your-device-connection-string"
)
# Connect to IoT Hub
await device_client.connect()
try:
# Invoke GenerateText method on the module
method_params = {
"methodName": "GenerateText",
"payload": {
"prompt": "Summarize the benefits of edge AI in three points:",
"max_length": 100
},
"responseTimeoutInSeconds": 30,
"connectTimeoutInSeconds": 5
}
response = await device_client.invoke_method(
"EdgeGenerativeAI", # Module ID
method_params
)
# Print response
print(f"Status: {response.status}")
print(f"Response: {response.payload}")
finally:
# Disconnect
await device_client.disconnect()
if __name__ == "__main__":
asyncio.run(invoke_edge_module())
"""
# Example deployment manifest for Azure IoT Edge
"""
{
"modulesContent": {
"$edgeAgent": {
"properties.desired": {
"modules": {
"EdgeGenerativeAI": {
"type": "docker",
"status": "running",
"restartPolicy": "always",
"settings": {
"image": "yourcontainerregistry.azurecr.io/edge-ai:1.0",
"createOptions": {
"HostConfig": {
"Binds": [
"/data/models:/app/models"
]
}
}
}
}
}
}
},
"$edgeHub": {
"properties.desired": {
"routes": {
"EdgeGenerativeAIToIoTHub": "FROM /messages/modules/EdgeGenerativeAI/outputs/telemetryOutput INTO $upstream"
}
}
},
"EdgeGenerativeAI": {
"properties.desired": {
"ModelSettings": {
"ModelPath": "/app/models/model_optimized.onnx",
"TokenizerPath": "/app/models/tokenizer"
}
}
}
}
}
"""
# Dockerfile for Azure IoT Edge module
"""
FROM python:3.9-slim
WORKDIR /app
# Install dependencies
COPY requirements.txt ./
RUN pip install -r requirements.txt
# Copy module code
COPY . .
# Create models directory
RUN mkdir -p /app/models
# Set the entry point
CMD ["python", "main.py"]
"""
# Example requirements.txt
"""
azure-iot-device==2.12.0
numpy==1.23.5
onnxruntime==1.14.1
transformers==4.28.1
"""
Self-Hosted Edge AI Implementations:
- Android mobile implementation using PyTorch Mobile
- Web service implementation using ONNX Runtime and Flask
- Detailed tokenization and inference processing
# Self-Hosted Edge AI Implementations
# 1. Python Web Service with ONNX Runtime
import onnxruntime as ort
import numpy as np
from transformers import AutoTokenizer
from flask import Flask, request, jsonify
class ONNXEdgeAI:
def __init__(self, model_path="models/model_quantized.onnx", tokenizer_path="models/tokenizer"):
# Load ONNX model
self.onnx_session = ort.InferenceSession(
model_path,
providers=["CPUExecutionProvider"]
)
# Load tokenizer
self.tokenizer = AutoTokenizer.from_pretrained(tokenizer_path)
def generate_text(self, prompt, max_length=100):
# Tokenize input
inputs = self.tokenizer(prompt, return_tensors="np")
# Get input names
input_names = [input.name for input in self.onnx_session.get_inputs()]
# Prepare inputs
onnx_inputs = {name: inputs[name.split('.')[-1]] for name in input_names if name.split('.')[-1] in inputs}
# Run inference
outputs = self.onnx_session.run(None, onnx_inputs)
# Process output
generated_ids = outputs[0]
generated_text = self.tokenizer.decode(generated_ids[0], skip_special_tokens=True)
return generated_text
# Create Flask app for web service
app = Flask(__name__)
edge_ai = ONNXEdgeAI()
@app.route('/generate', methods=['POST'])
def generate():
data = request.json
prompt = data.get('prompt', '')
max_length = data.get('max_length', 100)
try:
generated_text = edge_ai.generate_text(prompt, max_length)
return jsonify({'result': generated_text})
except Exception as e:
return jsonify({'error': str(e)}), 500
if __name__ == '__main__':
# Run the web service on the edge device
app.run(host='0.0.0.0', port=5000)
# Example Docker Compose setup for the web service
"""
version: '3'
services:
edge-ai:
build:
context: .
dockerfile: Dockerfile
ports:
- "5000:5000"
volumes:
- ./models:/app/models
restart: unless-stopped
deploy:
resources:
limits:
cpus: '2'
memory: 2G
"""
# Example Dockerfile for the web service
"""
FROM python:3.9-slim
WORKDIR /app
# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY app.py .
# Create models directory
RUN mkdir -p /app/models
# Expose port
EXPOSE 5000
# Run the application
CMD ["python", "app.py"]
"""
# 2. Android Mobile Implementation (Kotlin)
"""
// MainActivity.kt
package com.example.edgeai
import android.os.Bundle
import android.widget.Button
import android.widget.EditText
import android.widget.TextView
import androidx.appcompat.app.AppCompatActivity
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext
import org.pytorch.LiteModuleLoader
import org.pytorch.Module
import org.pytorch.Tensor
import java.io.File
import java.io.FileOutputStream
import java.nio.ByteBuffer
class MainActivity : AppCompatActivity() {
private lateinit var module: Module
private lateinit var tokenizer: BertTokenizer
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
// Initialize model and tokenizer
CoroutineScope(Dispatchers.IO).launch {
module = loadModel()
tokenizer = BertTokenizer(loadVocab())
withContext(Dispatchers.Main) {
findViewById
Cost Comparison for Edge AI Solutions:
Cloud Provider | Component | Service | Cost Structure | Est. Monthly Cost per Device |
---|---|---|---|---|
AWS | Edge Runtime | IoT Greengrass | $0.16/device/month + data transfer | $0.16-5.00 |
Model Compilation | SageMaker Neo | Free for compilation | $0 | |
Data Transfer | IoT Core | $1.00/million messages | $1.00-10.00 | |
GCP | Edge Hardware | Coral Dev Board | $129.99 one-time | $5.00-8.00 (amortized) |
Edge Runtime | Edge TPU Runtime | Free | $0 | |
Data Transfer | IoT Core | $0.40/million messages | $0.40-4.00 | |
Azure | Edge Runtime | IoT Edge | Free runtime, $0.15/device/month connection | $0.15-5.00 |
Model Optimization | Azure ML | Included with workspace | $0 | |
Data Transfer | IoT Hub | $0.40/million messages | $0.40-4.00 | |
Self-hosted | Mobile Integration | PyTorch Mobile/ONNX Runtime | Free libraries | $0 |
App Distribution | App Store fees | Variable | $0-25.00 |
6. Synthetic Data Generation and Augmentation
Generative AI is increasingly being used to create synthetic datasets for training and augmenting existing data, particularly in domains where real data is scarce, sensitive, or expensive to collect.





Synthetic Data Generation Examples
# AWS Bedrock Implementation for Synthetic Data Generation
import boto3
import json
import pandas as pd
import numpy as np
import time
def aws_synthetic_data_generation(real_data_sample, num_synthetic_samples=100):
"""
Generate synthetic tabular data using AWS Bedrock
Args:
real_data_sample: A small sample of real data to guide the generation
num_synthetic_samples: Number of synthetic samples to generate
Returns:
DataFrame of synthetic data
"""
# Initialize Bedrock client
bedrock_runtime = boto3.client(
service_name="bedrock-runtime",
region_name="us-east-1"
)
# Convert sample to string representation
sample_str = real_data_sample.head(5).to_string(index=False)
# Create prompt for synthetic data generation
prompt = f"""I need to generate synthetic tabular data that resembles the following real data sample:
{sample_str}
Please generate {num_synthetic_samples} rows of synthetic data that:
1. Maintains the same column names and data types
2. Preserves the statistical distributions of the original data
3. Maintains correlations between columns
4. Ensures the data is realistic and does not contain identifiable information
Output the data in CSV format only, with no additional explanation.
"""
# Create request payload
request_body = {
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": 4000,
"messages": [
{
"role": "user",
"content": prompt
}
]
}
# Invoke Claude model
response = bedrock_runtime.invoke_model(
modelId="anthropic.claude-3-sonnet-20240229-v1:0",
body=json.dumps(request_body)
)
# Parse response
response_body = json.loads(response.get("body").read())
synthetic_data_text = response_body["content"][0]["text"]
# Convert text to DataFrame
# This assumes the response is in CSV format
from io import StringIO
synthetic_df = pd.read_csv(StringIO(synthetic_data_text))
# Validate and correct data types to match the original
for col in real_data_sample.columns:
if col in synthetic_df.columns:
synthetic_df[col] = synthetic_df[col].astype(real_data_sample[col].dtype)
return synthetic_df
# AWS SageMaker Data Wrangler for Tabular Data Generation
def aws_tabular_synthetic_data():
import sagemaker
from sagemaker import get_execution_role
from sagemaker.session import Session
# Initialize SageMaker session
role = get_execution_role()
session = sagemaker.Session()
# Create processing job for synthetic data generation
from sagemaker.processing import ProcessingInput, ProcessingOutput, ScriptProcessor
# Create script processor
script_processor = ScriptProcessor(
command=['python3'],
image_uri='your-container-uri', # Custom container with data generation libraries
role=role,
instance_count=1,
instance_type='ml.m5.xlarge',
sagemaker_session=session
)
# Run processing job
script_processor.run(
code='generate_synthetic_data.py',
inputs=[
ProcessingInput(
source='s3://your-bucket/real-data.csv',
destination='/opt/ml/processing/input'
)
],
outputs=[
ProcessingOutput(
source='/opt/ml/processing/output',
destination='s3://your-bucket/synthetic-data'
)
],
arguments=[
'--num-samples', '1000',
'--output-file', '/opt/ml/processing/output/synthetic_data.csv',
'--method', 'ctgan' # Conditional Tabular GAN
]
)
# Example synthetic data generation script
"""
# generate_synthetic_data.py
import argparse
import pandas as pd
import numpy as np
from sdv.tabular import CTGAN
def main():
parser = argparse.ArgumentParser()
parser.add_argument('--num-samples', type=int, default=1000)
parser.add_argument('--output-file', type=str, required=True)
parser.add_argument('--method', type=str, default='ctgan')
args = parser.parse_args()
# Load real data
df = pd.read_csv('/opt/ml/processing/input/real-data.csv')
# Train synthetic data model
if args.method == 'ctgan':
model = CTGAN()
model.fit(df)
# Generate synthetic data
synthetic_data = model.sample(args.num_samples)
# Save to output location
synthetic_data.to_csv(args.output_file, index=False)
# Add other methods as needed
if __name__ == '__main__':
main()
"""
return {
"job_name": script_processor._current_job_name,
"output_path": "s3://your-bucket/synthetic-data"
}
# GCP Vertex AI Implementation for Synthetic Data
from google.cloud import aiplatform
from vertexai.preview.generative_models import GenerativeModel
import pandas as pd
import numpy as np
import json
def gcp_synthetic_data_generation(real_data_sample, num_synthetic_samples=100):
"""
Generate synthetic tabular data using Google Vertex AI
"""
# Initialize Vertex AI
aiplatform.init(project="your-project-id", location="us-central1")
# Load Gemini model
model = GenerativeModel("gemini-pro")
# Convert sample to string representation
sample_str = real_data_sample.head(5).to_string(index=False)
# Create prompt for synthetic data generation
prompt = f"""I need to generate synthetic tabular data that resembles the following real data sample:
{sample_str}
Please generate {num_synthetic_samples} rows of synthetic data that:
1. Maintains the same column names and data types
2. Preserves the statistical distributions of the original data
3. Maintains correlations between columns
4. Ensures the data is realistic and does not contain identifiable information
Output the data in CSV format only, with no additional explanation.
"""
# Generate synthetic data
response = model.generate_content(prompt)
synthetic_data_text = response.text
# Convert text to DataFrame
from io import StringIO
synthetic_df = pd.read_csv(StringIO(synthetic_data_text))
# Validate and correct data types to match the original
for col in real_data_sample.columns:
if col in synthetic_df.columns:
synthetic_df[col] = synthetic_df[col].astype(real_data_sample[col].dtype)
return synthetic_df
# GCP Dataflow for Large-Scale Synthetic Data Generation
def gcp_dataflow_synthetic_data():
"""
Create a Dataflow pipeline for large-scale synthetic data generation
"""
# Example Dataflow pipeline code
"""
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
import numpy as np
import pandas as pd
import json
import random
from typing import Dict, List, Tuple
class GenerateSyntheticDataFn(beam.DoFn):
def __init__(self, schema, distributions):
self.schema = schema
self.distributions = distributions
def process(self, element):
# Generate one synthetic record based on schema and distributions
record = {}
for col, col_info in self.schema.items():
data_type = col_info['type']
if data_type == 'categorical':
values = self.distributions[col]['values']
probabilities = self.distributions[col]['probabilities']
record[col] = np.random.choice(values, p=probabilities)
elif data_type == 'numerical':
mean = self.distributions[col]['mean']
std = self.distributions[col]['std']
record[col] = float(np.random.normal(mean, std))
elif data_type == 'datetime':
start_date = self.distributions[col]['start']
end_date = self.distributions[col]['end']
# Generate random date between start and end
delta = end_date - start_date
random_days = random.randint(0, delta.days)
record[col] = (start_date + datetime.timedelta(days=random_days)).isoformat()
yield record
def run():
# Example schema and distributions derived from real data
schema = {
'age': {'type': 'numerical'},
'income': {'type': 'numerical'},
'category': {'type': 'categorical'},
'signup_date': {'type': 'datetime'}
}
distributions = {
'age': {'mean': 35.2, 'std': 12.5},
'income': {'mean': 68000, 'std': 25000},
'category': {
'values': ['A', 'B', 'C', 'D'],
'probabilities': [0.3, 0.4, 0.2, 0.1]
},
'signup_date': {
'start': datetime.date(2020, 1, 1),
'end': datetime.date(2023, 12, 31)
}
}
# Define pipeline options
options = PipelineOptions(
runner='DataflowRunner',
project='your-project-id',
region='us-central1',
temp_location='gs://your-bucket/temp',
job_name='synthetic-data-generation'
)
# Create the pipeline
with beam.Pipeline(options=options) as p:
# Generate synthetic records
synthetic_data = (
p
| 'Create Records' >> beam.Create(range(1000000)) # Number of records to generate
| 'Generate Data' >> beam.ParDo(GenerateSyntheticDataFn(schema, distributions))
| 'Convert to JSON' >> beam.Map(json.dumps)
| 'Write to GCS' >> beam.io.WriteToText('gs://your-bucket/synthetic-data/output')
)
if __name__ == '__main__':
run()
"""
return "Dataflow Synthetic Data Pipeline (see commented code for implementation)"
# Azure OpenAI Implementation for Synthetic Data
import os
import json
import pandas as pd
import numpy as np
from openai import AzureOpenAI
def azure_synthetic_data_generation(real_data_sample, num_synthetic_samples=100):
"""
Generate synthetic tabular data using Azure OpenAI
"""
# Initialize Azure OpenAI client
client = AzureOpenAI(
api_key=os.getenv("AZURE_OPENAI_API_KEY"),
api_version="2023-12-01-preview",
azure_endpoint=os.getenv("AZURE_OPENAI_ENDPOINT")
)
# Convert sample to string representation
sample_str = real_data_sample.head(5).to_string(index=False)
# Create prompt for synthetic data generation
prompt = f"""I need to generate synthetic tabular data that resembles the following real data sample:
{sample_str}
Please generate {num_synthetic_samples} rows of synthetic data that:
1. Maintains the same column names and data types
2. Preserves the statistical distributions of the original data
3. Maintains correlations between columns
4. Ensures the data is realistic and does not contain identifiable information
Output the data in CSV format only, with no additional explanation.
"""
# Generate synthetic data using GPT-4
response = client.chat.completions.create(
model="gpt-4",
messages=[
{"role": "system", "content": "You are a data generation assistant that creates realistic synthetic data based on real data samples."},
{"role": "user", "content": prompt}
],
temperature=0.7,
max_tokens=4000
)
# Extract synthetic data text
synthetic_data_text = response.choices[0].message.content
# Convert text to DataFrame
from io import StringIO
synthetic_df = pd.read_csv(StringIO(synthetic_data_text))
# Validate and correct data types to match the original
for col in real_data_sample.columns:
if col in synthetic_df.columns:
synthetic_df[col] = synthetic_df[col].astype(real_data_sample[col].dtype)
return synthetic_df
# Azure Synapse Analytics for Large-Scale Synthetic Data
def azure_synapse_synthetic_data():
"""
Create an Azure Synapse Analytics pipeline for large-scale synthetic data generation
"""
# This would typically be implemented using the Azure SDK or REST API
# Below is a conceptual implementation
# Example Azure Synapse Notebook (PySpark) code
"""
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import numpy as np
import random
# Initialize Spark session
spark = SparkSession.builder.appName("SyntheticDataGeneration").getOrCreate()
# Define schema based on real data
schema = StructType([
StructField("customer_id", StringType(), False),
StructField("age", IntegerType(), True),
StructField("income", DoubleType(), True),
StructField("category", StringType(), True),
StructField("purchase_date", DateType(), True)
])
# Create UDF for random data generation
def generate_random_customer():
# Generate random customer ID
customer_id = f"CUST-{random.randint(10000, 99999)}"
# Generate age based on normal distribution
age = max(18, min(90, int(np.random.normal(35, 10))))
# Generate income based on normal distribution
income = max(20000, np.random.normal(65000, 25000))
# Generate category based on distribution
categories = ["A", "B", "C", "D"]
probabilities = [0.3, 0.4, 0.2, 0.1]
category = np.random.choice(categories, p=probabilities)
# Generate purchase date
days_ago = random.randint(0, 365*3) # Last 3 years
purchase_date = (current_date() - F.expr(f"INTERVAL {days_ago} days")).cast(DateType())
return (customer_id, age, income, category, purchase_date)
# Create empty DataFrame with schema
empty_df = spark.createDataFrame([], schema)
# Generate 1 million records
num_partitions = 100
records_per_partition = 10000
synthetic_df = (
spark.range(0, records_per_partition * num_partitions, 1, num_partitions)
.rdd
.map(lambda x: generate_random_customer())
.toDF(schema)
)
# Write to Azure Data Lake Storage
synthetic_df.write.mode("overwrite").parquet("abfss://container@account.dfs.core.windows.net/synthetic-data/")
"""
return "Azure Synapse Synthetic Data Pipeline (see commented code for implementation)"
# Self-Hosted/Open Source Synthetic Data Generation
import pandas as pd
import numpy as np
from sdv.tabular import CTGAN, GaussianCopula
from sdv.evaluation.single_table import evaluate_quality
def open_source_synthetic_data_generation(real_data, num_synthetic_samples=1000, method="ctgan"):
"""
Generate synthetic tabular data using open-source libraries
Args:
real_data: DataFrame containing real data
num_synthetic_samples: Number of synthetic samples to generate
method: Generation method ('ctgan' or 'copula')
Returns:
DataFrame of synthetic data and quality metrics
"""
# Select synthetic data generation model
if method == "ctgan":
# Conditional Tabular GAN
model = CTGAN()
else:
# Gaussian Copula model
model = GaussianCopula()
# Fit model to real data
model.fit(real_data)
# Generate synthetic data
synthetic_data = model.sample(num_synthetic_samples)
# Evaluate quality of synthetic data
quality_report = evaluate_quality(
real_data,
synthetic_data,
aggregate=True
)
return {
"synthetic_data": synthetic_data,
"quality_metrics": quality_report
}
# Example of image generation for training data augmentation
import tensorflow as tf
from tensorflow.keras.preprocessing.image import ImageDataGenerator
import numpy as np
import matplotlib.pyplot as plt
def image_data_augmentation(images, labels, augmentation_factor=5):
"""
Augment image training data using traditional techniques
"""
# Create data generator with augmentation
datagen = ImageDataGenerator(
rotation_range=20,
width_shift_range=0.2,
height_shift_range=0.2,
shear_range=0.2,
zoom_range=0.2,
horizontal_flip=True,
fill_mode='nearest'
)
# Prepare augmented dataset containers
augmented_images = []
augmented_labels = []
# Generate augmented images
for i in range(len(images)):
image = images[i]
label = labels[i]
# Convert to numpy array with batch dimension
image = np.expand_dims(image, 0)
# Generate augmented images
aug_iter = datagen.flow(image, batch_size=1)
# Add original image
augmented_images.append(image[0])
augmented_labels.append(label)
# Add augmented versions
for j in range(augmentation_factor - 1):
aug_image = aug_iter.next()[0]
augmented_images.append(aug_image)
augmented_labels.append(label)
# Convert to numpy arrays
augmented_images = np.array(augmented_images)
augmented_labels = np.array(augmented_labels)
return augmented_images, augmented_labels
# Advanced text data augmentation with NLP techniques
import nltk
from nltk.corpus import wordnet
import random
import spacy
def text_data_augmentation(texts, labels, augmentation_factor=3):
"""
Augment text training data using NLP techniques
"""
# Download necessary NLTK data
nltk.download('wordnet')
nltk.download('punkt')
# Load SpaCy model
nlp = spacy.load("en_core_web_sm")
augmented_texts = []
augmented_labels = []
# Add original data
augmented_texts.extend(texts)
augmented_labels.extend(labels)
for i, text in enumerate(texts):
label = labels[i]
# Parse with SpaCy
doc = nlp(text)
# Create augmented versions
for _ in range(augmentation_factor - 1):
# Choose augmentation technique randomly
technique = random.choice(["synonym", "deletion", "swap"])
if technique == "synonym":
# Replace some words with synonyms
new_text = synonym_replacement(text)
elif technique == "deletion":
# Randomly delete some words
new_text = random_deletion(text)
else:
# Randomly swap words
new_text = random_swap(text)
augmented_texts.append(new_text)
augmented_labels.append(label)
return augmented_texts, augmented_labels
def synonym_replacement(text, n=1):
"""Replace n words in the text with synonyms"""
words = nltk.word_tokenize(text)
new_words = words.copy()
random_word_indexes = random.sample(range(len(words)), min(n, len(words)))
for idx in random_word_indexes:
word = words[idx]
synonyms = get_synonyms(word)
if synonyms:
new_words[idx] = random.choice(synonyms)
return " ".join(new_words)
def get_synonyms(word):
"""Get synonyms of a word"""
synonyms = []
for syn in wordnet.synsets(word):
for lemma in syn.lemmas():
synonyms.append(lemma.name())
return list(set(synonyms))
def random_deletion(text, p=0.1):
"""Randomly delete words from text with probability p"""
words = nltk.word_tokenize(text)
if len(words) == 1:
return text
new_words = []
for word in words:
if random.random() > p:
new_words.append(word)
if len(new_words) == 0:
return words[0]
return " ".join(new_words)
def random_swap(text, n=1):
"""Randomly swap n pairs of words in the text"""
words = nltk.word_tokenize(text)
new_words = words.copy()
for _ in range(min(n, len(words)//2)):
idx1, idx2 = random.sample(range(len(new_words)), 2)
new_words[idx1], new_words[idx2] = new_words[idx2], new_words[idx1]
return " ".join(new_words)
Cost Comparison for Synthetic Data Solutions:
Cloud Provider | Service | Use Case | Pricing Model | Est. Monthly Cost (Medium Scale) |
---|---|---|---|---|
AWS | Bedrock (Claude) | Text-based generation | $15/1M input tokens, $60/1M output tokens | $300-800 |
SageMaker | Custom synthetic models | Instance costs: $0.92/hr (ml.m5.xlarge) | $150-700 | |
S3 | Storage | $0.023/GB | $5-50 | |
GCP | Vertex AI (Gemini) | Text-based generation | $0.0025/1K input tokens, $0.00375/1K output tokens | $250-600 |
Dataflow | Large-scale processing | $0.06/hour per vCPU, $0.01/hour per GB memory | $200-800 | |
Cloud Storage | Storage | $0.020/GB | $5-50 | |
Azure | Azure OpenAI (GPT-4) | Text-based generation | $0.03/1K input tokens, $0.06/1K output tokens | $400-1,000 |
Synapse Analytics | Large-scale processing | $5-500/DWU/hour | $200-1,000 | |
Blob Storage | Storage | $0.0184/GB | $5-50 | |
Self-hosted | Open Source (SDV) | Tabular data | Server costs + maintenance | $100-300 |
7. Agent-Based Architectures
AI agents that can autonomously complete complex tasks by planning, reasoning, and interacting with external systems are becoming increasingly sophisticated.

AI Agent Implementation Examples
# AWS Bedrock Agents Implementation
import boto3
import json
import time
def create_aws_bedrock_agent():
# Initialize Bedrock client
bedrock = boto3.client(
service_name="bedrock-agent",
region_name="us-east-1"
)
# Step 1: Create an action group (tool)
action_group_response = bedrock.create_agent_action_group(
agentId="your-agent-id",
actionGroupName="WeatherTool",
apiSchema={
"openapi": "3.0.0",
"info": {
"title": "Weather API",
"version": "1.0.0"
},
"paths": {
"/getWeather": {
"get": {
"operationId": "getWeather",
"summary": "Get weather for a location",
"parameters": [
{
"name": "location",
"in": "query",
"description": "City and state or country",
"required": True,
"schema": {"type": "string"}
}
],
"responses": {
"200": {
"description": "Weather information",
"content": {
"application/json": {
"schema": {
"type": "object",
"properties": {
"temperature": {"type": "number"},
"condition": {"type": "string"},
"humidity": {"type": "number"}
}
}
}
}
}
}
}
}
}
},
description="Tool to get weather information for a location",
actionGroupExecutor={
"lambda": {
"lambdaArn": "arn:aws:lambda:us-east-1:123456789012:function:weather-function"
}
}
)
# Step 2: Create a knowledge base for the agent
knowledge_base_response = bedrock.create_knowledge_base(
name="TravelKnowledgeBase",
description="Knowledge base with travel information",
roleArn="arn:aws:iam::123456789012:role/BedrockKnowledgeBaseRole",
knowledgeBaseConfiguration={
"type": "VECTOR",
"vectorKnowledgeBaseConfiguration": {
"embeddingModelArn": "arn:aws:bedrock:us-east-1::foundation-model/amazon.titan-embed-text-v1"
}
},
storageConfiguration={
"type": "OPENSEARCH_SERVERLESS",
"opensearchServerlessConfiguration": {
"collectionArn": "arn:aws:aoss:us-east-1:123456789012:collection/my-collection"
}
}
)
# Step 3: Create the agent
agent_response = bedrock.create_agent(
agentName="TravelAssistant",
description="An assistant that helps with travel planning",
instruction="You are a travel assistant. Help users plan trips by providing weather information and travel recommendations. Use the provided knowledge base for travel information and the weather tool to get current weather data.",
foundationModel="anthropic.claude-v2",
idleSessionTTLInSeconds=1800, # 30 minutes
customerEncryptionKeyArn="arn:aws:kms:us-east-1:123456789012:key/your-key-id",
agentResourceRoleArn="arn:aws:iam::123456789012:role/BedrockAgentRole"
)
# Step 4: Associate knowledge base with agent
kb_association_response = bedrock.associate_agent_knowledge_base(
agentId=agent_response["agentId"],
agentVersion="DRAFT",
knowledgeBaseId=knowledge_base_response["knowledgeBaseId"],
description="Travel information knowledge base"
)
# Step 5: Prepare agent for deployment
prepare_response = bedrock.prepare_agent(
agentId=agent_response["agentId"],
agentVersion="DRAFT"
)
# Check preparation status
waiter_config = {
"delay": 5,
"maxAttempts": 60
}
# Wait for preparation to complete
while True:
status_response = bedrock.get_agent(
agentId=agent_response["agentId"],
agentVersion="DRAFT"
)
if status_response["agentStatus"] == "PREPARED":
break
if status_response["agentStatus"] == "FAILED":
raise Exception("Agent preparation failed")
time.sleep(waiter_config["delay"])
waiter_config["maxAttempts"] -= 1
if waiter_config["maxAttempts"] <= 0:
raise Exception("Timed out waiting for agent preparation")
# Step 6: Create alias for the agent
alias_response = bedrock.create_agent_alias(
agentId=agent_response["agentId"],
agentAliasName="prod",
description="Production version of the travel assistant",
routingConfiguration=[
{
"agentVersion": "DRAFT"
}
]
)
return {
"agent_id": agent_response["agentId"],
"alias_id": alias_response["agentAliasId"]
}
# AWS Lambda function for the weather tool
def lambda_handler(event, context):
"""
AWS Lambda function that handles weather API requests from Bedrock agent
"""
# Extract parameters from the request
request_body = json.loads(event["body"])
action_group = request_body.get("actionGroup")
api_path = request_body.get("apiPath")
parameters = request_body.get("parameters", [])
# Process request based on API path
if api_path == "/getWeather":
# Extract location parameter
location = None
for param in parameters:
if param["name"] == "location":
location = param["value"]
break
if not location:
return {
"statusCode": 400,
"body": json.dumps({
"error": "Location parameter is required"
})
}
# In a real implementation, you would call a weather API here
# For this example, we'll return mock data
weather_data = {
"temperature": 22.5,
"condition": "Partly Cloudy",
"humidity": 65
}
return {
"statusCode": 200,
"body": json.dumps(weather_data)
}
return {
"statusCode": 404,
"body": json.dumps({
"error": "Not found"
})
}
# GCP Vertex AI Agents Implementation
from google.cloud import aiplatform
from google.protobuf import json_format
from google.protobuf.struct_pb2 import Struct
import vertexai
from vertexai.preview.generative_models import GenerativeModel
from vertexai.preview.generative_models import Tool, FunctionDeclaration, ToolConfig
def create_gcp_vertex_ai_agent():
# Initialize Vertex AI
vertexai.init(project="your-project-id", location="us-central1")
# Step 1: Define tools (function declarations)
weather_tool = Tool(
function_declarations=[
FunctionDeclaration(
name="get_weather",
description="Get the current weather for a location",
parameters=Struct(
fields={
"location": json_format.ParseDict(
{"type": "string", "description": "The city and state or country"},
Struct()
)
}
),
response=Struct(
fields={
"temperature": json_format.ParseDict(
{"type": "number", "description": "Temperature in Celsius"},
Struct()
),
"condition": json_format.ParseDict(
{"type": "string", "description": "Weather condition description"},
Struct()
),
"humidity": json_format.ParseDict(
{"type": "number", "description": "Humidity percentage"},
Struct()
)
}
)
)
]
)
# Step 2: Load Gemini model with tools
model = GenerativeModel(
model_name="gemini-pro",
tools=[weather_tool],
tool_config=ToolConfig(
function_calling_config={"mode": "AUTO"}
)
)
# Function to handle tool calls
def handle_tool_calls(tool_calls):
results = []
for tool_call in tool_calls:
function_name = tool_call.function_name
function_args = tool_call.function_args
if function_name == "get_weather":
location = function_args.get("location", "")
# In a real implementation, you would call a weather API here
# For this example, we'll return mock data
weather_result = {
"temperature": 22.5,
"condition": "Partly Cloudy",
"humidity": 65
}
results.append(weather_result)
return results
# Step 3: Create a prediction function for the agent
def predict(prompt):
response = model.generate_content(
prompt,
generation_config={
"max_output_tokens": 1024,
"temperature": 0.4
}
)
if hasattr(response, "candidates") and response.candidates:
candidate = response.candidates[0]
if hasattr(candidate, "content") and candidate.content:
content = candidate.content
# Check if there are any tool calls to process
if hasattr(content, "parts"):
for part in content.parts:
if hasattr(part, "function_call"):
# Handle tool calls
tool_results = handle_tool_calls([part.function_call])
# Continue the conversation with tool results
follow_up_response = model.generate_content(
[
{"role": "user", "parts": [prompt]},
{"role": "model", "parts": [part]},
{"role": "tool", "parts": [{"function_response": tool_results[0]}]}
]
)
return follow_up_response.text
return response.text
return "I couldn't process your request."
# Step 4: Deploy the agent (in a real implementation, this would be a more complex setup)
# For example, creating a Cloud Function or Cloud Run service that uses this prediction function
return {
"agent_type": "vertex_ai_gemini",
"prediction_function": predict,
"model": model
}
# GCP Cloud Function to serve the agent
"""
from flask import Flask, request, jsonify
import vertexai
from vertexai.preview.generative_models import GenerativeModel
import json
import functions_framework
app = Flask(__name__)
# Initialize Vertex AI
vertexai.init(project="your-project-id", location="us-central1")
# Create tools and model as defined above
# ... (tool and model definitions) ...
# Function to handle tool calls
def handle_tool_calls(tool_calls):
# ... (as defined above) ...
pass
@functions_framework.http
def agent_endpoint(request):
# Parse request
request_json = request.get_json(silent=True)
if not request_json or 'prompt' not in request_json:
return jsonify({'error': 'No prompt provided'}), 400
prompt = request_json['prompt']
# Generate response using the agent
try:
response = model.generate_content(prompt)
# Process any tool calls in the response
# ... (tool call processing logic) ...
return jsonify({'response': response.text})
except Exception as e:
return jsonify({'error': str(e)}), 500
"""
# Azure OpenAI Assistants Implementation
import os
import json
import time
from openai import AzureOpenAI
def create_azure_openai_assistant():
# Initialize Azure OpenAI client
client = AzureOpenAI(
api_key=os.getenv("AZURE_OPENAI_API_KEY"),
api_version="2023-12-01-preview",
azure_endpoint=os.getenv("AZURE_OPENAI_ENDPOINT")
)
# Step 1: Define functions/tools
tools = [
{
"type": "function",
"function": {
"name": "get_weather",
"description": "Get the current weather for a location",
"parameters": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "The city and state or country"
}
},
"required": ["location"]
}
}
}
]
# Step 2: Create a file with travel information for the assistant
with open("travel_info.txt", "w") as f:
f.write("""
# Travel Guide Information
## Popular Destinations
### Paris, France
- Best time to visit: April to June, September to October
- Famous for: Eiffel Tower, Louvre Museum, Notre Dame Cathedral
- Local cuisine: Croissants, Escargot, Coq au Vin
### Tokyo, Japan
- Best time to visit: March to May, September to November
- Famous for: Cherry blossoms, Shibuya Crossing, Tokyo Skytree
- Local cuisine: Sushi, Ramen, Tempura
## Travel Tips
- Always check visa requirements before booking flights
- Purchase travel insurance for international trips
- Inform your bank of travel plans to avoid card issues
- Make copies of important documents
""")
# Upload the file
file_response = client.files.create(
file=open("travel_info.txt", "rb"),
purpose="assistants"
)
# Step 3: Create the assistant
assistant = client.beta.assistants.create(
name="Travel Assistant",
description="An assistant that helps with travel planning",
instructions="You are a travel assistant. Help users plan trips by providing weather information and travel recommendations. Use the provided knowledge base for travel information and the weather tool to get current weather data.",
model="gpt-4",
tools=tools,
file_ids=[file_response.id]
)
# Step 4: Create a function to handle the assistant's tool calls
def handle_azure_tool_calls(tool_calls):
tool_results = []
for tool_call in tool_calls:
function_name = tool_call.function.name
function_args = json.loads(tool_call.function.arguments)
if function_name == "get_weather":
location = function_args.get("location", "")
# In a real implementation, you would call a weather API here
# For this example, we'll return mock data
weather_result = {
"temperature": 22.5,
"condition": "Partly Cloudy",
"humidity": 65
}
tool_results.append({
"tool_call_id": tool_call.id,
"output": json.dumps(weather_result)
})
return tool_results
# Step 5: Create a conversation helper function
def converse_with_assistant(prompt):
# Create a thread
thread = client.beta.threads.create()
# Add a message to the thread
client.beta.threads.messages.create(
thread_id=thread.id,
role="user",
content=prompt
)
# Run the assistant on the thread
run = client.beta.threads.runs.create(
thread_id=thread.id,
assistant_id=assistant.id
)
# Wait for the run to complete
while True:
run_status = client.beta.threads.runs.retrieve(
thread_id=thread.id,
run_id=run.id
)
if run_status.status == "completed":
break
if run_status.status == "requires_action":
# Handle tool calls
tool_calls = run_status.required_action.submit_tool_outputs.tool_calls
tool_outputs = handle_azure_tool_calls(tool_calls)
# Submit tool outputs
client.beta.threads.runs.submit_tool_outputs(
thread_id=thread.id,
run_id=run.id,
tool_outputs=tool_outputs
)
if run_status.status in ["failed", "cancelled", "expired"]:
raise Exception(f"Run failed with status: {run_status.status}")
time.sleep(1)
# Get messages from the thread
messages = client.beta.threads.messages.list(
thread_id=thread.id
)
# Return the assistant's response
for message in messages.data:
if message.role == "assistant":
return message.content[0].text.value
return "No response from assistant."
return {
"assistant_id": assistant.id,
"converse_function": converse_with_assistant
}
# Azure Function App for the Assistant
"""
import logging
import azure.functions as func
import json
import os
from openai import AzureOpenAI
# Initialize Azure OpenAI client
client = AzureOpenAI(
api_key=os.getenv("AZURE_OPENAI_API_KEY"),
api_version="2023-12-01-preview",
azure_endpoint=os.getenv("AZURE_OPENAI_ENDPOINT")
)
# Get assistant ID from environment variable
ASSISTANT_ID = os.getenv("ASSISTANT_ID")
# Function to handle tool calls
def handle_tool_calls(tool_calls):
# ... (as defined above) ...
pass
app = func.FunctionApp()
@app.route(route="travelbot", auth_level=func.AuthLevel.FUNCTION)
def travelbot(req: func.HttpRequest) -> func.HttpResponse:
logging.info('Python HTTP trigger function processed a request.')
try:
req_body = req.get_json()
prompt = req_body.get('prompt')
if not prompt:
return func.HttpResponse(
json.dumps({"error": "No prompt provided"}),
mimetype="application/json",
status_code=400
)
# Create a thread
thread = client.beta.threads.create()
# Add a message to the thread
client.beta.threads.messages.create(
thread_id=thread.id,
role="user",
content=prompt
)
# Run the assistant on the thread
run = client.beta.threads.runs.create(
thread_id=thread.id,
assistant_id=ASSISTANT_ID
)
# Wait for the run to complete (with tool call handling)
# ... (run completion logic) ...
# Get messages from the thread
messages = client.beta.threads.messages.list(
thread_id=thread.id
)
# Return the assistant's response
for message in messages.data:
if message.role == "assistant":
return func.HttpResponse(
json.dumps({"response": message.content[0].text.value}),
mimetype="application/json"
)
return func.HttpResponse(
json.dumps({"error": "No response from assistant"}),
mimetype="application/json",
status_code=500
)
except Exception as e:
return func.HttpResponse(
json.dumps({"error": str(e)}),
mimetype="application/json",
status_code=500
)
"""
# LangChain Agent Implementation (Cloud-Independent)
import os
from langchain.agents import initialize_agent, Tool
from langchain.agents import AgentType
from langchain.chat_models import ChatOpenAI
from langchain.memory import ConversationBufferMemory
from langchain.prompts import MessagesPlaceholder
from langchain.tools import BaseTool
from langchain.utilities import SerpAPIWrapper
from langchain.chains import RetrievalQA
from langchain.vectorstores import Chroma
from langchain.embeddings import HuggingFaceEmbeddings
from typing import Dict, List, Any, Optional
class WeatherTool(BaseTool):
name = "weather_tool"
description = "Get the current weather for a location"
def _run(self, location: str) -> Dict[str, Any]:
"""Get weather data for the specified location"""
# In a real implementation, you would call a weather API here
# For this example, we'll return mock data
return {
"temperature": 22.5,
"condition": "Partly Cloudy",
"humidity": 65
}
def _arun(self, location: str) -> Dict[str, Any]:
"""Get weather data asynchronously"""
return self._run(location)
def create_langchain_agent():
# Step 1: Initialize tools
# Weather tool
weather_tool = WeatherTool()
# Search tool
search = SerpAPIWrapper(serpapi_api_key=os.getenv("SERPAPI_API_KEY"))
search_tool = Tool(
name="search",
func=search.run,
description="Useful for answering questions about current events or the current state of the world"
)
# Knowledge base tool using vector store
embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2")
# Load or create vector store (this assumes you've already populated it)
vector_store = Chroma(
persist_directory="./chroma_db",
embedding_function=embeddings
)
retriever = vector_store.as_retriever(search_kwargs={"k": 5})
# Create QA chain for travel information
qa = RetrievalQA.from_chain_type(
llm=ChatOpenAI(temperature=0, model="gpt-4"),
chain_type="stuff",
retriever=retriever
)
travel_kb_tool = Tool(
name="travel_knowledge",
func=qa.run,
description="Useful for answering questions about travel destinations, tips, and recommendations."
)
tools = [weather_tool, search_tool, travel_kb_tool]
# Step 2: Set up memory
agent_kwargs = {
"extra_prompt_messages": [MessagesPlaceholder(variable_name="memory")],
}
memory = ConversationBufferMemory(memory_key="memory", return_messages=True)
# Step 3: Initialize the agent
llm = ChatOpenAI(temperature=0, model="gpt-4")
agent = initialize_agent(
tools,
llm,
agent=AgentType.OPENAI_FUNCTIONS,
verbose=True,
agent_kwargs=agent_kwargs,
memory=memory
)
return {
"agent": agent,
"tools": tools,
"memory": memory
}
# Example usage of the LangChain agent
def process_query_with_langchain(agent, query):
try:
response = agent.run(query)
return response
except Exception as e:
return f"Error processing your request: {str(e)}"
Cost Comparison for AI Agent Solutions:
Cloud Provider | Service | Component | Cost Structure | Est. Monthly Cost (Medium Usage) |
---|---|---|---|---|
AWS | Bedrock Agents | Base Agent | $0.00069/second of processing | $300-800 |
Bedrock | Claude/LLM | $15/1M input tokens, $60/1M output tokens | $500-1,500 | |
Lambda | Tool Execution | $0.20/million invocations + compute time | $5-50 | |
Knowledge Base | Vector Store | $0.10/GB/month + query costs | $20-200 | |
GCP | Vertex AI | Agent Framework | Part of model cost | Included |
Vertex AI | LLM (Gemini Pro) | $0.0025/1K input tokens, $0.00375/1K output tokens | $400-1,200 | |
Cloud Functions | Tool Execution | $0.40/million invocations + compute time | $5-50 | |
Vector Search | Knowledge Base | $0.45/1K queries | $45-100 | |
Azure | OpenAI Assistants | Base Assistant | $0.002/1K input tokens, $0.002/1K output tokens | $200-600 |
OpenAI | GPT-4 | $0.03/1K input tokens, $0.06/1K output tokens | $500-1,500 | |
Azure Functions | Tool Execution | $0.20/million executions + compute time | $5-50 | |
Azure AI Search | Knowledge Base | $100/search unit | $100-200 | |
Self-hosted | LangChain/LlamaIndex | Agent Framework | Server costs | $100-300 |
Open Source Models | Llama/Mistral | Server + GPU costs | $200-800 | |
Vector Database | Chroma/Qdrant | Server costs | $50-100 |
Conclusion
The future of generative AI is characterized by increasingly sophisticated and capable models, more specialized and domain-specific implementations, and deeper integration with existing systems and workflows. As cloud providers continue to compete and innovate in this space, we can expect costs to decrease while capabilities increase.
For organizations looking to implement generative AI solutions, choosing the right cloud platform depends on several factors:
- AWS offers the most mature and comprehensive set of tools for enterprise-grade generative AI deployments, with particularly strong offerings in Bedrock and SageMaker.
- GCP excels in AI research and specialized hardware (TPUs), making it attractive for organizations pushing the boundaries of what's possible with custom models.
- Azure provides the tightest integration with existing Microsoft enterprise applications and strong governance capabilities, making it ideal for regulated industries.
- Self-hosted/hybrid approaches remain valuable for organizations with specific privacy, compliance, or cost optimization needs, especially as open-source models continue to improve.
The implementation examples provided in this blog demonstrate practical applications across these platforms, giving you a starting point to explore these technologies for your specific use cases. By understanding these key trends and how to implement them across different cloud environments, you'll be well-equipped to leverage generative AI's transformative potential.
Whether you're building customer-facing applications, automating internal processes, or exploring entirely new business models, the generative AI capabilities available through cloud platforms have never been more powerful or accessible.