Spring Cloud Stream

GCS source with Spring Cloud Stream.

Overview.

GCP is gaining traction in the enterprise world. More companies looking towards GCP and products it provides and one of their core products is GCS(Google Cloud Storage). GCS allows world-wide storage and retrieval of any amount of data at any time. With a rise in popularity of GCP(and GCS as its core product), there is a natural need for this service to be integrated with other systems/frameworks, like Spring Framework. However, you’ll be struggling to find complete examples about GCS + Spring Framework(Spring Cloud). So, here I will share detailed steps on how to build Spring Cloud Stream and GCS as it’s a source and Apache Kafka as a sink. Our application will read all files from GCS bucket and then will poll it for newly added files, transform it to AVRO format and push it to Apache Kafka. Without further due, let’s get started.

1. Getting started.

Let’s start from creating of Spring Boot project and adding Gradle dependencies.
build.gradle file:

dependencies {
/ Starter for using Spring Boot’s Actuator which provides production ready features to help you monitor and manage your application
implementation 'org.springframework.boot:spring-boot-starter-actuator'
// Provides integrations with Google Cloud Storage and Spring Resource
implementation 'org.springframework.cloud:spring-cloud-gcp-starter-storage'
// Provides message converters that can be used for Apache Avro serialization
implementation 'org.springframework.cloud:spring-cloud-stream-schema'
// Needed for using Kafka binder
implementation 'org.springframework.cloud:spring-cloud-stream-binder-kafka'
// Spring Integration’s file support extends the Spring Integration core with a dedicated vocabulary to deal with reading, writing, and transforming files
implementation 'org.springframework.integration:spring-integration-file'
// Needed for Avro serialization
implementation "org.apache.avro:avro:${avroVersion}"
implementation("io.confluent:kafka-avro-serializer:${confluentVersion}") {
exclude(module: 'slf4j-log4j12')
exclude(module: 'slf4j-api')
}
// Needed for binding our custom configurations in *.yml/*.properties files to Java classes
annotationProcessor 'org.springframework.boot:spring-boot-configuration-processor'
testImplementation 'org.springframework.boot:spring-boot-configuration-processor-test'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'org.springframework.cloud:spring-cloud-stream-test-support'
}

Also, I created a GCS bucket with one simple file, that contains an array of JSONs. Here is an example of a file:

