PublishApi
- Last UpdatedMar 27, 2025
- 2 minute read
The PublishApi mirrors the Publish Rest Api.
It provides a way to manage the publishing of data to a catalog. It supports publishing to versioned, volatile, and stream layer types.
For the full PublishApi specification, see PublishApi.
The following pages describe the per-request configuration and metrics.
Example
val client = BaseClient()
val publishApi = client.of[PublishApi]
val blobApi = client.of[BlobApi]
def generateHandle = Random.alphanumeric.take(16).mkString
def hugeData = Seq.fill(1)(generateHandle).mkString
val someHrn = "hrn:here:data::olp-here-test:whatever"
val someLayer = "whateverLayer"
val contentType = "application/json"
val dataHandle = generateHandle
val data = hugeData.getBytes
val myPublicationId = "my-publication-xyz-123"
val publication = Publication(
id = Some(myPublicationId),
layerIds = Some(Seq(someLayer))
)
def waitUploadToComplete(linksRefs: BlobInitResponseLinks,
hrn: String,
layer: String,
dataHandle: String): Unit = {
// you should implement much fancier retry logic here
var response = ""
var count = 0
while (response != "completed") {
if (count > 100) throw new RuntimeException("upload did not complete within 100 seconds")
else count += 1
if (!response.isEmpty) Thread.sleep(1000)
response = blobApi
.getMultipartUploadStatus(hrn, linksRefs.status.get.href, layer, dataHandle, "", None)
.executeToEntity()
.await
.status
.get
.value
}
}
for {
ongoingPublication: Publication <- publishApi
.initPublication(someHrn, publication)
.executeToEntity()
links <- blobApi
.startMultipartUpload(someHrn,
someLayer,
dataHandle,
None,
Some(MultipartUploadMetadata(None, contentType)))
.executeToEntity()
linksRefs = links.links.get
part <- blobApi
.uploadPart(someHrn,
linksRefs.uploadPart.get.href,
someLayer,
dataHandle,
"",
"1",
contentType,
data.length,
data)
.execute()
etag = part.headers("etag").head
_ <- blobApi
.completeMultipartUpload(
someHrn,
linksRefs.complete.get.href,
someLayer,
dataHandle,
"",
None,
Some(MultipartCompleteRequest(Some(Seq(MultipartCompletePart(etag, 1))))))
.executeToEntity()
_ = waitUploadToComplete(linksRefs, someHrn, someLayer, dataHandle)
status <- publishApi
.submitPublication(someHrn, ongoingPublication.id.get)
.executeToStatusCode()
} yield {
assert(status == 204)
}
public void testPublication() {
BaseClient client = BaseClientJava.instance();
PublishApi publishApi = new PublishApi(client);
BlobApi blobApi = new BlobApi(client);
String someHrn = "hrn:here:data::olp-here-test:whatever";
String someLayer = "whateverLayer";
String dataHandle = UUID.randomUUID().toString().replaceAll("-", "").substring(0, 16);
byte[] data = new byte[20];
new Random().nextBytes(data);
String myPublicationId = "my-publication-xyz-123";
Publication publication =
new JPublication.Builder()
.withId(myPublicationId)
.withLayerIds(Arrays.asList(someLayer))
.build();
String contentType = "application/json";
Publication ongoingPublication =
publishApi
.initPublication(someHrn, publication, Optional.empty())
.executeToEntity()
.toCompletableFuture()
.join();
BlobInitResponse links =
blobApi
.startMultipartUpload(
someHrn,
someLayer,
dataHandle,
Optional.empty(),
Optional.of(
new JMultipartUploadMetadata.Builder().withContentType(contentType).build()))
.executeToEntity()
.toCompletableFuture()
.join();
BlobInitResponseLinks linksRefs = links.getLinks().get();
HttpResponse part =
blobApi
.uploadPart(
someHrn,
linksRefs.getUploadPart().get().getHref(),
someLayer,
dataHandle,
"",
"1",
contentType,
(long) data.length,
data,
Optional.empty(),
Optional.empty())
.execute()
.toCompletableFuture()
.join();
String etag = part.headers().get("etag").get().head();
blobApi
.completeMultipartUpload(
someHrn,
linksRefs.getComplete().get().getHref(),
someLayer,
dataHandle,
"",
Optional.empty(),
Optional.of(
new JMultipartCompleteRequest.Builder()
.withParts(
Arrays.asList(
new JMultipartCompletePart.Builder()
.withEtag(etag)
.withNumber(1)
.build()))
.build()))
.executeToEntity()
.toCompletableFuture()
.join();
try {
waitUploadToComplete(blobApi, linksRefs, someHrn, someLayer, dataHandle);
Integer status =
publishApi
.submitPublication(someHrn, ongoingPublication.getId().get(), Optional.empty())
.executeToStatusCode()
.toCompletableFuture()
.join();
assert (status == 204);
} catch (Exception ex) {
// log some problem
} finally {
}
}
private void waitUploadToComplete(
BlobApi blobApi, BlobInitResponseLinks linksRefs, String hrn, String layer, String dataHandle)
throws InterruptedException {
// you should implement much fancier retry logic here
String response = "";
int count = 0;
while (response != "completed") {
if (count > 100) throw new RuntimeException("upload did not complete within 100 seconds");
else count += 1;
if (!response.isEmpty()) Thread.sleep(1000L);
response =
blobApi
.getMultipartUploadStatus(
hrn,
linksRefs.getStatus().get().getHref(),
layer,
dataHandle,
"",
Optional.empty())
.executeToEntity()
.toCompletableFuture()
.join()
.getStatus()
.get();
}
}