По Вашему запросу ничего не найдено.
Рекомендуем Вам попробовать сделать следующее:
The Oracle Cloud Infrastructure Streaming service provides a fully managed, scalable, and durable storage option for continuous, high-volume streams of data that you can consume and process in near real-time.
For more information, see the following topics in the documentation:
For a list of the regions that presently run the Streaming service, see the documentation.
The API endpoint is constructed as follow: https://streaming.$_REGION.oci.oraclecloud.com
As an example for the variable
OCI Streaming is fully managed; from the underlying infrastructure to provisioning, deployment, maintenance, security patches, replication and consumer groups, which makes application development easier.
When you create a stream inside OCI Streaming, Oracle automatically creates and manages 3 streaming nodes distributed across 3 different AD(s) (or fault domains for single AD-regions), ensuring that your streams stay highly available and your data highly durable.
OCI Streaming allows you to emit data and retrieve the data in near real time. The number of use cases are nearly unlimited, from messaging to complex data streams processing.
Here are some of the many possible uses for Streaming:
Start using OCI Streaming by:
Overall, the amount of throughput you can have access to doesn’t have any limits. You just need to proactively design your stream with the right number of partitions.
The hard limits of the system are:
In our APIs, a partition is represented as a string.
If you create a stream with five partitions, you can access them by using the strings "0", "1", "2", "3", or "4".
Don't rely on partition identifiers being represented as numeric value.
Offsets aren't dense. Expect to see message offsets always increment, but sometimes not by 1. Don't rely on that to make future offset calculations.
For example, if you publish two messages going on the same partition, the first message could have offset 42 and the second message could have offset 45 (offset 43 and 44 being non-existent.
A Stream can be viewed as an append-only log file that contains your messages.
Streams are divided into a number of partitions for scalability. Partitions allow you to distribute a stream by splitting the messages across multiple nodes (or brokers) — each partition can be placed on a separate machine to allow for multiple consumers to read from a topic in parallel.
A 64-bit encoded message is what you emit into a topic.
Each message within a partition has an identifier called its offset. Consumers can read messages starting from a specific offset and are allowed to read from any offset point they choose. Consumers can also commit the latest processed offset so they can resume their work without replaying or missing a message if they stop and then restart.
A key is an identifier used to group related messages.
You can create a new stream by using our Console or our API. See API here.
Your stream is created for a particular region and tenancy and optionally for a dedicated compartment. A steam's data is replicated across the entire region allowing it to tolerate AD loss or network splits without disrupting the service and offers built-in high availability in a region.
The time to provision depends on the number of partitions. Creating a new partition takes up to 10 seconds.
The number of partitions for your stream depends on the throughput expectations of your application (expected throughput = average recond size x maximum number of records written per sectond).
The throughput of a Oracle Cloud Infrastructure stream is defined by a partition. A partition provides 1MB/sec data input and 2MB/sec data output.
You can send 1,000 requests per second to a partition.
Streaming SDK will support the same languages as other OCI SDK implementations, there are no additional languages that will be supported for Streaming service specifically.
Once a stream is created and active you can publish messages. For publishing, you can use the Write API (putMessages). The message will be published to a partition in the stream. If there is more than one partition, the partition where the message will be published is calculated using the message's key.
If the key is null, the partition will be calculated using a subset of the value. For messages with a null key, do not expect messages with same value to go on the same partition, since the partitioning scheme may change; sending a null key will effectively put the message in a random partition.
If you want to make sure that messages with the same value go to the same partition, you should use the same key for those messages.
As soon as the OCI Streaming API acknowledges your putMessage without error, this messages is durable.
guaranteesOCI Streaming linear reads and writes to a partitioning key.
When client requests exceed the limits, OCI Streaming denies the request and send out an error exception message.
The throttling mechanism is activated when the following thresholds are exceeded:
We recommend message batching for the following reasons:
The size of a batch of messages shouldn't exceed 1 MB. If this limit is exceeded, the throttling mechanism is triggered.
You can use one of the following approaches: chunking or sending the message via Object Storage.
Large payloads can be split into multiple, smaller chunks that the Streaming service can accept.
The chunks are stored in the service in the same way that ordinary (not-chunked) messages are stored. The only difference is that the consumer must keep the chunks and combine them into the real message when all the chunks have been collected.
The chunks in the partition can be interwoven with ordinary message.
A large payload is put in Object Storage and only the pointer to that data is transferred. The receiver recognizes this type of pointer payload, transparently reads the data from Object Storage, and provides it to the end user.
A common mistake is providing the incorrect date format.
The Streaming service supports ISO-8601, including the time zone for all dates.
The PutMessagesResultsEntry class provides the following methods:
At this time, there's no way to see the latest published message without publishing a message.
There is a mechanism to see the latest committed offset, per group or partition. Look into the getGroup endpoint.
Consuming messages requires you to:
Refer to the technical documentation for step by step guide on consuming data from a stream.
OCI Streaming provides two kinds of consume API:
Consumers can be configured to consume messages as part of a group. Stream partitions are distributed among members of a group so that messages from any single partition will only be sent to a single consumer.
Partition assignments are re-balanced as consumers join or leave the group.
We recommend that consumer applications take care of duplicates.
If you want to know if your consumer is falling behind (you are producing faster than you are consuming), you can use the difference between timestamp of the message and the current time. If this number gets higher, you might want to spawn a new consumer to take over some of the partitions from your first consumer.
A single consumer is an entity that reads messages from one or more streams.
This entity could exist alone or be part of a consumer group.
A pointer to a location in a stream. This location could be a pointer to a specific offset or time in a partition, or to a groups' current location.
The following cursor types are available: TRIM_HORIZON, AT_OFFSET, AFTER_OFFSET, AT_TIME, and LATEST. For details, see the documentation.
No, the cursor should be created outside of the loop. After you create a cursor, you can start consuming messages by using the GetMessages method. Each call to GetMessages returns the cursor to use in the next GetMessages call.
The returned cursor is not null and expires in 5 minutes. As long as you keep consuming, you should never have to re-create a cursor.
GetMessages, Commit, and Heartbeat all return a new cursor to use for subsequent calls.
A Java code snippet is available in the documentation.
In a couple of error cases, it's necessary to create new cursors. We recommend that you handle that as part of the failure strategy.
This is possible through policies. Tenant A must create a policy that gives Tenant B stream-pull access
When you aren't using group-cursors, storing processed offsets must be managed by the consumer.
When you are using group-consumers, processed offsets can be committed, in case of failure.
When you create a cursor, specify which type of cursor to use. When the application starts consuming messages, it needs to store which offset it reached/stopped at.
This scenario is practical when doing a demo or proof-of-concept, using only one partition per stream. In a production environment with multiples partitions, we recommend using consumer groups.
he getLimit( ) method of the GetMessageRequest class returns the maximum number of messages. You can specify any value up to 10,000. By default, the service returns as many messages as possible.
Consider your average message size to avoid exceeding throughput on the stream.
Streaming service getMessage batch sizes are based on the average message size published to the particular stream.
For a detailed explanation, see the documentation.
Consumer groups provide the following advantages:
An instance is a member of a consumer group. It's defined when a group cursor is created.
Partition reads are balanced among instances in a consumer group.
The instance name identifies that member of the group for operations related to offset management.
We recommend that you use unique instance names for each member of the consumer group.
The best practice is to use a concatenated string of useful information.
The following components of the Streaming service have timeouts:
Each instance within the consumer group needs to heartbeat before the 30-second timeout. For example, if a message is taking too long to process, we recommend that the instance send a heartbeat.
When reaching the 30-second timeout, the instance is removed from the consumer group and its partition is reassigned to another instance (if possible). This event is called rebalancing.
Rebalancing is the process in which a group of instances (belonging to the same consumer group) coordinate to own a mutually exclusive set of partitions that belongs to a specific stream.
At the end of a successful rebalance operation for a consumer group, every partition within the stream is owned by a single or multiple consumer instances within the group.
To ensure uniform distribution, you want to create a good value for your message keys. To do so, consider the selectivity and cardinality of your streaming data.
Aim for high cardinality with low selectivity.
In the Streaming service, the key is hashed and then used to determine the partition. Messages with the same key go to the same partition. Messages with different keys might go to different or to the same partitions.
As a producer, there is no way for you to explicitly control the partition to which a message goes.
If the data is sent with keys, the producer can't force it to a particular partition.
Yes, StreamClient is thread-safe.
When an object is stateless, it doesn't have to retain any data between invocations. Because there's no state to modify, one thread can't affect the result of another thread that invokes the object's operations. For this reason, a stateless class is inherently thread-safe.
Consumer lag is not yet implemented in the Streaming service.
The produced offset for each message is returned after each successful putMessage call.
The message offset is included with every message returned by getMessage calls.
You can determine lag by tracking the delta between produced and consumed offsets, by partition.
To determine if your consumer is falling behind (you're producing faster than you're consuming), you can use the timestamp of the message. If the consumer is falling behind, consider spawning a new consumer to take over some of the partitions from your first consumer. If you're falling behind on a single partition, there's no way to recover.
Consider the following options:
If you want to know how many messages are left to consume in a given partition, use a cursor of type , get the offset of the next LATESTpublished message, and make the delta with the offset that you are currently consuming.
Because we don't have dense-offset, you will get a rough estimate only. However, if your producer stopped producing, you won't be able to get that information (because you'll never get the offset of the next published message).
Reassignment happens only on commit and timeout. We recommend using commitOnGet=true, and relying on a heartbeat if the processing takes longer than 30 seconds. Writing custom commit logic is complicated, and full of race conditions and considerations. There are many cases in which some internal state is changed, and the client is required to handle the situation.
Yes, StreamClient is thread-safe.
An instance of a consumer group is consider inactive if it doesn't send a heartbeat for more than 30 seconds, or the process is terminated.
When that happens, a rebalance within the consumer group occurs to handle the partitions previously consumed by the inactive instance.
Such an instance is considered a new instance. A rebalance is triggered, and the instance is assigned a partition to start consuming messages.
The Streaming service makes no guarantee about whether the same partition (the one assigned before termination) is reassigned to this instance.
The Streaming service provides "at-least-once" semantics with the consumer group. Consider when offsets are committed in a message loop. If a consumer crashes before committing a batch of messages, that batch might be given to another consumer. When a partition is given to another consumer, the consumer uses the latest committed offset to start consumption. The consumer doesn't get messages before the committed offset.
The Streaming service handles offset commits automatically for the consumer group when the getCommitOnGet is set to true. We recommend using this method because it reduces the application complexity; that is, the application should not implement any commit mechanism. To overwrite this setting and implement a custom offset commit mechanism, set getCommitOnGet to false during when creating the consumer group.
CommitOnGet means that offsets from the previous request are committed. To illustrate this feature, consider the following example:
For a consumer A:
The orchestration system starts a new consumer B:
In this example, no messages were lost, but offsets 101–115 were processed at least once, which mean that they could have been processed more than once.
In this example, a portion (15) of the messages might have been processed and might be redelivered to the new consumer B after a fault event, but no data is lost.
Currently, it's not possible to update an individual partition in a consumer group.
The current behavior of the updateGroup call is to reset committedOffset for all partitions, which causes unnecessary old message retrievals for the partitions that were assigned to the other healthy consumers.
In a consumer group, the instances that are consuming the messages need to send heartbeats before reaching the timeout of 30 seconds. If an instance fails to send a heartbeat, the Streaming service considers the instance inactive and triggers a reassignment of its partition.
A cursor retrieved from a committed call should have no offsets. Heartbeats extend the timeout of partitions in the cursor.
Doing a heartbeat against an empty cursor should do nothing. The previous committed cursor could trigger a rebalance.
If a cursor is committed, and then a heartbeat is done against the cursor (rather than the one returned by the commit call), it updates samethe timeouts for the offsets contained.
To recover from a failure, you must store the offset of the last message that you processed (for each partition) so that you can start consuming from that message if you need to restart your consumer.
Note: Do not store the cursor; they expire after 5 minutes.
We don't provide any guidance for storing the offset of the last message that you processed, so you can use whatever method you want (for example, another stream, Kiev, a file on your machine, or Object Storage).
When your consumer restarts, read the offset of the last message that you processed, and then create a cursor of type and AFTER_OFFSETspecify the offset that you just got.
We recommend customers allocate partitions slightly higher than their maximum throughput. This will help them to manage their application spikes as we currently don't support changing the number of partition once a stream is created.
By default, we store data for 24 hours. You can set up the retention period up to 7 days while creating a stream. Once retention period is defined, it can't be edited.
The OCI Streaming console provides both operational and performance metrics, such as throughput of data input and output. OCI Streaming also integrates with OCI Telemetry so that you can collect, view, and analyze telemetry metrics for your streams.
All streams in the same tenancy have unique immutable names. Every stream has a compartment assigned. So, all the power of Oracle Cloud Infrastructure access control policies may be used to describe fine-grained rules at the tenancy, compartment, or single stream level.
Access policy is specified in a form of "Allow to in where ".
Our internet API uses the Oracle Identity service. Oracle Identity Service provides convenient way to authenticate users and authorize an access to such APIs from both browser (Username/password) and code (API Key).
OCI Streaming is secure by default - User data is encrypted both at rest and in motion. Only the account and data stream owners have access to the stream resources they create. OCI Streaming supports user authentication to control access to data. You can use Oracle Cloud Infrastructure IAM policies to selectively grant permissions to users and groups of users. You can securely put and get your data from OCI Streaming through SSL endpoints using the HTTPS protocol.
You own the data you emit; you can encrypt your data before sending it to OSS.
Ingestion (your producer - Streaming gateway): Data encrypted in motion due to SSL (HTTPS).
Inside of streaming service: On the gateway SSL gets terminated, data is encrypted upon arriving with per-stream AES-128 key, and is sent to the storage layer for persistence.
On consumption: Encrypted data is read from OSS, decrypted by the gateway node, and sent to consumer over SSL.
On consumption: Encrypted data is read from OCI Streaming, decrypted by the gateway node, and sent to consumer over SSL.
OCI Streaming uses AES-GCM 128 algorithm for encryption.
The monitoring in the Streaming service focuses on producers and consumers. For a list of the metrics emitted by the Streaming service, see the documentation.
Each metric available in the Console provides the following statistics:
These statistics offer four time intervals
For producers, consider setting alarms on the following metrics:
For consumers, consider setting the same alarms based on the following metrics:
When an alarm is triggered, the responsible team member needs to investigate the alarm and assess the situation.
If the issue is related to the client (producer or consumer), then the team member needs to resolve it or investigate more with the Dev team.
If the issue is related to the server, then the team member should contact Streaming service support.
A healthy stream is a stream that is active: messages are received successfully, and messages are consumed successfully.
Writes to the service are durable. If you can produce to your stream, and if you get a successful response, then the stream is health.
After data is ingested, it is accessible to consumers for the configured retention period.
If Get Messages API calls return elevated levels of internal server errors, the service isn't healthy.
A healthy stream also has healthy metrics:
Details about the API errors are located in the documentation.
The Streaming service supports partial failures due to throttling, per partition. In the case of a partial failure, the service returns a 200 status code and indicates the failures in the response payload.
If an entire request is throttled, you get a 429 status code.
Please contact Oracle Streaming Service to increase the limit for your tenancy.
OCI Streaming uses simple pay-as-you-go pricing. There are no upfront costs or minimum fees, and you only pay for the resources you use.
Please refer to the pricing guide for actual pricing of OCI Streaming.
Let's consider a scenario where a data producer puts 500 records per second in aggregate and each record is 2kB. The customer wants to egress/retrieve data at a rate twice that of ingress. Also the customer wants to store this data for 7 days.
Price calculation/day (just as an example)
Each record size = 4kB (rounded to 4kB for any record less than 4kB)
OCI Streaming doesn't have a free tier.