mirror of
				https://github.com/1Password/onepassword-operator.git
				synced 2025-10-25 17:00:46 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			504 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			504 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| // Copyright (c) 2017-2018 Uber Technologies, Inc.
 | |
| //
 | |
| // Licensed under the Apache License, Version 2.0 (the "License");
 | |
| // you may not use this file except in compliance with the License.
 | |
| // You may obtain a copy of the License at
 | |
| //
 | |
| // http://www.apache.org/licenses/LICENSE-2.0
 | |
| //
 | |
| // Unless required by applicable law or agreed to in writing, software
 | |
| // distributed under the License is distributed on an "AS IS" BASIS,
 | |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| // See the License for the specific language governing permissions and
 | |
| // limitations under the License.
 | |
| 
 | |
| package jaeger
 | |
| 
 | |
| import (
 | |
| 	"sync"
 | |
| 	"sync/atomic"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/opentracing/opentracing-go"
 | |
| 	"github.com/opentracing/opentracing-go/ext"
 | |
| 	"github.com/opentracing/opentracing-go/log"
 | |
| )
 | |
| 
 | |
| // Span implements opentracing.Span
 | |
| type Span struct {
 | |
| 	// referenceCounter used to increase the lifetime of
 | |
| 	// the object before return it into the pool.
 | |
| 	referenceCounter int32
 | |
| 
 | |
| 	sync.RWMutex
 | |
| 
 | |
| 	tracer *Tracer
 | |
| 
 | |
| 	// TODO: (breaking change) change to use a pointer
 | |
| 	context SpanContext
 | |
| 
 | |
| 	// The name of the "operation" this span is an instance of.
 | |
| 	// Known as a "span name" in some implementations.
 | |
| 	operationName string
 | |
| 
 | |
| 	// firstInProcess, if true, indicates that this span is the root of the (sub)tree
 | |
| 	// of spans in the current process. In other words it's true for the root spans,
 | |
| 	// and the ingress spans when the process joins another trace.
 | |
| 	firstInProcess bool
 | |
| 
 | |
| 	// startTime is the timestamp indicating when the span began, with microseconds precision.
 | |
| 	startTime time.Time
 | |
| 
 | |
| 	// duration returns duration of the span with microseconds precision.
 | |
| 	// Zero value means duration is unknown.
 | |
| 	duration time.Duration
 | |
| 
 | |
| 	// tags attached to this span
 | |
| 	tags []Tag
 | |
| 
 | |
| 	// The span's "micro-log"
 | |
| 	logs []opentracing.LogRecord
 | |
| 
 | |
| 	// The number of logs dropped because of MaxLogsPerSpan.
 | |
| 	numDroppedLogs int
 | |
| 
 | |
| 	// references for this span
 | |
| 	references []Reference
 | |
| 
 | |
| 	observer ContribSpanObserver
 | |
| }
 | |
| 
 | |
| // Tag is a simple key value wrapper.
 | |
| // TODO (breaking change) deprecate in the next major release, use opentracing.Tag instead.
 | |
| type Tag struct {
 | |
| 	key   string
 | |
| 	value interface{}
 | |
| }
 | |
| 
 | |
| // NewTag creates a new Tag.
 | |
| // TODO (breaking change) deprecate in the next major release, use opentracing.Tag instead.
 | |
| func NewTag(key string, value interface{}) Tag {
 | |
| 	return Tag{key: key, value: value}
 | |
| }
 | |
| 
 | |
| // SetOperationName sets or changes the operation name.
 | |
| func (s *Span) SetOperationName(operationName string) opentracing.Span {
 | |
| 	s.Lock()
 | |
| 	s.operationName = operationName
 | |
| 	ctx := s.context
 | |
| 	s.Unlock()
 | |
| 	if !ctx.isSamplingFinalized() {
 | |
| 		decision := s.tracer.sampler.OnSetOperationName(s, operationName)
 | |
| 		s.applySamplingDecision(decision, true)
 | |
| 	}
 | |
| 	s.observer.OnSetOperationName(operationName)
 | |
| 	return s
 | |
| }
 | |
| 
 | |
| // SetTag implements SetTag() of opentracing.Span
 | |
| func (s *Span) SetTag(key string, value interface{}) opentracing.Span {
 | |
| 	return s.setTagInternal(key, value, true)
 | |
| }
 | |
| 
 | |
