微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Snowplow Enrich设置问题

如何解决Snowplow Enrich设置问题

collector.conf

collector {
 
  interface = "0.0.0.0"
  interface = ${?COLLECTOR_INTERFACE}
  port = 8181
  port = ${?COLLECTOR_PORT}

  # optional SSL/TLS configuration
  ssl {
    enable = false
    enable = ${?COLLECTOR_SSL}
    # whether to redirect HTTP to HTTPS
    redirect = false
    redirect = ${?COLLECTOR_SSL_REDIRECT}
    port = 9543
    port = ${?COLLECTOR_SSL_PORT}
  }

  paths {
    # "/com.acme/track" = "/com.sNowplowanalytics.sNowplow/tp2"
    # "/com.acme/redirect" = "/r/tp2"
    # "/com.acme/iglu" = "/com.sNowplowanalytics.iglu/v1"
  }

  # Configure the P3P policy header.
  p3p {
    policyRef = "/w3c/p3p.xml"
    CP = "NOI DSP COR NID PSA OUR IND COM NAV STA"
  }

  crossDomain {
    enabled = false
    # Domains that are granted access,*.acme.com will match http://acme.com and http://sub.acme.com
    enabled = ${?COLLECTOR_CROSS_DOMAIN_ENABLED}
    domains = [ "*" ]
    domains = [ ${?COLLECTOR_CROSS_DOMAIN_DOMAIN} ]
    # Whether to only grant access to HTTPS or both HTTPS and HTTP sources
    secure = true
    secure = ${?COLLECTOR_CROSS_DOMAIN_SECURE}
  }

  cookie {
    enabled = true
    enabled = ${?COLLECTOR_COOKIE_ENABLED}
    expiration = "365 days"
    expiration = ${?COLLECTOR_COOKIE_EXPIRATION}
    # Network cookie name
    name = zanui_collector_cookie
    name = ${?COLLECTOR_COOKIE_NAME}
    domains = [
        "{{cookieDomain1}}" # e.g. "domain.com" -> any origin domain ending with this will be matched and domain.com will be returned
        "{{cookieDomain2}}" # e.g. "secure.anotherdomain.com" -> any origin domain ending with this will be matched and secure.anotherdomain.com will be returned
        # ... more domains
    ]
    domains += ${?COLLECTOR_COOKIE_DOMAIN_1}
    domains += ${?COLLECTOR_COOKIE_DOMAIN_2}
    fallbackDomain = ""
    fallbackDomain = ${?FALLBACK_DOMAIN}
    secure = false
    secure = ${?COLLECTOR_COOKIE_SECURE}
    httpOnly = false
    httpOnly = ${?COLLECTOR_COOKIE_HTTP_ONLY}
    sameSite = "{{cookieSameSite}}"
    sameSite = ${?COLLECTOR_COOKIE_SAME_SITE}
  }

  doNottrackCookie {
    enabled = false
    enabled = ${?COLLECTOR_DO_NOT_TRACK_COOKIE_ENABLED}
   # name = {{doNottrackCookieName}}
    name = zanui-collector-do-not-track-cookie
   # value = {{doNottrackCookieValue}}
    value = zanui-collector-do-not-track-cookie-value
  }

 
  cookieBounce {
    enabled = false
    enabled = ${?COLLECTOR_COOKIE_BOUNCE_ENABLED}
    name = "n3pc"
    name = ${?COLLECTOR_COOKIE_BOUNCE_NAME}
    fallbackNetworkUserId = "00000000-0000-4000-A000-000000000000"
    fallbackNetworkUserId = ${?COLLECTOR_COOKIE_BOUNCE_FALLBACK_NETWORK_USER_ID}
    forwardedProtocolHeader = "X-Forwarded-Proto"
    forwardedProtocolHeader = ${?COLLECTOR_COOKIE_BOUNCE_FORWARDED_PROTOCOL_HEADER}
  }
  enableDefaultRedirect = true
  enableDefaultRedirect = ${?COLLECTOR_ALLOW_REDIRECTS}
  redirectMacro {
    enabled = false
    enabled = ${?COLLECTOR_REDIRECT_MACRO_ENABLED}
    # Optional custom placeholder token (defaults to the literal `${SP_NUID}`)
    placeholder = "[TOKEN]"
    placeholder = ${?COLLECTOR_REDIRECT_REDIRECT_MACRO_PLACEHOLDER}
  }
  rootResponse {
    enabled = false
    enabled = ${?COLLECTOR_ROOT_RESPONSE_ENABLED}
    statusCode = 302
    statusCode = ${?COLLECTOR_ROOT_RESPONSE_STATUS_CODE}
    # Optional,defaults to empty map
    headers = {
      Location = "https://127.0.0.1/",Location = ${?COLLECTOR_ROOT_RESPONSE_HEADERS_LOCATION},X-Custom = "something"
    }
    # Optional,defaults to empty string
    body = "302,redirecting"
    body = ${?COLLECTOR_ROOT_RESPONSE_BODY}
  }
  cors {
    accessControlMaxAge = 5 seconds
    accessControlMaxAge = ${?COLLECTOR_CORS_ACCESS_CONTROL_MAX_AGE}
  }

  # Configuration of prometheus http metrics
  prometheusMetrics {
    enabled = false
  }

  streams {
    # Events which have successfully been collected will be stored in the good stream/topic
    good = sNowplow-collected-good-events-stream
    good = ${?COLLECTOR_STREAMS_GOOD}

    # Events that are too big (w.r.t Kinesis 1MB limit) will be stored in the bad stream/topic
    bad = sNowplow-collected-bad-events-stream
    bad = ${?COLLECTOR_STREAMS_BAD}

    useIpAddressAsPartitionKey = false
    useIpAddressAsPartitionKey = ${?COLLECTOR_STREAMS_USE_IP_ADDRESS_AS_PARTITION_KEY}

    sink {
      enabled = kinesis
      enabled = ${?COLLECTOR_STREAMS_SINK_ENABLED}

      # Region where the streams are located
      region = ap-southeast-2
      region = ${?COLLECTOR_STREAMS_SINK_REGION}

      threadPoolSize = 10
      threadPoolSize = ${?COLLECTOR_STREAMS_SINK_THREAD_POOL_SIZE}
      aws {
        accessKey = env
        accessKey = ${?COLLECTOR_STREAMS_SINK_AWS_ACCESS_KEY}
        secretKey = env
        secretKey = ${?COLLECTOR_STREAMS_SINK_AWS_SECRET_KEY}
      }

      # Minimum and maximum backoff periods,in milliseconds
      backoffPolicy {
        #minBackoff = {{minBackoffMillis}}
        minBackoff = 10
        #maxBackoff = {{maxBackoffMillis}}
        maxBackoff = 10      
        }

 }

    buffer {
      byteLimit = 4500000
      byteLimit = ${?COLLECTOR_STREAMS_BUFFER_BYTE_LIMIT}
      recordLimit =500 # Not supported by Kafka; will be ignored
      recordLimit = ${?COLLECTOR_STREAMS_BUFFER_RECORD_LIMIT}
      timeLimit = 5000
      timeLimit = ${?COLLECTOR_STREAMS_BUFFER_TIME_LIMIT}
    }
  }

}