{
  "history":[
    {
      "_id": "5d65a996de2b0ec59209c8f0",
      "index": 0,
      "guid": "343a48e5–4a35–4224–94bb-0411b387dec8",
      "isActive": false,
      "balance": "$3,091.86",
      "picture": "http://placehold.it/32x32",
      "age": 27,
      "eyeColor": "brown",
      "firstName": "Ashlee",
      "lastName": "Ellison",
      "company": "EARTHMARK",
      "email": "undefined.ellison@earthmark.io",
      "phone": "+1 (950) 449–3361",
      "address": "515 Franklin Street, Greenfields, North Dakota, 6556"
    },
    {
      "_id": "5d65a9969a7841114112c00b",
      "index": 1,
      "guid": "e3e6572e-7405–43a3–8e9d-19cdd785b6c2",
      "isActive": true,
      "balance": "$2,584.03",
      "picture": "http://placehold.it/32x32",
      "age": 36,
      "eyeColor": "blue",
      "firstName": "Marta",
      "lastName": "Smith",
      "company": "ZENTIME",
      "email": "undefined.smith@zentime.info",
      "phone": "+1 (873) 465–3326",
      "address": "196 Ovington Avenue, Canterwood, Iowa, 3143"
    },
    {
      "_id": "5d65a996a86308f5b6791ef4",
      "index": 2,
      "guid": "99b5148e-97ad-43e5-a550-f13071f80324",
      "isActive": true,
      "balance": "$3,664.99",
      "picture": "http://placehold.it/32x32",
      "age": 30,
      "eyeColor": "brown",
      "firstName": "Katelyn",
      "lastName": "Petersen",
      "company": "MYOPIUM",
      "email": "undefined.petersen@myopium.biz",
      "phone": "+1 (835) 587–3884",
      "address": "241 Robert Street, Northridge, Missouri, 9848"
    },
    {
      "_id": "5d65a9963bbf580640f0bbda",
      "index": 3,
      "guid": "6732fe16-b488–4b4c-9d4a-b447a9cba5e4",
      "isActive": true,
      "balance": "$2,749.94",
      "picture": "http://placehold.it/32x32",
      "age": 37,
      "eyeColor": "green",
      "firstName": "Lilian",
      "lastName": "Diaz",
      "company": "COSMETEX",
      "email": "undefined.diaz@cosmetex.us",
      "phone": "+1 (877) 448–3986",
      "address": "679 Kane Street, Jennings, Rhode Island, 7505"
    },
    {
      "_id": "5d65a9963f70ef1debc83b8a",
      "index": 4,
      "guid": "2f2c06c2–1b62–4ad1–99a4–42113405bd9f",
      "isActive": false,
      "balance": "$3,493.47",
      "picture": "http://placehold.it/32x32",
      "age": 27,
      "eyeColor": "brown",
      "firstName": "Luna",
      "lastName": "Mcdaniel",
      "company": "CENTREE",
      "email": "undefined.mcdaniel@centree.org",
      "phone": "+1 (835) 560–2775",
      "address": "695 Claver Place, Harold, Delaware, 5992"
    },
    {
      "_id": "5d65a99679275d2d7de304b4",
      "index": 5,
      "guid": "0b3aea6e-c647–44ce-aced-84fb97109949",
      "isActive": true,
      "balance": "$2,881.79",
      "picture": "http://placehold.it/32x32",
      "age": 33,
      "eyeColor": "green",
      "firstName": "Carr",
      "lastName": "Hester",
      "company": "COMCUR",
      "email": "undefined.hester@comcur.com",
      "phone": "+1 (914) 437–3871",
      "address": "935 Ridge Court, Leeper, Federated States Of Micronesia, 4784"
    },
    {
      "_id": "5d65a996df442bb03a534b34",
      "index": 6,
      "guid": "16cbfb8e-ed60–40df-85f1–1462a15600be",
      "isActive": true,
      "balance": "$2,303.35",
      "picture": "http://placehold.it/32x32",
      "age": 38,
      "eyeColor": "blue",
      "firstName": "Nona",
      "lastName": "Paul",
      "company": "SNACKTION",
      "email": "undefined.paul@snacktion.co.uk",
      "phone": "+1 (841) 479–3822",
      "address": "761 Clinton Avenue, Lawrence, Pennsylvania, 2491"
    },
    {
      "_id": "5d65a9963f1bbcfdb97f4f2e",
      "index": 7,
      "guid": "f69fad7d-f1b2–4ddc-ba2f-ab3b63acb5d4",
      "isActive": true,
      "balance": "$2,681.02",
      "picture": "http://placehold.it/32x32",
      "age": 33,
      "eyeColor": "brown",
      "firstName": "Walls",
      "lastName": "Kim",
      "company": "DIGINETIC",
      "email": "undefined.kim@diginetic.tv",
      "phone": "+1 (908) 473–2406",
      "address": "839 Oak Street, Freeburn, Arkansas, 6023"
    },
    {
      "_id": "5d65a9965694de42ddccf709",
      "index": 8,
      "guid": "dc01c6fd-bdc4–47fd-a769–00eef76cf6af",
      "isActive": true,
      "balance": "$1,378.01",
      "picture": "http://placehold.it/32x32",
      "age": 29,
      "eyeColor": "blue",
      "firstName": "Lorraine",
      "lastName": "Cochran",
      "company": "FUELWORKS",
      "email": "undefined.cochran@fuelworks.net",
      "phone": "+1 (891) 515–2546",
      "address": "562 Wythe Avenue, Frierson, Washington, 3206"
    },
    {
      "_id": "5d65a99670316dda6c297dbb",
      "index": 9,
      "guid": "1f138527-d7fd-4e65–8c34–5349ce16a524",
      "isActive": true,
      "balance": "$2,832.18",
      "picture": "http://placehold.it/32x32",
      "age": 30,
      "eyeColor": "green",
      "firstName": "Melva",
      "lastName": "Owen",
      "company": "ZOLARITY",
      "email": "undefined.owen@zolarity.biz",
      "phone": "+1 (844) 504–3524",
      "address": "927 Dinsmore Place, Konterra, Georgia, 5719"
    },
    {
      "_id": "5d65a996423287e83c657283",
      "index": 10,
      "guid": "ba708081-a30a-42eb-804d-5d27e301bc5f",
      "isActive": false,
      "balance": "$3,847.32",
      "picture": "http://placehold.it/32x32",
      "age": 32,
      "eyeColor": "green",
      "firstName": "Francesca",
      "lastName": "Merritt",
      "company": "GORGANIC",
      "email": "undefined.merritt@gorganic.me",
      "phone": "+1 (879) 513–3525",
      "address": "179 Oriental Court, Yonah, Virginia, 5656"
    }
  ]
}

