Kafka connector


Kafka connector

Parent document: Connectors

The Kafka connector supports the following functional points:

  • At Least Once write in batch scenarios
  • Exactly Once read in streaming scenarios

Maven dependency

The Kafka connector internally uses org.apache.kafka:kafka-clients (version 1.0.1) for data writing. So when using kafka to write the connector, you need to pay attention that the target kafka cluster should be able to use this version of kafka-clients.

<dependency>
   <groupId>com.bytedance.bitsail</groupId>
   <artifactId>bitsail-connector-kafka</artifactId>
   <version>${revision}</version>
</dependency>

Kafka reader

Parameters

Param nameRequiredDefault valueDescription
classYesReader class name for kafka connector,com.bytedance.bitsail.connector.legacy.kafka.source.KafkaSourceFunctionDAGBuilder
child_connector_typeYesOnly could be kafka
reader_parallelism_numNoReader parallelism num

Parameters for KafkaConsumer

The underlying Kafka connector uses FlinkKafkaConsumer for reading. The properties or kafka information of the initialized FlinkKafkaConsumer are passed in through options job.reader.connector. You can specify them as follows:

{
  "job": {
    "reader": {
      "connector": {
        "prop_key": "prop_value"   // "prop_key" means property key, while "prop_val" means property value
      }
    }
  }
}

job.reader.connector supports KV configuration in the form of <string,string>, where:

  • prop_key: FlinkKafkaConsumer property key
  • prop_value: FlinkKafkaConsumer property key

Some common property used are listed below:

1. Kafka cluster properties

Property keyRequiredDefault valueOptional valueDescription
connector.bootstrap.serversYeskafka cluster address
connector.topicYestopic to read
connector.group.idopen in new windowYeskafka consumer group

2. Where to start consuming

Property keyIs necessaryDefault valueOptional valueDescription
connector.startup-modeNogroup-offsets1. ealiest-offset: Consume from the earliest offset of the partition
2. latest-offset: Consume from the latest offset of the partition
3. group-offsets: Comsume from the offset of the current consumer group
4. specific-offsets: Specify the offset for each partition, cooperate with connector.specific-offsets
5. specific-timestamp: Consume messages after a certain point in time, cooperate with connector.specific-timestamp
Decide from which offsets to consume
connector.specific-offsetsNoUsed with specific-offsets, the format is a standard json string.
For example:
[{"partition":1,"offset":100},{"partition":2,"offset":200}]
connector.specific-timestampNoUsed with specific-timestamp (ms) to specify the offset to consume

3. Other FlinkKafkaConsumer parameters

FlinkKafkaConsumer supports many parameters, please refer to ConsumerConfig(2.1.0) API] for details . If the user needs to set these parameters, it can be configured through connector.XXX.

For example, to set MAX_PARTITION_FETCH_BYTES_CONFIG to 1024, add the parameter:

{
  "job": {
    "reader": {
      "connector": {
        "connector.max.partition.fetch.bytes": "1024"
      }
    }
  }
}

Parameters for debugging

The Kafka read connector is used in streaming scenarios and will be consumed all the time under normal circumstances. If the user wants to debug by consuming only a limited amount of data, the following parameters can be configured. Note that these parameters need to be added to job.reader block.

Property keyIs necessaryDefault valueDescription
enable_count_modeNofalseWhether to end the current task after sending a piece of data, generally used for testing
count_mode_record_thresholdNo10000If enable_count_mode=true, the current task ends after consuming count_mode_record_threshold pieces of messages
count_mode_run_time_thresholdNo600If enable_count_mode=true, end the current task after running count_mode_record_threshold seconds

Supported message parsing modes

Messages can be pulled from KafkaConsumer in format of ConsumerRecord. BitSail supports two ways to handle ConsumerRecordof. The user can use job.reader.format to decide which method to use.

  • job.reader.format_type="json": Parse according to json format
    • In this mode, BitSail parses the json format string represented by value in ConsumerRecord according to the parameters job.reader.columns set by the user.
    • Therefore, the parameters job.reader.columns is required in this mode
  • job.reader.format_type="streaming_file": Use raw byte value
    • In this mode, BitSail directly deliver the raw bytes value in ConsumerRecord. The specific structure is as follows:
   [
    {"index":0, "name":"key", "type":"binary"},     // message key
    {"index":1, "name":"value", "type":"binary"},   // message value
    {"index":2, "name":"partition", "type":"string"}, // partition of the message
    {"index":3, "name":"offset", "type":"long"}     // offset of the meesage in partition
   ]

Kafka Writer

Parameters

Note that these parameters should be added to job.writer block.

Necessary parameters

Param namesDefault valueDescription
classWriter class name of kafka connector, com.bytedance.bitsail.connector.legacy.kafka.sink.KafkaOutputFormat
kafka_serversKafka's bootstrap server address, multiple bootstrap server addresses are separated by ','
topic_namekafka topic
columnsDescribing fields' names and data types

Optional parameters

Param namesDefault valueDescription
writer_parallelism_numwriter parallelism num
partition_fieldpartition_field contains one or several fields from job.writer.columns, separated by commas (e.g. "id,timestamp"). If partition_field is not empty, when sending data to kafka topic, it will decide which topic to write based on the hash values ​​of these fields in the record
log_failures_onlyfalseWhen KafkaProducer fails to perform an asynchronous send operation:
1. If log_failures_only=true, only log failure information
2. If log_failures_only=false, throw an exception
retries10Number of failed retries for KafkaProducer
retry_backoff_ms1000KafkaProducer's failure retry interval (ms)
linger_ms5000The maximum waiting time (ms) for KafkaProducer to create a single batch

Other parameters

When initializing the KafkaProducer, the user can use job.common.optional to pass initialization parameters, for example:

{
   "job": {
      "common": {
         "optional": {
            "batch.size": 16384,
            "buffer.memory": 33554432
         }
      }
   }
}

Configuration examples: Kafka connector example