Kafka connector
Kafka connector
Parent document: Connectors
The Kafka connector supports the following functional points:
At Least Once
write in batch scenariosExactly Once
read in streaming scenarios
Maven dependency
The Kafka connector internally uses org.apache.kafka:kafka-clients
(version 1.0.1) for data writing. So when using kafka to write the connector, you need to pay attention that the target kafka cluster should be able to use this version of kafka-clients.
<dependency>
<groupId>com.bytedance.bitsail</groupId>
<artifactId>bitsail-connector-kafka</artifactId>
<version>${revision}</version>
</dependency>
Kafka reader
Parameters
Param name | Required | Default value | Description |
---|---|---|---|
class | Yes | Reader class name for kafka connector,com.bytedance.bitsail.connector.legacy.kafka.source.KafkaSourceFunctionDAGBuilder | |
child_connector_type | Yes | Only could be kafka | |
reader_parallelism_num | No | Reader parallelism num |
Parameters for KafkaConsumer
The underlying Kafka connector uses FlinkKafkaConsumer
for reading. The properties or kafka information of the initialized FlinkKafkaConsumer are passed in through options job.reader.connector
. You can specify them as follows:
{
"job": {
"reader": {
"connector": {
"prop_key": "prop_value" // "prop_key" means property key, while "prop_val" means property value
}
}
}
}
job.reader.connector
supports KV configuration in the form of <string,string>, where:
prop_key
: FlinkKafkaConsumer property keyprop_value
: FlinkKafkaConsumer property key
Some common property used are listed below:
1. Kafka cluster properties
Property key | Required | Default value | Optional value | Description |
---|---|---|---|---|
connector.bootstrap.servers | Yes | kafka cluster address | ||
connector.topic | Yes | topic to read | ||
connector.group.id | Yes | kafka consumer group |
2. Where to start consuming
Property key | Is necessary | Default value | Optional value | Description |
---|---|---|---|---|
connector.startup-mode | No | group-offsets | 1. ealiest-offset : Consume from the earliest offset of the partition2. latest-offset : Consume from the latest offset of the partition3. group-offsets : Comsume from the offset of the current consumer group4. specific-offsets : Specify the offset for each partition, cooperate with connector.specific-offsets 5. specific-timestamp : Consume messages after a certain point in time, cooperate with connector.specific-timestamp | Decide from which offsets to consume |
connector.specific-offsets | No | Used with specific-offsets, the format is a standard json string. For example: [{"partition":1,"offset":100},{"partition":2,"offset":200}] | ||
connector.specific-timestamp | No | Used with specific-timestamp (ms) to specify the offset to consume |
3. Other FlinkKafkaConsumer parameters
FlinkKafkaConsumer supports many parameters, please refer to ConsumerConfig(2.1.0) API] for details . If the user needs to set these parameters, it can be configured through connector.XXX
.
For example, to set MAX_PARTITION_FETCH_BYTES_CONFIG to 1024, add the parameter:
{
"job": {
"reader": {
"connector": {
"connector.max.partition.fetch.bytes": "1024"
}
}
}
}
Parameters for debugging
The Kafka read connector is used in streaming scenarios and will be consumed all the time under normal circumstances. If the user wants to debug by consuming only a limited amount of data, the following parameters can be configured. Note that these parameters need to be added to job.reader
block.
Property key | Is necessary | Default value | Description |
---|---|---|---|
enable_count_mode | No | false | Whether to end the current task after sending a piece of data, generally used for testing |
count_mode_record_threshold | No | 10000 | If enable_count_mode=true , the current task ends after consuming count_mode_record_threshold pieces of messages |
count_mode_run_time_threshold | No | 600 | If enable_count_mode=true , end the current task after running count_mode_record_threshold seconds |
Supported message parsing modes
Messages can be pulled from KafkaConsumer in format of ConsumerRecord. BitSail supports two ways to handle ConsumerRecordof. The user can use job.reader.format
to decide which method to use.
job.reader.format_type="json"
: Parse according to json format- In this mode, BitSail parses the json format string represented by value in ConsumerRecord according to the parameters
job.reader.columns
set by the user. - Therefore, the parameters
job.reader.columns
is required in this mode
- In this mode, BitSail parses the json format string represented by value in ConsumerRecord according to the parameters
job.reader.format_type="streaming_file"
: Use raw byte value- In this mode, BitSail directly deliver the raw bytes value in ConsumerRecord. The specific structure is as follows:
[
{"index":0, "name":"key", "type":"binary"}, // message key
{"index":1, "name":"value", "type":"binary"}, // message value
{"index":2, "name":"partition", "type":"string"}, // partition of the message
{"index":3, "name":"offset", "type":"long"} // offset of the meesage in partition
]
Kafka Writer
Parameters
Note that these parameters should be added to job.writer
block.
Necessary parameters
Param names | Default value | Description |
---|---|---|
class | Writer class name of kafka connector, com.bytedance.bitsail.connector.legacy.kafka.sink.KafkaOutputFormat | |
kafka_servers | Kafka's bootstrap server address, multiple bootstrap server addresses are separated by ',' | |
topic_name | kafka topic | |
columns | Describing fields' names and data types |
Optional parameters
Param names | Default value | Description |
---|---|---|
writer_parallelism_num | writer parallelism num | |
partition_field | partition_field contains one or several fields from job.writer.columns , separated by commas (e.g. "id,timestamp"). If partition_field is not empty, when sending data to kafka topic, it will decide which topic to write based on the hash values of these fields in the record | |
log_failures_only | false | When KafkaProducer fails to perform an asynchronous send operation: 1. If log_failures_only=true , only log failure information2. If log_failures_only=false , throw an exception |
retries | 10 | Number of failed retries for KafkaProducer |
retry_backoff_ms | 1000 | KafkaProducer's failure retry interval (ms) |
linger_ms | 5000 | The maximum waiting time (ms) for KafkaProducer to create a single batch |
Other parameters
When initializing the KafkaProducer, the user can use job.common.optional
to pass initialization parameters, for example:
{
"job": {
"common": {
"optional": {
"batch.size": 16384,
"buffer.memory": 33554432
}
}
}
}
Related documents
Configuration examples: Kafka connector example