akka {
  loglevel = DEBUG # 'OFF' for no logging,'DEBUG' for all logging.
  loglevel = ${?AKKA_LOGLEVEL}
  loggers = ["akka.event.slf4j.Slf4jLogger"]
  loggers = [${?AKKA_LOGGERS}]

  http.server {
    remote-address-header = on
    remote-address-header = ${?AKKA_HTTP_SERVER_REMOTE_ADDRESS_HEADER}

    raw-request-uri-header = on
    raw-request-uri-header = ${?AKKA_HTTP_SERVER_RAW_REQUEST_URI_HEADER}

    # Define the maximum request length (the default is 2048)
    parsing {
      max-uri-length = 32768
      max-uri-length = ${?AKKA_HTTP_SERVER_PARSING_MAX_URI_LENGTH}
      uri-parsing-mode = relaxed
      uri-parsing-mode = ${?AKKA_HTTP_SERVER_PARSING_URI_PARSING_MODE}
    }
  }

}

运行命令:

java -Dcom.amazonaws.sdk.disableCbor -jar sNowplow-stream-collector-kinesis-1.0.0.jar --config collector.conf

enricher.conf

enrich {

  streams {

    in {
      # Stream/topic where the raw events to be enriched are located
      raw = sNowplow-collected-good-events-stream
      raw = ${?ENRICH_STREAMS_IN_RAW}
    }

    out {
      # Stream/topic where the events that were successfully enriched will end up
      enriched = sNowplow-collected-good-events-stream
     
      # Stream/topic where the event that Failed enrichment will be stored
      bad = sNowplow-collected-bad-events-stream
      bad = ${?ENRICH_STREAMS_OUT_BAD}
      # Stream/topic where the pii tranformation events will end up
     # pii = {{outPii}}
     # pii = ${?ENRICH_STREAMS_OUT_PII}

      partitionKey = event_id
      partitionKey = ${?ENRICH_STREAMS_OUT_PARTITION_KEY}
    }

    sourceSink {

      enabled =  kinesis
      enabled =  ${?ENRICH_STREAMS_SOURCE_SINK_ENABLED}

      region = ap-southeast-2

       aws {
         accessKey = env
         accessKey = ${?ENRICH_STREAMS_SOURCE_SINK_AWS_ACCESS_KEY}
         secretKey = env
         secretKey = ${?ENRICH_STREAMS_SOURCE_SINK_AWS_SECRET_KEY}
       }

       maxRecords = 10000

      initialPosition = TRIM_HORIZON
      initialTimestamp = "2020-09-10T10:00:00Z"

      backoffPolicy {
        minBackoff = 1000
        minBackoff = ${?ENRICH_STREAMS_SOURCE_SINK_BACKOFF_POLICY_MIN_BACKOFF}
        maxBackoff = 5000
        maxBackoff = ${?ENRICH_STREAMS_SOURCE_SINK_BACKOFF_POLICY_MAX_BACKOFF}
      }


    }


    buffer {
      byteLimit = 1000000000
      byteLimit = ${?ENRICH_STREAMS_BUFFER_BYTE_LIMIT}
      recordLimit = 10 # Not supported by Kafka; will be ignored
      recordLimit = ${?ENRICH_STREAMS_BUFFER_RECORD_LIMIT}
      timeLimit = 5000
      timeLimit = ${?ENRICH_STREAMS_BUFFER_TIME_LIMIT}
    }

    appName = "zanui-enricher-app"
    appName = ${?ENRICH_STREAMS_APP_NAME}
  }

}

