Hudi connector


Hudi connector

Parent document: Connectors

The BitSail hudi connector supports reading and writing to hudi tables. The main function points are as follows:

  • Support streaming write to Hudi table.
  • Support batch write to Hudi table.
  • Support batch read from Hudi table.

Supported hudi versions

  • 0.11.1

Maven dependency

<dependency>
   <groupId>com.bytedance.bitsail</groupId>
   <artifactId>bitsail-connector-hudi</artifactId>
   <version>${revision}</version>
</dependency>

Hudi reader

Supported data types

  • Basic Data types:
    • Integer type:
      • tinyint
      • smallint
      • int
      • bigint
    • Float type:
      • float
      • double
      • decimal
    • Time type:
      • timestamp
      • date
    • String type:
      • string
      • varchar
      • char
    • Bool type:
      • boolean
    • Binary type:
      • binary
  • Composited data types:
    • map
    • array

Parameters

The following mentioned parameters should be added to job.reader block when using, for example:

{
  "job": {
    "reader":{
      "hoodie":{
        "datasource":{
          "query":{
            "type":"snapshot"
          }
        }
      },
      "path":"/path/to/table",
      "class":"com.bytedance.bitsail.connector.legacy.hudi.dag.HudiSourceFunctionDAGBuilder",
      "table":{
        "type":"MERGE_ON_READ"
      }
    }
  }
}

Necessary parameters

Param nameRequiredOptional valueDescription
classYesHudi read connector class name, com.bytedance.bitsail.connector.legacy.hudi.dag.HudiSourceFunctionDAGBuilder
pathYesthe path of the table, could be HDFS, S3, or other file systems.
table.typeYesThe type of the Hudi table, MERGE_ON_READ or COPY_ON_WRITE
hoodie.datasource.query.typeYesQuery type, could be snapshot or read_optimized

Optional parameters

Param nameRequiredOptional valueDescription
reader_parallelism_numNoRead parallelism num

Hudi writer

Supported data type

  • Basic data types supported:
    • Integer type:
      • tinyint
      • smallint
      • int
      • bigint
    • Float type:
      • float
      • double
      • decimal
    • Time type:
      • timestamp
      • date
    • String type:
      • string
      • varchar
      • char
    • Bool type:
      • boolean
    • Binary type:
      • binary
  • Composited data types supported:
    • map
    • array

Parameters

The following mentioned parameters should be added to job.writer block when using, for example:

{
  "job": {
    "writer": {
      "hoodie": {
        "bucket": {
          "index": {
            "num": {
              "buckets": "4"
            },
            "hash": {
              "field": "id"
            }
          }
        },
        "datasource": {
          "write": {
            "recordkey": {
              "field": "id"
            }
          }
        },
        "table": {
          "name": "test_table"
        }
      },
      "path": "/path/to/table",
      "index": {
        "type": "BUCKET"
      },
      "class": "com.bytedance.bitsail.connector.legacy.hudi.sink.HudiSinkFunctionDAGBuilder",
      "write": {
        "operation": "upsert"
      },
      "table": {
        "type": "MERGE_ON_READ"
      },
      "source_schema": "[{\"name\":\"id\",\"type\":\"bigint\"},{\"name\":\"test\",\"type\":\"string\"},{\"name\":\"timestamp\",\"type\":\"string\"}]",
      "sink_schema": "[{\"name\":\"id\",\"type\":\"bigint\"},{\"name\":\"test\",\"type\":\"string\"},{\"name\":\"timestamp\",\"type\":\"string\"}]"
    }
  }
}

Necessary parameters

Param nameIs necessaryOptional valueDescription
classYesHudi write class name, com.bytedance.bitsail.connector.legacy.hudi.sink.HudiSinkFunctionDAGBuilder
write.operationYesupsert insert bulk_insert
table.typeYesMERGE_ON_READ COPY_ON_WRITE
pathYespath to the Hudi table, could be HDFS, S3, or other file system. If path not exists, the table will be created on this path.
format_typeYesformat of the input data source, currently only support json
source_schemaYesschema used to deserialize source data.
sink_schemaYesschema used to write hoodie data
hoodie.table.nameopen in new windowYesthe name of the hoodie table

Optional parameters

For more advance parameter, please checkout FlinkOptions.java class.

Param nameIs necessaryOptional valueDescription
hoodie.datasource.write.recordkey.fieldfalseFor upsert operation, we need to define the primary key.
index.typefalseFor upsert operation, we need to define the index type. could be STATE or BUCKET
hoodie.bucket.index.num.bucketsfalseIf we use Bucket index, we need to define the bucket number.
hoodie.bucket.index.hash.fieldfalseIf we use Bucket index, we need to define a field to determine hash index.

Hudi Compaction

Parameters

Compaction has well-defined reader and writer parameters

{
  "job":{
    "reader":{
      "path":"/path/to/table",
      "class":"com.bytedance.bitsail.connector.legacy.hudi.dag.HudiCompactSourceDAGBuilder"
    },
    "writer":{
      "path":"/path/to/table",
      "class":"com.bytedance.bitsail.connector.legacy.hudi.dag.HudiCompactSinkDAGBuilder"
    }
  }
}

Necessary parameters

Param nameRequiredOptional valueDescription
job.reader.classYesHudi compaction read connector class name, com.bytedance.bitsail.connector.legacy.hudi.dag.HudiCompactSourceDAGBuilder
job.writer.classYesHudi compaction writer connector class name, com.bytedance.bitsail.connector.legacy.hudi.dag.HudiCompactSinkDAGBuilder
job.reader.pathYesthe path of the table, could be HDFS, S3, or other file systems.
job.writer.pathYesthe path of the table, could be HDFS, S3, or other file systems.

Optional parameters

Param nameRequiredOptional valueDescription
writer_parallelism_numNoparallelism to process the compaction

Configuration examples: Hudi connector example