Module aws_lambda_powertools.utilities.parser.models
Expand source code
from .alb import AlbModel, AlbRequestContext, AlbRequestContextData
from .cloudwatch import CloudWatchLogsData, CloudWatchLogsDecode, CloudWatchLogsLogEvent, CloudWatchLogsModel
from .dynamodb import DynamoDBStreamChangedRecordModel, DynamoDBStreamModel, DynamoDBStreamRecordModel
from .event_bridge import EventBridgeModel
from .kinesis import KinesisDataStreamModel, KinesisDataStreamRecord, KinesisDataStreamRecordPayload
from .s3 import S3Model, S3RecordModel
from .ses import SesModel, SesRecordModel
from .sns import SnsModel, SnsNotificationModel, SnsRecordModel
from .sqs import SqsModel, SqsRecordModel
__all__ = [
"CloudWatchLogsData",
"CloudWatchLogsDecode",
"CloudWatchLogsLogEvent",
"CloudWatchLogsModel",
"AlbModel",
"AlbRequestContext",
"AlbRequestContextData",
"DynamoDBStreamModel",
"EventBridgeModel",
"DynamoDBStreamChangedRecordModel",
"DynamoDBStreamRecordModel",
"KinesisDataStreamModel",
"KinesisDataStreamRecord",
"KinesisDataStreamRecordPayload",
"S3Model",
"S3RecordModel",
"SesModel",
"SesRecordModel",
"SnsModel",
"SnsNotificationModel",
"SnsRecordModel",
"SqsModel",
"SqsRecordModel",
]
Sub-modules
aws_lambda_powertools.utilities.parser.models.albaws_lambda_powertools.utilities.parser.models.cloudwatchaws_lambda_powertools.utilities.parser.models.dynamodbaws_lambda_powertools.utilities.parser.models.event_bridgeaws_lambda_powertools.utilities.parser.models.kinesisaws_lambda_powertools.utilities.parser.models.s3aws_lambda_powertools.utilities.parser.models.sesaws_lambda_powertools.utilities.parser.models.snsaws_lambda_powertools.utilities.parser.models.sqs
Classes
class AlbModel (**data: Any)-
Create a new model by parsing and validating input data from keyword arguments.
Raises ValidationError if the input data cannot be parsed to form a valid model.
Expand source code
class AlbModel(BaseModel): httpMethod: str path: str body: str isBase64Encoded: bool headers: Dict[str, str] queryStringParameters: Dict[str, str] requestContext: AlbRequestContextAncestors
- pydantic.main.BaseModel
- pydantic.utils.Representation
Class variables
var body : strvar headers : Dict[str, str]var httpMethod : strvar isBase64Encoded : boolvar path : strvar queryStringParameters : Dict[str, str]var requestContext : AlbRequestContext
class AlbRequestContext (**data: Any)-
Create a new model by parsing and validating input data from keyword arguments.
Raises ValidationError if the input data cannot be parsed to form a valid model.
Expand source code
class AlbRequestContext(BaseModel): elb: AlbRequestContextDataAncestors
- pydantic.main.BaseModel
- pydantic.utils.Representation
Class variables
var elb : AlbRequestContextData
class AlbRequestContextData (**data: Any)-
Create a new model by parsing and validating input data from keyword arguments.
Raises ValidationError if the input data cannot be parsed to form a valid model.
Expand source code
class AlbRequestContextData(BaseModel): targetGroupArn: strAncestors
- pydantic.main.BaseModel
- pydantic.utils.Representation
Class variables
var targetGroupArn : str
class CloudWatchLogsData (**data: Any)-
Create a new model by parsing and validating input data from keyword arguments.
Raises ValidationError if the input data cannot be parsed to form a valid model.
Expand source code
class CloudWatchLogsData(BaseModel): decoded_data: CloudWatchLogsDecode = Field(None, alias="data") @validator("decoded_data", pre=True) def prepare_data(cls, value): try: logger.debug("Decoding base64 cloudwatch log data before parsing") payload = base64.b64decode(value) logger.debug("Decompressing cloudwatch log data before parsing") uncompressed = zlib.decompress(payload, zlib.MAX_WBITS | 32) return json.loads(uncompressed.decode("utf-8")) except Exception: raise ValueError("unable to decompress data")Ancestors
- pydantic.main.BaseModel
- pydantic.utils.Representation
Class variables
var decoded_data : CloudWatchLogsDecode
Static methods
def prepare_data(value)-
Expand source code
@validator("decoded_data", pre=True) def prepare_data(cls, value): try: logger.debug("Decoding base64 cloudwatch log data before parsing") payload = base64.b64decode(value) logger.debug("Decompressing cloudwatch log data before parsing") uncompressed = zlib.decompress(payload, zlib.MAX_WBITS | 32) return json.loads(uncompressed.decode("utf-8")) except Exception: raise ValueError("unable to decompress data")
class CloudWatchLogsDecode (**data: Any)-
Create a new model by parsing and validating input data from keyword arguments.
Raises ValidationError if the input data cannot be parsed to form a valid model.
Expand source code
class CloudWatchLogsDecode(BaseModel): messageType: str owner: str logGroup: str logStream: str subscriptionFilters: List[str] logEvents: List[CloudWatchLogsLogEvent]Ancestors
- pydantic.main.BaseModel
- pydantic.utils.Representation
Class variables
var logEvents : List[CloudWatchLogsLogEvent]var logGroup : strvar logStream : strvar messageType : strvar owner : strvar subscriptionFilters : List[str]
class CloudWatchLogsLogEvent (**data: Any)-
Create a new model by parsing and validating input data from keyword arguments.
Raises ValidationError if the input data cannot be parsed to form a valid model.
Expand source code
class CloudWatchLogsLogEvent(BaseModel): id: str # noqa AA03 VNE003 timestamp: datetime message: strAncestors
- pydantic.main.BaseModel
- pydantic.utils.Representation
Class variables
var id : strvar message : strvar timestamp : datetime.datetime
class CloudWatchLogsModel (**data: Any)-
Create a new model by parsing and validating input data from keyword arguments.
Raises ValidationError if the input data cannot be parsed to form a valid model.
Expand source code
class CloudWatchLogsModel(BaseModel): awslogs: CloudWatchLogsDataAncestors
- pydantic.main.BaseModel
- pydantic.utils.Representation
Class variables
var awslogs : CloudWatchLogsData
class DynamoDBStreamChangedRecordModel (**data: Any)-
Create a new model by parsing and validating input data from keyword arguments.
Raises ValidationError if the input data cannot be parsed to form a valid model.
Expand source code
class DynamoDBStreamChangedRecordModel(BaseModel): ApproximateCreationDateTime: Optional[date] Keys: Dict[str, Dict[str, Any]] NewImage: Optional[Dict[str, Any]] OldImage: Optional[Dict[str, Any]] SequenceNumber: str SizeBytes: int StreamViewType: Literal["NEW_AND_OLD_IMAGES", "KEYS_ONLY", "NEW_IMAGE", "OLD_IMAGE"] # context on why it's commented: https://github.com/awslabs/aws-lambda-powertools-python/pull/118 # since both images are optional, they can both be None. However, at least one must # exist in a legal model of NEW_AND_OLD_IMAGES type # @root_validator # def check_one_image_exists(cls, values): # noqa: E800 # new_img, old_img = values.get("NewImage"), values.get("OldImage") # noqa: E800 # stream_type = values.get("StreamViewType") # noqa: E800 # if stream_type == "NEW_AND_OLD_IMAGES" and not new_img and not old_img: # noqa: E800 # raise TypeError("DynamoDB streams model failed validation, missing both new & old stream images") # noqa: E800,E501 # return values # noqa: E800Ancestors
- pydantic.main.BaseModel
- pydantic.utils.Representation
Class variables
var ApproximateCreationDateTime : Union[datetime.date, NoneType]var Keys : Dict[str, Dict[str, Any]]var NewImage : Union[Dict[str, Any], NoneType]var OldImage : Union[Dict[str, Any], NoneType]var SequenceNumber : strvar SizeBytes : intvar StreamViewType : Literal['NEW_AND_OLD_IMAGES', 'KEYS_ONLY', 'NEW_IMAGE', 'OLD_IMAGE']
class DynamoDBStreamModel (**data: Any)-
Create a new model by parsing and validating input data from keyword arguments.
Raises ValidationError if the input data cannot be parsed to form a valid model.
Expand source code
class DynamoDBStreamModel(BaseModel): Records: List[DynamoDBStreamRecordModel]Ancestors
- pydantic.main.BaseModel
- pydantic.utils.Representation
Class variables
var Records : List[DynamoDBStreamRecordModel]
class DynamoDBStreamRecordModel (**data: Any)-
Create a new model by parsing and validating input data from keyword arguments.
Raises ValidationError if the input data cannot be parsed to form a valid model.
Expand source code
class DynamoDBStreamRecordModel(BaseModel): eventID: str eventName: Literal["INSERT", "MODIFY", "REMOVE"] eventVersion: float eventSource: Literal["aws:dynamodb"] awsRegion: str eventSourceARN: str dynamodb: DynamoDBStreamChangedRecordModel userIdentity: Optional[UserIdentity]Ancestors
- pydantic.main.BaseModel
- pydantic.utils.Representation
Class variables
var awsRegion : strvar dynamodb : DynamoDBStreamChangedRecordModelvar eventID : strvar eventName : Literal['INSERT', 'MODIFY', 'REMOVE']var eventSource : Literal['aws:aws_lambda_powertools.utilities.parser.models.dynamodb']var eventSourceARN : strvar eventVersion : floatvar userIdentity : Union[UserIdentity, NoneType]
class EventBridgeModel (**data: Any)-
Create a new model by parsing and validating input data from keyword arguments.
Raises ValidationError if the input data cannot be parsed to form a valid model.
Expand source code
class EventBridgeModel(BaseModel): version: str id: str # noqa: A003,VNE003 source: str account: str time: datetime region: str resources: List[str] detail_type: str = Field(None, alias="detail-type") detail: Dict[str, Any] replay_name: Optional[str] = Field(None, alias="replay-name")Ancestors
- pydantic.main.BaseModel
- pydantic.utils.Representation
Class variables
var account : strvar detail : Dict[str, Any]var detail_type : strvar id : strvar region : strvar replay_name : Union[str, NoneType]var resources : List[str]var source : strvar time : datetime.datetimevar version : str
class KinesisDataStreamModel (**data: Any)-
Create a new model by parsing and validating input data from keyword arguments.
Raises ValidationError if the input data cannot be parsed to form a valid model.
Expand source code
class KinesisDataStreamModel(BaseModel): Records: List[KinesisDataStreamRecord]Ancestors
- pydantic.main.BaseModel
- pydantic.utils.Representation
Class variables
var Records : List[KinesisDataStreamRecord]
class KinesisDataStreamRecord (**data: Any)-
Create a new model by parsing and validating input data from keyword arguments.
Raises ValidationError if the input data cannot be parsed to form a valid model.
Expand source code
class KinesisDataStreamRecord(BaseModel): eventSource: Literal["aws:kinesis"] eventVersion: str eventID: str eventName: Literal["aws:kinesis:record"] invokeIdentityArn: str awsRegion: str eventSourceARN: str kinesis: KinesisDataStreamRecordPayloadAncestors
- pydantic.main.BaseModel
- pydantic.utils.Representation
Class variables
var awsRegion : strvar eventID : strvar eventName : Literal['aws:aws_lambda_powertools.utilities.parser.models.kinesis:record']var eventSource : Literal['aws:aws_lambda_powertools.utilities.parser.models.kinesis']var eventSourceARN : strvar eventVersion : strvar invokeIdentityArn : strvar kinesis : KinesisDataStreamRecordPayload
class KinesisDataStreamRecordPayload (**data: Any)-
Create a new model by parsing and validating input data from keyword arguments.
Raises ValidationError if the input data cannot be parsed to form a valid model.
Expand source code
class KinesisDataStreamRecordPayload(BaseModel): kinesisSchemaVersion: str partitionKey: str sequenceNumber: PositiveInt data: bytes # base64 encoded str is parsed into bytes approximateArrivalTimestamp: float @validator("data", pre=True) def data_base64_decode(cls, value): try: logger.debug("Decoding base64 Kinesis data record before parsing") return base64.b64decode(value) except (BinAsciiError, TypeError): raise ValueError("base64 decode failed")Ancestors
- pydantic.main.BaseModel
- pydantic.utils.Representation
Class variables
var approximateArrivalTimestamp : floatvar data : bytesvar kinesisSchemaVersion : strvar partitionKey : strvar sequenceNumber : pydantic.types.PositiveInt
Static methods
def data_base64_decode(value)-
Expand source code
@validator("data", pre=True) def data_base64_decode(cls, value): try: logger.debug("Decoding base64 Kinesis data record before parsing") return base64.b64decode(value) except (BinAsciiError, TypeError): raise ValueError("base64 decode failed")
class S3Model (**data: Any)-
Create a new model by parsing and validating input data from keyword arguments.
Raises ValidationError if the input data cannot be parsed to form a valid model.
Expand source code
class S3Model(BaseModel): Records: List[S3RecordModel]Ancestors
- pydantic.main.BaseModel
- pydantic.utils.Representation
Class variables
var Records : List[S3RecordModel]
class S3RecordModel (**data: Any)-
Create a new model by parsing and validating input data from keyword arguments.
Raises ValidationError if the input data cannot be parsed to form a valid model.
Expand source code
class S3RecordModel(BaseModel): eventVersion: str eventSource: Literal["aws:s3"] awsRegion: str eventTime: datetime eventName: str userIdentity: S3Identity requestParameters: S3RequestParameters responseElements: S3ResponseElements s3: S3Message glacierEventData: Optional[S3EventRecordGlacierEventData]Ancestors
- pydantic.main.BaseModel
- pydantic.utils.Representation
Class variables
var awsRegion : strvar eventName : strvar eventSource : Literal['aws:aws_lambda_powertools.utilities.parser.models.s3']var eventTime : datetime.datetimevar eventVersion : strvar glacierEventData : Union[S3EventRecordGlacierEventData, NoneType]var requestParameters : S3RequestParametersvar responseElements : S3ResponseElementsvar s3 : S3Messagevar userIdentity : S3Identity
class SesModel (**data: Any)-
Create a new model by parsing and validating input data from keyword arguments.
Raises ValidationError if the input data cannot be parsed to form a valid model.
Expand source code
class SesModel(BaseModel): Records: List[SesRecordModel]Ancestors
- pydantic.main.BaseModel
- pydantic.utils.Representation
Class variables
var Records : List[SesRecordModel]
class SesRecordModel (**data: Any)-
Create a new model by parsing and validating input data from keyword arguments.
Raises ValidationError if the input data cannot be parsed to form a valid model.
Expand source code
class SesRecordModel(BaseModel): eventSource: Literal["aws:ses"] eventVersion: str ses: SesMessageAncestors
- pydantic.main.BaseModel
- pydantic.utils.Representation
Class variables
var eventSource : Literal['aws:aws_lambda_powertools.utilities.parser.models.ses']var eventVersion : strvar ses : SesMessage
class SnsModel (**data: Any)-
Create a new model by parsing and validating input data from keyword arguments.
Raises ValidationError if the input data cannot be parsed to form a valid model.
Expand source code
class SnsModel(BaseModel): Records: List[SnsRecordModel]Ancestors
- pydantic.main.BaseModel
- pydantic.utils.Representation
Class variables
var Records : List[SnsRecordModel]
class SnsNotificationModel (**data: Any)-
Create a new model by parsing and validating input data from keyword arguments.
Raises ValidationError if the input data cannot be parsed to form a valid model.
Expand source code
class SnsNotificationModel(BaseModel): Subject: Optional[str] TopicArn: str UnsubscribeUrl: HttpUrl Type: Literal["Notification"] MessageAttributes: Optional[Dict[str, SnsMsgAttributeModel]] Message: str MessageId: str SigningCertUrl: HttpUrl Signature: str Timestamp: datetime SignatureVersion: str @root_validator(pre=True) def check_sqs_protocol(cls, values): sqs_rewritten_keys = ("UnsubscribeURL", "SigningCertURL") if any(key in sqs_rewritten_keys for key in values): values["UnsubscribeUrl"] = values.pop("UnsubscribeURL") values["SigningCertUrl"] = values.pop("SigningCertURL") return valuesAncestors
- pydantic.main.BaseModel
- pydantic.utils.Representation
Class variables
var Message : strvar MessageAttributes : Union[Dict[str, SnsMsgAttributeModel], NoneType]var MessageId : strvar Signature : strvar SignatureVersion : strvar SigningCertUrl : pydantic.networks.HttpUrlvar Subject : Union[str, NoneType]var Timestamp : datetime.datetimevar TopicArn : strvar Type : Literal['Notification']var UnsubscribeUrl : pydantic.networks.HttpUrl
Static methods
def check_sqs_protocol(values)-
Expand source code
@root_validator(pre=True) def check_sqs_protocol(cls, values): sqs_rewritten_keys = ("UnsubscribeURL", "SigningCertURL") if any(key in sqs_rewritten_keys for key in values): values["UnsubscribeUrl"] = values.pop("UnsubscribeURL") values["SigningCertUrl"] = values.pop("SigningCertURL") return values
class SnsRecordModel (**data: Any)-
Create a new model by parsing and validating input data from keyword arguments.
Raises ValidationError if the input data cannot be parsed to form a valid model.
Expand source code
class SnsRecordModel(BaseModel): EventSource: Literal["aws:sns"] EventVersion: str EventSubscriptionArn: str Sns: SnsNotificationModelAncestors
- pydantic.main.BaseModel
- pydantic.utils.Representation
Class variables
var EventSource : Literal['aws:aws_lambda_powertools.utilities.parser.models.sns']var EventSubscriptionArn : strvar EventVersion : strvar Sns : SnsNotificationModel
class SqsModel (**data: Any)-
Create a new model by parsing and validating input data from keyword arguments.
Raises ValidationError if the input data cannot be parsed to form a valid model.
Expand source code
class SqsModel(BaseModel): Records: List[SqsRecordModel]Ancestors
- pydantic.main.BaseModel
- pydantic.utils.Representation
Class variables
var Records : List[SqsRecordModel]
class SqsRecordModel (**data: Any)-
Create a new model by parsing and validating input data from keyword arguments.
Raises ValidationError if the input data cannot be parsed to form a valid model.
Expand source code
class SqsRecordModel(BaseModel): messageId: str receiptHandle: str body: str attributes: SqsAttributesModel messageAttributes: Dict[str, SqsMsgAttributeModel] md5OfBody: str md5OfMessageAttributes: Optional[str] eventSource: Literal["aws:sqs"] eventSourceARN: str awsRegion: strAncestors
- pydantic.main.BaseModel
- pydantic.utils.Representation
Class variables
var attributes : SqsAttributesModelvar awsRegion : strvar body : strvar eventSource : Literal['aws:aws_lambda_powertools.utilities.parser.models.sqs']var eventSourceARN : strvar md5OfBody : strvar md5OfMessageAttributes : Union[str, NoneType]var messageAttributes : Dict[str, SqsMsgAttributeModel]var messageId : strvar receiptHandle : str