| func (s *Span) setTagInternal(key string, value interface{}, lock bool) opentracing.Span {
 | |
| 	var ctx SpanContext
 | |
| 	var operationName string
 | |
| 	if lock {
 | |
| 		ctx = s.SpanContext()
 | |
| 		operationName = s.OperationName()
 | |
| 	} else {
 | |
| 		ctx = s.context
 | |
| 		operationName = s.operationName
 | |
| 	}
 | |
| 
 | |
| 	s.observer.OnSetTag(key, value)
 | |
| 	if key == string(ext.SamplingPriority) && !setSamplingPriority(ctx.samplingState, operationName, s.tracer, value) {
 | |
| 		return s
 | |
| 	}
 | |
| 	if !ctx.isSamplingFinalized() {
 | |
| 		decision := s.tracer.sampler.OnSetTag(s, key, value)
 | |
| 		s.applySamplingDecision(decision, lock)
 | |
| 	}
 | |
| 	if ctx.isWriteable() {
 | |
| 		if lock {
 | |
| 			s.Lock()
 | |
| 			defer s.Unlock()
 | |
| 		}
 | |
| 		s.appendTagNoLocking(key, value)
 | |
| 	}
 | |
| 	return s
 | |
| }
 | |
| 
 | |
| // SpanContext returns span context
 | |
| func (s *Span) SpanContext() SpanContext {
 | |
| 	s.Lock()
 | |
| 	defer s.Unlock()
 | |
| 	return s.context
 | |
| }
 | |
| 
 | |
| // StartTime returns span start time
 | |
| func (s *Span) StartTime() time.Time {
 | |
| 	s.Lock()
 | |
| 	defer s.Unlock()
 | |
| 	return s.startTime
 | |
| }
 | |
| 
 | |
| // Duration returns span duration
 | |
| func (s *Span) Duration() time.Duration {
 | |
| 	s.Lock()
 | |
| 	defer s.Unlock()
 | |
| 	return s.duration
 | |
| }
 | |
| 
 | |
| // Tags returns tags for span
 | |
| func (s *Span) Tags() opentracing.Tags {
 | |
| 	s.Lock()
 | |
| 	defer s.Unlock()
 | |
| 	var result = make(opentracing.Tags, len(s.tags))
 | |
| 	for _, tag := range s.tags {
 | |
| 		result[tag.key] = tag.value
 | |
| 	}
 | |
| 	return result
 | |
| }
 | |
| 
 | |
| // Logs returns micro logs for span
 | |
| func (s *Span) Logs() []opentracing.LogRecord {
 | |
| 	s.Lock()
 | |
| 	defer s.Unlock()
 | |
| 
 | |
| 	logs := append([]opentracing.LogRecord(nil), s.logs...)
 | |
| 	if s.numDroppedLogs != 0 {
 | |
| 		fixLogs(logs, s.numDroppedLogs)
 | |
| 	}
 | |
| 
 | |
| 	return logs
 | |
| }
 | |
| 
 | |
| // References returns references for this span
 | |