2. GCP and Kafka configuration

I won’t explain here how to create an account with GCP and then create a GCS bucket, but I will provide links to help you with it:
https://cloud.google.com/storage/docs/creating-buckets
https://cloud.google.com/
Generate and download key for the service account for the project that contains gcs-bucket. Then you should encode your generated key to Base64 format.
echo -n 'your-input-file' | openssl base64
Create/update application.yml in the resources folder.

spring:
  application:
    name: gcs-source
  cloud:
    gcp: # configuring access to GCP.
      project-id: project-id
      credentials:
        encoded-key: "encoded-key" # encoded to Base64 GCP key
    stream:
      bindings:
        output:
          destination: ${kafka-properties.output-topic} # Target Kafka topic
          producer:
            useNativeEncoding: true # the outbound message is serialized directly by the client library
      kafka:
        bindings:
          output:
            content-type: application/json
            producer:
              value-serializer: io.confluent.kafka.serializers.KafkaJsonSerializer
              configuration:
                schema.registry.url: ${kafka-properties.schema-registry-path}
        binder:
          configuration:
            brokers: ${kafka-properties.kafka-brokers}
  kafka:
    producer:
      value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer

kafka-properties:
  schema-registry-path: http://localhost:8081
  kafka-brokers: localhost:9092
  output-topic: client_info_with_json

gcs:
  bucket: olga-bucket

In properties file, we’re configuring GCP access and all configurations that are needed for Spring Cloud Stream Kafka Binder.
For launching Kafka cluster locally, use this Docker image:
docker run — rm -p 2181:2181 -p 3030:3030 -p 8081–8083:8081–8083 \
-p 9581–9585:9581–9585 -p 9092:9092 -e ADV_HOST=127.0.0.1 \
landoop/fast-data-dev:latest

3. Creating GCS source Configuration

Spring Cloud GCP provides two inbound adapters GcsInboundFileSynchronizingMessageSource and GcsStreamingMessageSource and one outbound adapter — GcsMessageHandler, that allows read and write files to GCS through MessageChannels. In my example I’m going to use GcsStreamingMessageSource.
In the code below we are configuring inbound streaming channel adapter. @InboundChannelAdapter annotation makes our streaming adapter a SourcePollingChannelAdapter an integration component that will poll from specified GCS bucket. It has one attribute — channel, that requires a MessageChannel bean name.
GcsSourceConfigration.java file:

@Configuration
public class GcsSourceConfiguration {

  @Bean
  public Gson gson() {
    return new Gson();
  }

  @Bean
  @InboundChannelAdapter(channel = "streamChannel")
  public MessageSource <InputStream> streamingAdapter(
      Storage gcs, GcsConfigurationProperties props) {
    GcsStreamingMessageSource adapter =
        new GcsStreamingMessageSource(new GcsRemoteFileTemplate(new GcsSessionFactory(gcs)));
    adapter.setRemoteDirectory(props.getBucket());
    return adapter;
  }
}