运行命令:

java -jar sNowplow-stream-enrich-kinesis-1.0.0.jar --config enricher.conf --resolver file:resolver.json

S3加载程序配置

source = "kinesis"

sink = "kinesis"

aws {
  accessKey = "env"
  secretKey = "env"
}

# Config for NSQ
 nsq {
  channelName = "nsqSourceChannelName"
    
  # Host name for NSQ tools
  host = "127.0.0.1"

  # HTTP port for nsqd
   port = 4150

  # HTTP port for nsqlookupd
   lookupPort = 4161
}

kinesis {

  initialPosition = "TRIM_HORIZON"
  initialTimestamp = "2017-05-17T10:00:00Z"
  maxRecords = 10000
  region = "ap-southeast-2"
  appName = "zanui-enricher-app"
}

streams {
  inStreamName = "sNowplow-collected-good-events-stream"
  outStreamName = "sNowplow-collected-bad-events-stream"

  buffer {
    byteLimit = 1000000000 # Not supported by NSQ; will be ignored
    recordLimit = 10
    timeLimit = 5000 # Not supported by NSQ; will be ignored
  }
}

s3 {
  region = "ap-southeast-2"
  bucket = "sNowplow-enriched-good-events"
  partitionedBucket = "sNowplow-enriched-good-events/partitioned"
  dateFormat = "{YYYY}/{MM}/{dd}/{HH}"
  outputDirectory = "zanui-enriched/good"
  filenamePrefix = "zanui-output"
  format = "gzip"
  # Maximum Timeout that the application is allowed to fail for (in milliseconds)
  maxTimeout = 300000 # 5 minutes
}

运行命令:

java -jar sNowplow-s3-loader-0.6.0.jar --config my.conf

但是此SNowplow S3加载程序没有执行任何操作,因此我使用Data Fireshose将流传输到S3存储桶。

当我尝试在Data Fireshose中使用Aws Lambda时,会给出错误消息

{"attemptsMade":4,"arrivalTimestamp":1600154159619,"errorCode":"Lambda.FunctionError","errorMessage":"The Lambda function was successfully invoked but it returned an error result.","attemptEndingTimestamp":1600154235846,"rawData":"****","lambdaArn":"arn:aws:lambda:ap-southeast-2:573188294151:function:sNowplow-json-transformer-lambda:$LATEST"}
    {"attemptsMade":4,"arrivalTimestamp":1600154161523,"rawData":"*****=","lambdaArn":"arn:aws:lambda:ap-southeast-2:573188294151:function:sNowplow-json-transformer-lambda:$LATEST"}

如果我不使用lambda,则在S3优质浓缩桶中为页面查看事件创建日志,但与此同时,在S3不良浓缩桶中为相同的页面查看事件创建日志

{"schema":"iglu:com.sNowplowanalytics.sNowplow.badrows/collector_payload_format_violation/jsonschema/1-0-0","data":{"processor":{"artifact":"sNowplow-stream-enrich","version":"1.0.0"},"failure":{"timestamp":"2020-09-15T07:16:02.488Z","loader":"thrift","message":{"error":"error deserializing raw event: Cannot read. Remote side has closed. Tried to read 2 bytes,but only got 1 bytes. (This is often indicative of an internal error on the server side. Please check your server logs.)"}},"payload":"****="}}

我反复遵循了文档,但是我对流丰富的设置感到困惑。我不明白的是,如果不使用自定义架构,是否需要设置数据库以实现流丰富?因为既然我试图用Javascript Tracker的Page View事件进行测试,所以我没有设置任何数据库。但是我已经提供了DynamoDb创建,IAM角色编辑的访问权限。

如果有人做过,请帮助我设置扫雪机。请:(

解决方法

我写了一个博客,介绍如何在AWS中设置Snowplow Analytics。

这里是link,希望对您有所帮助。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。