Spaces:
Sleeping
Sleeping
from fastapi import FastAPI | |
from opentelemetry import metrics, propagate, trace | |
from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter | |
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter | |
from opentelemetry.exporter.prometheus import PrometheusMetricReader | |
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor | |
from opentelemetry.propagators.aws import AwsXRayPropagator | |
from opentelemetry.sdk.extension.aws.resource.ec2 import AwsEc2ResourceDetector | |
from opentelemetry.sdk.extension.aws.trace import AwsXRayIdGenerator | |
from opentelemetry.sdk.metrics import MeterProvider | |
from opentelemetry.sdk.metrics.export import ConsoleMetricExporter, PeriodicExportingMetricReader | |
from opentelemetry.sdk.resources import ( | |
SERVICE_NAME, | |
SERVICE_VERSION, | |
Resource, | |
get_aggregated_resources, | |
) | |
from opentelemetry.sdk.trace import TracerProvider | |
from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter | |
from .config import MetricsConfig, TracingConfig | |
from .version import __version__ | |
def _configure_tracing(tracing_config: TracingConfig, resource: Resource) -> None: | |
if tracing_config is None: | |
return | |
if tracing_config.exporter == "xray": | |
propagate.set_global_textmap(AwsXRayPropagator()) | |
resource = resource.merge( | |
get_aggregated_resources( | |
[AwsEc2ResourceDetector()], | |
) | |
) | |
tracer_provider = TracerProvider(resource=resource) | |
if tracing_config.exporter == "xray": | |
tracer_provider.id_generator = AwsXRayIdGenerator() | |
exporter = OTLPSpanExporter(endpoint=tracing_config.endpoint) | |
elif tracing_config.exporter == "otel_http": | |
exporter = OTLPSpanExporter(endpoint=tracing_config.endpoint) | |
elif tracing_config.exporter == "console": | |
exporter = ConsoleSpanExporter() | |
tracer_provider.add_span_processor(BatchSpanProcessor(exporter)) | |
trace.set_tracer_provider(tracer_provider) | |
def _configure_metrics(metrics_config: MetricsConfig, resource: Resource) -> None: | |
if metrics_config is None: | |
return | |
if metrics_config.exporter == "console": | |
reader = PeriodicExportingMetricReader(ConsoleMetricExporter()) | |
elif metrics_config.exporter == "otel_http": | |
reader = PeriodicExportingMetricReader(OTLPMetricExporter(endpoint=metrics_config.endpoint)) | |
elif metrics_config.exporter == "prometheus": | |
reader = PrometheusMetricReader() | |
meter_provider = MeterProvider(resource=resource, metric_readers=[reader]) | |
metrics.set_meter_provider(meter_provider) | |
def configure_otel( | |
app_name: str, tracing_config: TracingConfig, metrics_config: MetricsConfig | |
) -> None: | |
resource = Resource( | |
attributes={ | |
SERVICE_NAME: app_name, | |
SERVICE_VERSION: __version__, | |
} | |
) | |
_configure_tracing(tracing_config, resource) | |
_configure_metrics(metrics_config, resource) | |
def instrument_app(app: FastAPI) -> None: | |
FastAPIInstrumentor.instrument_app( | |
app, | |
excluded_urls="healthz,readyz,metrics", | |
meter_provider=metrics.get_meter_provider(), | |
tracer_provider=trace.get_tracer_provider(), | |
) | |