4.Creating a POJO that will contain information from JSON.

ClientInfo.java file:

public class ClientInfo {
  @SerializedName("_id")
  private String id;

  @SerializedName("index")
  private int index;

  @SerializedName("balance")
  private String balance;

  @SerializedName("age")
  private int age;

  @SerializedName("eyeColor")
  private String eyeColor;

  @SerializedName("firstName")
  private String firstName;

  @SerializedName("lastName")
  private String lastName;

  @SerializedName("company")
  private String company;

  @SerializedName("email")
  private String email;

  @SerializedName("phone")
  private String phone;

  @SerializedName("address")
  private String address;
// getters, setters and toString method omitted for brevity
}

The value that specified in @SerializedName is a valid JSON field name. With the help of this annotation, we can quickly and easy transform our JSON to ClientInfo object.

5. Creating an AVRO schema.

Apache Avro is a recommended choice of message format when you work with Apache Kafka. It’s a serialization system that will convert your data to binary format. Apache Avro relies on schemas. Binary encoded data does not include field names and data information, this information is stored in Avro Schema, that defined using JSON format. Fields are defined as an array of objects, each of the object contains a name, type, doc, and default value.
resources/avro/client-info-v1.json file:

{
  "type": "record",
  "namespace": "io.enfuse.gcssource",
  "name": "ClientInfo",
  "version": "1",
  "fields": [
    {
      "name": "id",
      "type": [
        "null",
        "string"
      ],
      "doc": "id",
      "default": null
    },
    {
      "name": "index",
      "type": [
        "null",
        "string"
      ],
      "doc": "index",
      "default": null
    },
    {
      "name": "balance",
      "type": [
        "null",
        "string"
      ],
      "doc": "balance",
      "default": null
    },
    {
      "name": "age",
      "type": [
        "null",
        "string"
      ],
      "doc": "age",
      "default": null
    },
    {
      "name": "eyeColor",
      "type": [
        "null",
        "string"
      ],
      "doc": "eyeColor",
      "default": null
    },
    {
      "name": "firstName",
      "type": [
        "null",
        "string"
      ],
      "doc": "firstName",
      "default": null
    },
    {
      "name": "lastName",
      "type": [
        "null",
        "string"
      ],
      "doc": "lastName",
      "default": null
    },
    {
      "name": "company",
      "type": [
        "null",
        "string"
      ],
      "doc": "company",
      "default": null
    },
    {
      "name": "email",
      "type": [
        "null",
        "string"
      ],
      "doc": "email",
      "default": null
    },
    {
      "name": "phone",
      "type": [
        "null",
        "string"
      ],
      "doc": "phone",
      "default": null
    },
    {
      "name": "address",
      "type": [
        "null",
        "string"
      ],
      "doc": "address",
      "default": null
    }
  ]
}

6. Creating ConfigurationProperties for GCS

GcsConfigurationProperties.java file:

@Configuration
@EnableConfigurationProperties
@ConfigurationProperties(prefix = "gcs")
public class GcsConfigurationProperties {
  private String bucket;

  public String getBucket() {
    return bucket;
  }

  public void setBucket(String bucket) {
    this.bucket = bucket;
  }

}

7. Creating MessageUtils

MessageUtils contains 3 methods. convertStringToJsonArray() for converting our file to JsonArray, since it contains an array of JSONs inside. convertJsonToClientInfo() for converting each JSON from the file to ClientInfo POJO, that we already configured in the code above. convertToAvro() that will create a GenericRecord from our ClientInfo object, using Avro schema that was defined earlier.
MessageUtils.java file:

public class MessageUtils {
  private static Logger log = LoggerFactory.getLogger(MessageUtils.class);
  private static Schema schema;

  static {
    try {
      schema =
          new Schema.Parser()
              .parse(new ClassPathResource("avro/client-info-v1.json").getInputStream());
    } catch (IOException e) {
      log.error("avro schema parsing error" + e);
    }
  }

  private static final Gson gson = new Gson();

