Module aws_lambda_powertools.utilities.parser.models.kinesis
Expand source code
import base64
import logging
from binascii import Error as BinAsciiError
from typing import List
from pydantic import BaseModel, validator
from pydantic.types import PositiveInt
from ..types import Literal
logger = logging.getLogger(__name__)
class KinesisDataStreamRecordPayload(BaseModel):
kinesisSchemaVersion: str
partitionKey: str
sequenceNumber: PositiveInt
data: bytes # base64 encoded str is parsed into bytes
approximateArrivalTimestamp: float
@validator("data", pre=True)
def data_base64_decode(cls, value):
try:
logger.debug("Decoding base64 Kinesis data record before parsing")
return base64.b64decode(value)
except (BinAsciiError, TypeError):
raise ValueError("base64 decode failed")
class KinesisDataStreamRecord(BaseModel):
eventSource: Literal["aws:kinesis"]
eventVersion: str
eventID: str
eventName: Literal["aws:kinesis:record"]
invokeIdentityArn: str
awsRegion: str
eventSourceARN: str
kinesis: KinesisDataStreamRecordPayload
class KinesisDataStreamModel(BaseModel):
Records: List[KinesisDataStreamRecord]
Classes
class KinesisDataStreamModel (**data: Any)-
Create a new model by parsing and validating input data from keyword arguments.
Raises ValidationError if the input data cannot be parsed to form a valid model.
Expand source code
class KinesisDataStreamModel(BaseModel): Records: List[KinesisDataStreamRecord]Ancestors
- pydantic.main.BaseModel
- pydantic.utils.Representation
Class variables
var Records : List[KinesisDataStreamRecord]
class KinesisDataStreamRecord (**data: Any)-
Create a new model by parsing and validating input data from keyword arguments.
Raises ValidationError if the input data cannot be parsed to form a valid model.
Expand source code
class KinesisDataStreamRecord(BaseModel): eventSource: Literal["aws:kinesis"] eventVersion: str eventID: str eventName: Literal["aws:kinesis:record"] invokeIdentityArn: str awsRegion: str eventSourceARN: str kinesis: KinesisDataStreamRecordPayloadAncestors
- pydantic.main.BaseModel
- pydantic.utils.Representation
Class variables
var awsRegion : strvar eventID : strvar eventName : Literal['aws:kinesis:record']var eventSource : Literal['aws:kinesis']var eventSourceARN : strvar eventVersion : strvar invokeIdentityArn : strvar kinesis : KinesisDataStreamRecordPayload
class KinesisDataStreamRecordPayload (**data: Any)-
Create a new model by parsing and validating input data from keyword arguments.
Raises ValidationError if the input data cannot be parsed to form a valid model.
Expand source code
class KinesisDataStreamRecordPayload(BaseModel): kinesisSchemaVersion: str partitionKey: str sequenceNumber: PositiveInt data: bytes # base64 encoded str is parsed into bytes approximateArrivalTimestamp: float @validator("data", pre=True) def data_base64_decode(cls, value): try: logger.debug("Decoding base64 Kinesis data record before parsing") return base64.b64decode(value) except (BinAsciiError, TypeError): raise ValueError("base64 decode failed")Ancestors
- pydantic.main.BaseModel
- pydantic.utils.Representation
Class variables
var approximateArrivalTimestamp : floatvar data : bytesvar kinesisSchemaVersion : strvar partitionKey : strvar sequenceNumber : pydantic.types.PositiveInt
Static methods
def data_base64_decode(value)-
Expand source code
@validator("data", pre=True) def data_base64_decode(cls, value): try: logger.debug("Decoding base64 Kinesis data record before parsing") return base64.b64decode(value) except (BinAsciiError, TypeError): raise ValueError("base64 decode failed")