On November 11, 2024, the Apache Flink neighborhood launched a brand new model of AWS providers connectors, an AWS open supply contribution. This new launch, model 5.0.0, introduces a brand new supply connector to learn information from Amazon Kinesis Knowledge Streams. On this put up, we clarify how the brand new options of this connector can enhance efficiency and reliability of your Apache Flink software.
Apache Flink has each a supply and sink connector, to learn from and write to Kinesis Knowledge Streams. On this put up, we give attention to the brand new supply connector, as a result of model 5.0.0 doesn’t introduce new performance for the sink.
Apache Flink is a framework and distributed stream processing engine designed to carry out computation at in-memory velocity and at any scale. Amazon Managed Service for Apache Flink affords a totally managed, serverless expertise to run your Flink functions, carried out in Java, Python or SQL, and utilizing all of the APIs out there in Flink: SQL, Desk, DataStream, and ProcessFunction API.
Apache Flink connectors
Flink helps studying and writing information to exterior methods, by way of connectors, that are elements that permit your software to work together with stream-storage message brokers, databases, or object shops. Kinesis Knowledge Streams is a well-liked supply and vacation spot for streaming functions. Flink gives each supply and sink connectors for Kinesis Knowledge Streams.
The next diagram illustrates a pattern structure.
Earlier than continuing additional, it’s necessary to make clear three phrases usually used interchangeably in information streaming and within the Apache Flink documentation:
- Kinesis Knowledge Streams refers back to the Amazon service
- Kinesis supply and Kinesis shopper check with the Apache Flink elements, specifically the supply connectors, that permits studying information from Kinesis Knowledge Streams
- On this put up, we use the time period stream referring to a single Kinesis information stream
Introducing the brand new Flink Kinesis supply connector
The launch of the model 5.0.0 of AWS connectors introduces a brand new connector for studying occasions from Kinesis Knowledge Streams. The brand new connector known as Kinesis Streams Supply and supersedes the Kinesis Shopper because the supply connector for Kinesis Knowledge Streams.
The brand new connector introduces a number of new options and adheres to the brand new Flink Supply
interface, and is appropriate with Flink 2.x, the primary main model launch by the Flink neighborhood. Flink 2.x introduces a lot of breaking adjustments, together with eradicating the SourceFunction
interface utilized by legacy connectors. The legacy Kinesis Shopper will not work with Flink 2.x.
Establishing the connector is barely totally different than with the legacy Kinesis connector. Let’s begin with the DataStream API.
The best way to use the brand new connector with the DataStream API
So as to add the brand new connector to your software, it’s essential to replace the connector dependency. For the DataStream API, the dependency has modified its identify to flink-connector-aws-kinesis-streams
.
On the time of writing, the newest connector model is 5.0.0 and it helps the latest secure Flink variations, 1.19 and 1.20. The connector can also be appropriate with Flink 2.0, however no connector has been formally launched for Flink 2.x but. Assuming you might be utilizing Flink 1.20, the brand new dependency is the next:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-aws-kinesis streams</artifactId>
<model>5.0.0-1.20</model>
</dependency>
The connector makes use of the brand new Flink Supply
interface. This interface implements the brand new FLIP-27 commonplace, and replaces the legacy SourceFunction
interface that has been deprecated. SourceFunction
might be fully eliminated in Flink 2.x.
In your software, now you can use a fluent and expressive builder interface to instantiate and configure the supply. The minimal setup solely requires the stream Amazon Useful resource Title (ARN) and the deserialization schema:
KinesisStreamsSource<String> kdsSource = KinesisStreamsSource.<String>builder()
.setStreamArn("arn:aws:kinesis:us-east-1:123456789012:stream/test-stream")
.setDeserializationSchema(new SimpleStringSchema())
.construct();
The brand new supply class known as KinesisStreamSource
. To not be confused with the legacy supply, FlinkKinesisConsumer
.
You may then add the supply to the execution atmosphere utilizing the brand new fromSource()
technique. This technique requires explicitly specifying the watermark technique, together with a reputation for the supply:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// ...
DataStream<String> kinesisRecordsWithEventTimeWatermarks = env.fromSource(
kdsSource,
WatermarkStrategy.<String>forMonotonousTimestamps()
.withIdleness(Period.ofSeconds(1)),
"Kinesis supply");
These few strains of code introduce a few of the principal adjustments within the interface of the connector, which we talk about within the following sections.
Stream ARN
Now you can outline the Kinesis information stream ARN, versus the stream identify. This makes it easier to eat from streams cross-Area and cross-account.
When working in Amazon Managed Service for Apache Flink, you solely want so as to add to the applying AWS Identification and Entry Administration (IAM) function permissions to entry the stream. The ARN permits pointing to a stream situated in a distinct AWS Area or account, with out assuming roles or passing any exterior credentials.
Express watermark
Some of the necessary traits of the brand new Supply
interface is that you need to explicitly outline a watermark technique whenever you connect the supply to the execution atmosphere. In case your software solely implements processing-time semantics, you possibly can specify WatermarkStrategy.noWatermarks()
.
That is an enchancment by way of code readability. Wanting on the supply, you realize instantly which kind of watermark you have got, or when you don’t have any. Beforehand, many connectors have been offering some sort of default watermarks that the consumer may override. Nevertheless, the default watermark of every connector was barely totally different and complicated for the consumer.
With the brand new connector, you possibly can obtain the identical habits because the legacy FlinkKinesisConsumer
default watermarks, utilizing WatermarkStrategy.forMonotonousTimestamps()
, as proven within the earlier instance. This technique generates watermarks based mostly on the approximateArrivalTimestamp
returned by Kinesis Knowledge Streams. This timestamp corresponds to the time when the document was printed to Kinesis Knowledge Streams.
Idleness and watermark alignment
With the watermark technique, you possibly can moreover outline an idleness, which permits the watermark to progress even when some shards of the stream are idle and receiving no data. Consult with Dealing With Idle Sources for extra particulars about idleness and watermark mills.
A characteristic launched by the brand new Supply
interface, and absolutely supported by the brand new Kinesis supply, is watermark alignment. Watermark alignment works in the wrong way of idleness. It slows down consuming from a shard that’s progressing sooner than others. That is notably helpful when replaying information from a stream, to scale back the amount of information buffered within the software state. Consult with Watermark alignment for extra particulars.
Arrange the connector with the Desk API and SQL
Assuming you might be utilizing Flink 1.20, the dependency containing each Kinesis supply and sink for the Desk API and SQL is the next (each Flink 1.19 and 1.20 are supported, modify the model accordingly):
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kinesis</artifactId>
<model>5.0.0-1.20</model>
</dependency>
This dependency comprises each the brand new supply and the legacy supply. Consult with Versioning in case you might be planning to make use of each in the identical software.
When defining the supply in SQL or the Desk API, you employ the connector identify kinesis, because it was with the legacy supply. Nevertheless, many parameters have modified with the brand new supply:
CREATE TABLE KinesisTable (
   `user_id` BIGINT,
   `item_id` BIGINT,
   `category_id` BIGINT,
   `habits` STRING,
   `ts` TIMESTAMP(3)
)
PARTITIONED BY (user_id, item_id)
WITH (
   'connector' = 'kinesis',
   'stream.arn' = 'arn:aws:kinesis:us-east-1:012345678901:stream/my-stream-name',
   'aws.area' = 'us-east-1',
   'supply.init.place' = 'LATEST',
   'format' = 'csv'
);
A few notable connector choices modified from the legacy supply are:
stream.arn
specifies the stream ARN, versus the stream identify used within the legacy supply.init.initpos
defines the beginning place. This selection works equally to the legacy supply, however the choice identify is totally different. It was beforehandscan.stream.initpos
.
For the total listing of connector choices check with Connector Choices.
New options and enhancements
On this part, we talk about an important options launched by the brand new connector. These options can be found within the DataStream API, and in addition the Desk API and SQL.
Ordering ensures
Crucial enchancment launched by the brand new connector is about ordering ensures.
With Kinesis Knowledge Streams, the order of the message is retained per partitionId
. That is achieved by placing all data with the identical partitionId
in the identical shard. Nevertheless, when the stream scales, splitting or merging shards, data with the identical partitionId
find yourself in a brand new shard. Kinesis retains monitor of the parent-child lineage when resharding occurs.
One identified limitation of the legacy Kinesis supply is that it was unable to comply with the parent-child shard lineage. As a consequence, ordering couldn’t be assured when resharding occurs. The issue was notably related when the applying replayed outdated messages from a stream that had been resharded as a result of ordering could be misplaced. This additionally made watermark era and event-time processing non-deterministic.
With the brand new connector, ordering is retained additionally when resharding occurs. That is achieved following the parent-child shard lineage, and consuming all data from a mother or father shard earlier than continuing with the kid shard.
A greater default shard assigner
Every Kinesis information stream is comprised of many shards. Additionally, the Flink supply operator runs in a number of parallel subtasks. The shard assigner is the part that decides methods to assign the shards of the stream throughout the supply subtasks. The shard assigner’s job is non-trivial, as a result of shard break up or merge operations (resharding) would possibly occur when the stream scales up or down.
The brand new connector comes with a brand new default assigner, UniformShardAssigner
. This assigner maintains uniform distribution of the stream partitionId
throughout parallel subtasks, additionally when resharding occurs. That is achieved by wanting on the vary of partition keys (HashKeyRange
) of every shard.
This shard assigner was already out there within the earlier connector model, however for backward compatibility, it was not the default and also you needed to set it up explicitly. That is not the case with the brand new supply. The outdated default shard assigner, the legacy FlinkKinesisConsumer
, was evenly distributing shards (not partitionId
) throughout subtasks. On this case, the precise information distribution would possibly turn into uneven within the case of resharding, due to the mixture of open and closed shards within the stream. Consult with Shard Project Technique for extra particulars.
Lowered JAR dimension
The scale of the JAR file has been lowered by 99%, from about 60 MB all the way down to 200 KB. This considerably reduces the scale of the fat-JAR of your software utilizing the connector. A smaller JAR can velocity up many operations that require redeploying the applying.
AWS SDK for Java 2.x
The brand new connector relies on the newer AWS SDK for Java 2.x, which provides a number of options and improves help for non-blocking I/O. This makes the connector future-proof as a result of the AWS SDK v1 will attain end-of-support by finish of 2025.
AWS SDK built-in retry technique
The brand new connector depends on the AWS SDK built-in retry technique, versus a customized technique carried out by the legacy connector. Counting on the AWS SDK improves the classification of some errors as retriable or non-retriable.
Eliminated dependency on the Kinesis Consumer Library and Kinesis Producer Library
The brand new connector package deal not contains the Kinesis Consumer Library (KCL) and Kinesis Producer Library (KPL), contributing to the substantial discount of the JAR dimension that we have now talked about.
An implication of this alteration is that the brand new connector not helps de-aggregation out of the field. Until you might be publishing data to the stream utilizing the KPL and also you enabled aggregation, this won’t make any distinction for you. In case your producers use KPL aggregation, you would possibly think about implementing a customized DeserializationSchema
to de-aggregate the data within the supply.
Migrating from the legacy connector
Flink sources usually save the place within the checkpoint and savepoints, known as snapshots in Amazon Managed Service for Apache Flink. If you cease and restart the applying, or whenever you replace the applying to deploy a change, the default habits is saving the supply place within the snapshot simply earlier than stopping the applying, and restoring the place when the applying restarts. This enables Flink to supply exactly-once ensures on the supply.
Nevertheless, because of the main adjustments launched by the brand new KinesisSource
, the saved state is not appropriate with the legacy FlinkKinesisConsumer
. Because of this whenever you improve the supply of an present software, you possibly can’t straight restore the supply place from the snapshot.
For that reason, migrating your software to the brand new supply requires some consideration. The precise migration course of will depend on your use case. There are two normal situations:
- Your software makes use of the DataStream API and you might be following Flink greatest practices defining a UID on every operator
- Your software makes use of the Desk API or SQL, or your software used the DataStream API and you aren’t defining a UID on every operator
Let’s cowl every of those situations.
Your software makes use of the DataStream API and you might be defining a UID on every operator
On this case, you would possibly think about selectively resetting the state of the supply operator, retaining another software state. The overall method is as follows:
- Replace your software dependencies and code, changing the
FlinkKinesisConsumer
with the brand newKinesisSource
. - Change the UID of the supply operator (use a distinct string). Go away all different operators’ UIDs It will selectively reset the state of the supply whereas retaining the state of all different operators.
- Configure the supply beginning place utilizing
AT_TIMESTAMP
and set the timestamp to only earlier than the second you’ll deploy the change. See Configuring Beginning Place to discover ways to set the beginning place. We advocate passing the timestamp as a runtime property to make this extra versatile. The configured supply beginning place is used solely when the applying can’t restore the state from a savepoint (or snapshot). On this case, we’re intentionally forcing this, altering the UID of the supply operator. - Replace the Amazon Managed Service for Apache Flink software, deciding on the brand new JAR containing the modified software. Restart from the newest snapshot (default habits) and choose
allowNonRestoredState = true
. With out this flag, Flink would forestall restarting the applying, not with the ability to restore the state of the outdated supply that was saved within the snapshot. See Savepointing for extra particulars aboutallowNonRestoredState
.
This method will trigger the reprocessing of some data from the supply, and inside state exactly-once consistency could be damaged. Fastidiously consider the impression of reprocessing in your software, and the impression of duplicates on the downstream methods.
Your software makes use of the Desk API or SQL, or your software used the DataStream API and you aren’t defining a UID on every operator
On this case, you possibly can’t selectively reset the state of the supply operator.
Why does this occur? When utilizing the Desk API or SQL, or the DataStream API with out defining the operator’s UID explicitly, Flink robotically generates identifiers for all operators based mostly on the construction of the job graph of your software. These identifiers are used to establish the state of every operator when saved within the snapshots, and to revive it to the proper operator whenever you restart the applying.
Modifications to the applying would possibly trigger adjustments within the underlying information stream. This adjustments the auto-generated identifier. In case you are utilizing the DataStream API and you might be specifying the UID, Flink makes use of your identifiers as a substitute of the auto-generated identifies, and is ready to map again the state to the operator, even whenever you make adjustments to the applying. That is an intrinsic limitation of Flink, defined in Set UUIDs For All Operators. Enabling allowNonRestoredState
doesn’t clear up this downside, as a result of Flink just isn’t capable of map the state saved within the snapshot with the precise operators, after the adjustments.
In our migration state of affairs, the one choice is resetting the state of your software. You may obtain this in Amazon Managed Service for Apache Flink by deciding on Skip restore from snapshot (SKIP_RESTORE_FROM_SNAPSHOT
) whenever you deploy the change that replaces the supply connector.
After the applying utilizing the brand new supply is up and working, you possibly can swap again to the default habits of when restarting the applying, utilizing the newest snapshots (RESTORE_FROM_LATEST_SNAPSHOT
). This fashion, no information loss occurs when the applying is restarted.
Choosing the proper connector package deal and model
The dependency model it’s essential to choose is often <connector-version>-<flink-version>
. For instance, the newest Kinesis connector model is 5.0.0. In case you are utilizing a Flink runtime model 1.20.x, your dependency for the DataStream API is 5.0.0-1.20
.
For essentially the most up-to-date connector variations, see Use Apache Flink connectors with Managed Service for Apache Flink.
Connector artifact
In earlier variations of the connector (4.x and earlier than), there have been separate packages for the supply and sink. This extra degree of complexity has been eliminated with model 5.x.
To your Java software, or Python functions the place you package deal JAR dependencies utilizing Maven, as proven within the Amazon Managed Service for Apache Flink examples GitHub repository, the next dependency comprises the brand new model of each supply and sink connectors:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-aws-kinesis-streams</artifactId>
<model>5.0.0-1.20</model>
</dependency>
Be sure you’re utilizing the newest out there model. On the time of writing, that is 5.0.0. You may confirm the out there artifact variations in Maven Central. Additionally, use the proper model relying in your Flink runtime model. The earlier instance is for Flink 1.20.0.
Connector artifacts for Python software
For those who use Python, we advocate packaging JAR dependencies utilizing Maven, as proven within the Amazon Managed Service for Apache Flink examples GitHub repository. Nevertheless, when you’re passing straight a single JAR to your Amazon Managed Service for Apache Flink software, it’s essential to use the artifact that features all transitive dependencies. Within the case of the brand new Kinesis supply and sink, that is known as flink-sql-connector-aws-kinesis-streams
. This artifact contains solely the brand new supply. Consult with Amazon Kinesis Knowledge Streams SQL Connector for the correct package deal, in case you wish to use each the brand new and the legacy supply.
Conclusion
The brand new Flink Kinesis supply connector introduces many new options that enhance stability and efficiency, and prepares your software for Flink 2.x. Help for watermark idleness and alignment is a very necessary characteristic in case your software makes use of event-time semantics. The power to retain document ordering improves information consistency, specifically when stream resharding occurs, and whenever you replay outdated information from a stream that has been reshared.
It’s best to fastidiously plan the change when you’re migrating your software from the legacy Kinesis supply connector, and be sure to comply with Flink’s greatest practices like specifying a UID on all DataStream operators.
You’ll find a working instance of Java DataStream API software utilizing the brand new connector, within the Amazon Managed Service for Apache Flink samples GitHub repository.
To be taught extra concerning the new Flink Kinesis supply connector, check with Amazon Kinesis Knowledge Streams Connector and Amazon Kinesis Knowledge Streams SQL Connector.
Concerning the Writer
Lorenzo Nicora works as a Senior Streaming Options Architect at AWS, serving to prospects throughout EMEA. He has been constructing cloud-centered, data-intensive methods for over 25 years, working throughout industries each by way of consultancies and product corporations. He has used open supply applied sciences extensively and contributed to a number of tasks, together with Apache Flink.