| func (s *Span) References() []opentracing.SpanReference {
 | |
| 	s.Lock()
 | |
| 	defer s.Unlock()
 | |
| 
 | |
| 	if s.references == nil || len(s.references) == 0 {
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	result := make([]opentracing.SpanReference, len(s.references))
 | |
| 	for i, r := range s.references {
 | |
| 		result[i] = opentracing.SpanReference{Type: r.Type, ReferencedContext: r.Context}
 | |
| 	}
 | |
| 	return result
 | |
| }
 | |
| 
 | |
| func (s *Span) appendTagNoLocking(key string, value interface{}) {
 | |
| 	s.tags = append(s.tags, Tag{key: key, value: value})
 | |
| }
 | |
| 
 | |
| // LogFields implements opentracing.Span API
 | |
| func (s *Span) LogFields(fields ...log.Field) {
 | |
| 	s.Lock()
 | |
| 	defer s.Unlock()
 | |
| 	if !s.context.IsSampled() {
 | |
| 		return
 | |
| 	}
 | |
| 	s.logFieldsNoLocking(fields...)
 | |
| }
 | |
| 
 | |
| // this function should only be called while holding a Write lock
 | |
| func (s *Span) logFieldsNoLocking(fields ...log.Field) {
 | |
| 	lr := opentracing.LogRecord{
 | |
| 		Fields:    fields,
 | |
| 		Timestamp: time.Now(),
 | |
| 	}
 | |
| 	s.appendLogNoLocking(lr)
 | |
| }
 | |
| 
 | |
| // LogKV implements opentracing.Span API
 | |
| func (s *Span) LogKV(alternatingKeyValues ...interface{}) {
 | |
| 	s.RLock()
 | |
| 	sampled := s.context.IsSampled()
 | |
| 	s.RUnlock()
 | |
| 	if !sampled {
 | |
| 		return
 | |
| 	}
 | |
| 	fields, err := log.InterleavedKVToFields(alternatingKeyValues...)
 | |
| 	if err != nil {
 | |
| 		s.LogFields(log.Error(err), log.String("function", "LogKV"))
 | |
| 		return
 | |
| 	}
 | |
| 	s.LogFields(fields...)
 | |
| }
 | |
| 
 | |
| // LogEvent implements opentracing.Span API
 | |
| func (s *Span) LogEvent(event string) {
 | |
| 	s.Log(opentracing.LogData{Event: event})
 | |
| }
 | |
| 
 | |
| // LogEventWithPayload implements opentracing.Span API
 | |
| func (s *Span) LogEventWithPayload(event string, payload interface{}) {
 | |
| 	s.Log(opentracing.LogData{Event: event, Payload: payload})
 | |
| }
 | |
| 
 | |
| // Log implements opentracing.Span API
 | |
| func (s *Span) Log(ld opentracing.LogData) {
 | |
| 	s.Lock()
 | |
| 	defer s.Unlock()
 | |
| 	if s.context.IsSampled() {
 | |
| 		if ld.Timestamp.IsZero() {
 | |
| 			ld.Timestamp = s.tracer.timeNow()
 | |
| 		}
 | |
| 		s.appendLogNoLocking(ld.ToLogRecord())
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // this function should only be called while holding a Write lock
 | |
| func (s *Span) appendLogNoLocking(lr opentracing.LogRecord) {
 | |
| 	maxLogs := s.tracer.options.maxLogsPerSpan
 | |
| 	if maxLogs == 0 || len(s.logs) < maxLogs {
 | |
| 		s.logs = append(s.logs, lr)
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	// We have too many logs. We don't touch the first numOld logs; we treat the
 | |
| 	// rest as a circular buffer and overwrite the oldest log among those.
 | |
| 	numOld := (maxLogs - 1) / 2
 | |
| 	numNew := maxLogs - numOld
 | |
| 	s.logs[numOld+s.numDroppedLogs%numNew] = lr
 | |
| 	s.numDroppedLogs++
 | |
| }
 | |
| 
 | |
| // rotateLogBuffer rotates the records in the buffer: records 0 to pos-1 move at
 | |
| // the end (i.e. pos circular left shifts).
 | |
| func rotateLogBuffer(buf []opentracing.LogRecord, pos int) {
 | |
| 	// This algorithm is described in:
 | |
| 	//    http://www.cplusplus.com/reference/algorithm/rotate
 | |
| 	for first, middle, next := 0, pos, pos; first != middle; {
 | |
| 		buf[first], buf[next] = buf[next], buf[first]
 | |
| 		first++
 | |
| 		next++
 | |
| 		if next == len(buf) {
 | |
| 			next = middle
 | |
| 		} else if first == middle {
 | |
| 			middle = next
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func fixLogs(logs []opentracing.LogRecord, numDroppedLogs int) {
 | |
| 	// We dropped some log events, which means that we used part of Logs as a
 | |
| 	// circular buffer (see appendLog). De-circularize it.
 | |
| 	numOld := (len(logs) - 1) / 2
 | |
| 	numNew := len(logs) - numOld
 | |
| 	rotateLogBuffer(logs[numOld:], numDroppedLogs%numNew)
 | |
| 
 | |
| 	// Replace the log in the middle (the oldest "new" log) with information
 | |
| 	// about the dropped logs. This means that we are effectively dropping one
 | |
| 	// more "new" log.
 | |
| 	numDropped := numDroppedLogs + 1
 | |
| 	logs[numOld] = opentracing.LogRecord{
 | |
| 		// Keep the timestamp of the last dropped event.
 | |
| 		Timestamp: logs[numOld].Timestamp,
 | |
| 		Fields: []log.Field{
 | |
| 			log.String("event", "dropped Span logs"),
 | |
| 			log.Int("dropped_log_count", numDropped),
 | |
| 			log.String("component", "jaeger-client"),
 | |
| 		},
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (s *Span) fixLogsIfDropped() {
 | |
| 	if s.numDroppedLogs == 0 {
 | |
| 		return
 | |
| 	}
 | |
| 	fixLogs(s.logs, s.numDroppedLogs)
 | |
| 	s.numDroppedLogs = 0
 | |
| }
 | |
| 
 | |
| // SetBaggageItem implements SetBaggageItem() of opentracing.SpanContext.
 | |
| // The call is proxied via tracer.baggageSetter to allow policies to be applied
 | |
| // before allowing to set/replace baggage keys.
 | |
| // The setter eventually stores a new SpanContext with extended baggage:
 | |
| //
 | |
| //     span.context = span.context.WithBaggageItem(key, value)
 | |
| //
 | |
| //  See SpanContext.WithBaggageItem() for explanation why it's done this way.
 | |
| func (s *Span) SetBaggageItem(key, value string) opentracing.Span {
 | |
| 	s.Lock()
 | |
| 	defer s.Unlock()
 | |
| 	s.tracer.setBaggage(s, key, value)
 | |
| 	return s
 | |
| }
 | |
| 
 | |
| // BaggageItem implements BaggageItem() of opentracing.SpanContext
 | |
| func (s *Span) BaggageItem(key string) string {
 | |
| 	s.RLock()
 | |
| 	defer s.RUnlock()
 | |
| 	return s.context.baggage[key]
 | |
| }
 | |
| 
 | |
| // Finish implements opentracing.Span API
 | |
| // After finishing the Span object it returns back to the allocator unless the reporter retains it again,
 | |
| // so after that, the Span object should no longer be used because it won't be valid anymore.
 | |
| func (s *Span) Finish() {
 | |
| 	s.FinishWithOptions(opentracing.FinishOptions{})
 | |
| }
 | |
| 
 | |
| // FinishWithOptions implements opentracing.Span API
 | |
| func (s *Span) FinishWithOptions(options opentracing.FinishOptions) {
 | |
| 	if options.FinishTime.IsZero() {
 | |
| 		options.FinishTime = s.tracer.timeNow()
 | |
| 	}
 | |
| 	s.observer.OnFinish(options)
 | |
| 	s.Lock()
 | |
| 	s.duration = options.FinishTime.Sub(s.startTime)
 | |
| 	ctx := s.context
 | |
| 	s.Unlock()
 | |
| 	if !ctx.isSamplingFinalized() {
 | |
| 		decision := s.tracer.sampler.OnFinishSpan(s)
 | |
| 		s.applySamplingDecision(decision, true)
 | |
| 	}
 | |
| 	if ctx.IsSampled() {
 | |
| 		s.Lock()
 | |
| 		s.fixLogsIfDropped()
 | |
| 		if len(options.LogRecords) > 0 || len(options.BulkLogData) > 0 {
 | |
| 			// Note: bulk logs are not subject to maxLogsPerSpan limit
 | |
| 			if options.LogRecords != nil {
 | |
| 				s.logs = append(s.logs, options.LogRecords...)
 | |
| 			}
 | |
| 			for _, ld := range options.BulkLogData {
 | |
| 				s.logs = append(s.logs, ld.ToLogRecord())
 | |
| 			}
 | |
| 		}
 | |
| 		s.Unlock()
 | |
| 	}
 | |
| 	// call reportSpan even for non-sampled traces, to return span to the pool
 | |
| 	// and update metrics counter
 | |
| 	s.tracer.reportSpan(s)
 | |
| }
 | |
| 
 | |
| // Context implements opentracing.Span API
 | |
| func (s *Span) Context() opentracing.SpanContext {
 | |
| 	s.Lock()
 | |
| 	defer s.Unlock()
 | |
| 	return s.context
 | |
| }
 | |
| 
 | |
| // Tracer implements opentracing.Span API
 | |
| func (s *Span) Tracer() opentracing.Tracer {
 | |
| 	return s.tracer
 | |
| }
 | |
| 
 | |
| func (s *Span) String() string {
 | |
| 	s.RLock()
 | |
| 	defer s.RUnlock()
 | |
| 	return s.context.String()
 | |
| }
 | |
| 
 | |
| // OperationName allows retrieving current operation name.
 | |
| func (s *Span) OperationName() string {
 | |
| 	s.RLock()
 | |
| 	defer s.RUnlock()
 | |
| 	return s.operationName
 | |
| }
 | |
| 
 | |
| // Retain increases object counter to increase the lifetime of the object
 | |
| func (s *Span) Retain() *Span {
 | |
| 	atomic.AddInt32(&s.referenceCounter, 1)
 | |
| 	return s
 | |
| }
 | |
| 
 | |
| // Release decrements object counter and return to the
 | |
| // allocator manager  when counter will below zero
 | |
| func (s *Span) Release() {
 | |
| 	if atomic.AddInt32(&s.referenceCounter, -1) == -1 {
 | |
| 		s.tracer.spanAllocator.Put(s)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // reset span state and release unused data
 | |
| func (s *Span) reset() {
 | |
| 	s.firstInProcess = false
 | |
| 	s.context = emptyContext
 | |
| 	s.operationName = ""
 | |
| 	s.tracer = nil
 | |
| 	s.startTime = time.Time{}
 | |
| 	s.duration = 0
 | |
| 	s.observer = nil
 | |
| 	atomic.StoreInt32(&s.referenceCounter, 0)
 | |
| 
 | |
| 	// Note: To reuse memory we can save the pointers on the heap
 | |
| 	s.tags = s.tags[:0]
 | |
| 	s.logs = s.logs[:0]
 | |
| 	s.numDroppedLogs = 0
 | |
| 	s.references = s.references[:0]
 | |
| }
 | |
| 
 | |
| func (s *Span) serviceName() string {
 | |
| 	return s.tracer.serviceName
 | |
| }
 | |
| 
 | |
| func (s *Span) applySamplingDecision(decision SamplingDecision, lock bool) {
 | |
| 	var ctx SpanContext
 | |
| 	if lock {
 | |
| 		ctx = s.SpanContext()
 | |
| 	} else {
 | |
| 		ctx = s.context
 | |
| 	}
 | |
| 
 | |
| 	if !decision.Retryable {
 | |
| 		ctx.samplingState.setFinal()
 | |
| 	}
 | |
| 	if decision.Sample {
 | |
| 		ctx.samplingState.setSampled()
 | |
| 		if len(decision.Tags) > 0 {
 | |
| 			if lock {
 | |
| 				s.Lock()
 | |
| 				defer s.Unlock()
 | |
| 			}
 | |
| 			for _, tag := range decision.Tags {
 | |
| 				s.appendTagNoLocking(tag.key, tag.value)
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // setSamplingPriority returns true if the flag was updated successfully, false otherwise.
 | |
| // The behavior of setSamplingPriority is surprising
 | |
| // If noDebugFlagOnForcedSampling is set
 | |
| //     setSamplingPriority(..., 1) always sets only flagSampled
 | |
| // If noDebugFlagOnForcedSampling is unset, and isDebugAllowed passes
 | |
| //     setSamplingPriority(..., 1) sets both flagSampled and flagDebug
 | |
| // However,
 | |
| //     setSamplingPriority(..., 0) always only resets flagSampled
 | |
| //
 | |
| // This means that doing a setSamplingPriority(..., 1) followed by setSamplingPriority(..., 0) can
 | |
| // leave flagDebug set
 | |
| func setSamplingPriority(state *samplingState, operationName string, tracer *Tracer, value interface{}) bool {
 | |
| 	val, ok := value.(uint16)
 | |
| 	if !ok {
 | |
| 		return false
 | |
| 	}
 | |
| 	if val == 0 {
 | |
| 		state.unsetSampled()
 | |
| 		state.setFinal()
 | |
| 		return true
 | |
| 	}
 | |
| 	if tracer.options.noDebugFlagOnForcedSampling {
 | |
| 		state.setSampled()
 | |
| 		state.setFinal()
 | |
| 		return true
 | |
| 	} else if tracer.isDebugAllowed(operationName) {
 | |
| 		state.setDebugAndSampled()
 | |
| 		state.setFinal()
 | |
| 		return true
 | |
| 	}
 | |
| 	return false
 | |
| }
 | |
| 
 | |
| // EnableFirehose enables firehose flag on the span context
 | |
| func EnableFirehose(s *Span) {
 | |
| 	s.Lock()
 | |
| 	defer s.Unlock()
 | |
| 	s.context.samplingState.setFirehose()
 | |
| }
 | 
