Powered by Zoomin Software. For more details please contactZoomin

Data Client Base Library - Developer Guide

Product category
Technology
Doc type
Version
Product lifecycle
This publication

Data Client Base Library - Developer Guide: PublishApi

PublishApi

The PublishApi mirrors the Publish Rest Api.

It provides a way to manage the publishing of data to a catalog. It supports publishing to versioned, volatile, and stream layer types.

For the full PublishApi specification, see PublishApi.

The following pages describe the per-request configuration and metrics.

Example


    val client = BaseClient()
    val publishApi = client.of[PublishApi]
    val blobApi = client.of[BlobApi]

    def generateHandle = Random.alphanumeric.take(16).mkString
    def hugeData = Seq.fill(1)(generateHandle).mkString

    val someHrn = "hrn:here:data::olp-here-test:whatever"
    val someLayer = "whateverLayer"
    val contentType = "application/json"
    val dataHandle = generateHandle
    val data = hugeData.getBytes
    val myPublicationId = "my-publication-xyz-123"
    val publication = Publication(
      id = Some(myPublicationId),
      layerIds = Some(Seq(someLayer))
    )

    def waitUploadToComplete(linksRefs: BlobInitResponseLinks,
                             hrn: String,
                             layer: String,
                             dataHandle: String): Unit = {
      // you should implement much fancier retry logic here
      var response = ""
      var count = 0
      while (response != "completed") {
        if (count > 100) throw new RuntimeException("upload did not complete within 100 seconds")
        else count += 1
        if (!response.isEmpty) Thread.sleep(1000)
        response = blobApi
          .getMultipartUploadStatus(hrn, linksRefs.status.get.href, layer, dataHandle, "", None)
          .executeToEntity()
          .await
          .status
          .get
          .value
      }
    }

    for {
      ongoingPublication: Publication <- publishApi
        .initPublication(someHrn, publication)
        .executeToEntity()
      links <- blobApi
        .startMultipartUpload(someHrn,
                              someLayer,
                              dataHandle,
                              None,
                              Some(MultipartUploadMetadata(None, contentType)))
        .executeToEntity()
      linksRefs = links.links.get
      part <- blobApi
        .uploadPart(someHrn,
                    linksRefs.uploadPart.get.href,
                    someLayer,
                    dataHandle,
                    "",
                    "1",
                    contentType,
                    data.length,
                    data)
        .execute()
      etag = part.headers("etag").head
      _ <- blobApi
        .completeMultipartUpload(
          someHrn,
          linksRefs.complete.get.href,
          someLayer,
          dataHandle,
          "",
          None,
          Some(MultipartCompleteRequest(Some(Seq(MultipartCompletePart(etag, 1))))))
        .executeToEntity()
      _ = waitUploadToComplete(linksRefs, someHrn, someLayer, dataHandle)
      status <- publishApi
        .submitPublication(someHrn, ongoingPublication.id.get)
        .executeToStatusCode()
    } yield {
      assert(status == 204)
    }
    

  public void testPublication() {
    BaseClient client = BaseClientJava.instance();
    PublishApi publishApi = new PublishApi(client);
    BlobApi blobApi = new BlobApi(client);

    String someHrn = "hrn:here:data::olp-here-test:whatever";
    String someLayer = "whateverLayer";
    String dataHandle = UUID.randomUUID().toString().replaceAll("-", "").substring(0, 16);
    byte[] data = new byte[20];
    new Random().nextBytes(data);
    String myPublicationId = "my-publication-xyz-123";
    Publication publication =
        new JPublication.Builder()
            .withId(myPublicationId)
            .withLayerIds(Arrays.asList(someLayer))
            .build();
    String contentType = "application/json";

    Publication ongoingPublication =
        publishApi
            .initPublication(someHrn, publication, Optional.empty())
            .executeToEntity()
            .toCompletableFuture()
            .join();

    BlobInitResponse links =
        blobApi
            .startMultipartUpload(
                someHrn,
                someLayer,
                dataHandle,
                Optional.empty(),
                Optional.of(
                    new JMultipartUploadMetadata.Builder().withContentType(contentType).build()))
            .executeToEntity()
            .toCompletableFuture()
            .join();

    BlobInitResponseLinks linksRefs = links.getLinks().get();

    HttpResponse part =
        blobApi
            .uploadPart(
                someHrn,
                linksRefs.getUploadPart().get().getHref(),
                someLayer,
                dataHandle,
                "",
                "1",
                contentType,
                (long) data.length,
                data,
                Optional.empty(),
                Optional.empty())
            .execute()
            .toCompletableFuture()
            .join();

    String etag = part.headers().get("etag").get().head();

    blobApi
        .completeMultipartUpload(
            someHrn,
            linksRefs.getComplete().get().getHref(),
            someLayer,
            dataHandle,
            "",
            Optional.empty(),
            Optional.of(
                new JMultipartCompleteRequest.Builder()
                    .withParts(
                        Arrays.asList(
                            new JMultipartCompletePart.Builder()
                                .withEtag(etag)
                                .withNumber(1)
                                .build()))
                    .build()))
        .executeToEntity()
        .toCompletableFuture()
        .join();

    try {
      waitUploadToComplete(blobApi, linksRefs, someHrn, someLayer, dataHandle);

      Integer status =
          publishApi
              .submitPublication(someHrn, ongoingPublication.getId().get(), Optional.empty())
              .executeToStatusCode()
              .toCompletableFuture()
              .join();

      assert (status == 204);
    } catch (Exception ex) {
      // log some problem
    } finally {
    }
  }

  private void waitUploadToComplete(
      BlobApi blobApi, BlobInitResponseLinks linksRefs, String hrn, String layer, String dataHandle)
      throws InterruptedException {
    // you should implement much fancier retry logic here
    String response = "";
    int count = 0;
    while (response != "completed") {
      if (count > 100) throw new RuntimeException("upload did not complete within 100 seconds");
      else count += 1;
      if (!response.isEmpty()) Thread.sleep(1000L);
      response =
          blobApi
              .getMultipartUploadStatus(
                  hrn,
                  linksRefs.getStatus().get().getHref(),
                  layer,
                  dataHandle,
                  "",
                  Optional.empty())
              .executeToEntity()
              .toCompletableFuture()
              .join()
              .getStatus()
              .get();
    }
  }
  
In This Article
Was this article helpful?
TitleResults for “How to create a CRG?”Also Available inAlert