kafka-connect-datagen
is a Kafka Connect connector for generating mock data.
It is available in Confluent Hub.
It is not suitable for production.
There are multiple released versions of this connector, starting with 0.1.0
.
The instructions below use version 0.1.1
as an example, but you can substitute any of the other released versions.
In fact, unless specified otherwise, we recommend using the latest released version to get all of the features and bug fixes.
You may install the kafka-connect-datagen
connector from Confluent Hub.
confluent-hub install confluentinc/kafka-connect-datagen:0.1.1
for a the 0.1.1
version of the connector (you can use any released version), or
confluent-hub install confluentinc/kafka-connect-datagen:latest
for the latest released version of the connector.
Alternatively, you may build and install the kafka-connect-datagen
connector from latest code.
Here we use v0.1.1
to reference the git tag for the 0.1.1
version, but the same pattern works for all released versions.
git checkout v0.1.1
mvn clean package
confluent-hub install target/components/packages/confluentinc-kafka-connect-datagen-0.1.1.zip
Here is an example of how to run the kafka-connect-datagen
on a local install:
confluent start connect
confluent config datagen-pageviews -d config/connector_pageviews.config
confluent status connectors
confluent consume test1 --value-format avro --max-messages 5 --property print.key=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --from-beginning
This project provides several Dockerfiles that you can use to create Docker images with this connector. The Dockerfiles differ slightly with each release, so be sure the connector version in the Dockerfile matches the version you want to use.
You may install into your Docker image the kafka-connect-datagen
connector from Confluent Hub.
The following command builds the image using the Dockerfile-confluenthub
specification and tags that image with confluentinc/kafka-connect-datagen:0.1.1
(be sure to use the correct datagen connector version in the label).
docker build . -f Dockerfile-confluenthub -t confluentinc/kafka-connect-datagen:0.1.1
Alternatively, you may build and install the kafka-connect-datagen
connector from latest code.
Here we use v0.1.1
to reference the git tag for the 0.1.1
version, but the same pattern works for all released versions.
Be sure to use the same version in the Docker image tag (e.g., confluentinc/kafka-connect-datagen:0.1.1
) that you checked out (e.g., v0.1.1
).
git checkout v0.1.1
mvn clean package
docker build . -f Dockerfile-local -t confluentinc/kafka-connect-datagen:0.1.1
Here is an example of how to run the kafka-connect-datagen
with Docker Compose.
If you used a different Docker image tag, be sure to use that here instead of confluentinc/kafka-connect-datagen:0.1.1
.
docker-compose up -d --build
curl -X POST -H "Content-Type: application/json" --data @config/connector_pageviews.config http://localhost:8083/connectors
docker-compose exec connect kafka-console-consumer --topic pageviews --bootstrap-server kafka:29092 --property print.key=true --max-messages 5 --from-beginning
See all Kafka Connect configuration parameters.
See kafka-connect-datagen
configuration parameters and their defaults.
There are a few quickstart schema specifications bundled with kafka-connect-datagen
, and they are listed in this directory.
To use one of these bundled schema, refer to this mapping and in the configuration file, set the parameter quickstart
to the associated name.
For example:
...
"quickstart": "users",
...
You can also define your own schema specifications if you want to customize the fields and their values to be more domain specific or to match what your application is expecting.
Under the hood, kafka-connect-datagen
uses Avro Random Generator, so the only constraint in writing your own schema specification is that it is compatible with Avro Random Generator.
To define your own schema:
- Create your own schema file
/path/to/your_schema.avsc
that is compatible with Avro Random Generator - In the connector configuration, remove the configuration parameter
quickstart
and add the parametersschema.filename
andschema.keyfield
:
...
"schema.filename": "your_schema.avsc",
"schema.keyfield": "<field representing the key>",
...
- Set
CONNECT_CLASSPATH
to the directory that has the fileyour_schema.avsc
. For example, if your file is/Users/alice/schemas/your_schema.avsc
, then setschema.filename=your_schema.avsc
in the connector configuration above andCONNECT_CLASSPATH=/Users/alice/schemas/
in the export command below. This needs to be done before running Connect, so restart Connect if needed.
export CONNECT_CLASSPATH=</path/to/>
The Avro schemas used by the kafka-connect-datagen
declare the "rules" for generating the data.
These rules declare a list of primitives or more complex data types, length of data, and other properties about the generated data.
Examples of these schema files are listed in this directory.
But these schemas are independent of the format of the data produced to Kafka and independent of the schema in Confluent Schema Registry.
- To define the format of the data produced to Kafka, you must set the format type in your connector configuration.
For example to produce Avro records to Kafka, in the connector configuration set the
value.converter
andvalue.converter.schema.registry.url
parameters:
...
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://localhost:8081",
...
Or to produce JSON records to Kafka, in the connector configuration set the value.converter
parameter:
...
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
...
- The schema in Confluent Schema Registry declares the record fields and their types.
As an example, consider the following "rule" in the schema specification to generate a field
userid
:
...
{"name": "userid", "type": {
"type": "string",
"arg.properties": {
"regex": "User_[1-9]{0,1}"
}
}},
...
Here is the corresponding field in the schema in Confluent Schema Registry (if you are using Avro format for producing data to Kafka):
{"name": "userid", "type": ["null", "string"], "default": null},