  public static<T>  Message<T> message(T val) {
    return MessageBuilder.withPayload(val).build();
  }

  public static GenericRecord convertToAvro(ClientInfo clientInfo) {

    GenericRecord avroClientInfo = new GenericData.Record(schema);

    avroClientInfo.put("id", clientInfo.getId());
    avroClientInfo.put("index", String.valueOf(clientInfo.getIndex()));
    avroClientInfo.put("age", String.valueOf(clientInfo.getAge()));
    avroClientInfo.put("balance", clientInfo.getBalance());
    avroClientInfo.put("company", clientInfo.getCompany());
    avroClientInfo.put("email", clientInfo.getEmail());
    avroClientInfo.put("eyeColor", clientInfo.getEyeColor());
    avroClientInfo.put("firstName", clientInfo.getFirstName());
    avroClientInfo.put("lastName", clientInfo.getLastName());
    avroClientInfo.put("phone", clientInfo.getPhone());
    avroClientInfo.put("address", clientInfo.getAddress());


    return avroClientInfo;
  }

  public static JSONArray convertStringToJsonArray(String file) throws JSONException {
    JSONObject jsonObject = new JSONObject(file);
    JSONArray historyList = null;
    try {
      historyList = jsonObject.getJSONArray("history");

    } catch (JSONException e) {
      e.printStackTrace();
    }
    return historyList;
  }

  public static ClientInfo convertJsonToClientInfo(JSONObject json) {
    return gson.fromJson(String.valueOf(json), ClientInfo.class);
  }
}

8. Creating an outbound channel adapter.

@EnableBinding(Source.class) annotation converts our regular Spring app to Spring Cloud Stream application and binds OUTPUT channel within the Processor interface.
In the GcsSourceRunner class we create outboundChannelAdapter() method annotated with @ServiceActivator(inputChannel = "streamChannel") and input channel name, that we configured earlier (GcsSourceConfiguration.java).outboundChannelAdapter() receives Message<?> , which we later use to get our JSON file(array of objects) from GCS. Once we get JSON file we iterate over it and extract individual objects, converting them to POJO. After that, we simply use our utility methods to convert POJO to AVRO and send it to Kafka. Pls, note that Kafka producer library will automatically register AVRO schema(with Confluent Schema Registry) on your behalf when you send your AVRO message for the first time.

@Component
@EnableBinding(Source.class)
public class GcsSourceRunner {
  Logger log = LoggerFactory.getLogger(GcsSourceRunner.class);

  private Source source;
  private GcsConfigurationProperties properties;
  private Storage gcs;

  @Autowired
  public GcsSourceRunner(Source source,  GcsConfigurationProperties properties, Storage gcs) {
    this.source = source;
    this.properties = properties;
    this.gcs = gcs;
  }

  @ServiceActivator(inputChannel = "streamChannel")
  public void outboundChannelAdapter(<?> message) throws IOException, InterruptedException, JSONException {
    String filePath = message.getHeaders().get(FileHeaders.REMOTE_FILE, String.class);
    Blob blob = gcs.get(properties.getBucket(), filePath);

    String s = new String(blob.getContent(), "UTF-8");

    log.info("*******FILE PATH " + filePath);

    JSONArray jsonArray = MessageUtils.convertStringToJsonArray(s);

    for (int i = 0; i < jsonArray.length(); i++) {
      JSONObject json = jsonArray.getJSONObject(i);
      ClientInfo clientInfo = MessageUtils.convertJsonToClientInfo(json);
      System.out.println("****************** Client Info " + clientInfo);
      source.output().send(MessageBuilder.withPayload(MessageUtils.convertToAvro(clientInfo)).build());
    }

  }
}

Conclusion.

In 8 simple steps, we’ve created GCS source for Spring Cloud Stream, that polls GCS bucket for new files, converts each file to the AVRO records and pushes them to the specified Kafka topic.
NOTE: Depending on the size of your bucket, an application can have a delay before it will start pushing Avro messages to Kafka topic.

Author

Cahlen Humphreys