Skip to content

Commit

Permalink
Merge pull request #119 from zalando-incubator/bugs/fbenjamin/118
Browse files Browse the repository at this point in the history
Bugs/fbenjamin/118
  • Loading branch information
codechimp authored Oct 28, 2016
2 parents c88a5f2 + 07d7dfb commit d177875
Show file tree
Hide file tree
Showing 10 changed files with 99 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public String getEid() {
return eid;
}

@JsonProperty("event_type")
public String getEventTypeName() {
return eventTypeName;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public class EventType {
public EventType(@JsonProperty("name") String name, @JsonProperty("owning_application") String owningApplication, @JsonProperty("category") EventTypeCategory category,
@JsonProperty("enrichment_strategies") List<EventEnrichmentStrategy> enrichmentStrategies,
@JsonProperty("partition_strategy") PartitionStrategy partitionStrategy, @JsonProperty("schema") EventTypeSchema schema, @JsonProperty("data_key_fields") List<String> dataKeyFields,
@JsonProperty("partition_key_fields") List<String> partitionKeyFields, @JsonProperty("default_statistics") EventTypeStatistics statistics) {
@JsonProperty("partition_key_fields") List<String> partitionKeyFields, @JsonProperty("default_statistic") EventTypeStatistics statistics) {
this.name = name;
this.owningApplication = owningApplication;
this.category = category;
Expand Down Expand Up @@ -109,7 +109,7 @@ public List<String> getDataKeyFields() {
public List<String> getPartitionKeyFields() {
return unmodifiableList(partitionKeyFields);
}

@JsonProperty("default_statistic")
public EventTypeStatistics getStatistics() {
return statistics;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ case class EventType(
schema: EventTypeSchema,
dataKeyFields: Seq[String],
partitionKeyFields: Seq[String],
@JsonProperty("default_statistics") statistics: Option[EventTypeStatistics])
@JsonProperty("default_statistic") statistics: Option[EventTypeStatistics])

/**
* The schema for an EventType, expected to be a json schema in yaml
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ sealed class ConnectionImpl(val host: String,
}

def executeCall(request: HttpRequest): Future[HttpResponse] = {
logger.debug("executingCall {}", request)
logger.debug("executingCall {}", request.method)
val response: Future[HttpResponse] = Source.single(request).via(requestFlow).runWith(Sink.head)(materializer())
logError(response)
response
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public abstract class EventGeneratorBuilder {
private Event newEvent;
private String schemaDefinition;
private EventType eventType = null;
private String owner = "Nakadi-klients(java-integration-test-suite)";
private String owner = "stups_Nakadi-klients(java-integration-test-suite)";
private EventTypeCategory category = EventTypeCategory.UNDEFINED;
private List<EventEnrichmentStrategy> enrichmentStrategies = Collections.emptyList();
private PartitionStrategy partitionStrategy = PartitionStrategy.RANDOM;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,39 +1,22 @@
package org.zalando.nakadi.client.scala

import org.zalando.nakadi.client.utils.ClientBuilder
import java.util.function.Supplier

object ClientFactory {
import sys.process._
import scala.language.postfixOps
def OAuth2Token(): Option[() => String] =
Option(() => "*")
def getJavaClient() =
builder().buildJavaClient();
def OAuth2Token(): Option[() => String] = Option(() => System.getProperty("OAUTH2_ACCESS_TOKENS", null));
def getJavaClient() = builder().buildJavaClient();
def host() = System.getProperty("NAKADI_HOST", "localhost")
def port() = System.getProperty("NAKADI_PORT", "8080").toInt
def verifySSLCertificate() = System.getProperty("NAKADI_VERIFY_SSL_CERTIFICATE", "false").toBoolean
def securedConnection() = System.getProperty("NAKADI_SECURED_CONNECTION", "false").toBoolean

def getScalaClient() = builder().build()

private def builder() = {
// useSandbox()
// useStaging()
useLocal()
}
private def useLocal() = {
new ClientBuilder() //
.withHost("localhost") //
.withPort(8080) //
.withSecuredConnection(false) // s
.withVerifiedSslCertificate(false) // s
}
private def useSandbox() = {
ClientBuilder()
.withHost("nakadi-sandbox.aruha-test.zalan.do")
.withPort(443)
.withSecuredConnection(true) //s
.withVerifiedSslCertificate(false) //s
.withTokenProvider(ClientFactory.OAuth2Token())
}
private def useStaging() = {
useSandbox().withHost("nakadi-staging.aruha-test.zalan.do")
.withHost(host()) //
.withPort(port()) //
.withSecuredConnection(securedConnection()) // s
.withVerifiedSslCertificate(verifySSLCertificate()) // s
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ trait EventGenerator {
/**
* Returns the owningApplication value. Default = "Nakadi-klients(integration-test-suite)"
*/
def owner: String = "Nakadi-klients(integration-test-suite)"
def owner: String = "stups_Nakadi-klients(integration-test-suite)"

/**
* Returns the category value. Default = UNDEFINED
Expand Down
47 changes: 43 additions & 4 deletions it/src/test/java/org/zalando/client/java/SimpleEventTest.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package org.zalando.client.java;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.*;

import java.util.List;
import java.util.Optional;
Expand All @@ -16,6 +15,7 @@
import org.zalando.nakadi.client.java.model.Event;
import org.zalando.nakadi.client.java.model.EventStreamBatch;
import org.zalando.nakadi.client.java.model.EventType;
import org.zalando.nakadi.client.java.model.EventTypeStatistics;
import org.zalando.nakadi.client.java.test.event.generator.EventGenerator;
import org.zalando.nakadi.client.java.test.event.generator.EventIntegrationHelper;
import org.zalando.nakadi.client.java.test.event.simple.MySimpleEvent;
Expand Down Expand Up @@ -58,7 +58,7 @@ public void validatePublishedNrOfEvents() throws InterruptedException, Execution
.withEventTypeId("SimpleEventTest-validatePublishedNrOfEvents").build();
EventIntegrationHelper it = new EventIntegrationHelper(gen, client);
assertTrue("EventType should be created", it.createEventType());
Thread.sleep(1000);// Creation can take time.
Thread.sleep(5000);// Creation can take time.
Optional<EventType> eventTypeOpt = it.getEventType();
assertTrue("Did not return the eventType", eventTypeOpt.isPresent());
List<Event> createdEvents = it.publishEvents(nrOfEvents);
Expand Down Expand Up @@ -133,7 +133,46 @@ public void validateCreatedEventType() throws InterruptedException {
assertEquals(eventType.getPartitionStrategy(), originalEventType.getPartitionStrategy());
assertEquals(eventType.getSchema().getSchema(), originalEventType.getSchema().getSchema());
assertEquals(eventType.getSchema().getType(), originalEventType.getSchema().getType());
assertEquals(eventType.getStatistics(), originalEventType.getStatistics());
assertNull(eventType.getStatistics());
}
@Test
public void validateCreatedEventTypeWithStatistics() throws InterruptedException {
Integer messagesPerMinute = 2400;
Integer messageSize = 20240;
Integer readParallelism = 8;
Integer writeParallelism = 7;
EventGenerator gen = new MySimpleEventGenerator(){
@Override
public EventTypeStatistics getStatistics() {
return new EventTypeStatistics(messagesPerMinute, messageSize, readParallelism, writeParallelism);
}
}
.withEventTypeId("SimpleEventTest-validateCreatedEventType").build();
EventIntegrationHelper it = new EventIntegrationHelper(gen, client);
assertTrue("EventType should be created", it.createEventType());
Thread.sleep(1000);// Creation can take time.
Optional<EventType> eventTypeOpt = it.getEventType();
assertTrue("Did not return the eventType", eventTypeOpt.isPresent());

EventType originalEventType = it.getEventType().get();
EventType eventType = eventTypeOpt.get();
assertEquals(eventType.getCategory(), originalEventType.getCategory());
assertEquals(eventType.getDataKeyFields(), originalEventType.getDataKeyFields());
assertEquals(eventType.getName(), originalEventType.getName());
assertEquals(eventType.getOwningApplication(), originalEventType.getOwningApplication());
assertEquals(eventType.getPartitionKeyFields(), originalEventType.getPartitionKeyFields());
assertEquals(eventType.getPartitionStrategy(), originalEventType.getPartitionStrategy());
assertEquals(eventType.getSchema().getSchema(), originalEventType.getSchema().getSchema());
assertEquals(eventType.getSchema().getType(), originalEventType.getSchema().getType());
assertNotNull(eventType.getStatistics());
assertEquals(messageSize, originalEventType.getStatistics().getMessageSize());
assertEquals(messagesPerMinute, originalEventType.getStatistics().getMessagesPerMinute());
assertEquals(readParallelism, originalEventType.getStatistics().getReadParallelism());
assertEquals(writeParallelism, originalEventType.getStatistics().getWriteParallelism());
assertEquals(eventType.getStatistics().getMessageSize(), originalEventType.getStatistics().getMessageSize());
assertEquals(eventType.getStatistics().getMessagesPerMinute(), originalEventType.getStatistics().getMessagesPerMinute());
assertEquals(eventType.getStatistics().getReadParallelism(), originalEventType.getStatistics().getReadParallelism());
assertEquals(eventType.getStatistics().getWriteParallelism(), originalEventType.getStatistics().getWriteParallelism());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
package org.zalando.nakadi.client.scala

import org.scalatest.BeforeAndAfterAll
import org.scalatest.Matchers
import org.scalatest.WordSpec
import org.zalando.nakadi.client.scala.model.Cursor
import org.zalando.nakadi.client.scala.model.ScalaJacksonJsonMarshaller
import org.zalando.nakadi.client.scala.model.EventEnrichmentStrategy
import org.zalando.nakadi.client.scala.model.PartitionStrategy
import org.zalando.nakadi.client.scala.model.EventTypeStatistics
import org.zalando.nakadi.client.scala.model.PartitionStrategyType
import org.zalando.nakadi.client.scala.model.ScalaJacksonJsonMarshaller
import org.zalando.nakadi.client.scala.test.factory.EventIntegrationHelper
import org.zalando.nakadi.client.scala.test.factory.events.MySimpleEvent
import org.zalando.nakadi.client.scala.test.factory.events.SimpleEventListener
import org.zalando.nakadi.client.scala.model.PartitionStrategy
import org.scalatest.BeforeAndAfterAll
import org.zalando.nakadi.client.scala.model.EventEnrichmentStrategy

class SimpleEventTest extends WordSpec with Matchers with BeforeAndAfterAll {

import org.scalatest.Matchers._
import ClientFactory._
import ScalaJacksonJsonMarshaller._
import MySimpleEvent._
Expand Down Expand Up @@ -48,6 +48,7 @@ class SimpleEventTest extends WordSpec with Matchers with BeforeAndAfterAll {
val listener = new SimpleEventListener()

it.createEventType() shouldBe true
Thread.sleep(2000)
val events = it.publishEvents(nrOfEvents)
client.subscribe(eventGenerator.eventTypeName, StreamParameters(cursor = cursor), listener)
val receivedEvents = listener.waitToReceive(nrOfEvents)
Expand Down Expand Up @@ -121,6 +122,38 @@ class SimpleEventTest extends WordSpec with Matchers with BeforeAndAfterAll {
eventType.enrichmentStrategies shouldBe it.eventType.enrichmentStrategies
eventType.partitionKeyFields shouldBe it.eventType.partitionKeyFields
}
"Validate created EventType wiht Statistics" in {
val eventGenerator = new DefaultMySimpleEventGenerator() {
def eventTypeId = s"SimpleEventIntegrationTest-Validate-Created-EventType"
override def statistics: Option[EventTypeStatistics] = Option(EventTypeStatistics(2400, 20240, 4, 4))
}
val it = new EventIntegrationHelper(eventGenerator, client)

it.createEventType() shouldBe true

Thread.sleep(5000)

val optionalOfCreatedEventType = it.getEventType()

optionalOfCreatedEventType.isDefined shouldBe true

val Some(eventType) = optionalOfCreatedEventType
eventType.category shouldBe it.eventType.category
eventType.dataKeyFields shouldBe null
eventType.name shouldBe it.eventType.name
eventType.owningApplication shouldBe it.eventType.owningApplication
eventType.partitionStrategy shouldBe it.eventType.partitionStrategy
eventType.schema shouldBe it.eventType.schema
eventType.statistics shouldBe it.eventType.statistics
eventType.enrichmentStrategies shouldBe it.eventType.enrichmentStrategies
eventType.partitionKeyFields shouldBe it.eventType.partitionKeyFields
eventType.statistics should not be null
eventType.statistics.get.messageSize shouldBe it.eventType.statistics.get.messageSize
eventType.statistics.get.messagesPerMinute shouldBe it.eventType.statistics.get.messagesPerMinute
eventType.statistics.get.readParallelism shouldBe it.eventType.statistics.get.readParallelism
eventType.statistics.get.writeParallelism shouldBe it.eventType.statistics.get.writeParallelism

}

"Validate nr of partitions after Creation of EventType" in {
val eventGenerator = new DefaultMySimpleEventGenerator() {
Expand Down
2 changes: 1 addition & 1 deletion project/Build.scala
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ lazy val client = withDefaults(
.settings(
name := projectName,
organization := "org.zalando.nakadi.client",
version := "2.0.1",
version := "2.0.2",
crossPaths := false,
scalaVersion := "2.11.8",
findbugsReportType := Some(ReportType.FancyHtml),
Expand Down

0 comments on commit d177875

Please sign in to comment.